| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- #define BOOST_TEST_DYN_LINK
- #include "core/datanode_base.h"
- #include "utility/bit_operations.hpp"
- #include "utility/debug_utility.hpp"
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <boost/asio/this_coro.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/test/unit_test.hpp>
- #include <fmt/format.h>
- #include <algorithm>
- #include <chrono>
- #include <fstream>
- #include <iostream>
- using boost::asio::awaitable;
- using boost::asio::co_spawn;
- using boost::asio::detached;
- using boost::asio::use_awaitable;
- using namespace sophiar;
- using namespace std::chrono_literals;
- struct cnt_node_type : public datanode_base {
- DEFAULT_NEW_INSTANCE(cnt_node_type);
- unsigned int cnt = 0;
- void load_construct_config(const nlohmann::json &config) override {
- auto input_index = register_input("input_1");
- BOOST_TEST(input_index == 0);
- datanode_base::load_construct_config(config);
- }
- awaitable<bool> exec() {
- ++cnt;
- co_return true;
- }
- };
- struct trigger_node_type : public sophiar_obj {
- DEFAULT_NEW_INSTANCE(trigger_node_type);
- tiny_signal<> trigger_signal;
- void load_construct_config(const nlohmann::json &config) override {
- get_manager().register_signal(this, "triggered", trigger_signal);
- }
- void trigger() {
- trigger_signal.emit();
- }
- };
- struct update_node_type : public sophiar_obj {
- DEFAULT_NEW_INSTANCE(update_node_type);
- tiny_signal<versatile_obj::pointer> trigger_signal;
- void load_construct_config(const nlohmann::json &config) override {
- get_manager().register_signal(this, "triggered", trigger_signal);
- }
- void trigger() {
- trigger_signal.emit(versatile_obj::new_instance());
- }
- };
- struct test_node_type : public datanode_base {
- DEFAULT_NEW_INSTANCE(test_node_type);
- unsigned int cnt = 0;
- void load_construct_config(const nlohmann::json &config) override {
- auto input_index = register_input("input_1");
- BOOST_TEST(input_index == 0);
- auto output_index = register_output("output_1");
- BOOST_TEST(output_index == 0);
- datanode_base::load_construct_config(config);
- }
- awaitable<bool> exec() {
- auto input_data = get_input_data(0);
- cnt = (*input_data)[0];
- auto output_data = versatile_obj::new_instance();
- output_data->copy_from(*input_data);
- (*output_data)[0] += 1.0;
- set_output_data(0, output_data);
- co_return true;
- }
- };
- struct speedtest_node_type : public datanode_base {
- uint64_t cnt = 0;
- void load_construct_config(const nlohmann::json &config) override {
- auto input_index = register_input("input_1");
- BOOST_TEST(input_index == 0);
- auto output_index = register_output("output_1");
- BOOST_TEST(output_index == 0);
- datanode_base::load_construct_config(config);
- }
- awaitable<bool> exec() {
- auto input_data = get_input_data(0);
- auto output_data = versatile_obj::new_instance();
- output_data->copy_from(*input_data);
- for (size_t i = 0; i < versatile_data::FLOAT_FILED_LENGTH; ++i) {
- (*output_data)[i] += i;
- }
- std::reverse(output_data->floats, output_data->floats + versatile_data::FLOAT_FILED_LENGTH);
- output_data->timestamp = current_timestamp();
- cnt = (*output_data)[0];
- set_output_data(0, output_data);
- co_return true;
- }
- };
- awaitable<void> test_datanode_base_1() {
- auto trigger_node = dynamic_cast<trigger_node_type *>(global_sophiar_manager.get_object("trigger_node"));
- auto cnt_node = dynamic_cast<cnt_node_type *>(global_sophiar_manager.get_object("cnt_node"));
- assert(trigger_node != nullptr);
- assert(cnt_node != nullptr);
- co_await global_sophiar_manager.switch_mode("mode_a");
- trigger_node->trigger();
- co_await coro_sleep(10ms);
- BOOST_TEST(cnt_node->cnt == 1);
- co_await coro_sleep(50ms);
- for (int i = 0; i < 5; ++i) {
- trigger_node->trigger();
- }
- BOOST_TEST(cnt_node->cnt == 2);
- co_await coro_sleep(50ms);
- BOOST_TEST(cnt_node->cnt == 3);
- co_await global_sophiar_manager.switch_mode("mode_b");
- auto update_node = dynamic_cast<update_node_type *>(global_sophiar_manager.get_object("update_node"));
- assert(update_node != nullptr);
- cnt_node->cnt = 0;
- update_node->trigger();
- co_await coro_sleep(10ms);
- BOOST_TEST(cnt_node->cnt == 1);
- co_await coro_sleep(50ms);
- for (int i = 0; i < 5; ++i) {
- update_node->trigger();
- }
- BOOST_TEST(cnt_node->cnt == 2);
- co_await coro_sleep(50ms);
- BOOST_TEST(cnt_node->cnt == 3);
- cnt_node->cnt = 0;
- co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable<void> {
- co_await coro_sleep(275ms);
- co_await cnt_node->stop();
- }, detached);
- co_await global_sophiar_manager.switch_mode("mode_c");
- co_await coro_sleep(350ms);
- BOOST_TEST(cnt_node->cnt == 6);
- BOOST_TEST((cnt_node->get_state() == datanode_base::state_type::PENDING));
- auto test_node_a = dynamic_cast<test_node_type *>(global_sophiar_manager.get_object("test_node_a"));
- auto test_node_b = dynamic_cast<test_node_type *>(global_sophiar_manager.get_object("test_node_b"));
- assert(test_node_a != nullptr);
- assert(test_node_b != nullptr);
- auto &test_slot = global_sophiar_manager.get_slot<versatile_obj::pointer>("test_node_a", "input_1");
- co_await global_sophiar_manager.switch_mode("mode_d");
- auto packet = versatile_obj::new_instance();
- (*packet)[0] = 100;
- test_slot.signal_received(std::move(packet));
- BOOST_TEST(test_node_a->cnt == 100);
- BOOST_TEST(test_node_b->cnt == 101);
- co_return;
- }
- awaitable<void> test_datanode_base_speed(size_t length, size_t repeat) {
- std::destroy_at(&global_sophiar_manager);
- std::construct_at(&global_sophiar_manager);
- nlohmann::json config_json;
- config_json["mode_list"].push_back({{"name", "test_mode"}});
- nlohmann::json object_config;
- object_config["type"] = "speedtest_node_type";
- object_config["enabled_modes"].push_back("test_mode");
- object_config["construct_config"] = nlohmann::json({});
- object_config["init_configs"] = nlohmann::json::array();
- nlohmann::json start_config;
- start_config["modes"].push_back("test_mode");
- start_config["config"]["trigger_mode"] = "input";
- start_config["config"]["input_mask"] = 1;
- object_config["start_configs"].push_back(start_config);
- for (auto i = 0; i < length; ++i) {
- object_config["name"] = fmt::format("speedtest_node_{}", i);
- config_json["object_list"].push_back(object_config);
- }
- std::cout << config_json.dump() << std::endl;
- // TODO
- //
- // auto pool = new test_type[length];
- // for (size_t i = 0; i < length; ++i) {
- // auto &node = pool[i];
- // co_await node.init();
- // node.set_trigger_mode(test_type::trigger_mode_type::INPUT);
- // node.set_trigger_input_mask(0b1);
- // if (i != 0) {
- // test_type::connect(pool[i - 1], 0, node, 0);
- // }
- // co_await node.start();
- // }
- //
- // auto test_data = test_type::data_packet::new_instance();
- // auto start_ts = current_timestamp();
- // for (int i = 0; i < repeat; ++i) {
- // pool[0].update_input_data(0, test_data);
- //// assert(pool[length - 1].cnt == length);
- // }
- //
- // auto time_used = (current_timestamp() - start_ts) / 1000.0;
- // auto time_left = 1000.0 / repeat - time_used / repeat;
- // std::cout << fmt::format("Length = {}, Repeat = {}, "
- // "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)",
- // length, repeat, time_used,
- // time_left, time_left / (10.0 / repeat))
- // << std::endl;
- //
- // BOOST_TEST(true);
- co_return;
- }
- BOOST_AUTO_TEST_CASE(test_datanode_base) {
- spdlog::set_level(spdlog::level::trace);
- REGISTER_TYPE(cnt_node_type);
- REGISTER_TYPE(trigger_node_type);
- REGISTER_TYPE(update_node_type);
- REGISTER_TYPE(test_node_type);
- std::ifstream config_file("data/datanode_base_config.json");
- BOOST_TEST(config_file.is_open());
- global_sophiar_manager.build_from_config(nlohmann::json::parse(config_file));
- co_spawn(global_context, test_datanode_base_1(), detached);
- // co_spawn(high_freq_context, test_datanode_base_2(), detached);
- // for (auto length: {1, 4, 16, 64, 128})
- // for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000})
- // co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached);
- global_context.run();
- }
- BOOST_AUTO_TEST_CASE(test_datanode_speed) {
- co_spawn(global_context, test_datanode_base_speed(3, 1), detached);
- global_context.run();
- }
|