#include "sophiar_manager.h" #include "core/sophiar_obj.hpp" #include "core/tristate_obj.h" #include "utility/debug_utility.hpp" #include "utility/named_vector.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace sophiar { using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; struct sophiar_manager::impl { enum class manager_states { INITIAL, NORMAL, SWITCHING_MODE, }; manager_states current_states = manager_states::INITIAL; using config_index_type = uint16_t; config_index_type next_config_index = 0; struct config_info { nlohmann::json config; int16_t ref_count = 0; }; using config_pool_type = std::unordered_map; config_pool_type config_pool; using mode_index_type = uint8_t; using mode_config_pool_type = std::unordered_map; using obj_type_index_type = uint8_t; using obj_factory_func_pool_type = named_vector; struct obj_info { sophiar_obj *ptr; config_index_type last_init_config_index; config_index_type last_start_config_index; mode_config_pool_type init_config_pool; mode_config_pool_type start_config_pool; }; using obj_index_type = uint16_t; using obj_pool_type = named_vector; using obj_set_type = std::unordered_set; using obj_ptr_index_map_type = std::unordered_map; using slot_list_type = std::list; using slot_iter_type = slot_list_type::iterator; struct slot_info { slot_demuxer_base *demuxer = nullptr; tiny_slot_base *direct_slot = nullptr; // 真正的 slot std::stack free_slot_pool; // 从 demuxer 创建的,未使用的 slot slot_list_type used_slot_pool; // 从 demuxer 创建的,正在使用的 slot }; using signal_index_type = uint16_t; using slot_index_type = uint16_t; using signal_pool_type = named_vector; using slot_pool_type = named_vector; struct connection_info { bool is_connected = false; bool is_direct_connected = false; // if false, slot_iter is valid signal_index_type signal_index; slot_index_type slot_index; slot_iter_type slot_iter; }; using connection_index_type = uint32_t; using connection_pool_type = named_vector; using connection_set_type = std::unordered_set; struct global_obj_info { void *placeholder; coro_signal2 *update_signal; timestamp_type last_update_ts; std::type_index obj_type = typeid(void); }; using global_obj_pool_type = named_vector; struct mode_info { obj_set_type obj_set; // 需要运行的对象 connection_set_type connection_set; // 需要建立的连接 std::vector degrade_list; // 降级尝试列表 }; using mode_pool_type = named_vector; sophiar_manager *q_this = nullptr; obj_factory_func_pool_type obj_factory_func_pool; obj_pool_type obj_pool; obj_ptr_index_map_type obj_ptr_index_map; signal_pool_type signal_pool; slot_pool_type slot_pool; connection_pool_type connection_pool; global_obj_pool_type global_obj_pool; mode_index_type current_mode = 0; // all_down mode_pool_type mode_pool; impl() { config_pool[next_config_index++] = config_info{.config={}, .ref_count=1}; // default empty config } std::string get_obj_name_by_ptr(sophiar_obj *obj) const { assert(obj_ptr_index_map.contains(obj)); auto obj_index = obj_ptr_index_map.at(obj); return obj_pool.to_name_by_index(obj_index); } auto get_tristate_ptr(obj_index_type obj_index) { return dynamic_cast(obj_pool[obj_index].ptr); } static auto get_signal_uri(const std::string &signal_obj_name, const std::string &signal_name) { return fmt::format("signal://{}/{}", signal_obj_name, signal_name); } static auto get_slot_uri(const std::string &slot_obj_name, const std::string &slot_name) { return fmt::format("slot://{}/{}", slot_obj_name, slot_name); } static auto get_connection_uri(const signal_index_type signal_index, const slot_index_type slot_index) { return fmt::format("connection://{}/{}", signal_index, slot_index); } void acquire_config(config_index_type config_index) { assert(config_pool.contains(config_index)); ++config_pool.at(config_index).ref_count; } void release_config(config_index_type config_index) { assert(config_pool.contains(config_index)); auto ref_count_after = --config_pool.at(config_index).ref_count; assert(ref_count_after >= 0); if (ref_count_after == 0) { // delete config config_pool.erase(config_index); assert(!config_pool.contains(config_index)); } } void enable_connection(connection_index_type conn_index) { auto &conn_info = connection_pool[conn_index]; assert(!conn_info.is_connected); auto signal = signal_pool[conn_info.signal_index]; auto &slot_info = slot_pool[conn_info.slot_index]; if (!slot_info.direct_slot->is_linked()) { // 直接连接 signal->add_slot_base(slot_info.direct_slot); conn_info.is_direct_connected = true; } else { // 从 muxer 创建的 slot 连接 if (slot_info.free_slot_pool.empty()) { // 获得一个新的 slot auto new_slot = slot_info.demuxer->new_slot(); slot_info.free_slot_pool.push(new_slot); } auto target_slot = slot_info.free_slot_pool.top(); slot_info.free_slot_pool.pop(); signal->add_slot_base(target_slot); slot_info.used_slot_pool.push_front(target_slot); conn_info.is_direct_connected = false; conn_info.slot_iter = slot_info.used_slot_pool.begin(); } SPDLOG_DEBUG("Connection [uri = {}] is enabled.", get_connection_uri(conn_info.signal_index, conn_info.slot_index)); conn_info.is_connected = true; } void disable_connection(connection_index_type conn_index) { auto &conn_info = connection_pool[conn_index]; assert(conn_info.is_connected); auto &slot_info = slot_pool[conn_info.slot_index]; if (conn_info.is_direct_connected) { slot_info.direct_slot->disconnect(); } else { auto target_slot = *(conn_info.slot_iter); target_slot->disconnect(); slot_info.used_slot_pool.erase(conn_info.slot_iter); slot_info.free_slot_pool.push(target_slot); } SPDLOG_DEBUG("Connection [uri = {}] is disabled.", get_connection_uri(conn_info.signal_index, conn_info.slot_index)); conn_info.is_connected = false; } awaitable try_switch_mode(mode_index_type mode_index) { // 尝试切换模式 auto &mode_info = mode_pool[mode_index]; using state_type = tristate_obj::state_type; SPDLOG_INFO("Try switch to mode {}...", mode_pool.to_name_by_index(mode_index)); // 停止不需要的对象 for (obj_index_type obj_index = 0; obj_index < obj_pool.size(); ++obj_index) { auto tristate_ptr = get_tristate_ptr(obj_index); if (tristate_ptr != nullptr && // not tristate_obj tristate_ptr->get_state() == state_type::RUNNING && !mode_info.obj_set.contains(obj_index)) { co_await tristate_ptr->stop(); assert(tristate_ptr->get_state() != state_type::RUNNING && tristate_ptr->get_state() != state_type::STOPPING); } } // 禁用不需要的连接 for (connection_index_type conn_index = 0; conn_index < connection_pool.size(); ++conn_index) { auto &conn_info = connection_pool[conn_index]; if (conn_info.is_connected && !mode_info.connection_set.contains(conn_index)) { disable_connection(conn_index); assert(conn_info.is_connected == false); } } // 启用需要的连接 for (auto conn_index: mode_info.connection_set) { auto &conn_info = connection_pool[conn_index]; if (!conn_info.is_connected) { enable_connection(conn_index); assert(conn_info.is_connected == true); } } // 启动需要的对象 for (auto obj_index: mode_info.obj_set) { auto &obj_info = obj_pool[obj_index]; auto tristate_ptr = get_tristate_ptr(obj_index); if (tristate_ptr == nullptr) continue; // not tristate if (!tristate_ptr->is_stable()) { // TODO show low, cannot switch mode because some obj is unstable. co_return false; } assert(obj_info.init_config_pool.contains(mode_index)); assert(obj_info.start_config_pool.contains(mode_index)); auto init_config_index = obj_info.init_config_pool.at(mode_index); auto start_config_index = obj_info.start_config_pool.at(mode_index); // check if start config is updated if (tristate_ptr->get_state() == state_type::RUNNING) { if (obj_info.last_start_config_index != start_config_index) { SPDLOG_DEBUG("New start config found for object [name = {}].", obj_pool.to_name_by_index(obj_index)); co_await tristate_ptr->stop(); assert(tristate_ptr->is_stable()); } } // check if init config is updated if (tristate_ptr->get_state() != state_type::INITIAL) { assert(tristate_ptr->get_state() == state_type::PENDING || tristate_ptr->get_state() == state_type::RUNNING); if (obj_info.last_init_config_index != init_config_index) { SPDLOG_DEBUG("New init config found for object [name = {}].", obj_pool.to_name_by_index(obj_index)); co_await tristate_ptr->reset(); assert(tristate_ptr->is_stable()); } } // is still running, do nothing if (tristate_ptr->get_state() == state_type::RUNNING) continue; // if not initialized, make it to be pending if (tristate_ptr->get_state() == state_type::INITIAL) { assert(config_pool.contains(init_config_index)); CO_ENSURE(tristate_ptr->init(config_pool.at(init_config_index).config)) obj_info.last_init_config_index = init_config_index; } assert(tristate_ptr->get_state() == state_type::PENDING); assert(config_pool.contains(start_config_index)); CO_ENSURE(tristate_ptr->start(config_pool.at(start_config_index).config)) obj_info.last_start_config_index = start_config_index; assert(tristate_ptr->get_state() == state_type::RUNNING); } current_mode = mode_index; SPDLOG_INFO("Current mode switched to {}.", mode_pool.to_name_by_index(mode_index)); co_return true; } awaitable try_degrade_mode(mode_index_type target_mode) { for (auto degrade_mode: mode_pool[target_mode].degrade_list) { auto ok = co_await try_switch_mode(degrade_mode); if (ok) co_return; SPDLOG_ERROR("Switch to degrade mode {} failed, degrading...", mode_pool.to_name_by_index(degrade_mode)); } assert(false); // all_down will always success co_return; } awaitable switch_mode_impl(mode_index_type target_mode) { // 尝试切换模式,如果失败就降级 bool ok = co_await try_switch_mode(target_mode); if (ok) co_return true; SPDLOG_ERROR("Switch to target mode {} failed, degrading...", mode_pool.to_name_by_index(target_mode)); co_await try_degrade_mode(target_mode); co_return false; } awaitable switch_mode(mode_index_type target_mode) { if (current_states != manager_states::NORMAL) co_return false; current_states = manager_states::SWITCHING_MODE; auto ret = co_await switch_mode_impl(target_mode); current_states = manager_states::NORMAL; co_return ret; } void on_object_stopped(sophiar_obj *obj_ptr) { if (current_states != manager_states::NORMAL) return; // maybe triggered by mode switching assert(obj_ptr_index_map.contains(obj_ptr)); auto obj_index = obj_ptr_index_map.at(obj_ptr); auto &mode_info = mode_pool[current_mode]; assert(mode_info.obj_set.contains(obj_index)); SPDLOG_ERROR("Abnormal object stop detected, degrading..."); co_spawn(global_context, [=]() -> awaitable { if (current_states != manager_states::NORMAL) co_return; current_states = manager_states::SWITCHING_MODE; co_await try_degrade_mode(current_mode); current_states = manager_states::NORMAL; co_return; }, detached); } obj_index_type create_object(const std::string &type_name, const std::string &obj_name) { auto type_index = obj_factory_func_pool.to_index_by_name(type_name); auto factory_func = obj_factory_func_pool[type_index]; auto obj_ptr = factory_func(); auto obj_index = obj_pool.new_elem(obj_name); auto &obj_info = obj_pool[obj_index]; obj_info.ptr = obj_ptr; assert(!obj_ptr_index_map.contains(obj_ptr)); obj_ptr_index_map[obj_ptr] = obj_index; return obj_index; } void register_signal(sophiar_obj *obj, const std::string &signal_name, tiny_signal_base *signal_base) { auto obj_name = get_obj_name_by_ptr(obj); auto signal_uri = get_signal_uri(obj_name, signal_name); auto signal_index = signal_pool.new_elem(signal_uri); signal_pool[signal_index] = signal_base; } void register_slot(sophiar_obj *obj, const std::string &slot_name, tiny_slot_base *slot_base, slot_demuxer_base *demuxer_base) { auto obj_name = get_obj_name_by_ptr(obj); auto slot_uri = get_slot_uri(obj_name, slot_name); auto slot_index = slot_pool.new_elem(slot_uri); auto &slot_info = slot_pool[slot_index]; slot_info.demuxer = demuxer_base; slot_info.direct_slot = slot_base; } std::vector get_valid_modes(const nlohmann::json &config) const { if (config.is_string()) { assert(config.get() == "all"); return {boost::make_counting_iterator(static_cast(1)), // 0 is reserved for all_down boost::make_counting_iterator(mode_pool.size())}; } std::vector ret; assert(config.is_array()); for (auto &mode_json: config) { assert(mode_json.is_string()); auto mode_name = mode_json.get(); ret.push_back(mode_pool.to_index_by_name(mode_name)); } return ret; } std::vector get_valid_modes_for_obj(const nlohmann::json &config, obj_index_type obj_index) const { auto candidate = get_valid_modes(config); std::vector ret; std::copy_if(candidate.begin(), candidate.end(), std::back_inserter(ret), [=](mode_index_type mode_index) { return mode_pool[mode_index].obj_set.contains(obj_index); }); return ret; } void build_mode(const nlohmann::json &config) { assert(config.contains("name")); assert(config["name"].is_string()); auto mode_name = config["name"].get(); auto mode_index = mode_pool.new_elem(mode_name); auto &mode_info = mode_pool[mode_index]; if (config.contains("degrade_list")) { assert(config["degrade_list"].is_array()); for (auto °rade_mode_json: config["degrade_list"]) { assert(degrade_mode_json.is_string()); auto degrade_mode_name = degrade_mode_json.get(); auto degrade_mode_index = mode_pool.to_index_by_name(degrade_mode_name); mode_info.degrade_list.push_back(degrade_mode_index); } } mode_info.degrade_list.push_back(0); // all_down } void build_object(const nlohmann::json &config) { assert(config.contains("type")); assert(config.contains("name")); assert(config["type"].is_string()); assert(config["name"].is_string()); auto type_name = config["type"].get(); auto obj_name = config["name"].get(); auto obj_index = create_object(type_name, obj_name); auto &obj_info = obj_pool[obj_index]; auto obj_ptr = obj_info.ptr; assert(config.contains("enabled_modes")); auto enabled_modes = get_valid_modes(config["enabled_modes"]); for (auto mode_index: enabled_modes) { mode_pool[mode_index].obj_set.insert(obj_index); } assert(config.contains("construct_config")); obj_ptr->load_construct_config(config["construct_config"]); auto tristate_ptr = dynamic_cast(obj_ptr); if (tristate_ptr == nullptr) return; // not tristate obj assert(config.contains("init_configs")); assert(config["init_configs"].is_array()); for (auto &config_json: config["init_configs"]) { assert(config_json.contains("modes")); auto mode_list = get_valid_modes_for_obj(config_json["modes"], obj_index); if (mode_list.empty()) { // TODO show log, invalid mode continue; } assert(config_json.contains("config")); auto &init_config = config_json["config"]; auto cur_config_index = next_config_index++; auto &config_info = config_pool[cur_config_index]; config_info.config = init_config; for (auto mode_index: mode_list) { obj_info.init_config_pool[mode_index] = cur_config_index; acquire_config(cur_config_index); } } assert(config.contains("start_configs")); assert(config["start_configs"].is_array()); for (auto &config_json: config["start_configs"]) { assert(config_json.contains("modes")); auto mode_list = get_valid_modes_for_obj(config_json["modes"], obj_index); if (mode_list.empty()) { // TODO show log, invalid mode config continue; } assert(config_json.contains("config")); auto &start_config = config_json["config"]; auto cur_config_index = next_config_index++; auto &config_info = config_pool[cur_config_index]; config_info.config = start_config; for (auto mode_index: mode_list) { obj_info.start_config_pool[mode_index] = cur_config_index; acquire_config(cur_config_index); } } // fill default configs for (auto mode_index: enabled_modes) { if (!obj_info.init_config_pool.contains(mode_index)) { // TODO show log obj_info.init_config_pool[mode_index] = 0; // empty config acquire_config(0); } if (!obj_info.start_config_pool.contains(mode_index)) { // TODO show log obj_info.start_config_pool[mode_index] = 0; // empty config acquire_config(0); } } } void build_graph(const nlohmann::json &config) { mode_pool.new_elem("all_down"); assert(mode_pool.to_index_by_name("all_down") == 0); assert(config.contains("mode_list")); assert(config["mode_list"].is_array()); for (auto &mode_json: config["mode_list"]) { build_mode(mode_json); } assert(config.contains("object_list")); assert(config["object_list"].is_array()); for (auto &obj_json: config["object_list"]) { build_object(obj_json); } assert(config.contains("connection_list")); assert(config["connection_list"].is_array()); for (auto &part_json: config["connection_list"]) { assert(part_json.contains("modes")); auto mode_list = get_valid_modes(part_json["modes"]); assert(part_json.contains("connections")); assert(part_json["connections"].is_array()); for (auto &conn_json: part_json["connections"]) { assert(conn_json.contains("signal_object")); assert(conn_json.contains("signal_name")); assert(conn_json.contains("slot_object")); assert(conn_json.contains("slot_name")); assert(conn_json["signal_object"].is_string()); assert(conn_json["signal_name"].is_string()); assert(conn_json["slot_object"].is_string()); assert(conn_json["slot_name"].is_string()); auto signal_obj_name = conn_json["signal_object"].get(); auto signal_name = conn_json["signal_name"].get(); auto slot_obj_name = conn_json["slot_object"].get(); auto slot_name = conn_json["slot_name"].get(); auto signal_uri = get_signal_uri(signal_obj_name, signal_name); auto slot_uri = get_slot_uri(slot_obj_name, slot_name); auto signal_index = signal_pool.to_index_by_name(signal_uri); auto slot_index = slot_pool.to_index_by_name(slot_uri); auto conn_uri = get_connection_uri(signal_index, slot_index); auto conn_index = connection_pool.contains(conn_uri) ? connection_pool.to_index_by_name(conn_uri) : connection_pool.new_elem(conn_uri); auto &conn_info = connection_pool[conn_index]; conn_info.signal_index = signal_index; conn_info.slot_index = slot_index; for (auto mode_index: mode_list) { mode_pool[mode_index].connection_set.insert(conn_index); } } } } }; sophiar_manager::sophiar_manager() : pimpl(std::make_unique()) { pimpl->q_this = this; } void sophiar_manager::register_object_type_impl(const std::string &type_name, obj_factory_func_type func) { auto index = pimpl->obj_factory_func_pool.new_elem(type_name); pimpl->obj_factory_func_pool[index] = func; } void sophiar_manager::register_signal_impl(sophiar_obj *obj, const std::string &signal_name, tiny_signal_base *signal_base) { pimpl->register_signal(obj, signal_name, signal_base); } void sophiar_manager::register_slot_impl(sophiar_obj *obj, const std::string &slot_name, tiny_slot_base *slot_base, slot_demuxer_base *demuxer_base) { pimpl->register_slot(obj, slot_name, slot_base, demuxer_base); } void sophiar_manager::build_from_config(const nlohmann::json &config) { assert(pimpl->current_states == impl::manager_states::INITIAL); pimpl->build_graph(config); pimpl->current_states = impl::manager_states::NORMAL; } boost::asio::awaitable sophiar_manager::switch_mode(const std::string &mode_name) { auto mode_index = pimpl->mode_pool.to_index_by_name(mode_name); co_return co_await pimpl->switch_mode(mode_index); } std::string sophiar_manager::get_object_name(sophiar_obj *obj) const { #ifdef SOPHIAR_TEST // make test without manager happy if (!pimpl->obj_ptr_index_map.contains(obj)) { return "unknown"; } #endif assert(pimpl->obj_ptr_index_map.contains(obj)); auto obj_index = pimpl->obj_ptr_index_map.at(obj); return pimpl->obj_pool.to_name_by_index(obj_index); } void sophiar_manager::notify_object_stop(sophiar_obj *obj) { pimpl->on_object_stopped(obj); } global_obj_index_type sophiar_manager::register_global_obj_impl(const std::string &obj_name, std::type_index obj_type, void *placeholder) { if (placeholder == nullptr) { if (!pimpl->global_obj_pool.contains(obj_name)) return ~(global_obj_index_type) 0; // indicate the caller to create a placeholder assert(pimpl->global_obj_pool[obj_name].obj_type == obj_type); return pimpl->global_obj_pool.to_index_by_name(obj_name); } // create new one auto obj_index = pimpl->global_obj_pool.new_elem(obj_name); auto &obj_info = pimpl->global_obj_pool[obj_index]; obj_info.obj_type = obj_type; obj_info.placeholder = placeholder; obj_info.update_signal = new coro_signal2{global_context}; return obj_index; } void *sophiar_manager::get_global_obj_placeholder(global_obj_index_type obj_index, std::type_index obj_type) { auto &obj_info = pimpl->global_obj_pool[obj_index]; assert(obj_info.obj_type == obj_type); return obj_info.placeholder; } void sophiar_manager::update_global_obj_timestamp(global_obj_index_type obj_index, timestamp_type ts) { auto &obj_info = pimpl->global_obj_pool[obj_index]; obj_info.last_update_ts = ts; obj_info.update_signal->try_notify_all(); } signal_watcher sophiar_manager::request_global_obj_update_watcher(global_obj_index_type obj_index) { return pimpl->global_obj_pool[obj_index] .update_signal->new_watcher(global_context); } #ifdef SOPHIAR_TEST sophiar_obj *sophiar_manager::get_object(const std::string &obj_name) const { if (!pimpl->obj_pool.contains(obj_name)) return nullptr; return pimpl->obj_pool[obj_name].ptr; } tiny_slot_base *sophiar_manager::get_slot_impl(const std::string &obj_name, const std::string &slot_name) { auto slot_uri = pimpl->get_slot_uri(obj_name, slot_name); assert(pimpl->slot_pool.contains(slot_uri)); return pimpl->slot_pool[slot_uri].direct_slot; } #endif // SOPHIAR_TEST sophiar_manager::~sophiar_manager() = default; }