|
|
@@ -0,0 +1,290 @@
|
|
|
+#include "local_connection.h"
|
|
|
+#include "core/basic_obj_types.hpp"
|
|
|
+#include "core/global_defs.h"
|
|
|
+#include "core/sophiar_manager.h"
|
|
|
+#include "core/sophiar_pool.h"
|
|
|
+
|
|
|
+#include <boost/asio/awaitable.hpp>
|
|
|
+#include <boost/asio/co_spawn.hpp>
|
|
|
+#include <boost/asio/detached.hpp>
|
|
|
+#include <boost/asio/post.hpp>
|
|
|
+
|
|
|
+#include <fmt/ostream.h>
|
|
|
+#include <spdlog/spdlog.h>
|
|
|
+
|
|
|
+#include <atomic>
|
|
|
+#include <condition_variable>
|
|
|
+#include <coroutine>
|
|
|
+#include <map>
|
|
|
+#include <thread>
|
|
|
+
|
|
|
+namespace sophiar {
|
|
|
+
|
|
|
+ namespace local_connection_impl {
|
|
|
+
|
|
|
+ enum command_type {
|
|
|
+ INIT, START, STOP, RESET, QUERY, INIT_CONF, START_CONF,
|
|
|
+ QUERY_VARIABLE, REGISTER_CALLBACK, ATTACH_CALLBACK
|
|
|
+ };
|
|
|
+
|
|
|
+ // https://en.cppreference.com/w/cpp/atomic/atomic_flag
|
|
|
+ struct spin_lock {
|
|
|
+
|
|
|
+ void lock() {
|
|
|
+ while (flag.test_and_set(std::memory_order_acquire)) // acquire lock
|
|
|
+ {
|
|
|
+#if defined(__cpp_lib_atomic_flag_test)
|
|
|
+ while (flag.test(std::memory_order_relaxed)) // test lock
|
|
|
+#endif
|
|
|
+ ; // spin
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void unlock() {
|
|
|
+ flag.clear(std::memory_order_release); // release lock
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+ std::atomic_flag flag = ATOMIC_FLAG_INIT;
|
|
|
+ };
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ using namespace local_connection_impl;
|
|
|
+
|
|
|
+ using boost::asio::awaitable;
|
|
|
+ using boost::asio::co_spawn;
|
|
|
+ using boost::asio::detached;
|
|
|
+ using boost::asio::post;
|
|
|
+
|
|
|
+ using callback_token_type = sophiar_pool::callback_token_type;
|
|
|
+ using attach_token_type = sophiar_pool::attach_token_type;
|
|
|
+
|
|
|
+ struct local_connection::impl {
|
|
|
+
|
|
|
+ // for command
|
|
|
+
|
|
|
+ struct attach_info {
|
|
|
+ variable_index_type index;
|
|
|
+ callback_token_type cb_token;
|
|
|
+ };
|
|
|
+
|
|
|
+ std::mutex mu;
|
|
|
+ std::condition_variable cv;
|
|
|
+ command_type cmd;
|
|
|
+ std::any cmd_param, cmd_result;
|
|
|
+ bool cmd_finish = true;
|
|
|
+
|
|
|
+ // for variable
|
|
|
+
|
|
|
+ struct variable_store_info {
|
|
|
+ using variable_info = sophiar_pool::variable_info;
|
|
|
+
|
|
|
+ std::any value;
|
|
|
+ variable_info var_info;
|
|
|
+ spin_lock mu;
|
|
|
+ callback_token_type cb_token;
|
|
|
+ attach_token_type bind_token;
|
|
|
+ };
|
|
|
+
|
|
|
+ std::map<std::string, variable_store_info> variable_pool;
|
|
|
+
|
|
|
+ awaitable<void> process_command() {
|
|
|
+// SPDLOG_INFO("Process in thread: {}.", std::this_thread::get_id());
|
|
|
+ {
|
|
|
+ auto lock = std::lock_guard{mu};
|
|
|
+ assert(!cmd_finish);
|
|
|
+ if (cmd < INIT_CONF) {
|
|
|
+ auto obj_name = std::any_cast<std::string>(cmd_param);
|
|
|
+ switch (cmd) {
|
|
|
+ case INIT: {
|
|
|
+ cmd_result = co_await global_sophiar_manager->init_object(
|
|
|
+ std::any_cast<std::string>(cmd_param));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case START: {
|
|
|
+ cmd_result = co_await global_sophiar_manager->start_object(
|
|
|
+ std::any_cast<std::string>(cmd_param));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case STOP: {
|
|
|
+ cmd_result = co_await global_sophiar_manager->stop_object(
|
|
|
+ std::any_cast<std::string>(cmd_param));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case RESET: {
|
|
|
+ cmd_result = co_await global_sophiar_manager->reset_object(
|
|
|
+ std::any_cast<std::string>(cmd_param));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case QUERY: {
|
|
|
+ cmd_result = global_sophiar_manager->query_object_state(
|
|
|
+ std::any_cast<std::string>(cmd_param));
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ default: {
|
|
|
+ assert(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (cmd < QUERY_VARIABLE) {
|
|
|
+ auto patch = std::any_cast<config_patch_type>(cmd_param);
|
|
|
+ switch (cmd) {
|
|
|
+ case INIT_CONF: {
|
|
|
+ cmd_result = global_sophiar_manager->patch_init_config(
|
|
|
+ patch.name, patch.config);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case START_CONF: {
|
|
|
+ cmd_result = global_sophiar_manager->patch_start_config(
|
|
|
+ patch.name, patch.config);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ default: {
|
|
|
+ assert(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (cmd == QUERY_VARIABLE) {
|
|
|
+ auto var_name = std::any_cast<std::string>(cmd_param);
|
|
|
+ cmd_result = global_sophiar_pool->query_variable_information(var_name);
|
|
|
+ } else if (cmd == REGISTER_CALLBACK) {
|
|
|
+ auto func = std::any_cast<std::function<void()>>(cmd_param);
|
|
|
+ cmd_result = REGISTER_CALLBACK(std::move(func));
|
|
|
+ } else if (cmd == ATTACH_CALLBACK) {
|
|
|
+ auto info = std::any_cast<attach_info>(cmd_param);
|
|
|
+ cmd_result = ATTACH_CALLBACK(info.index, info.cb_token);
|
|
|
+ }
|
|
|
+ cmd_finish = true;
|
|
|
+ }
|
|
|
+ cv.notify_one();
|
|
|
+ co_return;
|
|
|
+ }
|
|
|
+
|
|
|
+ void send_and_wait_command(command_type _cmd, const std::any &_param) {
|
|
|
+ // copy command information
|
|
|
+ {
|
|
|
+ auto lock = std::lock_guard{mu};
|
|
|
+ assert(cmd_finish);
|
|
|
+ cmd = _cmd;
|
|
|
+ cmd_param = _param;
|
|
|
+ cmd_finish = false;
|
|
|
+ }
|
|
|
+ co_spawn(*global_context, process_command(), detached);
|
|
|
+ auto lock = std::unique_lock{mu};
|
|
|
+ cv.wait(lock, [this] { return cmd_finish; });
|
|
|
+ }
|
|
|
+
|
|
|
+ std::any command_helper(command_type _cmd, const std::any &_param) {
|
|
|
+ send_and_wait_command(_cmd, _param);
|
|
|
+ return cmd_result;
|
|
|
+ }
|
|
|
+
|
|
|
+ void register_variable(const std::string &name) {
|
|
|
+ auto var_info = std::any_cast<sophiar_pool::variable_info>(
|
|
|
+ command_helper(QUERY_VARIABLE, name));
|
|
|
+ auto [iter, ok] = variable_pool.emplace(std::piecewise_construct,
|
|
|
+ std::forward_as_tuple(name),
|
|
|
+ std::forward_as_tuple());
|
|
|
+ assert(ok);
|
|
|
+ auto store_ptr = &iter->second;
|
|
|
+ store_ptr->var_info = var_info;
|
|
|
+ auto var_type_index = var_info.type_index;
|
|
|
+ auto raw_ptr = var_info.placeholder;
|
|
|
+ std::function<void()> cb_func = [=] {
|
|
|
+ auto lock = std::lock_guard{store_ptr->mu};
|
|
|
+ LOOP_BASIC_VAR_TYPE(
|
|
|
+ {
|
|
|
+ if (real_ptr == nullptr) {
|
|
|
+ store_ptr->value = {};
|
|
|
+ } else {
|
|
|
+ store_ptr->value = real_ptr->value;
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ })
|
|
|
+ };
|
|
|
+ store_ptr->cb_token = std::any_cast<callback_token_type>(
|
|
|
+ command_helper(REGISTER_CALLBACK, cb_func));
|
|
|
+ store_ptr->bind_token = std::any_cast<attach_token_type>(
|
|
|
+ command_helper(ATTACH_CALLBACK, attach_info{var_info.index, store_ptr->cb_token}));
|
|
|
+ post(*global_context, cb_func); // query current value
|
|
|
+ }
|
|
|
+
|
|
|
+ variable_store_info *get_variable_store(const std::string &name) {
|
|
|
+ if (!variable_pool.contains(name)) [[unlikely]] {
|
|
|
+ register_variable(name);
|
|
|
+ }
|
|
|
+ auto iter = variable_pool.find(name);
|
|
|
+ assert(iter != variable_pool.end());
|
|
|
+ return &iter->second;
|
|
|
+ }
|
|
|
+
|
|
|
+ std::any query_variable(const std::string &name) {
|
|
|
+ auto store = get_variable_store(name);
|
|
|
+ auto lock = std::lock_guard{store->mu};
|
|
|
+ return store->value;
|
|
|
+ }
|
|
|
+
|
|
|
+ void update_variable(const std::string &name, const std::any &value) {
|
|
|
+ auto store = get_variable_store(name);
|
|
|
+ auto var_type_index = store->var_info.type_index;
|
|
|
+ auto raw_ptr = store->var_info.placeholder;
|
|
|
+ post(*global_context, [=] {
|
|
|
+ LOOP_BASIC_VAR_TYPE(
|
|
|
+ {
|
|
|
+ using ValueType = decltype(real_ptr->value);
|
|
|
+ if (!value.has_value()) [[unlikely]] {
|
|
|
+ real_ptr = nullptr;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (real_ptr == nullptr) [[unlikely]] {
|
|
|
+ real_ptr = real_ptr->new_instance();
|
|
|
+ }
|
|
|
+ real_ptr->value = std::any_cast<ValueType>(value);
|
|
|
+ return;
|
|
|
+ })
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+ local_connection::local_connection()
|
|
|
+ : pimpl(std::make_unique<impl>()) {}
|
|
|
+
|
|
|
+ local_connection::~local_connection() = default;
|
|
|
+
|
|
|
+ bool local_connection::init_object(const std::string &name) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(INIT, name));
|
|
|
+ }
|
|
|
+
|
|
|
+ bool local_connection::start_object(const std::string &name) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(START, name));
|
|
|
+ }
|
|
|
+
|
|
|
+ bool local_connection::stop_object(const std::string &name) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(STOP, name));
|
|
|
+ }
|
|
|
+
|
|
|
+ bool local_connection::reset_object(const std::string &name) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(RESET, name));
|
|
|
+ }
|
|
|
+
|
|
|
+ tristate_obj::state_type local_connection::query_object(const std::string &name) {
|
|
|
+ return std::any_cast<tristate_obj::state_type>(pimpl->command_helper(QUERY, name));
|
|
|
+ }
|
|
|
+
|
|
|
+ bool local_connection::patch_init_config(const config_patch_type &patch) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(INIT_CONF, patch));
|
|
|
+ }
|
|
|
+
|
|
|
+ bool local_connection::patch_start_config(const config_patch_type &patch) {
|
|
|
+ return std::any_cast<bool>(pimpl->command_helper(START_CONF, patch));
|
|
|
+ }
|
|
|
+
|
|
|
+ std::any local_connection::query_variable_impl(const std::string &name) {
|
|
|
+ return pimpl->query_variable(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ void local_connection::update_variable_impl(const std::string &name, const std::any &value) {
|
|
|
+ pimpl->update_variable(name, value);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|