#define BOOST_TEST_DYN_LINK #include "core/datanode_base.h" #include "utility/bit_operations.hpp" #include "utility/debug_utility.hpp" #include #include #include #include #include #include #include #include #include #include #include using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using namespace sophiar; using namespace std::chrono_literals; struct cnt_node_type : public datanode_base { DEFAULT_NEW_INSTANCE(cnt_node_type); unsigned int cnt = 0; void load_construct_config(const nlohmann::json &config) override { auto input_index = register_input("input_1"); BOOST_TEST(input_index == 0); datanode_base::load_construct_config(config); } awaitable exec() { ++cnt; co_return true; } }; struct trigger_node_type : public sophiar_obj { DEFAULT_NEW_INSTANCE(trigger_node_type); tiny_signal<> trigger_signal; void load_construct_config(const nlohmann::json &config) override { get_manager().register_signal(this, "triggered", trigger_signal); } void trigger() { trigger_signal.emit(); } }; struct update_node_type : public sophiar_obj { DEFAULT_NEW_INSTANCE(update_node_type); tiny_signal trigger_signal; void load_construct_config(const nlohmann::json &config) override { get_manager().register_signal(this, "triggered", trigger_signal); } void trigger() { trigger_signal.emit(versatile_obj::new_instance()); } }; struct test_node_type : public datanode_base { DEFAULT_NEW_INSTANCE(test_node_type); unsigned int cnt = 0; void load_construct_config(const nlohmann::json &config) override { auto input_index = register_input("input_1"); BOOST_TEST(input_index == 0); auto output_index = register_output("output_1"); BOOST_TEST(output_index == 0); datanode_base::load_construct_config(config); } awaitable exec() { auto input_data = get_input_data(0); cnt = (*input_data)[0]; auto output_data = versatile_obj::new_instance(); output_data->copy_from(*input_data); (*output_data)[0] += 1.0; set_output_data(0, output_data); co_return true; } }; struct speedtest_node_type : public datanode_base { uint64_t cnt = 0; void load_construct_config(const nlohmann::json &config) override { auto input_index = register_input("input_1"); BOOST_TEST(input_index == 0); auto output_index = register_output("output_1"); BOOST_TEST(output_index == 0); datanode_base::load_construct_config(config); } awaitable exec() { auto input_data = get_input_data(0); auto output_data = versatile_obj::new_instance(); output_data->copy_from(*input_data); for (size_t i = 0; i < versatile_data::FLOAT_FILED_LENGTH; ++i) { (*output_data)[i] += i; } std::reverse(output_data->floats, output_data->floats + versatile_data::FLOAT_FILED_LENGTH); output_data->timestamp = current_timestamp(); cnt = (*output_data)[0]; set_output_data(0, output_data); co_return true; } }; awaitable test_datanode_base_1() { auto trigger_node = dynamic_cast(global_sophiar_manager.get_object("trigger_node")); auto cnt_node = dynamic_cast(global_sophiar_manager.get_object("cnt_node")); assert(trigger_node != nullptr); assert(cnt_node != nullptr); co_await global_sophiar_manager.switch_mode("mode_a"); trigger_node->trigger(); co_await coro_sleep(10ms); BOOST_TEST(cnt_node->cnt == 1); co_await coro_sleep(50ms); for (int i = 0; i < 5; ++i) { trigger_node->trigger(); } BOOST_TEST(cnt_node->cnt == 2); co_await coro_sleep(50ms); BOOST_TEST(cnt_node->cnt == 3); co_await global_sophiar_manager.switch_mode("mode_b"); auto update_node = dynamic_cast(global_sophiar_manager.get_object("update_node")); assert(update_node != nullptr); cnt_node->cnt = 0; update_node->trigger(); co_await coro_sleep(10ms); BOOST_TEST(cnt_node->cnt == 1); co_await coro_sleep(50ms); for (int i = 0; i < 5; ++i) { update_node->trigger(); } BOOST_TEST(cnt_node->cnt == 2); co_await coro_sleep(50ms); BOOST_TEST(cnt_node->cnt == 3); cnt_node->cnt = 0; co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable { co_await coro_sleep(275ms); co_await cnt_node->stop(); }, detached); co_await global_sophiar_manager.switch_mode("mode_c"); co_await coro_sleep(350ms); BOOST_TEST(cnt_node->cnt == 6); BOOST_TEST((cnt_node->get_state() == datanode_base::state_type::PENDING)); auto test_node_a = dynamic_cast(global_sophiar_manager.get_object("test_node_a")); auto test_node_b = dynamic_cast(global_sophiar_manager.get_object("test_node_b")); assert(test_node_a != nullptr); assert(test_node_b != nullptr); auto &test_slot = global_sophiar_manager.get_slot("test_node_a", "input_1"); co_await global_sophiar_manager.switch_mode("mode_d"); auto packet = versatile_obj::new_instance(); (*packet)[0] = 100; test_slot.signal_received(std::move(packet)); BOOST_TEST(test_node_a->cnt == 100); BOOST_TEST(test_node_b->cnt == 101); co_return; } awaitable test_datanode_base_speed(size_t length, size_t repeat) { std::destroy_at(&global_sophiar_manager); std::construct_at(&global_sophiar_manager); nlohmann::json config_json; config_json["mode_list"].push_back({{"name", "test_mode"}}); nlohmann::json object_config; object_config["type"] = "speedtest_node_type"; object_config["enabled_modes"].push_back("test_mode"); object_config["construct_config"] = nlohmann::json({}); object_config["init_configs"] = nlohmann::json::array(); nlohmann::json start_config; start_config["modes"].push_back("test_mode"); start_config["config"]["trigger_mode"] = "input"; start_config["config"]["input_mask"] = 1; object_config["start_configs"].push_back(start_config); for (auto i = 0; i < length; ++i) { object_config["name"] = fmt::format("speedtest_node_{}", i); config_json["object_list"].push_back(object_config); } std::cout << config_json.dump() << std::endl; // TODO // // auto pool = new test_type[length]; // for (size_t i = 0; i < length; ++i) { // auto &node = pool[i]; // co_await node.init(); // node.set_trigger_mode(test_type::trigger_mode_type::INPUT); // node.set_trigger_input_mask(0b1); // if (i != 0) { // test_type::connect(pool[i - 1], 0, node, 0); // } // co_await node.start(); // } // // auto test_data = test_type::data_packet::new_instance(); // auto start_ts = current_timestamp(); // for (int i = 0; i < repeat; ++i) { // pool[0].update_input_data(0, test_data); //// assert(pool[length - 1].cnt == length); // } // // auto time_used = (current_timestamp() - start_ts) / 1000.0; // auto time_left = 1000.0 / repeat - time_used / repeat; // std::cout << fmt::format("Length = {}, Repeat = {}, " // "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)", // length, repeat, time_used, // time_left, time_left / (10.0 / repeat)) // << std::endl; // // BOOST_TEST(true); co_return; } BOOST_AUTO_TEST_CASE(test_datanode_base) { spdlog::set_level(spdlog::level::trace); REGISTER_TYPE(cnt_node_type); REGISTER_TYPE(trigger_node_type); REGISTER_TYPE(update_node_type); REGISTER_TYPE(test_node_type); std::ifstream config_file("data/datanode_base_config.json"); BOOST_TEST(config_file.is_open()); global_sophiar_manager.build_from_config(nlohmann::json::parse(config_file)); co_spawn(global_context, test_datanode_base_1(), detached); // co_spawn(high_freq_context, test_datanode_base_2(), detached); // for (auto length: {1, 4, 16, 64, 128}) // for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000}) // co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached); global_context.run(); } BOOST_AUTO_TEST_CASE(test_datanode_speed) { co_spawn(global_context, test_datanode_base_speed(3, 1), detached); global_context.run(); }