|
|
@@ -0,0 +1,350 @@
|
|
|
+#include "external_variable_io.h"
|
|
|
+#include "core/global_defs.h"
|
|
|
+#include "core/sophiar_pool.h"
|
|
|
+#include "utility/config_utility.hpp"
|
|
|
+#include "utility/coro_signal_group.hpp"
|
|
|
+#include "utility/coro_worker.hpp"
|
|
|
+#include "utility/dynamic_pool.hpp"
|
|
|
+#include "utility/versatile_buffer2.hpp"
|
|
|
+
|
|
|
+#include <boost/asio/awaitable.hpp>
|
|
|
+#include <boost/asio/defer.hpp>
|
|
|
+#include <boost/asio/detached.hpp>
|
|
|
+#include <boost/asio/co_spawn.hpp>
|
|
|
+#include <boost/asio/ip/tcp.hpp>
|
|
|
+#include <boost/asio/redirect_error.hpp>
|
|
|
+#include <boost/asio/use_awaitable.hpp>
|
|
|
+#include <boost/system/error_code.hpp>
|
|
|
+
|
|
|
+#include <spdlog/spdlog.h>
|
|
|
+
|
|
|
+namespace sophiar {
|
|
|
+
|
|
|
+ using namespace boost::asio::ip;
|
|
|
+ using boost::asio::awaitable;
|
|
|
+ using boost::asio::co_spawn;
|
|
|
+ using boost::asio::detached;
|
|
|
+ using boost::asio::redirect_error;
|
|
|
+ using boost::asio::use_awaitable;
|
|
|
+ using boost::system::error_code;
|
|
|
+
|
|
|
+ struct external_variable_io::impl {
|
|
|
+
|
|
|
+ static constexpr uint16_t DEFAULT_LISTEN_PORT = 5278;
|
|
|
+
|
|
|
+ using header_type = uint16_t;
|
|
|
+ using variable_size_type = uint8_t;
|
|
|
+ static constexpr size_t header_offset = sizeof(header_type);
|
|
|
+ static constexpr size_t variable_size_type_offset = sizeof(variable_size_type);
|
|
|
+
|
|
|
+ using variable_info_list = dynamic_vector<sophiar_pool::variable_info>;
|
|
|
+
|
|
|
+ struct binary_in_client;
|
|
|
+ struct binary_out_client;
|
|
|
+
|
|
|
+ uint16_t listen_port = DEFAULT_LISTEN_PORT;
|
|
|
+ coro_worker::pointer listen_worker;
|
|
|
+
|
|
|
+ static awaitable<void> create_client_worker(tcp::socket &&s);
|
|
|
+
|
|
|
+ void load_config(const nlohmann::json &config) {
|
|
|
+ if (config.contains("variable_io_port")) {
|
|
|
+ listen_port = LOAD_UINT_ITEM("variable_io_port");
|
|
|
+ } else {
|
|
|
+ listen_port = DEFAULT_LISTEN_PORT;
|
|
|
+ // TODO show log
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ bool start() {
|
|
|
+ assert(listen_worker == nullptr);
|
|
|
+ auto listen_endpoint = tcp::endpoint(tcp::v4(), listen_port);
|
|
|
+ try {
|
|
|
+ auto listen_func = [=,
|
|
|
+ acceptor = tcp::acceptor(*global_context, listen_endpoint)
|
|
|
+ ]() mutable -> awaitable<bool> {
|
|
|
+ error_code ec;
|
|
|
+ auto client = co_await acceptor.async_accept(redirect_error(use_awaitable, ec));
|
|
|
+ if (ec) {
|
|
|
+ SPDLOG_ERROR("Error while waiting for client: {}", ec.what());
|
|
|
+ co_return false;
|
|
|
+ } else {
|
|
|
+ co_spawn(*global_context, create_client_worker(std::move(client)), detached);
|
|
|
+ }
|
|
|
+ co_return true;
|
|
|
+ };
|
|
|
+ listen_worker = make_infinite_coro_worker(std::move(listen_func));
|
|
|
+ listen_worker->run();
|
|
|
+ SPDLOG_INFO("External variable I/O is listen on {}:{}",
|
|
|
+ listen_endpoint.address().to_string(),
|
|
|
+ listen_endpoint.port());
|
|
|
+ } catch (std::exception &e) {
|
|
|
+ // TODO show log, maybe port is occupied.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ static sophiar_pool::variable_info query_variable_information(std::string_view var_name) {
|
|
|
+ return global_sophiar_pool->query_variable_information(var_name);
|
|
|
+ }
|
|
|
+
|
|
|
+ static void update_variable_timestamp(variable_index_type var_index, timestamp_type ts) {
|
|
|
+ return global_sophiar_pool->update_variable_timestamp_impl(var_index, ts);
|
|
|
+ }
|
|
|
+
|
|
|
+ template<typename WriterType>
|
|
|
+ static auto require_raw_pointer_writer(std::type_index var_type) {
|
|
|
+ return global_sophiar_pool->require_raw_pointer_writer<WriterType>(var_type);
|
|
|
+ }
|
|
|
+
|
|
|
+ template<typename ReaderType>
|
|
|
+ static auto require_raw_pointer_reader(std::type_index var_type) {
|
|
|
+ return global_sophiar_pool->require_raw_pointer_reader<ReaderType>(var_type);
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+ external_variable_io::external_variable_io()
|
|
|
+ : pimpl(std::make_unique<impl>()) {
|
|
|
+ }
|
|
|
+
|
|
|
+ external_variable_io::~external_variable_io() = default;
|
|
|
+
|
|
|
+ bool external_variable_io::load_config_and_start(const nlohmann::json &config) {
|
|
|
+ pimpl->load_config(config);
|
|
|
+ return pimpl->start();
|
|
|
+ }
|
|
|
+
|
|
|
+ struct external_variable_io::impl::binary_in_client {
|
|
|
+
|
|
|
+ using reader_type = versatile_reader<sophiar_endian>;
|
|
|
+ using reader_func_type = void (*)(reader_type &, void *);
|
|
|
+
|
|
|
+ struct variable_io_info {
|
|
|
+ void *placeholder;
|
|
|
+ reader_func_type reader_func;
|
|
|
+ };
|
|
|
+
|
|
|
+ using info_pool_type = std::unordered_map<variable_index_type, variable_io_info>;
|
|
|
+
|
|
|
+ tcp::socket s;
|
|
|
+ info_pool_type info_pool;
|
|
|
+ dynamic_memory::pointer buf;
|
|
|
+
|
|
|
+ binary_in_client(tcp::socket &&_s, const variable_info_list &infos)
|
|
|
+ : s(std::move(_s)),
|
|
|
+ buf(dynamic_memory::new_instance()) {
|
|
|
+ for (auto &info: infos) {
|
|
|
+ assert(!info_pool.contains(info.var_index));
|
|
|
+ info_pool.emplace(info.var_index,
|
|
|
+ variable_io_info{
|
|
|
+ .placeholder = info.placeholder,
|
|
|
+ .reader_func = require_raw_pointer_reader<reader_type>(info.type)
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ awaitable<bool> work_once() {
|
|
|
+ // receive message
|
|
|
+ auto length = co_await async_read_value<sophiar_endian, header_type>(s);
|
|
|
+ buf->resize(length);
|
|
|
+ co_await async_fill_memory_from(s, *buf);
|
|
|
+
|
|
|
+ // parse and update variables
|
|
|
+ auto ts = current_timestamp();
|
|
|
+ auto reader = reader_type{*buf};
|
|
|
+ while (!reader.empty()) {
|
|
|
+ auto var_index = reader.read_value<variable_index_type>();
|
|
|
+ auto val_size = reader.read_value<variable_size_type>();
|
|
|
+ auto cur_data = buf->data() + reader.current_offset();
|
|
|
+ reader.manual_offset(val_size);
|
|
|
+ auto val_buf = const_extern_memory{cur_data, val_size};
|
|
|
+ auto val_reader = reader_type{val_buf};
|
|
|
+
|
|
|
+ assert(info_pool.contains(var_index));
|
|
|
+ auto &info = info_pool[var_index];
|
|
|
+ info.reader_func(val_reader, info.placeholder);
|
|
|
+ update_variable_timestamp(var_index, ts);
|
|
|
+ }
|
|
|
+
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+ struct external_variable_io::impl::binary_out_client {
|
|
|
+
|
|
|
+ using writer_type = dynamic_memory_writer<sophiar_endian>;
|
|
|
+ using writer_func_type = void (*)(writer_type &, void *);
|
|
|
+
|
|
|
+ struct variable_io_info {
|
|
|
+ variable_index_type var_index;
|
|
|
+ void *placeholder;
|
|
|
+ writer_func_type writer_func;
|
|
|
+ timestamp_type last_update_ts;
|
|
|
+ };
|
|
|
+
|
|
|
+ using info_pool_type = dynamic_vector<variable_io_info>;
|
|
|
+
|
|
|
+ tcp::socket s;
|
|
|
+ info_pool_type info_pool;
|
|
|
+ dynamic_memory::pointer buf;
|
|
|
+
|
|
|
+ coro_signal_any_group::pointer signal_group;
|
|
|
+ signal_watcher watcher;
|
|
|
+
|
|
|
+ bool is_first_run = true;
|
|
|
+
|
|
|
+ binary_out_client(tcp::socket &&_s, const variable_info_list &infos)
|
|
|
+ : s(std::move(_s)),
|
|
|
+ buf(dynamic_memory::new_instance()),
|
|
|
+ signal_group(coro_signal_any_group::new_instance()),
|
|
|
+ watcher(signal_group->new_watcher()) {
|
|
|
+ for (auto &info: infos) {
|
|
|
+ signal_group->add_watcher(REQUIRE_VARIABLE_WATCHER(info.var_index));
|
|
|
+ info_pool.push_back(
|
|
|
+ {
|
|
|
+ .var_index = info.var_index,
|
|
|
+ .placeholder = info.placeholder,
|
|
|
+ .writer_func = require_raw_pointer_writer<writer_type>(info.type),
|
|
|
+ .last_update_ts = 0
|
|
|
+ });
|
|
|
+ }
|
|
|
+ signal_group->start();
|
|
|
+ }
|
|
|
+
|
|
|
+ awaitable<bool> work_once() {
|
|
|
+ if (!is_first_run) {
|
|
|
+ co_await watcher.coro_wait(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ auto writer = writer_type{buf.get(), header_offset};
|
|
|
+ for (auto &info: info_pool) {
|
|
|
+ auto cur_update_ts = QUERY_VARIABLE_TS(info.var_index);
|
|
|
+ if (!is_first_run
|
|
|
+ && cur_update_ts <= info.last_update_ts) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ info.last_update_ts = cur_update_ts;
|
|
|
+
|
|
|
+ // write variable value
|
|
|
+ writer << info.var_index;
|
|
|
+ auto size_offset = buf->size();
|
|
|
+ buf->increase_size(variable_size_type_offset);
|
|
|
+ info.writer_func(writer, info.placeholder);
|
|
|
+ auto length = buf->size() - size_offset - variable_size_type_offset;
|
|
|
+ assert(length <= std::numeric_limits<variable_size_type>::max());
|
|
|
+ write_binary_value<sophiar_endian>(buf->data() + size_offset,
|
|
|
+ (variable_size_type) length);
|
|
|
+ }
|
|
|
+
|
|
|
+ // fill message size
|
|
|
+ auto length = buf->size() - header_offset;
|
|
|
+ assert(length <= std::numeric_limits<header_type>::max());
|
|
|
+ write_binary_value<sophiar_endian>(buf->data(), (header_type) length);
|
|
|
+
|
|
|
+ is_first_run = false;
|
|
|
+ watcher.sync(); // manual sync to prevent duplicated notification
|
|
|
+
|
|
|
+ co_await async_write_memory_to(s, *buf);
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+ awaitable<void> external_variable_io::impl::create_client_worker(tcp::socket &&_s) {
|
|
|
+ // receive command
|
|
|
+ tcp::socket s = std::move(_s); // 避免这玩意被莫名其妙的析构掉
|
|
|
+ auto length = co_await async_read_value<sophiar_endian, header_type>(s);
|
|
|
+ auto buf_in = dynamic_memory::new_instance(length);
|
|
|
+ co_await async_fill_memory_from(s, *buf_in);
|
|
|
+
|
|
|
+ // load command and params
|
|
|
+ auto reader = versatile_reader<sophiar_endian>(*buf_in);
|
|
|
+ auto params = dynamic_vector<std::string_view>{};
|
|
|
+ auto cmd = reader.read_string_until(' ');
|
|
|
+ params.clear();
|
|
|
+ while (!reader.empty()) {
|
|
|
+ params.push_back(reader.read_string_until(','));
|
|
|
+ }
|
|
|
+
|
|
|
+ // send initial reply
|
|
|
+ auto writer = string_writer{","};
|
|
|
+ variable_info_list infos;
|
|
|
+ for (auto var_name: params) {
|
|
|
+ auto info = query_variable_information(var_name);
|
|
|
+ writer << info.var_index;
|
|
|
+ infos.push_back(info);
|
|
|
+ }
|
|
|
+ auto buf_str = writer.get_string_and_reset();
|
|
|
+ auto buf_out = dynamic_memory::new_instance(buf_str.length() + header_offset);
|
|
|
+ auto buf_writer = versatile_writer<sophiar_endian>(*buf_out);
|
|
|
+ assert(buf_str.length() <= std::numeric_limits<header_type>::max());
|
|
|
+ buf_writer << (header_type) buf_str.length() << buf_str;
|
|
|
+ assert(buf_writer.remaining_bytes() == 0);
|
|
|
+ co_await async_write_memory_to(s, *buf_out);
|
|
|
+
|
|
|
+ // create worker
|
|
|
+ auto re = s.remote_endpoint();
|
|
|
+ auto error_handler = [=](std::exception &e) {
|
|
|
+ SPDLOG_WARN("Client {}:{} left: {}",
|
|
|
+ re.address().to_string(), re.port(), e.what());
|
|
|
+ };
|
|
|
+
|
|
|
+ auto worker_ptr_ptr = new coro_worker *;
|
|
|
+ coro_worker::pointer worker;
|
|
|
+ auto exit_func = [=]() {
|
|
|
+ assert(worker_ptr_ptr != nullptr);
|
|
|
+ boost::asio::defer(*global_context, [=]() {
|
|
|
+ delete *worker_ptr_ptr;
|
|
|
+ delete worker_ptr_ptr;
|
|
|
+ });
|
|
|
+ };
|
|
|
+
|
|
|
+ if (cmd == "VARINB") {
|
|
|
+ auto worker_func = [
|
|
|
+ client = binary_in_client(std::move(s), infos)]() mutable
|
|
|
+ -> awaitable<bool> {
|
|
|
+ return client.work_once();
|
|
|
+ };
|
|
|
+ auto noexcept_worker_func = make_noexcept_func(
|
|
|
+ std::move(worker_func), std::move(error_handler));
|
|
|
+ worker = make_infinite_coro_worker(std::move(noexcept_worker_func),
|
|
|
+ std::move(exit_func));
|
|
|
+ } else if (cmd == "VAROUTB") {
|
|
|
+ auto client = binary_out_client(std::move(s), infos);
|
|
|
+ auto exit_func_ext = [
|
|
|
+ old_exit_func = std::move(exit_func),
|
|
|
+ signal_group = std::move(client.signal_group)]() mutable {
|
|
|
+ co_spawn(*global_context, [ // binary_out_client 的非平凡的析构函数会导致编译错误
|
|
|
+ signal_group = std::move(signal_group)]() -> awaitable<void> {
|
|
|
+ co_await signal_group->stop();
|
|
|
+ co_return;
|
|
|
+ }, detached);
|
|
|
+ old_exit_func();
|
|
|
+ };
|
|
|
+ auto worker_func = [
|
|
|
+ client = std::move(client)]() mutable
|
|
|
+ -> awaitable<bool> {
|
|
|
+ return client.work_once();
|
|
|
+ };
|
|
|
+ auto noexcept_worker_func = make_noexcept_func(
|
|
|
+ std::move(worker_func), std::move(error_handler));
|
|
|
+ worker = make_infinite_coro_worker(std::move(noexcept_worker_func),
|
|
|
+ std::move(exit_func_ext));
|
|
|
+ } else {
|
|
|
+ assert(false);
|
|
|
+ co_return;
|
|
|
+ }
|
|
|
+
|
|
|
+ worker->run();
|
|
|
+ SPDLOG_INFO("Working with client {}:{}",
|
|
|
+ re.address().to_string(), re.port());
|
|
|
+ *worker_ptr_ptr = worker.release();
|
|
|
+
|
|
|
+ co_return;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|