datanode_base.cpp 8.7 KB


  1. #define BOOST_TEST_DYN_LINK
  2. #include "core/datanode_base.h"
  3. #include "utility/bit_operations.hpp"
  4. #include "utility/debug_utility.hpp"
  5. #include <boost/asio/co_spawn.hpp>
  6. #include <boost/asio/detached.hpp>
  7. #include <boost/asio/high_resolution_timer.hpp>
  8. #include <boost/asio/this_coro.hpp>
  9. #include <boost/asio/use_awaitable.hpp>
  10. #include <boost/test/unit_test.hpp>
  11. #include <fmt/format.h>
  12. #include <algorithm>
  13. #include <chrono>
  14. #include <fstream>
  15. #include <iostream>
  16. using boost::asio::awaitable;
  17. using boost::asio::co_spawn;
  18. using boost::asio::detached;
  19. using boost::asio::use_awaitable;
  20. using namespace sophiar;
  21. using namespace std::chrono_literals;
  22. struct cnt_node_type : public datanode_base {
  23. DEFAULT_NEW_INSTANCE(cnt_node_type);
  24. unsigned int cnt = 0;
  25. void load_construct_config(const nlohmann::json &config) override {
  26. auto input_index = register_input("input_1");
  27. BOOST_TEST(input_index == 0);
  28. datanode_base::load_construct_config(config);
  29. }
  30. awaitable<bool> exec() {
  31. ++cnt;
  32. co_return true;
  33. }
  34. };
  35. struct trigger_node_type : public sophiar_obj {
  36. DEFAULT_NEW_INSTANCE(trigger_node_type);
  37. tiny_signal<> trigger_signal;
  38. void load_construct_config(const nlohmann::json &config) override {
  39. get_manager().register_signal(this, "triggered", trigger_signal);
  40. }
  41. void trigger() {
  42. trigger_signal.emit();
  43. }
  44. };
  45. struct update_node_type : public sophiar_obj {
  46. DEFAULT_NEW_INSTANCE(update_node_type);
  47. tiny_signal<versatile_obj::pointer> trigger_signal;
  48. void load_construct_config(const nlohmann::json &config) override {
  49. get_manager().register_signal(this, "triggered", trigger_signal);
  50. }
  51. void trigger() {
  52. trigger_signal.emit(versatile_obj::new_instance());
  53. }
  54. };
  55. struct test_node_type : public datanode_base {
  56. DEFAULT_NEW_INSTANCE(test_node_type);
  57. unsigned int cnt = 0;
  58. void load_construct_config(const nlohmann::json &config) override {
  59. auto input_index = register_input("input_1");
  60. BOOST_TEST(input_index == 0);
  61. auto output_index = register_output("output_1");
  62. BOOST_TEST(output_index == 0);
  63. datanode_base::load_construct_config(config);
  64. }
  65. awaitable<bool> exec() {
  66. auto input_data = get_input_data(0);
  67. cnt = (*input_data)[0];
  68. auto output_data = versatile_obj::new_instance();
  69. output_data->copy_from(*input_data);
  70. (*output_data)[0] += 1.0;
  71. set_output_data(0, output_data);
  72. co_return true;
  73. }
  74. };
  75. struct speedtest_node_type : public datanode_base {
  76. uint64_t cnt = 0;
  77. void load_construct_config(const nlohmann::json &config) override {
  78. auto input_index = register_input("input_1");
  79. BOOST_TEST(input_index == 0);
  80. auto output_index = register_output("output_1");
  81. BOOST_TEST(output_index == 0);
  82. datanode_base::load_construct_config(config);
  83. }
  84. awaitable<bool> exec() {
  85. auto input_data = get_input_data(0);
  86. auto output_data = versatile_obj::new_instance();
  87. output_data->copy_from(*input_data);
  88. for (size_t i = 0; i < versatile_data::FLOAT_FILED_LENGTH; ++i) {
  89. (*output_data)[i] += i;
  90. }
  91. std::reverse(output_data->floats, output_data->floats + versatile_data::FLOAT_FILED_LENGTH);
  92. output_data->timestamp = current_timestamp();
  93. cnt = (*output_data)[0];
  94. set_output_data(0, output_data);
  95. co_return true;
  96. }
  97. };
  98. awaitable<void> test_datanode_base_1() {
  99. auto trigger_node = dynamic_cast<trigger_node_type *>(global_sophiar_manager.get_object("trigger_node"));
  100. auto cnt_node = dynamic_cast<cnt_node_type *>(global_sophiar_manager.get_object("cnt_node"));
  101. assert(trigger_node != nullptr);
  102. assert(cnt_node != nullptr);
  103. co_await global_sophiar_manager.switch_mode("mode_a");
  104. trigger_node->trigger();
  105. co_await coro_sleep(10ms);
  106. BOOST_TEST(cnt_node->cnt == 1);
  107. co_await coro_sleep(50ms);
  108. for (int i = 0; i < 5; ++i) {
  109. trigger_node->trigger();
  110. }
  111. BOOST_TEST(cnt_node->cnt == 2);
  112. co_await coro_sleep(50ms);
  113. BOOST_TEST(cnt_node->cnt == 3);
  114. co_await global_sophiar_manager.switch_mode("mode_b");
  115. auto update_node = dynamic_cast<update_node_type *>(global_sophiar_manager.get_object("update_node"));
  116. assert(update_node != nullptr);
  117. cnt_node->cnt = 0;
  118. update_node->trigger();
  119. co_await coro_sleep(10ms);
  120. BOOST_TEST(cnt_node->cnt == 1);
  121. co_await coro_sleep(50ms);
  122. for (int i = 0; i < 5; ++i) {
  123. update_node->trigger();
  124. }
  125. BOOST_TEST(cnt_node->cnt == 2);
  126. co_await coro_sleep(50ms);
  127. BOOST_TEST(cnt_node->cnt == 3);
  128. cnt_node->cnt = 0;
  129. co_spawn(co_await boost::asio::this_coro::executor, [&]() -> awaitable<void> {
  130. co_await coro_sleep(275ms);
  131. co_await cnt_node->stop();
  132. }, detached);
  133. co_await global_sophiar_manager.switch_mode("mode_c");
  134. co_await coro_sleep(350ms);
  135. BOOST_TEST(cnt_node->cnt == 6);
  136. BOOST_TEST((cnt_node->get_state() == datanode_base::state_type::PENDING));
  137. auto test_node_a = dynamic_cast<test_node_type *>(global_sophiar_manager.get_object("test_node_a"));
  138. auto test_node_b = dynamic_cast<test_node_type *>(global_sophiar_manager.get_object("test_node_b"));
  139. assert(test_node_a != nullptr);
  140. assert(test_node_b != nullptr);
  141. auto &test_slot = global_sophiar_manager.get_slot<versatile_obj::pointer>("test_node_a", "input_1");
  142. co_await global_sophiar_manager.switch_mode("mode_d");
  143. auto packet = versatile_obj::new_instance();
  144. (*packet)[0] = 100;
  145. test_slot.signal_received(std::move(packet));
  146. BOOST_TEST(test_node_a->cnt == 100);
  147. BOOST_TEST(test_node_b->cnt == 101);
  148. co_return;
  149. }
  150. awaitable<void> test_datanode_base_speed(size_t length, size_t repeat) {
  151. std::destroy_at(&global_sophiar_manager);
  152. std::construct_at(&global_sophiar_manager);
  153. nlohmann::json config_json;
  154. config_json["mode_list"].push_back({{"name", "test_mode"}});
  155. nlohmann::json object_config;
  156. object_config["type"] = "speedtest_node_type";
  157. object_config["enabled_modes"].push_back("test_mode");
  158. object_config["construct_config"] = nlohmann::json({});
  159. object_config["init_configs"] = nlohmann::json::array();
  160. nlohmann::json start_config;
  161. start_config["modes"].push_back("test_mode");
  162. start_config["config"]["trigger_mode"] = "input";
  163. start_config["config"]["input_mask"] = 1;
  164. object_config["start_configs"].push_back(start_config);
  165. for (auto i = 0; i < length; ++i) {
  166. object_config["name"] = fmt::format("speedtest_node_{}", i);
  167. config_json["object_list"].push_back(object_config);
  168. }
  169. std::cout << config_json.dump() << std::endl;
  170. // TODO
  171. //
  172. // auto pool = new test_type[length];
  173. // for (size_t i = 0; i < length; ++i) {
  174. // auto &node = pool[i];
  175. // co_await node.init();
  176. // node.set_trigger_mode(test_type::trigger_mode_type::INPUT);
  177. // node.set_trigger_input_mask(0b1);
  178. // if (i != 0) {
  179. // test_type::connect(pool[i - 1], 0, node, 0);
  180. // }
  181. // co_await node.start();
  182. // }
  183. //
  184. // auto test_data = test_type::data_packet::new_instance();
  185. // auto start_ts = current_timestamp();
  186. // for (int i = 0; i < repeat; ++i) {
  187. // pool[0].update_input_data(0, test_data);
  188. //// assert(pool[length - 1].cnt == length);
  189. // }
  190. //
  191. // auto time_used = (current_timestamp() - start_ts) / 1000.0;
  192. // auto time_left = 1000.0 / repeat - time_used / repeat;
  193. // std::cout << fmt::format("Length = {}, Repeat = {}, "
  194. // "Time used = {:.3f}ms ({:.3f}ms, {:.2f}% left)",
  195. // length, repeat, time_used,
  196. // time_left, time_left / (10.0 / repeat))
  197. // << std::endl;
  198. //
  199. // BOOST_TEST(true);
  200. co_return;
  201. }
  202. BOOST_AUTO_TEST_CASE(test_datanode_base) {
  203. spdlog::set_level(spdlog::level::trace);
  204. REGISTER_TYPE(cnt_node_type);
  205. REGISTER_TYPE(trigger_node_type);
  206. REGISTER_TYPE(update_node_type);
  207. REGISTER_TYPE(test_node_type);
  208. std::ifstream config_file("data/datanode_base_config.json");
  209. BOOST_TEST(config_file.is_open());
  210. global_sophiar_manager.build_from_config(nlohmann::json::parse(config_file));
  211. co_spawn(global_context, test_datanode_base_1(), detached);
  212. // co_spawn(high_freq_context, test_datanode_base_2(), detached);
  213. // for (auto length: {1, 4, 16, 64, 128})
  214. // for (auto repeat: {1, 50, 125, 500, 1000, 2000, 5000, 10000})
  215. // co_spawn(high_freq_context, test_datanode_base_speed(length, repeat), detached);
  216. global_context.run();
  217. }
  218. BOOST_AUTO_TEST_CASE(test_datanode_speed) {
  219. co_spawn(global_context, test_datanode_base_speed(3, 1), detached);
  220. global_context.run();
  221. }