sophiar_pool.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. #include "sophiar_pool.h"
  2. #include "core/basic_obj_types.hpp"
  3. #include "core/global_defs.h"
  4. #include "utility/config_utility.hpp"
  5. #include "utility/named_vector.hpp"
  6. #include "utility/string_map.hpp"
  7. #include "utility/versatile_buffer2.hpp"
  8. #include <boost/asio/co_spawn.hpp>
  9. #include <boost/asio/post.hpp>
  10. #include <list>
  11. #include <vector>
  12. #include <unordered_map>
  13. using boost::asio::co_spawn;
  14. using boost::asio::post;
  15. namespace sophiar {
  16. struct sophiar_pool::impl {
  17. struct variable_info_impl;
  18. struct callback_info;
  19. using callback_queue_type = std::list<callback_info *>;
  20. using callback_queue_iter_type = callback_queue_type::iterator;
  21. struct attach_info {
  22. variable_info_impl *var_info = nullptr;
  23. callback_info *callback = nullptr;
  24. void *callback_list_iter = nullptr; // callback_list_iter_type
  25. void *attach_iter = nullptr; // attach_iter_type
  26. };
  27. using attach_list_type = std::list<attach_info>;
  28. using attach_iter_type = attach_list_type::iterator;
  29. static_assert(sizeof(attach_iter_type) == sizeof(void *));
  30. static_assert(sizeof(attach_iter_type) == sizeof(attach_token_type));
  31. struct callback_info {
  32. enum status_type {
  33. IDLE, PENDING, RUNNING
  34. } status = IDLE;
  35. using callback_store_type = std::variant<function_callback_type, coroutine_callback_type>;
  36. callback_store_type func_store;
  37. function_callback_type exit_func;
  38. attach_list_type attach_list;
  39. callback_queue_iter_type queue_iter;
  40. bool exit_flag = false;
  41. void *pool_iter = nullptr; // callback_iter_type
  42. };
  43. using callback_pool_type = std::list<callback_info>;
  44. using callback_iter_type = callback_pool_type::iterator;
  45. static_assert(sizeof(callback_iter_type) == sizeof(callback_token_type));
  46. static_assert(sizeof(callback_iter_type) == sizeof(void *));
  47. using callback_list_type = std::list<callback_info *>;
  48. using callback_list_iter_type = callback_list_type::iterator;
  49. static_assert(sizeof(callback_list_iter_type) == sizeof(void *));
  50. struct variable_type_info {
  51. std::type_index type = typeid(void);
  52. variable_type_index_type type_index = -1;
  53. std::string type_name;
  54. // function list
  55. using creator_func_type = void *(*)(const nlohmann::json &);
  56. creator_func_type creator_func = nullptr;
  57. };
  58. struct variable_info_impl {
  59. void *placeholder = nullptr;
  60. coro_signal2 *update_signal = nullptr;
  61. const variable_type_info *type_info = nullptr;
  62. timestamp_type last_update_ts = 0;
  63. callback_list_type callback_list;
  64. };
  65. string_map<variable_type_index_type> variable_type_name_index_map; // typename -> index
  66. std::unordered_map<std::type_index, variable_type_index_type> variable_type_index_index_map; // type_index -> index
  67. std::vector<variable_type_info> variable_type_info_pool;
  68. named_vector<variable_index_type, variable_info_impl> variable_pool;
  69. callback_pool_type callback_pool;
  70. callback_queue_type callback_queue;
  71. bool is_callback_handler_active = false;
  72. template<typename SmallObjType>
  73. static void *create_variable_pointer(const nlohmann::json &config) {
  74. auto placeholder = new SmallObjType::pointer{};
  75. if (!config.contains("value")) {
  76. return (void *) placeholder;
  77. }
  78. // load default value
  79. *placeholder = SmallObjType::new_instance();
  80. (*placeholder)->fill_from_json_array(config["value"]);
  81. return (void *) placeholder;
  82. }
  83. template<typename SmallObjType>
  84. void register_variable_type(std::string_view type_name) {
  85. static_assert(SmallObjType::binary_length() <= std::numeric_limits<uint8_t>::max());
  86. auto var_type_index = variable_type_info_pool.size();
  87. variable_type_info_pool.emplace_back();
  88. assert(!variable_type_name_index_map.contains(type_name));
  89. variable_type_name_index_map.insert(type_name, var_type_index);
  90. const std::type_index type = typeid(SmallObjType);
  91. assert(!variable_type_index_index_map.contains(type));
  92. variable_type_index_index_map[type] = var_type_index;
  93. auto &type_info = variable_type_info_pool[var_type_index];
  94. type_info.type = type;
  95. type_info.type_index = var_type_index;
  96. type_info.type_name = type_name;
  97. type_info.creator_func = create_variable_pointer<SmallObjType>;
  98. }
  99. void register_basic_variable_types() {
  100. #define REGISTER_TYPE(var_type) \
  101. register_variable_type<var_type>(#var_type)
  102. REGISTER_TYPE(bool_obj);
  103. REGISTER_TYPE(u64int_obj);
  104. REGISTER_TYPE(double_obj);
  105. REGISTER_TYPE(scalarxyz_obj);
  106. REGISTER_TYPE(transform_obj);
  107. REGISTER_TYPE(array6_obj);
  108. REGISTER_TYPE(array7_obj);
  109. #undef REGISTER_TYPE
  110. // check registered variable type index
  111. assert(variable_type_index_index_map.at(typeid(bool_obj)) == bool_var_type_index);
  112. assert(variable_type_index_index_map.at(typeid(u64int_obj)) == u64int_var_type_index);
  113. assert(variable_type_index_index_map.at(typeid(double_obj)) == double_var_type_index);
  114. assert(variable_type_index_index_map.at(typeid(scalarxyz_obj)) == scalarxyz_var_type_index);
  115. assert(variable_type_index_index_map.at(typeid(transform_obj)) == transform_var_type_index);
  116. assert(variable_type_index_index_map.at(typeid(array6_obj)) == array6_var_type_index);
  117. assert(variable_type_index_index_map.at(typeid(array7_obj)) == array7_var_type_index);
  118. }
  119. void register_variable(std::string_view name,
  120. std::string_view type_name,
  121. const nlohmann::json &config = {}) {
  122. auto index = variable_pool.new_elem(name);
  123. auto &info = variable_pool[index];
  124. assert(variable_type_name_index_map.contains(type_name));
  125. auto var_type_index = variable_type_name_index_map.query(type_name);
  126. const auto &type_info = variable_type_info_pool[var_type_index];
  127. info.placeholder = type_info.creator_func(config);
  128. info.update_signal = new coro_signal2{};
  129. info.type_info = &type_info;
  130. info.last_update_ts = 0;
  131. }
  132. callback_token_type register_callback(callback_info::callback_store_type &&callback,
  133. function_callback_type &&exit_func) {
  134. auto &item = callback_pool.emplace_front();
  135. item.func_store = std::move(callback);
  136. item.exit_func = std::move(exit_func);
  137. item.pool_iter = std::bit_cast<void *>(callback_pool.begin());
  138. return std::bit_cast<callback_token_type>(item.pool_iter);
  139. }
  140. void unregister_callback(callback_iter_type callback) {
  141. while (!callback->attach_list.empty()) {
  142. detach_callback(callback->attach_list.begin());
  143. }
  144. if (callback->status == callback_info::RUNNING) {
  145. callback->exit_flag = true;
  146. return;
  147. }
  148. if (callback->status == callback_info::PENDING) {
  149. callback_queue.erase(callback->queue_iter);
  150. }
  151. callback->exit_func();
  152. callback_pool.erase(callback);
  153. }
  154. attach_token_type attach_callback(variable_index_type var_index, callback_iter_type callback) {
  155. auto &var_info = variable_pool[var_index];
  156. var_info.callback_list.push_front(&(*callback));
  157. auto &attach_item = callback->attach_list.emplace_front();
  158. attach_item.var_info = &var_info;
  159. attach_item.callback = &(*callback);
  160. attach_item.callback_list_iter = std::bit_cast<void *>(var_info.callback_list.begin());
  161. attach_item.attach_iter = std::bit_cast<void *>(callback->attach_list.begin());
  162. return std::bit_cast<attach_token_type>(callback->attach_list.begin());
  163. }
  164. static void detach_callback(attach_iter_type iter) {
  165. auto callback_iter = std::bit_cast<callback_list_iter_type>(iter->callback_list_iter);
  166. auto attach_iter = std::bit_cast<attach_iter_type>(iter->attach_iter);
  167. iter->var_info->callback_list.erase(callback_iter);
  168. iter->callback->attach_list.erase(attach_iter);
  169. }
  170. void handle_callback_func() {
  171. while (!callback_queue.empty()) {
  172. auto callback = callback_queue.back();
  173. callback_queue.pop_back();
  174. callback->status = callback_info::RUNNING;
  175. callback->queue_iter = {};
  176. if (callback->func_store.index() == 0) {
  177. std::get<0>(callback->func_store)();
  178. callback->status = callback_info::IDLE;
  179. } else {
  180. assert(callback->func_store.index() == 1);
  181. co_spawn(*global_context, std::get<1>(callback->func_store)(), [=, this](std::exception_ptr eptr) {
  182. assert(eptr == nullptr);
  183. callback->status = callback_info::IDLE;
  184. if (callback->exit_flag) {
  185. callback->exit_func();
  186. auto real_iter = std::bit_cast<callback_iter_type>(callback->pool_iter);
  187. callback_pool.erase(real_iter);
  188. }
  189. });
  190. }
  191. }
  192. is_callback_handler_active = false;
  193. }
  194. void trigger_callback(variable_info_impl *var_info) {
  195. if (var_info->callback_list.empty()) return;
  196. bool has_callback = false;
  197. for (auto &callback: var_info->callback_list) {
  198. if (callback->status != callback_info::IDLE) continue;
  199. callback_queue.push_front(callback);
  200. callback->status = callback_info::PENDING;
  201. callback->queue_iter = callback_queue.begin();
  202. has_callback = true;
  203. }
  204. if (!has_callback || is_callback_handler_active) return;
  205. post(*global_context, [this]() { handle_callback_func(); });
  206. is_callback_handler_active = true;
  207. }
  208. void load_config(const nlohmann::json &config) {
  209. #ifdef SOPHIAR_TEST
  210. if (config.empty()) return;
  211. #endif // SOPHIAR_TEST
  212. ENSURE_ARRAY("variable_list");
  213. for (auto &var_config: config["variable_list"]) {
  214. auto var_name = LOAD_STRING_ITEM2(var_config, "name");
  215. auto type_name = LOAD_STRING_ITEM2(var_config, "type");
  216. register_variable(var_name, type_name, var_config);
  217. }
  218. }
  219. impl() {
  220. register_basic_variable_types();
  221. }
  222. };
  223. sophiar_pool::sophiar_pool()
  224. : pimpl(std::make_unique<impl>()) {}
  225. signal_watcher sophiar_pool::require_variable_watcher(variable_index_type var_index) {
  226. assert(pimpl->variable_pool.contains(var_index));
  227. return pimpl->variable_pool[var_index].update_signal->new_watcher();
  228. }
  229. std::string sophiar_pool::query_variable_name(variable_index_type var_index) {
  230. assert(pimpl->variable_pool.contains(var_index));
  231. return pimpl->variable_pool.to_name_by_index(var_index);
  232. }
  233. timestamp_type sophiar_pool::query_variable_update_ts(variable_index_type var_index) {
  234. assert(pimpl->variable_pool.contains(var_index));
  235. return pimpl->variable_pool[var_index].last_update_ts;
  236. }
  237. sophiar_pool::variable_info sophiar_pool::query_variable_information(std::string_view var_name) {
  238. assert(pimpl->variable_pool.contains(var_name));
  239. const auto &info = pimpl->variable_pool[var_name];
  240. variable_info ret{
  241. .type_index = info.type_info->type_index,
  242. .index = pimpl->variable_pool.to_index_by_name(var_name),
  243. .placeholder = info.placeholder
  244. };
  245. return ret;
  246. }
  247. variable_index_type sophiar_pool::require_variable_impl(std::string_view var_name,
  248. std::type_index var_type) {
  249. assert(pimpl->variable_pool.contains(var_name));
  250. const auto &info = pimpl->variable_pool[var_name];
  251. assert(info.type_info->type == var_type);
  252. return pimpl->variable_pool.to_index_by_name(var_name);
  253. }
  254. void *sophiar_pool::require_variable_placeholder_impl(variable_index_type var_index,
  255. std::type_index var_type,
  256. timestamp_type *ts_out) {
  257. assert(pimpl->variable_pool.contains(var_index));
  258. const auto &info = pimpl->variable_pool[var_index];
  259. assert(info.type_info->type == var_type); // ensure type consistency
  260. if (ts_out != nullptr) {
  261. *ts_out = info.last_update_ts;
  262. }
  263. return info.placeholder;
  264. }
  265. void sophiar_pool::update_variable_timestamp_impl(variable_index_type var_index,
  266. timestamp_type ts) {
  267. assert(pimpl->variable_pool.contains(var_index));
  268. auto &info = pimpl->variable_pool[var_index];
  269. assert(ts > info.last_update_ts);
  270. info.last_update_ts = ts;
  271. info.update_signal->try_notify_all(ts);
  272. pimpl->trigger_callback(&info);
  273. }
  274. void sophiar_pool::load_config(const nlohmann::json &config) {
  275. pimpl->load_config(config);
  276. }
  277. sophiar_pool::callback_token_type
  278. sophiar_pool::register_callback(function_callback_type &&callback,
  279. function_callback_type &&exit_func) {
  280. return pimpl->register_callback(std::move(callback), std::move(exit_func));
  281. }
  282. sophiar_pool::callback_token_type
  283. sophiar_pool::register_coro_callback(coroutine_callback_type &&callback,
  284. function_callback_type &&exit_func) {
  285. return pimpl->register_callback(std::move(callback), std::move(exit_func));
  286. }
  287. void sophiar_pool::unregister_callback(callback_token_type token) {
  288. if (token == nullptr) return;
  289. return pimpl->unregister_callback(std::bit_cast<impl::callback_iter_type>(token));
  290. }
  291. sophiar_pool::attach_token_type sophiar_pool::attach_callback(variable_index_type var_index,
  292. callback_token_type token) {
  293. assert(token != nullptr);
  294. return pimpl->attach_callback(var_index, std::bit_cast<impl::callback_iter_type>(token));
  295. }
  296. void sophiar_pool::detach_callback(attach_token_type token) {
  297. if (token == nullptr) return;
  298. return pimpl->detach_callback(std::bit_cast<impl::attach_iter_type>(token));
  299. }
  300. sophiar_pool::~sophiar_pool() = default;
  301. #ifdef SOPHIAR_TEST
  302. void sophiar_pool::register_variable(std::string_view var_name,
  303. std::string_view type_name) {
  304. pimpl->register_variable(var_name, type_name);
  305. }
  306. #endif
  307. }