| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- #define BOOST_TEST_DYN_LINK
- #include "core/datanode_base.h"
- #include "utility/bit_operations.hpp"
- #include "utility/debug_utility.hpp"
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <boost/asio/this_coro.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/test/unit_test.hpp>
- #include <fmt/format.h>
- #include <algorithm>
- #include <chrono>
- #include <iostream>
- using boost::asio::awaitable;
- using boost::asio::co_spawn;
- using boost::asio::detached;
- using boost::asio::use_awaitable;
- using namespace sophiar;
- using namespace std::chrono_literals;
- awaitable<void> test_datanode_base_1() {
- struct type_a : public datanode_base<high_freq_tag> {
- unsigned int cnt = 0;
- awaitable<bool> exec() {
- ++cnt;
- co_return true;
- }
- } node_a;
- BOOST_TEST(co_await node_a.init());
- BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::MANUAL));
- BOOST_TEST(co_await node_a.start());
- BOOST_TEST(co_await node_a.trigger());
- BOOST_TEST(node_a.cnt == 1);
- co_await coro_sleep(50ms);
- node_a.set_minimal_exec_interval(10ms);
- for (int i = 0; i < 5; ++i) {
- co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached);
- }
- BOOST_TEST(node_a.cnt == 2);
- co_await coro_sleep(50ms);
- BOOST_TEST(node_a.cnt == 3);
- node_a.cnt = 0;
- co_await coro_sleep(50ms);
- node_a.set_minimal_exec_interval(10ms);
- for (int i = 0; i < 5; ++i) {
- co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached);
- }
- BOOST_TEST(node_a.cancel_pending());
- BOOST_TEST(node_a.cnt == 1);
- co_await coro_sleep(50ms);
- BOOST_TEST(node_a.cnt == 1);
- co_await node_a.stop();
- node_a.cnt = 0;
- BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::INPUT));
- node_a.set_trigger_input_mask(1);
- BOOST_TEST(co_await node_a.start());
- node_a.update_input_data(0, type_a::data_packet::new_instance());
- BOOST_TEST(node_a.cnt == 1);
- co_await coro_sleep(50ms);
- for (int i = 0; i < 5; ++i) {
- node_a.update_input_data(0, type_a::data_packet::new_instance());
- }
- BOOST_TEST(node_a.cnt == 2);
- co_await coro_sleep(50ms);
- BOOST_TEST(node_a.cnt == 3);
- co_await node_a.stop();
- node_a.cnt = 0;
- BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::PERIODIC));
- node_a.set_trigger_interval(50ms);
- co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable<void> {
- co_await coro_sleep(275ms);
- co_await node_a.stop();
- }, detached);
- co_await node_a.start();
- co_await coro_sleep(350ms);
- BOOST_TEST(node_a.cnt == 6);
- BOOST_TEST((node_a.get_state() == type_a::state_type::PENDING));
- }
- awaitable<void> test_datanode_base_2() {
- struct test_type : public datanode_base<high_freq_tag> {
- unsigned int cnt = 0;
- awaitable<bool> exec() {
- auto input_data = get_input_data(0);
- cnt = (*input_data)[0];
- auto output_data = data_packet::new_instance();
- output_data->copy_content(*input_data);
- (*output_data)[0] += 1.0;
- set_output_data(0, output_data);
- co_return true;
- }
- };
- test_type node_a, node_b;
- test_type::connect(node_a, 0, node_b, 0);
- co_await node_a.init();
- co_await node_b.init();
- node_a.set_trigger_mode(test_type::trigger_mode_type::INPUT);
- node_b.set_trigger_mode(test_type::trigger_mode_type::INPUT);
- node_a.set_trigger_input_mask(0b1);
- node_b.set_trigger_input_mask(0b1);
- co_await node_a.start();
- co_await node_b.start();
- auto packet = test_type::data_packet::new_instance();
- packet->refresh();
- (*packet)[0] = 101;
- node_a.update_input_data(0, packet);
- BOOST_TEST(node_a.cnt == 101);
- BOOST_TEST(node_b.cnt == 102);
- }
- awaitable<void> test_datanode_base_speed(size_t length, size_t repeat) {
- struct test_type : public datanode_base<high_freq_tag> {
- uint64_t cnt = 0;
- awaitable<bool> exec() {
- auto input_data = get_input_data(0);
- auto output_data = data_packet::new_instance();
- output_data->copy_content(*input_data);
- for (size_t i = 0; i < test_type::data_packet::DATA_PACKET_LENGTH; ++i) {
- (*output_data)[i] += i;
- }
- std::reverse(output_data->data, output_data->data + test_type::data_packet::DATA_PACKET_LENGTH);
- output_data->timestamp = current_timestamp();
- cnt = (*output_data)[0];
- set_output_data(0, output_data);
- co_return true;
- }
- };
- auto pool = new test_type[length];
- for (size_t i = 0; i < length; ++i) {
- auto &node = pool[i];
- co_await node.init();
- node.set_trigger_mode(test_type::trigger_mode_type::INPUT);
- node.set_trigger_input_mask(0b1);
- if (i != 0) {
- test_type::connect(pool[i - 1], 0, node, 0);
- }
- co_await node.start();
- }
- auto test_data = test_type::data_packet::new_instance();
- test_data->refresh();
- auto start_ts = current_timestamp();
- for (int i = 0; i < repeat; ++i) {
- pool[0].update_input_data(0, test_data);
- // assert(pool[length - 1].cnt == length);
- }
- auto time_used = (current_timestamp() - start_ts) / 1000.0;
- auto time_left = 1000.0 / repeat - time_used / repeat;
- std::cout << fmt::format("Length = {}, Repeat = {}, "
- "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)",
- length, repeat, time_used,
- time_left, time_left / (10.0 / repeat))
- << std::endl;
- BOOST_TEST(true);
- }
- BOOST_AUTO_TEST_CASE(test_datanode_base) {
- // co_spawn(high_freq_context, test_datanode_base_1(), detached);
- // co_spawn(high_freq_context, test_datanode_base_2(), detached);
- // high_freq_context.run();
- for (auto length: {1, 4, 16, 64, 128})
- for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000})
- co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached);
- high_freq_context.run();
- }
|