#define BOOST_TEST_DYN_LINK #include "core/sophiar_manager.h" #include "core/tristate_obj.h" #include "core/basic_obj_types.hpp" #include "core/external_controller.h" #include "utility/config_utility.hpp" #include "utility/coro_signal_group.hpp" #include "utility/coro_worker.hpp" #include "utility/variable_helper.hpp" #include #include #include #include #include #include #include #include #include #include using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using namespace sophiar; using namespace std::chrono_literals; struct source_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(source_node_type) variable_index_type output_var_index; coro_worker::pointer source_worker; boost::asio::awaitable on_init(const nlohmann::json &config) noexcept override { output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name"); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) noexcept override { source_worker = make_interval_coro_worker( std::chrono::seconds(1), [ output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index), start_value = LOAD_UINT_ITEM("start_value")]() mutable -> boost::asio::awaitable { output_var = start_value++; SPDLOG_WARN("New value from source {}", output_var->value); co_return true; }); source_worker->run(); co_return true; } boost::asio::awaitable on_stop() noexcept override { source_worker->cancel(); co_await source_worker->coro_wait_stop(); source_worker.reset(); co_return; } }; struct proxy_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(proxy_node_type); variable_index_type input_var_index; variable_index_type output_var_index; coro_worker::pointer proxy_worker; boost::asio::awaitable on_init(const nlohmann::json &config) noexcept override { input_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "input_var_name"); output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name"); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) noexcept override { proxy_worker = make_infinite_coro_worker( [ input_var = VARIABLE_AUTO_DELEGATE(u64int_obj, input_var_index), output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index), start_value = LOAD_UINT_ITEM("start_value")]() mutable -> boost::asio::awaitable { co_await input_var.coro_wait_update(); output_var = input_var->value + (start_value++); SPDLOG_WARN("New value from proxy {}", output_var->value); co_return true; }); proxy_worker->run(); co_return true; } boost::asio::awaitable on_stop() noexcept override { proxy_worker->cancel(); co_await proxy_worker->coro_wait_stop(); proxy_worker.reset(); co_return; } }; struct target_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(target_node_type); variable_index_type source_var_index; variable_index_type proxy_var_index; coro_worker::pointer target_worker; coro_signal_any_group::pointer watch_group; boost::asio::awaitable on_init(const nlohmann::json &config) noexcept override { source_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "source_var_name"); proxy_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "proxy_var_name"); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) noexcept override { watch_group = std::make_unique(); watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(source_var_index)); watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(proxy_var_index)); watch_group->start(); target_worker = make_infinite_coro_worker( [ watcher = watch_group->new_watcher(), source_var = VARIABLE_AUTO_DELEGATE(u64int_obj, source_var_index), proxy_var = VARIABLE_AUTO_DELEGATE(u64int_obj, proxy_var_index)]() mutable -> boost::asio::awaitable { co_await watcher.coro_wait(); SPDLOG_ERROR("source: {}, proxy: {}", source_var.empty() ? 0 : source_var->value, proxy_var.empty() ? 0 : proxy_var->value); co_return true; }); target_worker->run(); co_return true; } boost::asio::awaitable on_stop() noexcept override { co_await watch_group->stop(); target_worker->cancel(); co_await target_worker->coro_wait_stop(); target_worker.reset(); co_return; } }; BOOST_AUTO_TEST_CASE(test_sophiar_manager) { spdlog::set_level(spdlog::level::trace); std::ifstream config_file("data/sophiar_manager_config.json"); BOOST_TEST(config_file.is_open()); auto config = nlohmann::json::parse(config_file); initialize({}); REGISTER_TYPE(source_node_type); REGISTER_TYPE(proxy_node_type); REGISTER_TYPE(target_node_type); global_sophiar_pool->load_config(config); global_sophiar_manager->load_config_and_start(config); global_external_controller->load_config_and_start(config); global_context->run(); }