| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- #define BOOST_TEST_DYN_LINK
- #include "core/sophiar_manager.h"
- #include "core/tristate_obj.h"
- #include "core/basic_obj_types.hpp"
- #include "core/external_controller.h"
- #include "utility/config_utility.hpp"
- #include "utility/coro_signal_group.hpp"
- #include "utility/coro_worker.hpp"
- #include "utility/variable_helper.hpp"
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <boost/asio/this_coro.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/test/unit_test.hpp>
- #include <spdlog/spdlog.h>
- #include <chrono>
- #include <fstream>
- #include <iostream>
- using boost::asio::awaitable;
- using boost::asio::co_spawn;
- using boost::asio::detached;
- using boost::asio::use_awaitable;
- using namespace sophiar;
- using namespace std::chrono_literals;
- struct source_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(source_node_type)
- variable_index_type output_var_index;
- coro_worker::pointer source_worker;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
- output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name");
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
- source_worker = make_interval_coro_worker(
- std::chrono::seconds(1), [
- output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index),
- start_value = LOAD_UINT_ITEM("start_value")]() mutable
- -> boost::asio::awaitable<bool> {
- output_var = start_value++;
- SPDLOG_WARN("New value from source {}", output_var->value);
- co_return true;
- });
- source_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() noexcept override {
- source_worker->cancel();
- co_await source_worker->coro_wait_stop();
- source_worker.reset();
- co_return;
- }
- };
- struct proxy_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(proxy_node_type);
- variable_index_type input_var_index;
- variable_index_type output_var_index;
- coro_worker::pointer proxy_worker;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
- input_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "input_var_name");
- output_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "output_var_name");
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
- proxy_worker = make_infinite_coro_worker(
- [
- input_var = VARIABLE_AUTO_DELEGATE(u64int_obj, input_var_index),
- output_var = VARIABLE_AUTO_DELEGATE(u64int_obj, output_var_index),
- start_value = LOAD_UINT_ITEM("start_value")]() mutable
- -> boost::asio::awaitable<bool> {
- co_await input_var.coro_wait_update();
- output_var = input_var->value + (start_value++);
- SPDLOG_WARN("New value from proxy {}", output_var->value);
- co_return true;
- });
- proxy_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() noexcept override {
- proxy_worker->cancel();
- co_await proxy_worker->coro_wait_stop();
- proxy_worker.reset();
- co_return;
- }
- };
- struct target_node_type : public tristate_obj {
- DEFAULT_NEW_INSTANCE(target_node_type);
- variable_index_type source_var_index;
- variable_index_type proxy_var_index;
- coro_worker::pointer target_worker;
- coro_signal_any_group::pointer watch_group;
- boost::asio::awaitable<bool> on_init(const nlohmann::json &config) noexcept override {
- source_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "source_var_name");
- proxy_var_index = LOAD_VARIABLE_INDEX(u64int_obj, "proxy_var_name");
- co_return true;
- }
- boost::asio::awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
- watch_group = std::make_unique<coro_signal_any_group>();
- watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(source_var_index));
- watch_group->add_watcher(REQUIRE_VARIABLE_WATCHER(proxy_var_index));
- watch_group->start();
- target_worker = make_infinite_coro_worker(
- [
- watcher = watch_group->new_watcher(),
- source_var = VARIABLE_AUTO_DELEGATE(u64int_obj, source_var_index),
- proxy_var = VARIABLE_AUTO_DELEGATE(u64int_obj, proxy_var_index)]() mutable
- -> boost::asio::awaitable<bool> {
- co_await watcher.coro_wait();
- SPDLOG_ERROR("source: {}, proxy: {}",
- source_var.empty() ? 0 : source_var->value,
- proxy_var.empty() ? 0 : proxy_var->value);
- co_return true;
- });
- target_worker->run();
- co_return true;
- }
- boost::asio::awaitable<void> on_stop() noexcept override {
- co_await watch_group->stop();
- target_worker->cancel();
- co_await target_worker->coro_wait_stop();
- target_worker.reset();
- co_return;
- }
- };
- BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
- spdlog::set_level(spdlog::level::trace);
- std::ifstream config_file("data/sophiar_manager_config.json");
- BOOST_TEST(config_file.is_open());
- auto config = nlohmann::json::parse(config_file);
- initialize({});
- REGISTER_TYPE(source_node_type);
- REGISTER_TYPE(proxy_node_type);
- REGISTER_TYPE(target_node_type);
- global_sophiar_pool->load_config(config);
- global_sophiar_manager->load_config_and_start(config);
- global_external_controller->load_config_and_start(config);
- global_context->run();
- }
|