sophiar_manager.cpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #define BOOST_TEST_DYN_LINK
  2. #include "core/sophiar_manager.h"
  3. #include "core/tristate_obj.h"
  4. #include "core/basic_obj_types.hpp"
  5. #include "core/external_controller.h"
  6. #include "utility/config_utility.hpp"
  7. #include "utility/coro_signal_group.hpp"
  8. #include "utility/coro_worker.hpp"
  9. #include "utility/variable_helper.hpp"
  10. #include <boost/asio/co_spawn.hpp>
  11. #include <boost/asio/detached.hpp>
  12. #include <boost/asio/high_resolution_timer.hpp>
  13. #include <boost/asio/this_coro.hpp>
  14. #include <boost/asio/use_awaitable.hpp>
  15. #include <boost/test/unit_test.hpp>
  16. #include <spdlog/spdlog.h>
  17. #include <chrono>
  18. #include <fstream>
  19. #include <iostream>
  20. using boost::asio::awaitable;
  21. using boost::asio::co_spawn;
  22. using boost::asio::detached;
  23. using boost::asio::use_awaitable;
  24. using namespace sophiar;
  25. using namespace std::chrono_literals;
  26. struct source_node_type : public tristate_obj {
  27. DEFAULT_NEW_INSTANCE(source_node_type)
  28. variable_index_type output_var_index;
  29. coro_worker::pointer source_worker;
  30. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
  31. output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name");
  32. co_return true;
  33. }
  34. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
  35. source_worker = make_interval_coro_worker(
  36. std::chrono::seconds(1), [
  37. output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index),
  38. start_value = LOAD_UINT_ITEM("start_value")]() mutable
  39. -> boost::asio::awaitable<bool> {
  40. output_var = start_value++;
  41. SPDLOG_WARN("New value from source {}", output_var->value);
  42. co_return true;
  43. });
  44. source_worker->run();
  45. co_return true;
  46. }
  47. boost::asio::awaitable<void> on_stop() noexcept override {
  48. source_worker->cancel();
  49. co_await source_worker->coro_wait_stop();
  50. source_worker.reset();
  51. co_return;
  52. }
  53. };
  54. struct proxy_node_type : public tristate_obj {
  55. DEFAULT_NEW_INSTANCE(proxy_node_type);
  56. variable_index_type input_var_index;
  57. variable_index_type output_var_index;
  58. coro_worker::pointer proxy_worker;
  59. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
  60. input_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "input_var_name");
  61. output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name");
  62. co_return true;
  63. }
  64. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
  65. proxy_worker = make_infinite_coro_worker(
  66. [
  67. input_var = VARIABLE_AUTO_DELEGATE(u64int_obj, input_var_index),
  68. output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index),
  69. start_value = LOAD_UINT_ITEM("start_value")]() mutable
  70. -> boost::asio::awaitable<bool> {
  71. co_await input_var.coro_wait_update();
  72. output_var = input_var->value + (start_value++);
  73. SPDLOG_WARN("New value from proxy {}", output_var->value);
  74. co_return true;
  75. });
  76. proxy_worker->run();
  77. co_return true;
  78. }
  79. boost::asio::awaitable<void> on_stop() noexcept override {
  80. proxy_worker->cancel();
  81. co_await proxy_worker->coro_wait_stop();
  82. proxy_worker.reset();
  83. co_return;
  84. }
  85. };
  86. struct target_node_type : public tristate_obj {
  87. DEFAULT_NEW_INSTANCE(target_node_type);
  88. variable_index_type source_var_index;
  89. variable_index_type proxy_var_index;
  90. coro_worker::pointer target_worker;
  91. coro_signal_any_group::pointer watch_group;
  92. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
  93. source_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "source_var_name");
  94. proxy_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "proxy_var_name");
  95. co_return true;
  96. }
  97. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
  98. watch_group = std::make_unique<coro_signal_any_group>();
  99. watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(source_var_index));
  100. watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(proxy_var_index));
  101. watch_group->start();
  102. target_worker = make_infinite_coro_worker(
  103. [
  104. watcher = watch_group->new_watcher(),
  105. source_var = VARIABLE_AUTO_DELEGATE(u64int_obj, source_var_index),
  106. proxy_var = VARIABLE_AUTO_DELEGATE(u64int_obj, proxy_var_index)]() mutable
  107. -> boost::asio::awaitable<bool> {
  108. co_await watcher.coro_wait();
  109. SPDLOG_ERROR("source: {}, proxy: {}",
  110. source_var.empty() ? 0 : source_var->value,
  111. proxy_var.empty() ? 0 : proxy_var->value);
  112. co_return true;
  113. });
  114. target_worker->run();
  115. co_return true;
  116. }
  117. boost::asio::awaitable<void> on_stop() noexcept override {
  118. co_await watch_group->stop();
  119. target_worker->cancel();
  120. co_await target_worker->coro_wait_stop();
  121. target_worker.reset();
  122. co_return;
  123. }
  124. };
  125. BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
  126. spdlog::set_level(spdlog::level::trace);
  127. std::ifstream config_file("data/sophiar_manager_config.json");
  128. BOOST_TEST(config_file.is_open());
  129. auto config = nlohmann::json::parse(config_file);
  130. initialize({});
  131. REGISTER_TYPE(source_node_type);
  132. REGISTER_TYPE(proxy_node_type);
  133. REGISTER_TYPE(target_node_type);
  134. global_sophiar_pool->load_config(config);
  135. global_sophiar_manager->load_config_and_start(config);
  136. global_external_controller->load_config_and_start(config);
  137. global_context->run();
  138. }