#define BOOST_TEST_DYN_LINK #include "core/datanode_base.h" #include "utility/bit_operations.hpp" #include "utility/debug_utility.hpp" #include #include #include #include #include #include #include #include #include #include using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using namespace sophiar; using namespace std::chrono_literals; awaitable test_datanode_base_1() { struct type_a : public datanode_base { unsigned int cnt = 0; awaitable exec() { ++cnt; co_return true; } } node_a; BOOST_TEST(co_await node_a.init()); BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::MANUAL)); BOOST_TEST(co_await node_a.start()); BOOST_TEST(co_await node_a.trigger()); BOOST_TEST(node_a.cnt == 1); co_await coro_sleep(50ms); node_a.set_minimal_exec_interval(10ms); for (int i = 0; i < 5; ++i) { co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached); } BOOST_TEST(node_a.cnt == 2); co_await coro_sleep(50ms); BOOST_TEST(node_a.cnt == 3); node_a.cnt = 0; co_await coro_sleep(50ms); node_a.set_minimal_exec_interval(10ms); for (int i = 0; i < 5; ++i) { co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached); } BOOST_TEST(node_a.cancel_pending()); BOOST_TEST(node_a.cnt == 1); co_await coro_sleep(50ms); BOOST_TEST(node_a.cnt == 1); co_await node_a.stop(); node_a.cnt = 0; BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::INPUT)); node_a.set_trigger_input_mask(1); BOOST_TEST(co_await node_a.start()); node_a.update_input_data(0, type_a::data_packet::new_instance()); BOOST_TEST(node_a.cnt == 1); co_await coro_sleep(50ms); for (int i = 0; i < 5; ++i) { node_a.update_input_data(0, type_a::data_packet::new_instance()); } BOOST_TEST(node_a.cnt == 2); co_await coro_sleep(50ms); BOOST_TEST(node_a.cnt == 3); co_await node_a.stop(); node_a.cnt = 0; BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::PERIODIC)); node_a.set_trigger_interval(50ms); co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable { co_await coro_sleep(275ms); co_await node_a.stop(); }, detached); co_await node_a.start(); co_await coro_sleep(350ms); BOOST_TEST(node_a.cnt == 6); BOOST_TEST((node_a.get_state() == type_a::state_type::PENDING)); } awaitable test_datanode_base_2() { struct test_type : public datanode_base { unsigned int cnt = 0; awaitable exec() { auto input_data = get_input_data(0); cnt = (*input_data)[0]; auto output_data = data_packet::new_instance(); output_data->copy_content(*input_data); (*output_data)[0] += 1.0; set_output_data(0, output_data); co_return true; } }; test_type node_a, node_b; test_type::connect(node_a, 0, node_b, 0); co_await node_a.init(); co_await node_b.init(); node_a.set_trigger_mode(test_type::trigger_mode_type::INPUT); node_b.set_trigger_mode(test_type::trigger_mode_type::INPUT); node_a.set_trigger_input_mask(0b1); node_b.set_trigger_input_mask(0b1); co_await node_a.start(); co_await node_b.start(); auto packet = test_type::data_packet::new_instance(); packet->refresh(); (*packet)[0] = 101; node_a.update_input_data(0, packet); BOOST_TEST(node_a.cnt == 101); BOOST_TEST(node_b.cnt == 102); } awaitable test_datanode_base_speed(size_t length, size_t repeat) { struct test_type : public datanode_base { uint64_t cnt = 0; awaitable exec() { auto input_data = get_input_data(0); auto output_data = data_packet::new_instance(); output_data->copy_content(*input_data); for (size_t i = 0; i < test_type::data_packet::DATA_PACKET_LENGTH; ++i) { (*output_data)[i] += i; } std::reverse(output_data->data, output_data->data + test_type::data_packet::DATA_PACKET_LENGTH); output_data->timestamp = current_timestamp(); cnt = (*output_data)[0]; set_output_data(0, output_data); co_return true; } }; auto pool = new test_type[length]; for (size_t i = 0; i < length; ++i) { auto &node = pool[i]; co_await node.init(); node.set_trigger_mode(test_type::trigger_mode_type::INPUT); node.set_trigger_input_mask(0b1); if (i != 0) { test_type::connect(pool[i - 1], 0, node, 0); } co_await node.start(); } auto test_data = test_type::data_packet::new_instance(); test_data->refresh(); auto start_ts = current_timestamp(); for (int i = 0; i < repeat; ++i) { pool[0].update_input_data(0, test_data); // assert(pool[length - 1].cnt == length); } auto time_used = (current_timestamp() - start_ts) / 1000.0; auto time_left = 1000.0 / repeat - time_used / repeat; std::cout << fmt::format("Length = {}, Repeat = {}, " "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)", length, repeat, time_used, time_left, time_left / (10.0 / repeat)) << std::endl; BOOST_TEST(true); } BOOST_AUTO_TEST_CASE(test_datanode_base) { // co_spawn(high_freq_context, test_datanode_base_1(), detached); // co_spawn(high_freq_context, test_datanode_base_2(), detached); // high_freq_context.run(); for (auto length: {1, 4, 16, 64, 128}) for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000}) co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached); high_freq_context.run(); }