#include "sophiar_pool.h" #include "core/basic_obj_types.hpp" #include "core/global_defs.h" #include "utility/config_utility.hpp" #include "utility/named_vector.hpp" #include "utility/string_map.hpp" #include "utility/versatile_buffer2.hpp" #include #include #include #include #include using boost::asio::co_spawn; using boost::asio::post; namespace sophiar { struct sophiar_pool::impl { struct variable_info_impl; struct callback_info; using callback_queue_type = std::list; using callback_queue_iter_type = callback_queue_type::iterator; struct attach_info { variable_info_impl *var_info = nullptr; callback_info *callback = nullptr; void *callback_list_iter = nullptr; // callback_list_iter_type void *attach_iter = nullptr; // attach_iter_type }; using attach_list_type = std::list; using attach_iter_type = attach_list_type::iterator; static_assert(sizeof(attach_iter_type) == sizeof(void *)); static_assert(sizeof(attach_iter_type) == sizeof(attach_token_type)); struct callback_info { enum status_type { IDLE, PENDING, RUNNING } status = IDLE; using callback_store_type = std::variant; callback_store_type func_store; function_callback_type exit_func; attach_list_type attach_list; callback_queue_iter_type queue_iter; bool exit_flag = false; void *pool_iter = nullptr; // callback_iter_type }; using callback_pool_type = std::list; using callback_iter_type = callback_pool_type::iterator; static_assert(sizeof(callback_iter_type) == sizeof(callback_token_type)); static_assert(sizeof(callback_iter_type) == sizeof(void *)); using callback_list_type = std::list; using callback_list_iter_type = callback_list_type::iterator; static_assert(sizeof(callback_list_iter_type) == sizeof(void *)); struct variable_type_info { std::type_index type = typeid(void); variable_type_index_type type_index = -1; std::string type_name; // function list using creator_func_type = void *(*)(const nlohmann::json &); creator_func_type creator_func = nullptr; }; struct variable_info_impl { void *placeholder = nullptr; coro_signal2 *update_signal = nullptr; const variable_type_info *type_info = nullptr; timestamp_type last_update_ts = 0; callback_list_type callback_list; }; string_map variable_type_name_index_map; // typename -> index std::unordered_map variable_type_index_index_map; // type_index -> index std::vector variable_type_info_pool; named_vector variable_pool; callback_pool_type callback_pool; callback_queue_type callback_queue; bool is_callback_handler_active = false; template static void *create_variable_pointer(const nlohmann::json &config) { auto placeholder = new SmallObjType::pointer{}; if (!config.contains("value")) { return (void *) placeholder; } // load default value *placeholder = SmallObjType::new_instance(); (*placeholder)->fill_from_json_array(config["value"]); return (void *) placeholder; } template void register_variable_type(std::string_view type_name) { static_assert(SmallObjType::binary_length() <= std::numeric_limits::max()); auto var_type_index = variable_type_info_pool.size(); variable_type_info_pool.emplace_back(); assert(!variable_type_name_index_map.contains(type_name)); variable_type_name_index_map.insert(type_name, var_type_index); const std::type_index type = typeid(SmallObjType); assert(!variable_type_index_index_map.contains(type)); variable_type_index_index_map[type] = var_type_index; auto &type_info = variable_type_info_pool[var_type_index]; type_info.type = type; type_info.type_index = var_type_index; type_info.type_name = type_name; type_info.creator_func = create_variable_pointer; } void register_basic_variable_types() { #define REGISTER_TYPE(var_type) \ register_variable_type(#var_type) REGISTER_TYPE(bool_obj); REGISTER_TYPE(u64int_obj); REGISTER_TYPE(double_obj); REGISTER_TYPE(scalarxyz_obj); REGISTER_TYPE(transform_obj); REGISTER_TYPE(array6_obj); REGISTER_TYPE(array7_obj); #undef REGISTER_TYPE // check registered variable type index assert(variable_type_index_index_map.at(typeid(bool_obj)) == bool_var_type_index); assert(variable_type_index_index_map.at(typeid(u64int_obj)) == u64int_var_type_index); assert(variable_type_index_index_map.at(typeid(double_obj)) == double_var_type_index); assert(variable_type_index_index_map.at(typeid(scalarxyz_obj)) == scalarxyz_var_type_index); assert(variable_type_index_index_map.at(typeid(transform_obj)) == transform_var_type_index); assert(variable_type_index_index_map.at(typeid(array6_obj)) == array6_var_type_index); assert(variable_type_index_index_map.at(typeid(array7_obj)) == array7_var_type_index); } void register_variable(std::string_view name, std::string_view type_name, const nlohmann::json &config = {}) { auto index = variable_pool.new_elem(name); auto &info = variable_pool[index]; assert(variable_type_name_index_map.contains(type_name)); auto var_type_index = variable_type_name_index_map.query(type_name); const auto &type_info = variable_type_info_pool[var_type_index]; info.placeholder = type_info.creator_func(config); info.update_signal = new coro_signal2{}; info.type_info = &type_info; info.last_update_ts = 0; } callback_token_type register_callback(callback_info::callback_store_type &&callback, function_callback_type &&exit_func) { auto &item = callback_pool.emplace_front(); item.func_store = std::move(callback); item.exit_func = std::move(exit_func); item.pool_iter = std::bit_cast(callback_pool.begin()); return std::bit_cast(item.pool_iter); } void unregister_callback(callback_iter_type callback) { while (!callback->attach_list.empty()) { detach_callback(callback->attach_list.begin()); } if (callback->status == callback_info::RUNNING) { callback->exit_flag = true; return; } if (callback->status == callback_info::PENDING) { callback_queue.erase(callback->queue_iter); } callback->exit_func(); callback_pool.erase(callback); } attach_token_type attach_callback(variable_index_type var_index, callback_iter_type callback) { auto &var_info = variable_pool[var_index]; var_info.callback_list.push_front(&(*callback)); auto &attach_item = callback->attach_list.emplace_front(); attach_item.var_info = &var_info; attach_item.callback = &(*callback); attach_item.callback_list_iter = std::bit_cast(var_info.callback_list.begin()); attach_item.attach_iter = std::bit_cast(callback->attach_list.begin()); return std::bit_cast(callback->attach_list.begin()); } static void detach_callback(attach_iter_type iter) { auto callback_iter = std::bit_cast(iter->callback_list_iter); auto attach_iter = std::bit_cast(iter->attach_iter); iter->var_info->callback_list.erase(callback_iter); iter->callback->attach_list.erase(attach_iter); } void handle_callback_func() { while (!callback_queue.empty()) { auto callback = callback_queue.back(); callback_queue.pop_back(); callback->status = callback_info::RUNNING; callback->queue_iter = {}; if (callback->func_store.index() == 0) { std::get<0>(callback->func_store)(); callback->status = callback_info::IDLE; } else { assert(callback->func_store.index() == 1); co_spawn(*global_context, std::get<1>(callback->func_store)(), [=, this](std::exception_ptr eptr) { assert(eptr == nullptr); callback->status = callback_info::IDLE; if (callback->exit_flag) { callback->exit_func(); auto real_iter = std::bit_cast(callback->pool_iter); callback_pool.erase(real_iter); } }); } } is_callback_handler_active = false; } void trigger_callback(variable_info_impl *var_info) { if (var_info->callback_list.empty()) return; bool has_callback = false; for (auto &callback: var_info->callback_list) { if (callback->status != callback_info::IDLE) continue; callback_queue.push_front(callback); callback->status = callback_info::PENDING; callback->queue_iter = callback_queue.begin(); has_callback = true; } if (!has_callback || is_callback_handler_active) return; post(*global_context, [this]() { handle_callback_func(); }); is_callback_handler_active = true; } void load_config(const nlohmann::json &config) { #ifdef SOPHIAR_TEST if (config.empty()) return; #endif // SOPHIAR_TEST ENSURE_ARRAY("variable_list"); for (auto &var_config: config["variable_list"]) { auto var_name = LOAD_STRING_ITEM2(var_config, "name"); auto type_name = LOAD_STRING_ITEM2(var_config, "type"); register_variable(var_name, type_name, var_config); } } impl() { register_basic_variable_types(); } }; sophiar_pool::sophiar_pool() : pimpl(std::make_unique()) {} signal_watcher sophiar_pool::require_variable_watcher(variable_index_type var_index) { assert(pimpl->variable_pool.contains(var_index)); return pimpl->variable_pool[var_index].update_signal->new_watcher(); } std::string sophiar_pool::query_variable_name(variable_index_type var_index) { assert(pimpl->variable_pool.contains(var_index)); return pimpl->variable_pool.to_name_by_index(var_index); } timestamp_type sophiar_pool::query_variable_update_ts(variable_index_type var_index) { assert(pimpl->variable_pool.contains(var_index)); return pimpl->variable_pool[var_index].last_update_ts; } sophiar_pool::variable_info sophiar_pool::query_variable_information(std::string_view var_name) { assert(pimpl->variable_pool.contains(var_name)); const auto &info = pimpl->variable_pool[var_name]; variable_info ret{ .type_index = info.type_info->type_index, .index = pimpl->variable_pool.to_index_by_name(var_name), .placeholder = info.placeholder }; return ret; } variable_index_type sophiar_pool::require_variable_impl(std::string_view var_name, std::type_index var_type) { assert(pimpl->variable_pool.contains(var_name)); const auto &info = pimpl->variable_pool[var_name]; assert(info.type_info->type == var_type); return pimpl->variable_pool.to_index_by_name(var_name); } void *sophiar_pool::require_variable_placeholder_impl(variable_index_type var_index, std::type_index var_type, timestamp_type *ts_out) { assert(pimpl->variable_pool.contains(var_index)); const auto &info = pimpl->variable_pool[var_index]; assert(info.type_info->type == var_type); // ensure type consistency if (ts_out != nullptr) { *ts_out = info.last_update_ts; } return info.placeholder; } void sophiar_pool::update_variable_timestamp_impl(variable_index_type var_index, timestamp_type ts) { assert(pimpl->variable_pool.contains(var_index)); auto &info = pimpl->variable_pool[var_index]; assert(ts > info.last_update_ts); info.last_update_ts = ts; info.update_signal->try_notify_all(ts); pimpl->trigger_callback(&info); } void sophiar_pool::load_config(const nlohmann::json &config) { pimpl->load_config(config); } sophiar_pool::callback_token_type sophiar_pool::register_callback(function_callback_type &&callback, function_callback_type &&exit_func) { return pimpl->register_callback(std::move(callback), std::move(exit_func)); } sophiar_pool::callback_token_type sophiar_pool::register_coro_callback(coroutine_callback_type &&callback, function_callback_type &&exit_func) { return pimpl->register_callback(std::move(callback), std::move(exit_func)); } void sophiar_pool::unregister_callback(callback_token_type token) { if (token == nullptr) return; return pimpl->unregister_callback(std::bit_cast(token)); } sophiar_pool::attach_token_type sophiar_pool::attach_callback(variable_index_type var_index, callback_token_type token) { assert(token != nullptr); return pimpl->attach_callback(var_index, std::bit_cast(token)); } void sophiar_pool::detach_callback(attach_token_type token) { if (token == nullptr) return; return pimpl->detach_callback(std::bit_cast(token)); } sophiar_pool::~sophiar_pool() = default; #ifdef SOPHIAR_TEST void sophiar_pool::register_variable(std::string_view var_name, std::string_view type_name) { pimpl->register_variable(var_name, type_name); } #endif }