sophiar_manager.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #define BOOST_TEST_DYN_LINK
  2. #include "core/sophiar_manager.h"
  3. #include "core/tristate_obj.h"
  4. #include "core/basic_obj_types.hpp"
  5. #include "utility/coro_signal_group.hpp"
  6. #include "utility/coro_worker.hpp"
  7. #include "utility/global_obj_helper.hpp"
  8. #include <boost/asio/co_spawn.hpp>
  9. #include <boost/asio/detached.hpp>
  10. #include <boost/asio/high_resolution_timer.hpp>
  11. #include <boost/asio/this_coro.hpp>
  12. #include <boost/asio/use_awaitable.hpp>
  13. #include <boost/test/unit_test.hpp>
  14. #include <spdlog/spdlog.h>
  15. #include <chrono>
  16. #include <fstream>
  17. #include <iostream>
  18. using boost::asio::awaitable;
  19. using boost::asio::co_spawn;
  20. using boost::asio::detached;
  21. using boost::asio::use_awaitable;
  22. using namespace sophiar;
  23. using namespace std::chrono_literals;
  24. struct source_node_type : public tristate_obj {
  25. DEFAULT_NEW_INSTANCE(source_node_type)
  26. global_obj_index_type output_obj_index;
  27. coro_worker::pointer source_worker;
  28. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
  29. output_obj_index = get_manager().
  30. register_global_obj<u64int_obj>(
  31. config["output_obj_name"].get<std::string>());
  32. co_return true;
  33. }
  34. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
  35. source_worker = make_interval_coro_worker(
  36. get_context(), std::chrono::seconds(1), [
  37. output_obj = global_obj_auto_sync_delegate<u64int_obj>(
  38. get_manager(), output_obj_index),
  39. start_value = config["start_value"].get<std::uint64_t>()]() mutable
  40. -> boost::asio::awaitable<bool> {
  41. auto new_out = u64int_obj::new_instance(start_value);
  42. output_obj.set_value(new_out);
  43. SPDLOG_WARN("New value from source {}", new_out->value);
  44. ++start_value;
  45. co_return true;
  46. });
  47. source_worker->run();
  48. co_return true;
  49. }
  50. boost::asio::awaitable<void> on_stop() override {
  51. source_worker->cancel();
  52. co_await source_worker->coro_wait_stop();
  53. source_worker.reset();
  54. co_return;
  55. }
  56. };
  57. struct proxy_node_type : public tristate_obj {
  58. DEFAULT_NEW_INSTANCE(proxy_node_type);
  59. global_obj_index_type input_obj_index;
  60. global_obj_index_type output_obj_index;
  61. coro_worker::pointer proxy_worker;
  62. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
  63. input_obj_index = get_manager().
  64. register_global_obj<u64int_obj>(
  65. config["input_obj_name"].get<std::string>());
  66. output_obj_index = get_manager().
  67. register_global_obj<u64int_obj>(
  68. config["output_obj_name"].get<std::string>());
  69. co_return true;
  70. }
  71. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
  72. proxy_worker = make_infinite_coro_worker(
  73. get_context(), [
  74. input_obj = global_obj_auto_sync_delegate<u64int_obj>(
  75. get_manager(), input_obj_index),
  76. output_obj = global_obj_auto_sync_delegate<u64int_obj>(
  77. get_manager(), output_obj_index),
  78. start_value = config["start_value"].get<std::uint64_t>()]() mutable
  79. -> boost::asio::awaitable<bool> {
  80. co_await input_obj.coro_wait_update();
  81. auto new_out = u64int_obj::new_instance(input_obj->value + start_value);
  82. output_obj.set_value(new_out);
  83. SPDLOG_WARN("New value from proxy {}", new_out->value);
  84. ++start_value;
  85. co_return true;
  86. });
  87. proxy_worker->run();
  88. co_return true;
  89. }
  90. boost::asio::awaitable<void> on_stop() override {
  91. proxy_worker->cancel();
  92. co_await proxy_worker->coro_wait_stop();
  93. proxy_worker.reset();
  94. co_return;
  95. }
  96. };
  97. struct target_node_type : public tristate_obj {
  98. DEFAULT_NEW_INSTANCE(target_node_type);
  99. global_obj_index_type source_obj_index;
  100. global_obj_index_type proxy_obj_index;
  101. coro_worker::pointer target_worker;
  102. coro_signal_any_group::pointer watch_group;
  103. boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
  104. source_obj_index = get_manager().
  105. register_global_obj<u64int_obj>(
  106. config["source_obj_name"].get<std::string>());
  107. proxy_obj_index = get_manager().
  108. register_global_obj<u64int_obj>(
  109. config["proxy_obj_name"].get<std::string>());
  110. co_return true;
  111. }
  112. boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
  113. watch_group = std::make_unique<coro_signal_any_group>(get_manager());
  114. watch_group->add_watcher(get_manager().request_global_obj_update_watcher(source_obj_index));
  115. watch_group->add_watcher(get_manager().request_global_obj_update_watcher(proxy_obj_index));
  116. watch_group->start(get_context());
  117. target_worker = make_infinite_coro_worker(
  118. get_context(), [
  119. watcher = watch_group->new_watcher(get_context()),
  120. source_obj = global_obj_auto_sync_delegate<u64int_obj>(
  121. get_manager(), source_obj_index),
  122. proxy_obj = global_obj_auto_sync_delegate<u64int_obj>(
  123. get_manager(), proxy_obj_index)]() mutable
  124. -> boost::asio::awaitable<bool> {
  125. co_await watcher.coro_wait();
  126. SPDLOG_ERROR("source: {}, proxy: {}", source_obj->value, proxy_obj->value);
  127. co_return true;
  128. });
  129. target_worker->run();
  130. co_return true;
  131. }
  132. boost::asio::awaitable<void> on_stop() override {
  133. co_await watch_group->stop();
  134. target_worker->cancel();
  135. co_await target_worker->coro_wait_stop();
  136. target_worker.reset();
  137. co_return;
  138. }
  139. };
  140. BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
  141. spdlog::set_level(spdlog::level::trace);
  142. REGISTER_TYPE(source_node_type);
  143. REGISTER_TYPE(proxy_node_type);
  144. REGISTER_TYPE(target_node_type);
  145. std::ifstream config_file("data/sophiar_manager_config.json");
  146. BOOST_TEST(config_file.is_open());
  147. global_sophiar_manager.load_config_and_start(nlohmann::json::parse(config_file));
  148. global_context.run();
  149. }