sophiar_manager.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. #include "sophiar_manager.h"
  2. #include "core/sophiar_obj.hpp"
  3. #include "core/tristate_obj.h"
  4. #include "utility/debug_utility.hpp"
  5. #include "utility/named_vector.hpp"
  6. #include <boost/asio/awaitable.hpp>
  7. #include <boost/asio/co_spawn.hpp>
  8. #include <boost/asio/detached.hpp>
  9. #include <boost/iterator/counting_iterator.hpp>
  10. #include <fmt/format.h>
  11. #include <spdlog/spdlog.h>
  12. #include <algorithm>
  13. #include <coroutine>
  14. #include <iterator>
  15. #include <list>
  16. #include <stack>
  17. #include <unordered_map>
  18. #include <unordered_set>
  19. #include <vector>
  20. namespace sophiar {
  21. using boost::asio::awaitable;
  22. using boost::asio::co_spawn;
  23. using boost::asio::detached;
  24. struct sophiar_manager::impl {
  25. enum class manager_states {
  26. INITIAL,
  27. NORMAL,
  28. SWITCHING_MODE,
  29. };
  30. manager_states current_states = manager_states::INITIAL;
  31. using config_index_type = uint16_t;
  32. config_index_type next_config_index = 0;
  33. struct config_info {
  34. nlohmann::json config;
  35. int16_t ref_count = 0;
  36. };
  37. using config_pool_type = std::unordered_map<config_index_type, config_info>;
  38. config_pool_type config_pool;
  39. using mode_index_type = uint8_t;
  40. using mode_config_pool_type = std::unordered_map<mode_index_type, config_index_type>;
  41. using obj_type_index_type = uint8_t;
  42. using obj_factory_func_pool_type = named_vector<obj_type_index_type, obj_factory_func_type>;
  43. struct obj_info {
  44. sophiar_obj *ptr;
  45. config_index_type last_init_config_index;
  46. config_index_type last_start_config_index;
  47. mode_config_pool_type init_config_pool;
  48. mode_config_pool_type start_config_pool;
  49. };
  50. using obj_index_type = uint16_t;
  51. using obj_pool_type = named_vector<obj_index_type, obj_info>;
  52. using obj_set_type = std::unordered_set<obj_index_type>;
  53. using obj_ptr_index_map_type = std::unordered_map<sophiar_obj *, obj_index_type>;
  54. using slot_list_type = std::list<tiny_slot_base *>;
  55. using slot_iter_type = slot_list_type::iterator;
  56. struct slot_info {
  57. slot_demuxer_base *demuxer = nullptr;
  58. tiny_slot_base *direct_slot = nullptr; // 真正的 slot
  59. std::stack<tiny_slot_base *> free_slot_pool; // 从 demuxer 创建的,未使用的 slot
  60. slot_list_type used_slot_pool; // 从 demuxer 创建的,正在使用的 slot
  61. };
  62. using signal_index_type = uint16_t;
  63. using slot_index_type = uint16_t;
  64. using signal_pool_type = named_vector<signal_index_type, tiny_signal_base *>;
  65. using slot_pool_type = named_vector<slot_index_type, slot_info>;
  66. struct connection_info {
  67. bool is_connected = false;
  68. bool is_direct_connected = false; // if false, slot_iter is valid
  69. signal_index_type signal_index;
  70. slot_index_type slot_index;
  71. slot_iter_type slot_iter;
  72. };
  73. using connection_index_type = uint32_t;
  74. using connection_pool_type = named_vector<connection_index_type, connection_info>;
  75. using connection_set_type = std::unordered_set<connection_index_type>;
  76. struct global_obj_info {
  77. void *placeholder;
  78. coro_signal2 *update_signal;
  79. timestamp_type last_update_ts;
  80. std::type_index obj_type = typeid(void);
  81. };
  82. using global_obj_pool_type = named_vector<global_obj_index_type, global_obj_info>;
  83. struct mode_info {
  84. obj_set_type obj_set; // 需要运行的对象
  85. connection_set_type connection_set; // 需要建立的连接
  86. std::vector<mode_index_type> degrade_list; // 降级尝试列表
  87. };
  88. using mode_pool_type = named_vector<mode_index_type, mode_info>;
  89. sophiar_manager *q_this = nullptr;
  90. obj_factory_func_pool_type obj_factory_func_pool;
  91. obj_pool_type obj_pool;
  92. obj_ptr_index_map_type obj_ptr_index_map;
  93. signal_pool_type signal_pool;
  94. slot_pool_type slot_pool;
  95. connection_pool_type connection_pool;
  96. global_obj_pool_type global_obj_pool;
  97. mode_index_type current_mode = 0; // all_down
  98. mode_pool_type mode_pool;
  99. impl() {
  100. config_pool[next_config_index++] =
  101. config_info{.config={}, .ref_count=1}; // default empty config
  102. }
  103. std::string get_obj_name_by_ptr(sophiar_obj *obj) const {
  104. assert(obj_ptr_index_map.contains(obj));
  105. auto obj_index = obj_ptr_index_map.at(obj);
  106. return obj_pool.to_name_by_index(obj_index);
  107. }
  108. auto get_tristate_ptr(obj_index_type obj_index) {
  109. return dynamic_cast<tristate_obj *>(obj_pool[obj_index].ptr);
  110. }
  111. static auto get_signal_uri(const std::string &signal_obj_name,
  112. const std::string &signal_name) {
  113. return fmt::format("signal://{}/{}", signal_obj_name, signal_name);
  114. }
  115. static auto get_slot_uri(const std::string &slot_obj_name,
  116. const std::string &slot_name) {
  117. return fmt::format("slot://{}/{}", slot_obj_name, slot_name);
  118. }
  119. static auto get_connection_uri(const signal_index_type signal_index,
  120. const slot_index_type slot_index) {
  121. return fmt::format("connection://{}/{}", signal_index, slot_index);
  122. }
  123. void acquire_config(config_index_type config_index) {
  124. assert(config_pool.contains(config_index));
  125. ++config_pool.at(config_index).ref_count;
  126. }
  127. void release_config(config_index_type config_index) {
  128. assert(config_pool.contains(config_index));
  129. auto ref_count_after = --config_pool.at(config_index).ref_count;
  130. assert(ref_count_after >= 0);
  131. if (ref_count_after == 0) { // delete config
  132. config_pool.erase(config_index);
  133. assert(!config_pool.contains(config_index));
  134. }
  135. }
  136. void enable_connection(connection_index_type conn_index) {
  137. auto &conn_info = connection_pool[conn_index];
  138. assert(!conn_info.is_connected);
  139. auto signal = signal_pool[conn_info.signal_index];
  140. auto &slot_info = slot_pool[conn_info.slot_index];
  141. if (!slot_info.direct_slot->is_linked()) { // 直接连接
  142. signal->add_slot_base(slot_info.direct_slot);
  143. conn_info.is_direct_connected = true;
  144. } else { // 从 muxer 创建的 slot 连接
  145. if (slot_info.free_slot_pool.empty()) { // 获得一个新的 slot
  146. auto new_slot = slot_info.demuxer->new_slot();
  147. slot_info.free_slot_pool.push(new_slot);
  148. }
  149. auto target_slot = slot_info.free_slot_pool.top();
  150. slot_info.free_slot_pool.pop();
  151. signal->add_slot_base(target_slot);
  152. slot_info.used_slot_pool.push_front(target_slot);
  153. conn_info.is_direct_connected = false;
  154. conn_info.slot_iter = slot_info.used_slot_pool.begin();
  155. }
  156. SPDLOG_DEBUG("Connection [uri = {}] is enabled.",
  157. get_connection_uri(conn_info.signal_index, conn_info.slot_index));
  158. conn_info.is_connected = true;
  159. }
  160. void disable_connection(connection_index_type conn_index) {
  161. auto &conn_info = connection_pool[conn_index];
  162. assert(conn_info.is_connected);
  163. auto &slot_info = slot_pool[conn_info.slot_index];
  164. if (conn_info.is_direct_connected) {
  165. slot_info.direct_slot->disconnect();
  166. } else {
  167. auto target_slot = *(conn_info.slot_iter);
  168. target_slot->disconnect();
  169. slot_info.used_slot_pool.erase(conn_info.slot_iter);
  170. slot_info.free_slot_pool.push(target_slot);
  171. }
  172. SPDLOG_DEBUG("Connection [uri = {}] is disabled.",
  173. get_connection_uri(conn_info.signal_index, conn_info.slot_index));
  174. conn_info.is_connected = false;
  175. }
  176. awaitable<bool> try_switch_mode(mode_index_type mode_index) { // 尝试切换模式
  177. auto &mode_info = mode_pool[mode_index];
  178. using state_type = tristate_obj::state_type;
  179. SPDLOG_INFO("Try switch to mode {}...",
  180. mode_pool.to_name_by_index(mode_index));
  181. // 停止不需要的对象
  182. for (obj_index_type obj_index = 0; obj_index < obj_pool.size(); ++obj_index) {
  183. auto tristate_ptr = get_tristate_ptr(obj_index);
  184. if (tristate_ptr != nullptr && // not tristate_obj
  185. tristate_ptr->get_state() == state_type::RUNNING &&
  186. !mode_info.obj_set.contains(obj_index)) {
  187. co_await tristate_ptr->stop();
  188. assert(tristate_ptr->get_state() != state_type::RUNNING &&
  189. tristate_ptr->get_state() != state_type::STOPPING);
  190. }
  191. }
  192. // 禁用不需要的连接
  193. for (connection_index_type conn_index = 0; conn_index < connection_pool.size(); ++conn_index) {
  194. auto &conn_info = connection_pool[conn_index];
  195. if (conn_info.is_connected &&
  196. !mode_info.connection_set.contains(conn_index)) {
  197. disable_connection(conn_index);
  198. assert(conn_info.is_connected == false);
  199. }
  200. }
  201. // 启用需要的连接
  202. for (auto conn_index: mode_info.connection_set) {
  203. auto &conn_info = connection_pool[conn_index];
  204. if (!conn_info.is_connected) {
  205. enable_connection(conn_index);
  206. assert(conn_info.is_connected == true);
  207. }
  208. }
  209. // 启动需要的对象
  210. for (auto obj_index: mode_info.obj_set) {
  211. auto &obj_info = obj_pool[obj_index];
  212. auto tristate_ptr = get_tristate_ptr(obj_index);
  213. if (tristate_ptr == nullptr) continue; // not tristate
  214. if (!tristate_ptr->is_stable()) {
  215. // TODO show low, cannot switch mode because some obj is unstable.
  216. co_return false;
  217. }
  218. assert(obj_info.init_config_pool.contains(mode_index));
  219. assert(obj_info.start_config_pool.contains(mode_index));
  220. auto init_config_index = obj_info.init_config_pool.at(mode_index);
  221. auto start_config_index = obj_info.start_config_pool.at(mode_index);
  222. // check if start config is updated
  223. if (tristate_ptr->get_state() == state_type::RUNNING) {
  224. if (obj_info.last_start_config_index != start_config_index) {
  225. SPDLOG_DEBUG("New start config found for object [name = {}].",
  226. obj_pool.to_name_by_index(obj_index));
  227. co_await tristate_ptr->stop();
  228. assert(tristate_ptr->is_stable());
  229. }
  230. }
  231. // check if init config is updated
  232. if (tristate_ptr->get_state() != state_type::INITIAL) {
  233. assert(tristate_ptr->get_state() == state_type::PENDING ||
  234. tristate_ptr->get_state() == state_type::RUNNING);
  235. if (obj_info.last_init_config_index != init_config_index) {
  236. SPDLOG_DEBUG("New init config found for object [name = {}].",
  237. obj_pool.to_name_by_index(obj_index));
  238. co_await tristate_ptr->reset();
  239. assert(tristate_ptr->is_stable());
  240. }
  241. }
  242. // is still running, do nothing
  243. if (tristate_ptr->get_state() == state_type::RUNNING) continue;
  244. // if not initialized, make it to be pending
  245. if (tristate_ptr->get_state() == state_type::INITIAL) {
  246. assert(config_pool.contains(init_config_index));
  247. CO_ENSURE(tristate_ptr->init(config_pool.at(init_config_index).config))
  248. obj_info.last_init_config_index = init_config_index;
  249. }
  250. assert(tristate_ptr->get_state() == state_type::PENDING);
  251. assert(config_pool.contains(start_config_index));
  252. CO_ENSURE(tristate_ptr->start(config_pool.at(start_config_index).config))
  253. obj_info.last_start_config_index = start_config_index;
  254. assert(tristate_ptr->get_state() == state_type::RUNNING);
  255. }
  256. current_mode = mode_index;
  257. SPDLOG_INFO("Current mode switched to {}.",
  258. mode_pool.to_name_by_index(mode_index));
  259. co_return true;
  260. }
  261. awaitable<void> try_degrade_mode(mode_index_type target_mode) {
  262. for (auto degrade_mode: mode_pool[target_mode].degrade_list) {
  263. auto ok = co_await try_switch_mode(degrade_mode);
  264. if (ok) co_return;
  265. SPDLOG_ERROR("Switch to degrade mode {} failed, degrading...",
  266. mode_pool.to_name_by_index(degrade_mode));
  267. }
  268. assert(false); // all_down will always success
  269. co_return;
  270. }
  271. awaitable<bool> switch_mode_impl(mode_index_type target_mode) { // 尝试切换模式,如果失败就降级
  272. bool ok = co_await try_switch_mode(target_mode);
  273. if (ok) co_return true;
  274. SPDLOG_ERROR("Switch to target mode {} failed, degrading...",
  275. mode_pool.to_name_by_index(target_mode));
  276. co_await try_degrade_mode(target_mode);
  277. co_return false;
  278. }
  279. awaitable<bool> switch_mode(mode_index_type target_mode) {
  280. if (current_states != manager_states::NORMAL) co_return false;
  281. current_states = manager_states::SWITCHING_MODE;
  282. auto ret = co_await switch_mode_impl(target_mode);
  283. current_states = manager_states::NORMAL;
  284. co_return ret;
  285. }
  286. void on_object_stopped(sophiar_obj *obj_ptr) {
  287. if (current_states != manager_states::NORMAL) return; // maybe triggered by mode switching
  288. assert(obj_ptr_index_map.contains(obj_ptr));
  289. auto obj_index = obj_ptr_index_map.at(obj_ptr);
  290. auto &mode_info = mode_pool[current_mode];
  291. assert(mode_info.obj_set.contains(obj_index));
  292. SPDLOG_ERROR("Abnormal object stop detected, degrading...");
  293. co_spawn(global_context, [=]() -> awaitable<void> {
  294. if (current_states != manager_states::NORMAL) co_return;
  295. current_states = manager_states::SWITCHING_MODE;
  296. co_await try_degrade_mode(current_mode);
  297. current_states = manager_states::NORMAL;
  298. co_return;
  299. }, detached);
  300. }
  301. obj_index_type create_object(const std::string &type_name,
  302. const std::string &obj_name) {
  303. auto type_index = obj_factory_func_pool.to_index_by_name(type_name);
  304. auto factory_func = obj_factory_func_pool[type_index];
  305. auto obj_ptr = factory_func();
  306. auto obj_index = obj_pool.new_elem(obj_name);
  307. auto &obj_info = obj_pool[obj_index];
  308. obj_info.ptr = obj_ptr;
  309. assert(!obj_ptr_index_map.contains(obj_ptr));
  310. obj_ptr_index_map[obj_ptr] = obj_index;
  311. return obj_index;
  312. }
  313. void register_signal(sophiar_obj *obj,
  314. const std::string &signal_name,
  315. tiny_signal_base *signal_base) {
  316. auto obj_name = get_obj_name_by_ptr(obj);
  317. auto signal_uri = get_signal_uri(obj_name, signal_name);
  318. auto signal_index = signal_pool.new_elem(signal_uri);
  319. signal_pool[signal_index] = signal_base;
  320. }
  321. void register_slot(sophiar_obj *obj,
  322. const std::string &slot_name,
  323. tiny_slot_base *slot_base,
  324. slot_demuxer_base *demuxer_base) {
  325. auto obj_name = get_obj_name_by_ptr(obj);
  326. auto slot_uri = get_slot_uri(obj_name, slot_name);
  327. auto slot_index = slot_pool.new_elem(slot_uri);
  328. auto &slot_info = slot_pool[slot_index];
  329. slot_info.demuxer = demuxer_base;
  330. slot_info.direct_slot = slot_base;
  331. }
  332. std::vector<mode_index_type> get_valid_modes(const nlohmann::json &config) const {
  333. if (config.is_string()) {
  334. assert(config.get<std::string>() == "all");
  335. return {boost::make_counting_iterator(static_cast<mode_index_type>(1)), // 0 is reserved for all_down
  336. boost::make_counting_iterator(mode_pool.size())};
  337. }
  338. std::vector<mode_index_type> ret;
  339. assert(config.is_array());
  340. for (auto &mode_json: config) {
  341. assert(mode_json.is_string());
  342. auto mode_name = mode_json.get<std::string>();
  343. ret.push_back(mode_pool.to_index_by_name(mode_name));
  344. }
  345. return ret;
  346. }
  347. std::vector<mode_index_type> get_valid_modes_for_obj(const nlohmann::json &config,
  348. obj_index_type obj_index) const {
  349. auto candidate = get_valid_modes(config);
  350. std::vector<mode_index_type> ret;
  351. std::copy_if(candidate.begin(), candidate.end(),
  352. std::back_inserter(ret),
  353. [=](mode_index_type mode_index) {
  354. return mode_pool[mode_index].obj_set.contains(obj_index);
  355. });
  356. return ret;
  357. }
  358. void build_mode(const nlohmann::json &config) {
  359. assert(config.contains("name"));
  360. assert(config["name"].is_string());
  361. auto mode_name = config["name"].get<std::string>();
  362. auto mode_index = mode_pool.new_elem(mode_name);
  363. auto &mode_info = mode_pool[mode_index];
  364. if (config.contains("degrade_list")) {
  365. assert(config["degrade_list"].is_array());
  366. for (auto &degrade_mode_json: config["degrade_list"]) {
  367. assert(degrade_mode_json.is_string());
  368. auto degrade_mode_name = degrade_mode_json.get<std::string>();
  369. auto degrade_mode_index = mode_pool.to_index_by_name(degrade_mode_name);
  370. mode_info.degrade_list.push_back(degrade_mode_index);
  371. }
  372. }
  373. mode_info.degrade_list.push_back(0); // all_down
  374. }
  375. void build_object(const nlohmann::json &config) {
  376. assert(config.contains("type"));
  377. assert(config.contains("name"));
  378. assert(config["type"].is_string());
  379. assert(config["name"].is_string());
  380. auto type_name = config["type"].get<std::string>();
  381. auto obj_name = config["name"].get<std::string>();
  382. auto obj_index = create_object(type_name, obj_name);
  383. auto &obj_info = obj_pool[obj_index];
  384. auto obj_ptr = obj_info.ptr;
  385. assert(config.contains("enabled_modes"));
  386. auto enabled_modes = get_valid_modes(config["enabled_modes"]);
  387. for (auto mode_index: enabled_modes) {
  388. mode_pool[mode_index].obj_set.insert(obj_index);
  389. }
  390. assert(config.contains("construct_config"));
  391. obj_ptr->load_construct_config(config["construct_config"]);
  392. auto tristate_ptr = dynamic_cast<tristate_obj *>(obj_ptr);
  393. if (tristate_ptr == nullptr) return; // not tristate obj
  394. assert(config.contains("init_configs"));
  395. assert(config["init_configs"].is_array());
  396. for (auto &config_json: config["init_configs"]) {
  397. assert(config_json.contains("modes"));
  398. auto mode_list = get_valid_modes_for_obj(config_json["modes"], obj_index);
  399. if (mode_list.empty()) {
  400. // TODO show log, invalid mode
  401. continue;
  402. }
  403. assert(config_json.contains("config"));
  404. auto &init_config = config_json["config"];
  405. auto cur_config_index = next_config_index++;
  406. auto &config_info = config_pool[cur_config_index];
  407. config_info.config = init_config;
  408. for (auto mode_index: mode_list) {
  409. obj_info.init_config_pool[mode_index] = cur_config_index;
  410. acquire_config(cur_config_index);
  411. }
  412. }
  413. assert(config.contains("start_configs"));
  414. assert(config["start_configs"].is_array());
  415. for (auto &config_json: config["start_configs"]) {
  416. assert(config_json.contains("modes"));
  417. auto mode_list = get_valid_modes_for_obj(config_json["modes"], obj_index);
  418. if (mode_list.empty()) {
  419. // TODO show log, invalid mode config
  420. continue;
  421. }
  422. assert(config_json.contains("config"));
  423. auto &start_config = config_json["config"];
  424. auto cur_config_index = next_config_index++;
  425. auto &config_info = config_pool[cur_config_index];
  426. config_info.config = start_config;
  427. for (auto mode_index: mode_list) {
  428. obj_info.start_config_pool[mode_index] = cur_config_index;
  429. acquire_config(cur_config_index);
  430. }
  431. }
  432. // fill default configs
  433. for (auto mode_index: enabled_modes) {
  434. if (!obj_info.init_config_pool.contains(mode_index)) {
  435. // TODO show log
  436. obj_info.init_config_pool[mode_index] = 0; // empty config
  437. acquire_config(0);
  438. }
  439. if (!obj_info.start_config_pool.contains(mode_index)) {
  440. // TODO show log
  441. obj_info.start_config_pool[mode_index] = 0; // empty config
  442. acquire_config(0);
  443. }
  444. }
  445. }
  446. void build_graph(const nlohmann::json &config) {
  447. mode_pool.new_elem("all_down");
  448. assert(mode_pool.to_index_by_name("all_down") == 0);
  449. assert(config.contains("mode_list"));
  450. assert(config["mode_list"].is_array());
  451. for (auto &mode_json: config["mode_list"]) {
  452. build_mode(mode_json);
  453. }
  454. assert(config.contains("object_list"));
  455. assert(config["object_list"].is_array());
  456. for (auto &obj_json: config["object_list"]) {
  457. build_object(obj_json);
  458. }
  459. assert(config.contains("connection_list"));
  460. assert(config["connection_list"].is_array());
  461. for (auto &part_json: config["connection_list"]) {
  462. assert(part_json.contains("modes"));
  463. auto mode_list = get_valid_modes(part_json["modes"]);
  464. assert(part_json.contains("connections"));
  465. assert(part_json["connections"].is_array());
  466. for (auto &conn_json: part_json["connections"]) {
  467. assert(conn_json.contains("signal_object"));
  468. assert(conn_json.contains("signal_name"));
  469. assert(conn_json.contains("slot_object"));
  470. assert(conn_json.contains("slot_name"));
  471. assert(conn_json["signal_object"].is_string());
  472. assert(conn_json["signal_name"].is_string());
  473. assert(conn_json["slot_object"].is_string());
  474. assert(conn_json["slot_name"].is_string());
  475. auto signal_obj_name = conn_json["signal_object"].get<std::string>();
  476. auto signal_name = conn_json["signal_name"].get<std::string>();
  477. auto slot_obj_name = conn_json["slot_object"].get<std::string>();
  478. auto slot_name = conn_json["slot_name"].get<std::string>();
  479. auto signal_uri = get_signal_uri(signal_obj_name, signal_name);
  480. auto slot_uri = get_slot_uri(slot_obj_name, slot_name);
  481. auto signal_index = signal_pool.to_index_by_name(signal_uri);
  482. auto slot_index = slot_pool.to_index_by_name(slot_uri);
  483. auto conn_uri = get_connection_uri(signal_index, slot_index);
  484. auto conn_index = connection_pool.contains(conn_uri) ?
  485. connection_pool.to_index_by_name(conn_uri) :
  486. connection_pool.new_elem(conn_uri);
  487. auto &conn_info = connection_pool[conn_index];
  488. conn_info.signal_index = signal_index;
  489. conn_info.slot_index = slot_index;
  490. for (auto mode_index: mode_list) {
  491. mode_pool[mode_index].connection_set.insert(conn_index);
  492. }
  493. }
  494. }
  495. }
  496. };
  497. sophiar_manager::sophiar_manager()
  498. : pimpl(std::make_unique<impl>()) {
  499. pimpl->q_this = this;
  500. }
  501. void sophiar_manager::register_object_type_impl(const std::string &type_name,
  502. obj_factory_func_type func) {
  503. auto index = pimpl->obj_factory_func_pool.new_elem(type_name);
  504. pimpl->obj_factory_func_pool[index] = func;
  505. }
  506. void sophiar_manager::register_signal_impl(sophiar_obj *obj,
  507. const std::string &signal_name,
  508. tiny_signal_base *signal_base) {
  509. pimpl->register_signal(obj, signal_name, signal_base);
  510. }
  511. void sophiar_manager::register_slot_impl(sophiar_obj *obj,
  512. const std::string &slot_name,
  513. tiny_slot_base *slot_base,
  514. slot_demuxer_base *demuxer_base) {
  515. pimpl->register_slot(obj, slot_name, slot_base, demuxer_base);
  516. }
  517. void sophiar_manager::build_from_config(const nlohmann::json &config) {
  518. assert(pimpl->current_states == impl::manager_states::INITIAL);
  519. pimpl->build_graph(config);
  520. pimpl->current_states = impl::manager_states::NORMAL;
  521. }
  522. boost::asio::awaitable<bool> sophiar_manager::switch_mode(const std::string &mode_name) {
  523. auto mode_index = pimpl->mode_pool.to_index_by_name(mode_name);
  524. co_return co_await pimpl->switch_mode(mode_index);
  525. }
  526. std::string sophiar_manager::get_object_name(sophiar_obj *obj) const {
  527. #ifdef SOPHIAR_TEST // make test without manager happy
  528. if (!pimpl->obj_ptr_index_map.contains(obj)) {
  529. return "unknown";
  530. }
  531. #endif
  532. assert(pimpl->obj_ptr_index_map.contains(obj));
  533. auto obj_index = pimpl->obj_ptr_index_map.at(obj);
  534. return pimpl->obj_pool.to_name_by_index(obj_index);
  535. }
  536. void sophiar_manager::notify_object_stop(sophiar_obj *obj) {
  537. pimpl->on_object_stopped(obj);
  538. }
  539. global_obj_index_type sophiar_manager::register_global_obj_impl(const std::string &obj_name,
  540. std::type_index obj_type,
  541. void *placeholder) {
  542. if (placeholder == nullptr) {
  543. if (!pimpl->global_obj_pool.contains(obj_name))
  544. return ~(global_obj_index_type) 0; // indicate the caller to create a placeholder
  545. assert(pimpl->global_obj_pool[obj_name].obj_type == obj_type);
  546. return pimpl->global_obj_pool.to_index_by_name(obj_name);
  547. }
  548. // create new one
  549. auto obj_index = pimpl->global_obj_pool.new_elem(obj_name);
  550. auto &obj_info = pimpl->global_obj_pool[obj_index];
  551. obj_info.obj_type = obj_type;
  552. obj_info.placeholder = placeholder;
  553. obj_info.update_signal = new coro_signal2{global_context};
  554. return obj_index;
  555. }
  556. void *sophiar_manager::get_global_obj_placeholder(global_obj_index_type obj_index,
  557. std::type_index obj_type) {
  558. auto &obj_info = pimpl->global_obj_pool[obj_index];
  559. assert(obj_info.obj_type == obj_type);
  560. return obj_info.placeholder;
  561. }
  562. void sophiar_manager::update_global_obj_timestamp(global_obj_index_type obj_index,
  563. timestamp_type ts) {
  564. auto &obj_info = pimpl->global_obj_pool[obj_index];
  565. obj_info.last_update_ts = ts;
  566. obj_info.update_signal->try_notify_all();
  567. }
  568. signal_watcher sophiar_manager::request_global_obj_update_watcher(global_obj_index_type obj_index) {
  569. return pimpl->global_obj_pool[obj_index]
  570. .update_signal->new_watcher(global_context);
  571. }
  572. #ifdef SOPHIAR_TEST
  573. sophiar_obj *sophiar_manager::get_object(const std::string &obj_name) const {
  574. if (!pimpl->obj_pool.contains(obj_name)) return nullptr;
  575. return pimpl->obj_pool[obj_name].ptr;
  576. }
  577. tiny_slot_base *sophiar_manager::get_slot_impl(const std::string &obj_name,
  578. const std::string &slot_name) {
  579. auto slot_uri = pimpl->get_slot_uri(obj_name, slot_name);
  580. assert(pimpl->slot_pool.contains(slot_uri));
  581. return pimpl->slot_pool[slot_uri].direct_slot;
  582. }
  583. #endif // SOPHIAR_TEST
  584. sophiar_manager::~sophiar_manager() = default;
  585. }