| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #define BOOST_TEST_DYN_LINK
- #include "core/sophiar_manager.h"
- #include "core/tristate_obj.h"
- #include "core/basic_obj_types.hpp"
- #include "utility/coro_signal_group.hpp"
- #include "utility/coro_worker.hpp"
- #include "utility/debug_utility.hpp"
- #include "utility/global_obj_helper.hpp"
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <boost/asio/this_coro.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/test/unit_test.hpp>
- #include <chrono>
- #include <fstream>
- #include <iostream>
- using boost::asio::awaitable;
- using boost::asio::co_spawn;
- using boost::asio::detached;
- using boost::asio::use_awaitable;
- using namespace sophiar;
- using namespace std::chrono_literals;
- struct source_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(source_node_type)
- global_obj_index_type output_obj_index;
- coro_worker::pointer source_worker;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
- output_obj_index = get_manager().
- register_global_obj<u64int_obj>(
- config["output_obj_name"].get<std::string>());
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
- source_worker = make_interval_coro_worker(
- get_context(), std::chrono::seconds(1), [
- output_obj = global_obj_auto_sync_delegate<u64int_obj>(
- get_manager(), output_obj_index),
- start_value = config["start_value"].get<std::uint64_t>()]() mutable
- -> boost::asio::awaitable<bool> {
- auto new_out = u64int_obj::new_instance(start_value);
- output_obj.set_value(new_out);
- SPDLOG_WARN("New value from source {}", new_out->value);
- ++start_value;
- co_return true;
- });
- source_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() override {
- source_worker->cancel();
- co_await source_worker->coro_wait_stop();
- source_worker.reset();
- co_return;
- }
- };
- struct proxy_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(proxy_node_type);
- global_obj_index_type input_obj_index;
- global_obj_index_type output_obj_index;
- coro_worker::pointer proxy_worker;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
- input_obj_index = get_manager().
- register_global_obj<u64int_obj>(
- config["input_obj_name"].get<std::string>());
- output_obj_index = get_manager().
- register_global_obj<u64int_obj>(
- config["output_obj_name"].get<std::string>());
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
- proxy_worker = make_infinite_coro_worker(
- get_context(), [
- input_obj = global_obj_auto_sync_delegate<u64int_obj>(
- get_manager(), input_obj_index),
- output_obj = global_obj_auto_sync_delegate<u64int_obj>(
- get_manager(), output_obj_index),
- start_value = config["start_value"].get<std::uint64_t>()]() mutable
- -> boost::asio::awaitable<bool> {
- co_await input_obj.coro_wait_update();
- auto new_out = u64int_obj::new_instance(input_obj->value + start_value);
- output_obj.set_value(new_out);
- SPDLOG_WARN("New value from proxy {}", new_out->value);
- ++start_value;
- co_return true;
- });
- proxy_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() override {
- proxy_worker->cancel();
- co_await proxy_worker->coro_wait_stop();
- proxy_worker.reset();
- co_return;
- }
- };
- struct target_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(target_node_type);
- global_obj_index_type source_obj_index;
- global_obj_index_type proxy_obj_index;
- coro_worker::pointer target_worker;
- coro_signal_any_group::pointer watch_group;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
- source_obj_index = get_manager().
- register_global_obj<u64int_obj>(
- config["source_obj_name"].get<std::string>());
- proxy_obj_index = get_manager().
- register_global_obj<u64int_obj>(
- config["proxy_obj_name"].get<std::string>());
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
- watch_group = std::make_unique<coro_signal_any_group>(get_manager());
- watch_group->add_watcher(get_manager().request_global_obj_update_watcher(source_obj_index));
- watch_group->add_watcher(get_manager().request_global_obj_update_watcher(proxy_obj_index));
- watch_group->start(get_context());
- target_worker = make_infinite_coro_worker(
- get_context(), [
- watcher = watch_group->new_watcher(get_context()),
- source_obj = global_obj_auto_sync_delegate<u64int_obj>(
- get_manager(), source_obj_index),
- proxy_obj = global_obj_auto_sync_delegate<u64int_obj>(
- get_manager(), proxy_obj_index)]() mutable
- -> boost::asio::awaitable<bool> {
- co_await watcher.coro_wait();
- SPDLOG_ERROR("source: {}, proxy: {}", source_obj->value, proxy_obj->value);
- co_return true;
- });
- target_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() override {
- co_await watch_group->stop();
- target_worker->cancel();
- co_await target_worker->coro_wait_stop();
- target_worker.reset();
- co_return;
- }
- };
- BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
- spdlog::set_level(spdlog::level::trace);
- REGISTER_TYPE(source_node_type);
- REGISTER_TYPE(proxy_node_type);
- REGISTER_TYPE(target_node_type);
- std::ifstream config_file("data/sophiar_manager_config.json");
- BOOST_TEST(config_file.is_open());
- global_sophiar_manager.load_config_and_start(nlohmann::json::parse(config_file));
- global_context.run();
- }
|