#define BOOST_TEST_DYN_LINK #include "core/sophiar_manager.h" #include "core/tristate_obj.h" #include "core/basic_obj_types.hpp" #include "utility/coro_signal_group.hpp" #include "utility/coro_worker.hpp" #include "utility/global_obj_helper.hpp" #include #include #include #include #include #include #include #include #include #include using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using namespace sophiar; using namespace std::chrono_literals; struct source_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(source_node_type) global_obj_index_type output_obj_index; coro_worker::pointer source_worker; boost::asio::awaitable on_init(const nlohmann::json &config) override { output_obj_index = get_manager(). register_global_obj( config["output_obj_name"].get()); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) override { source_worker = make_interval_coro_worker( get_context(), std::chrono::seconds(1), [ output_obj = global_obj_auto_sync_delegate( get_manager(), output_obj_index), start_value = config["start_value"].get()]() mutable -> boost::asio::awaitable { auto new_out = u64int_obj::new_instance(start_value); output_obj.set_value(new_out); SPDLOG_WARN("New value from source {}", new_out->value); ++start_value; co_return true; }); source_worker->run(); co_return true; } boost::asio::awaitable on_stop() override { source_worker->cancel(); co_await source_worker->coro_wait_stop(); source_worker.reset(); co_return; } }; struct proxy_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(proxy_node_type); global_obj_index_type input_obj_index; global_obj_index_type output_obj_index; coro_worker::pointer proxy_worker; boost::asio::awaitable on_init(const nlohmann::json &config) override { input_obj_index = get_manager(). register_global_obj( config["input_obj_name"].get()); output_obj_index = get_manager(). register_global_obj( config["output_obj_name"].get()); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) override { proxy_worker = make_infinite_coro_worker( get_context(), [ input_obj = global_obj_auto_sync_delegate( get_manager(), input_obj_index), output_obj = global_obj_auto_sync_delegate( get_manager(), output_obj_index), start_value = config["start_value"].get()]() mutable -> boost::asio::awaitable { co_await input_obj.coro_wait_update(); auto new_out = u64int_obj::new_instance(input_obj->value + start_value); output_obj.set_value(new_out); SPDLOG_WARN("New value from proxy {}", new_out->value); ++start_value; co_return true; }); proxy_worker->run(); co_return true; } boost::asio::awaitable on_stop() override { proxy_worker->cancel(); co_await proxy_worker->coro_wait_stop(); proxy_worker.reset(); co_return; } }; struct target_node_type : public tristate_obj { DEFAULT_NEW_INSTANCE(target_node_type); global_obj_index_type source_obj_index; global_obj_index_type proxy_obj_index; coro_worker::pointer target_worker; coro_signal_any_group::pointer watch_group; boost::asio::awaitable on_init(const nlohmann::json &config) override { source_obj_index = get_manager(). register_global_obj( config["source_obj_name"].get()); proxy_obj_index = get_manager(). register_global_obj( config["proxy_obj_name"].get()); co_return true; } boost::asio::awaitable on_start(const nlohmann::json &config) override { watch_group = std::make_unique(get_manager()); watch_group->add_watcher(get_manager().request_global_obj_update_watcher(source_obj_index)); watch_group->add_watcher(get_manager().request_global_obj_update_watcher(proxy_obj_index)); watch_group->start(get_context()); target_worker = make_infinite_coro_worker( get_context(), [ watcher = watch_group->new_watcher(get_context()), source_obj = global_obj_auto_sync_delegate( get_manager(), source_obj_index), proxy_obj = global_obj_auto_sync_delegate( get_manager(), proxy_obj_index)]() mutable -> boost::asio::awaitable { co_await watcher.coro_wait(); SPDLOG_ERROR("source: {}, proxy: {}", source_obj->value, proxy_obj->value); co_return true; }); target_worker->run(); co_return true; } boost::asio::awaitable on_stop() override { co_await watch_group->stop(); target_worker->cancel(); co_await target_worker->coro_wait_stop(); target_worker.reset(); co_return; } }; BOOST_AUTO_TEST_CASE(test_sophiar_manager) { spdlog::set_level(spdlog::level::trace); REGISTER_TYPE(source_node_type); REGISTER_TYPE(proxy_node_type); REGISTER_TYPE(target_node_type); std::ifstream config_file("data/sophiar_manager_config.json"); BOOST_TEST(config_file.is_open()); global_sophiar_manager.load_config_and_start(nlohmann::json::parse(config_file)); global_context.run(); }