datanode_base.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #define BOOST_TEST_DYN_LINK
  2. #include "core/datanode_base.h"
  3. #include "utility/bit_operations.hpp"
  4. #include "utility/debug_utility.hpp"
  5. #include <boost/asio/co_spawn.hpp>
  6. #include <boost/asio/detached.hpp>
  7. #include <boost/asio/high_resolution_timer.hpp>
  8. #include <boost/asio/this_coro.hpp>
  9. #include <boost/asio/use_awaitable.hpp>
  10. #include <boost/test/unit_test.hpp>
  11. #include <fmt/format.h>
  12. #include <algorithm>
  13. #include <chrono>
  14. #include <iostream>
  15. using boost::asio::awaitable;
  16. using boost::asio::co_spawn;
  17. using boost::asio::detached;
  18. using boost::asio::use_awaitable;
  19. using namespace sophiar;
  20. using namespace std::chrono_literals;
  21. awaitable<void> test_datanode_base_1() {
  22. struct type_a : public datanode_base<high_freq_tag> {
  23. unsigned int cnt = 0;
  24. awaitable<bool> exec() {
  25. ++cnt;
  26. co_return true;
  27. }
  28. } node_a;
  29. BOOST_TEST(co_await node_a.init());
  30. BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::MANUAL));
  31. BOOST_TEST(co_await node_a.start());
  32. BOOST_TEST(co_await node_a.trigger());
  33. BOOST_TEST(node_a.cnt == 1);
  34. co_await coro_sleep(50ms);
  35. node_a.set_minimal_exec_interval(10ms);
  36. for (int i = 0; i < 5; ++i) {
  37. co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached);
  38. }
  39. BOOST_TEST(node_a.cnt == 2);
  40. co_await coro_sleep(50ms);
  41. BOOST_TEST(node_a.cnt == 3);
  42. node_a.cnt = 0;
  43. co_await coro_sleep(50ms);
  44. node_a.set_minimal_exec_interval(10ms);
  45. for (int i = 0; i < 5; ++i) {
  46. co_spawn(co_await boost::asio::this_coro::executor, node_a.trigger(), detached);
  47. }
  48. BOOST_TEST(node_a.cancel_pending());
  49. BOOST_TEST(node_a.cnt == 1);
  50. co_await coro_sleep(50ms);
  51. BOOST_TEST(node_a.cnt == 1);
  52. co_await node_a.stop();
  53. node_a.cnt = 0;
  54. BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::INPUT));
  55. node_a.set_trigger_input_mask(1);
  56. BOOST_TEST(co_await node_a.start());
  57. node_a.update_input_data(0, type_a::data_packet::new_instance());
  58. BOOST_TEST(node_a.cnt == 1);
  59. co_await coro_sleep(50ms);
  60. for (int i = 0; i < 5; ++i) {
  61. node_a.update_input_data(0, type_a::data_packet::new_instance());
  62. }
  63. BOOST_TEST(node_a.cnt == 2);
  64. co_await coro_sleep(50ms);
  65. BOOST_TEST(node_a.cnt == 3);
  66. co_await node_a.stop();
  67. node_a.cnt = 0;
  68. BOOST_TEST(node_a.set_trigger_mode(type_a::trigger_mode_type::PERIODIC));
  69. node_a.set_trigger_interval(50ms);
  70. co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable<void> {
  71. co_await coro_sleep(275ms);
  72. co_await node_a.stop();
  73. }, detached);
  74. co_await node_a.start();
  75. co_await coro_sleep(350ms);
  76. BOOST_TEST(node_a.cnt == 6);
  77. BOOST_TEST((node_a.get_state() == type_a::state_type::PENDING));
  78. }
  79. awaitable<void> test_datanode_base_2() {
  80. struct test_type : public datanode_base<high_freq_tag> {
  81. unsigned int cnt = 0;
  82. awaitable<bool> exec() {
  83. auto input_data = get_input_data(0);
  84. cnt = (*input_data)[0];
  85. auto output_data = data_packet::new_instance();
  86. output_data->copy_content(*input_data);
  87. (*output_data)[0] += 1.0;
  88. set_output_data(0, output_data);
  89. co_return true;
  90. }
  91. };
  92. test_type node_a, node_b;
  93. test_type::connect(node_a, 0, node_b, 0);
  94. co_await node_a.init();
  95. co_await node_b.init();
  96. node_a.set_trigger_mode(test_type::trigger_mode_type::INPUT);
  97. node_b.set_trigger_mode(test_type::trigger_mode_type::INPUT);
  98. node_a.set_trigger_input_mask(0b1);
  99. node_b.set_trigger_input_mask(0b1);
  100. co_await node_a.start();
  101. co_await node_b.start();
  102. auto packet = test_type::data_packet::new_instance();
  103. (*packet)[0] = 101;
  104. node_a.update_input_data(0, packet);
  105. BOOST_TEST(node_a.cnt == 101);
  106. BOOST_TEST(node_b.cnt == 102);
  107. }
  108. awaitable<void> test_datanode_base_speed(size_t length, size_t repeat) {
  109. struct test_type : public datanode_base<high_freq_tag> {
  110. uint64_t cnt = 0;
  111. awaitable<bool> exec() {
  112. auto input_data = get_input_data(0);
  113. auto output_data = data_packet::new_instance();
  114. output_data->copy_content(*input_data);
  115. for (size_t i = 0; i < test_type::data_packet::FLOAT_FILED_LENGTH; ++i) {
  116. (*output_data)[i] += i;
  117. }
  118. std::reverse(output_data->floats, output_data->floats + test_type::data_packet::FLOAT_FILED_LENGTH);
  119. output_data->timestamp = current_timestamp();
  120. cnt = (*output_data)[0];
  121. set_output_data(0, output_data);
  122. co_return true;
  123. }
  124. };
  125. auto pool = new test_type[length];
  126. for (size_t i = 0; i < length; ++i) {
  127. auto &node = pool[i];
  128. co_await node.init();
  129. node.set_trigger_mode(test_type::trigger_mode_type::INPUT);
  130. node.set_trigger_input_mask(0b1);
  131. if (i != 0) {
  132. test_type::connect(pool[i - 1], 0, node, 0);
  133. }
  134. co_await node.start();
  135. }
  136. auto test_data = test_type::data_packet::new_instance();
  137. auto start_ts = current_timestamp();
  138. for (int i = 0; i < repeat; ++i) {
  139. pool[0].update_input_data(0, test_data);
  140. // assert(pool[length - 1].cnt == length);
  141. }
  142. auto time_used = (current_timestamp() - start_ts) / 1000.0;
  143. auto time_left = 1000.0 / repeat - time_used / repeat;
  144. std::cout << fmt::format("Length = {}, Repeat = {}, "
  145. "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)",
  146. length, repeat, time_used,
  147. time_left, time_left / (10.0 / repeat))
  148. << std::endl;
  149. BOOST_TEST(true);
  150. }
  151. BOOST_AUTO_TEST_CASE(test_datanode_base) {
  152. // co_spawn(high_freq_context, test_datanode_base_1(), detached);
  153. // co_spawn(high_freq_context, test_datanode_base_2(), detached);
  154. // high_freq_context.run();
  155. for (auto length: {1, 4, 16, 64, 128})
  156. for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000})
  157. co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached);
  158. high_freq_context.run();
  159. }