variable_utility.hpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #ifndef SOPHIAR2_VARIABLE_UTILITY_HPP
  2. #define SOPHIAR2_VARIABLE_UTILITY_HPP
  3. #include "core/basic_obj_types.hpp"
  4. #include "core/timestamp_helper.hpp"
  5. #include "utility/config_utility.hpp"
  6. #include "utility/simple_tristate_obj.hpp"
  7. #include "utility/variable_helper.hpp"
  8. #include "utility/versatile_buffer2.hpp"
  9. #include <boost/asio/awaitable.hpp>
  10. #include <spdlog/spdlog.h>
  11. #include <chrono>
  12. #include <coroutine>
  13. #include <cstring>
  14. #include <fstream>
  15. namespace sophiar {
  16. using boost::asio::awaitable;
  17. template<typename SmallObjType>
  18. inline std::string variable_to_string(const SmallObjType &var) {
  19. auto writer = string_writer{};
  20. var.write_to(writer);
  21. return writer.get_string_and_reset();
  22. }
  23. template<typename SmallObjType>
  24. inline coro_worker::pointer variable_debug_watcher_func(const nlohmann::json &config) {
  25. std::string var_name;
  26. auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name);
  27. auto worker = make_infinite_coro_worker(
  28. [=,
  29. buffer = string_writer(),
  30. var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable
  31. -> awaitable<bool> {
  32. co_await var_helper.coro_wait_update();
  33. if (var_helper.empty()) {
  34. SPDLOG_DEBUG("{} is empty.", var_name);
  35. } else {
  36. var_helper->write_to(buffer);
  37. SPDLOG_DEBUG("{} = {}", var_name, buffer.get_string_and_reset());
  38. }
  39. co_return true;
  40. });
  41. return std::move(worker);
  42. }
  43. template<typename SmallObjType>
  44. inline coro_worker::pointer variable_validity_watcher_func(const nlohmann::json &config) {
  45. enum class status_type {
  46. UNKNOWN,
  47. VALID,
  48. INVALID
  49. };
  50. std::string var_name;
  51. auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name);
  52. auto worker = make_infinite_coro_worker(
  53. [=,
  54. status = status_type{},
  55. var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable
  56. -> awaitable<bool> {
  57. if (status != status_type::UNKNOWN) {
  58. co_await var_helper.coro_wait_update();
  59. } else {
  60. var_helper.manual_sync();
  61. }
  62. if (var_helper.empty()) {
  63. if (status == status_type::INVALID)[[likely]] co_return true;
  64. SPDLOG_DEBUG("{} becomes invalid.", var_name);
  65. status = status_type::INVALID;
  66. } else {
  67. if (status == status_type::VALID)[[likely]] co_return true;
  68. SPDLOG_DEBUG("{} becomes valid.", var_name);
  69. status = status_type::VALID;
  70. }
  71. co_return true;
  72. });
  73. return std::move(worker);
  74. }
  75. template<typename SmallObjType>
  76. inline coro_worker::pointer variable_recorder_func(const nlohmann::json &config) {
  77. auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name");
  78. auto save_file_path = LOAD_STRING_ITEM("save_file");
  79. auto ofs = std::ofstream(save_file_path, std::ofstream::out);
  80. assert(ofs.is_open());
  81. // create worker
  82. auto worker = make_infinite_coro_worker(
  83. [=,
  84. buffer = string_writer(","), ofs = std::move(ofs),
  85. var_helper = VARIABLE_AUTO_DELEGATE(SmallObjType, var_index)]() mutable
  86. -> awaitable<bool> {
  87. co_await var_helper.coro_wait_update();
  88. auto ts = var_helper.get_last_update_ts();
  89. buffer << (static_cast<double>(ts) / 1000.0); // us -> ms
  90. if (!var_helper.empty()) {
  91. var_helper->write_to(buffer);
  92. }
  93. ofs << buffer.get_string_and_reset() << std::endl;
  94. co_return true;
  95. });
  96. return std::move(worker);
  97. }
  98. template<typename SmallObjType>
  99. inline coro_worker::pointer variable_replayer_func(const nlohmann::json &config) {
  100. auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name");
  101. auto record_file_path = LOAD_STRING_ITEM("record_file");
  102. auto ifs = std::ifstream(record_file_path, std::ifstream::in);
  103. assert(ifs.is_open());
  104. // create worker
  105. auto worker = make_infinite_coro_worker(
  106. [=,
  107. timer = boost::asio::high_resolution_timer(*global_context),
  108. buffer = string_reader(), ifs = std::move(ifs)]() mutable
  109. -> awaitable<bool> {
  110. std::string str_buf;
  111. if (!std::getline(ifs, str_buf)) co_return false; // EOF
  112. buffer.set_string(std::move(str_buf));
  113. auto next_ts_ms = buffer.read_value<double>();
  114. auto next_ts = static_cast<timestamp_type>(next_ts_ms * 1000); // ms -> us
  115. auto cur_ts = current_timestamp();
  116. if (next_ts > cur_ts) {
  117. timer.expires_at(get_time_from_timestamp(next_ts));
  118. co_await timer.async_wait(boost::asio::use_awaitable);
  119. } else if (next_ts != cur_ts) { // if time is missed, simply ignore
  120. co_return true;
  121. }
  122. if (buffer.empty()) { // empty obj
  123. UPDATE_VARIABLE(SmallObjType, var_index, nullptr);
  124. } else {
  125. auto new_var = SmallObjType::new_instance();
  126. new_var->fill_from(buffer);
  127. assert(buffer.empty());
  128. UPDATE_VARIABLE(SmallObjType, var_index, std::move(new_var));
  129. }
  130. co_return true;
  131. });
  132. return std::move(worker);
  133. }
  134. template<typename SmallObjType>
  135. using variable_debug_watcher = simple_tristate_obj_wrapper<variable_debug_watcher_func<SmallObjType>>;
  136. template<typename SmallObjType>
  137. using variable_validity_watcher = simple_tristate_obj_wrapper<variable_validity_watcher_func<SmallObjType>>;
  138. template<typename SmallObjType>
  139. using variable_recorder = simple_tristate_obj_wrapper<variable_recorder_func<SmallObjType>>;
  140. template<typename SmallObjType>
  141. using variable_replayer = simple_tristate_obj_wrapper<variable_replayer_func<SmallObjType>>;
  142. }
  143. #endif //SOPHIAR2_VARIABLE_UTILITY_HPP