| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- #include "sophiar_pool.h"
- #include "core/basic_obj_types.hpp"
- #include "core/global_defs.h"
- #include "utility/config_utility.hpp"
- #include "utility/named_vector.hpp"
- #include "utility/string_map.hpp"
- #include "utility/versatile_buffer2.hpp"
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/post.hpp>
- #include <list>
- #include <vector>
- #include <unordered_map>
- using boost::asio::co_spawn;
- using boost::asio::post;
- namespace sophiar {
- struct sophiar_pool::impl {
- struct variable_info_impl;
- struct callback_info;
- using callback_queue_type = std::list<callback_info *>;
- using callback_queue_iter_type = callback_queue_type::iterator;
- struct attach_info {
- variable_info_impl *var_info = nullptr;
- callback_info *callback = nullptr;
- void *callback_list_iter = nullptr; // callback_list_iter_type
- void *attach_iter = nullptr; // attach_iter_type
- };
- using attach_list_type = std::list<attach_info>;
- using attach_iter_type = attach_list_type::iterator;
- static_assert(sizeof(attach_iter_type) == sizeof(void *));
- static_assert(sizeof(attach_iter_type) == sizeof(attach_token_type));
- struct callback_info {
- enum status_type {
- IDLE, PENDING, RUNNING
- } status = IDLE;
- using callback_store_type = std::variant<function_callback_type, coroutine_callback_type>;
- callback_store_type func_store;
- function_callback_type exit_func;
- attach_list_type attach_list;
- callback_queue_iter_type queue_iter;
- bool exit_flag = false;
- void *pool_iter = nullptr; // callback_iter_type
- };
- using callback_pool_type = std::list<callback_info>;
- using callback_iter_type = callback_pool_type::iterator;
- static_assert(sizeof(callback_iter_type) == sizeof(callback_token_type));
- static_assert(sizeof(callback_iter_type) == sizeof(void *));
- using callback_list_type = std::list<callback_info *>;
- using callback_list_iter_type = callback_list_type::iterator;
- static_assert(sizeof(callback_list_iter_type) == sizeof(void *));
- struct variable_type_info {
- std::type_index type = typeid(void);
- variable_type_index_type type_index = -1;
- std::string type_name;
- // function list
- using creator_func_type = void *(*)(const nlohmann::json &);
- creator_func_type creator_func = nullptr;
- };
- struct variable_info_impl {
- void *placeholder = nullptr;
- coro_signal2 *update_signal = nullptr;
- const variable_type_info *type_info = nullptr;
- timestamp_type last_update_ts = 0;
- callback_list_type callback_list;
- };
- string_map<variable_type_index_type> variable_type_name_index_map; // typename -> index
- std::unordered_map<std::type_index, variable_type_index_type> variable_type_index_index_map; // type_index -> index
- std::vector<variable_type_info> variable_type_info_pool;
- named_vector<variable_index_type, variable_info_impl> variable_pool;
- callback_pool_type callback_pool;
- callback_queue_type callback_queue;
- bool is_callback_handler_active = false;
- template<typename SmallObjType>
- static void *create_variable_pointer(const nlohmann::json &config) {
- auto placeholder = new SmallObjType::pointer{};
- if (!config.contains("value")) {
- return (void *) placeholder;
- }
- // load default value
- *placeholder = SmallObjType::new_instance();
- (*placeholder)->fill_from_json_array(config["value"]);
- return (void *) placeholder;
- }
- template<typename SmallObjType>
- void register_variable_type(std::string_view type_name) {
- static_assert(SmallObjType::binary_length() <= std::numeric_limits<uint8_t>::max());
- auto var_type_index = variable_type_info_pool.size();
- variable_type_info_pool.emplace_back();
- assert(!variable_type_name_index_map.contains(type_name));
- variable_type_name_index_map.insert(type_name, var_type_index);
- const std::type_index type = typeid(SmallObjType);
- assert(!variable_type_index_index_map.contains(type));
- variable_type_index_index_map[type] = var_type_index;
- auto &type_info = variable_type_info_pool[var_type_index];
- type_info.type = type;
- type_info.type_index = var_type_index;
- type_info.type_name = type_name;
- type_info.creator_func = create_variable_pointer<SmallObjType>;
- }
- void register_basic_variable_types() {
- #define REGISTER_TYPE(var_type) \
- register_variable_type<var_type>(#var_type)
- REGISTER_TYPE(bool_obj);
- REGISTER_TYPE(u64int_obj);
- REGISTER_TYPE(double_obj);
- REGISTER_TYPE(scalarxyz_obj);
- REGISTER_TYPE(transform_obj);
- REGISTER_TYPE(array6_obj);
- REGISTER_TYPE(array7_obj);
- #undef REGISTER_TYPE
- // check registered variable type index
- assert(variable_type_index_index_map.at(typeid(bool_obj)) == bool_var_type_index);
- assert(variable_type_index_index_map.at(typeid(u64int_obj)) == u64int_var_type_index);
- assert(variable_type_index_index_map.at(typeid(double_obj)) == double_var_type_index);
- assert(variable_type_index_index_map.at(typeid(scalarxyz_obj)) == scalarxyz_var_type_index);
- assert(variable_type_index_index_map.at(typeid(transform_obj)) == transform_var_type_index);
- assert(variable_type_index_index_map.at(typeid(array6_obj)) == array6_var_type_index);
- assert(variable_type_index_index_map.at(typeid(array7_obj)) == array7_var_type_index);
- }
- void register_variable(std::string_view name,
- std::string_view type_name,
- const nlohmann::json &config = {}) {
- auto index = variable_pool.new_elem(name);
- auto &info = variable_pool[index];
- assert(variable_type_name_index_map.contains(type_name));
- auto var_type_index = variable_type_name_index_map.query(type_name);
- const auto &type_info = variable_type_info_pool[var_type_index];
- info.placeholder = type_info.creator_func(config);
- info.update_signal = new coro_signal2{};
- info.type_info = &type_info;
- info.last_update_ts = 0;
- }
- callback_token_type register_callback(callback_info::callback_store_type &&callback,
- function_callback_type &&exit_func) {
- auto &item = callback_pool.emplace_front();
- item.func_store = std::move(callback);
- item.exit_func = std::move(exit_func);
- item.pool_iter = std::bit_cast<void *>(callback_pool.begin());
- return std::bit_cast<callback_token_type>(item.pool_iter);
- }
- void unregister_callback(callback_iter_type callback) {
- while (!callback->attach_list.empty()) {
- detach_callback(callback->attach_list.begin());
- }
- if (callback->status == callback_info::RUNNING) {
- callback->exit_flag = true;
- return;
- }
- if (callback->status == callback_info::PENDING) {
- callback_queue.erase(callback->queue_iter);
- }
- callback->exit_func();
- callback_pool.erase(callback);
- }
- attach_token_type attach_callback(variable_index_type var_index, callback_iter_type callback) {
- auto &var_info = variable_pool[var_index];
- var_info.callback_list.push_front(&(*callback));
- auto &attach_item = callback->attach_list.emplace_front();
- attach_item.var_info = &var_info;
- attach_item.callback = &(*callback);
- attach_item.callback_list_iter = std::bit_cast<void *>(var_info.callback_list.begin());
- attach_item.attach_iter = std::bit_cast<void *>(callback->attach_list.begin());
- return std::bit_cast<attach_token_type>(callback->attach_list.begin());
- }
- static void detach_callback(attach_iter_type iter) {
- auto callback_iter = std::bit_cast<callback_list_iter_type>(iter->callback_list_iter);
- auto attach_iter = std::bit_cast<attach_iter_type>(iter->attach_iter);
- iter->var_info->callback_list.erase(callback_iter);
- iter->callback->attach_list.erase(attach_iter);
- }
- void handle_callback_func() {
- while (!callback_queue.empty()) {
- auto callback = callback_queue.back();
- callback_queue.pop_back();
- callback->status = callback_info::RUNNING;
- callback->queue_iter = {};
- if (callback->func_store.index() == 0) {
- std::get<0>(callback->func_store)();
- callback->status = callback_info::IDLE;
- } else {
- assert(callback->func_store.index() == 1);
- co_spawn(*global_context, std::get<1>(callback->func_store)(), [=, this](std::exception_ptr eptr) {
- assert(eptr == nullptr);
- callback->status = callback_info::IDLE;
- if (callback->exit_flag) {
- callback->exit_func();
- auto real_iter = std::bit_cast<callback_iter_type>(callback->pool_iter);
- callback_pool.erase(real_iter);
- }
- });
- }
- }
- is_callback_handler_active = false;
- }
- void trigger_callback(variable_info_impl *var_info) {
- if (var_info->callback_list.empty()) return;
- bool has_callback = false;
- for (auto &callback: var_info->callback_list) {
- if (callback->status != callback_info::IDLE) continue;
- callback_queue.push_front(callback);
- callback->status = callback_info::PENDING;
- callback->queue_iter = callback_queue.begin();
- has_callback = true;
- }
- if (!has_callback || is_callback_handler_active) return;
- post(*global_context, [this]() { handle_callback_func(); });
- is_callback_handler_active = true;
- }
- void load_config(const nlohmann::json &config) {
- #ifdef SOPHIAR_TEST
- if (config.empty()) return;
- #endif // SOPHIAR_TEST
- ENSURE_ARRAY("variable_list");
- for (auto &var_config: config["variable_list"]) {
- auto var_name = LOAD_STRING_ITEM2(var_config, "name");
- auto type_name = LOAD_STRING_ITEM2(var_config, "type");
- register_variable(var_name, type_name, var_config);
- }
- }
- impl() {
- register_basic_variable_types();
- }
- };
- sophiar_pool::sophiar_pool()
- : pimpl(std::make_unique<impl>()) {}
- signal_watcher sophiar_pool::require_variable_watcher(variable_index_type var_index) {
- assert(pimpl->variable_pool.contains(var_index));
- return pimpl->variable_pool[var_index].update_signal->new_watcher();
- }
- std::string sophiar_pool::query_variable_name(variable_index_type var_index) {
- assert(pimpl->variable_pool.contains(var_index));
- return pimpl->variable_pool.to_name_by_index(var_index);
- }
- timestamp_type sophiar_pool::query_variable_update_ts(variable_index_type var_index) {
- assert(pimpl->variable_pool.contains(var_index));
- return pimpl->variable_pool[var_index].last_update_ts;
- }
- sophiar_pool::variable_info sophiar_pool::query_variable_information(std::string_view var_name) {
- assert(pimpl->variable_pool.contains(var_name));
- const auto &info = pimpl->variable_pool[var_name];
- variable_info ret{
- .type_index = info.type_info->type_index,
- .index = pimpl->variable_pool.to_index_by_name(var_name),
- .placeholder = info.placeholder
- };
- return ret;
- }
- variable_index_type sophiar_pool::require_variable_impl(std::string_view var_name,
- std::type_index var_type) {
- assert(pimpl->variable_pool.contains(var_name));
- const auto &info = pimpl->variable_pool[var_name];
- assert(info.type_info->type == var_type);
- return pimpl->variable_pool.to_index_by_name(var_name);
- }
- void *sophiar_pool::require_variable_placeholder_impl(variable_index_type var_index,
- std::type_index var_type,
- timestamp_type *ts_out) {
- assert(pimpl->variable_pool.contains(var_index));
- const auto &info = pimpl->variable_pool[var_index];
- assert(info.type_info->type == var_type); // ensure type consistency
- if (ts_out != nullptr) {
- *ts_out = info.last_update_ts;
- }
- return info.placeholder;
- }
- void sophiar_pool::update_variable_timestamp_impl(variable_index_type var_index,
- timestamp_type ts) {
- assert(pimpl->variable_pool.contains(var_index));
- auto &info = pimpl->variable_pool[var_index];
- assert(ts > info.last_update_ts);
- info.last_update_ts = ts;
- info.update_signal->try_notify_all(ts);
- pimpl->trigger_callback(&info);
- }
- void sophiar_pool::load_config(const nlohmann::json &config) {
- pimpl->load_config(config);
- }
- sophiar_pool::callback_token_type
- sophiar_pool::register_callback(function_callback_type &&callback,
- function_callback_type &&exit_func) {
- return pimpl->register_callback(std::move(callback), std::move(exit_func));
- }
- sophiar_pool::callback_token_type
- sophiar_pool::register_coro_callback(coroutine_callback_type &&callback,
- function_callback_type &&exit_func) {
- return pimpl->register_callback(std::move(callback), std::move(exit_func));
- }
- void sophiar_pool::unregister_callback(callback_token_type token) {
- if (token == nullptr) return;
- return pimpl->unregister_callback(std::bit_cast<impl::callback_iter_type>(token));
- }
- sophiar_pool::attach_token_type sophiar_pool::attach_callback(variable_index_type var_index,
- callback_token_type token) {
- assert(token != nullptr);
- return pimpl->attach_callback(var_index, std::bit_cast<impl::callback_iter_type>(token));
- }
- void sophiar_pool::detach_callback(attach_token_type token) {
- if (token == nullptr) return;
- return pimpl->detach_callback(std::bit_cast<impl::attach_iter_type>(token));
- }
- sophiar_pool::~sophiar_pool() = default;
- #ifdef SOPHIAR_TEST
- void sophiar_pool::register_variable(std::string_view var_name,
- std::string_view type_name) {
- pimpl->register_variable(var_name, type_name);
- }
- #endif
- }
|