tristate_obj.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #include "tristate_obj.h"
  2. #include "core/global_defs.h"
  3. #include "utility/coro_signal2.hpp"
  4. #include "utility/debug_utility.hpp"
  5. #include "utility/name_translator.hpp"
  6. #include <spdlog/spdlog.h>
  7. #include <boost/asio/experimental/awaitable_operators.hpp>
  8. #include "utility/assert_utility.h"
  9. namespace sophiar {
  10. name_translator<uint8_t> *state_name_translator;
  11. using boost::asio::awaitable;
  12. using namespace boost::asio::experimental::awaitable_operators;
  13. struct tristate_obj::impl {
  14. tristate_obj *q_this = nullptr;
  15. state_type state = state_type::INITIAL;
  16. coro_signal2 init_cancel_signal;
  17. coro_signal2 start_cancel_signal;
  18. coro_signal2 init_finished_signal;
  19. coro_signal2 start_finished_signal;
  20. coro_signal2 stop_finished_signal;
  21. coro_signal2 reset_finished_signal;
  22. static void initialize_translator() {
  23. RUN_ONCE
  24. state_name_translator = new name_translator<uint8_t>;
  25. state_name_translator->register_item("Initial", (uint8_t) tristate_obj::state_type::INITIAL);
  26. state_name_translator->register_item("Initializing", (uint8_t) tristate_obj::state_type::INITIALIZING);
  27. state_name_translator->register_item("Resetting", (uint8_t) tristate_obj::state_type::RESETTING);
  28. state_name_translator->register_item("Pending", (uint8_t) tristate_obj::state_type::PENDING);
  29. state_name_translator->register_item("Starting", (uint8_t) tristate_obj::state_type::STARTING);
  30. state_name_translator->register_item("Stopping", (uint8_t) tristate_obj::state_type::STOPPING);
  31. state_name_translator->register_item("Running", (uint8_t) tristate_obj::state_type::RUNNING);
  32. state_name_translator->register_item("Unknown", (uint8_t) tristate_obj::state_type::UNKNOWN);
  33. }
  34. void log_state_change(state_type old_state, state_type new_state) const {
  35. SPDLOG_INFO("Object [name = {}], State: {} => {}.",
  36. QUERY_OBJECT_NAME(q_this),
  37. state_name_translator->translate((uint8_t) old_state),
  38. state_name_translator->translate((uint8_t) new_state));
  39. }
  40. awaitable<bool> init(const nlohmann::json &config) {
  41. if (state == state_type::INITIALIZING) {
  42. auto init_finished_watcher = init_finished_signal.new_watcher();
  43. auto reset_finished_watcher = reset_finished_signal.new_watcher();
  44. auto result = co_await (init_finished_watcher.coro_wait() ||
  45. reset_finished_watcher.coro_wait());
  46. co_return result.index() == 0;
  47. }
  48. if (state == state_type::RESETTING) {
  49. auto reset_finished_watcher = reset_finished_signal.new_watcher();
  50. co_await reset_finished_watcher.coro_wait();
  51. co_return false;
  52. }
  53. if (state != state_type::INITIAL) co_return true; // >= PENDING
  54. state = state_type::INITIALIZING;
  55. log_state_change(state_type::INITIAL, state_type::INITIALIZING);
  56. SPDLOG_TRACE("Initializing object [name = {}] with config {}.",
  57. QUERY_OBJECT_NAME(q_this), config.dump());
  58. auto init_cancel_watcher = init_cancel_signal.new_watcher();
  59. auto result = co_await (q_this->on_init(config) || init_cancel_watcher.coro_wait());
  60. if (result.index() == 0 && std::get<0>(result) == true) { // succeeded
  61. state = state_type::PENDING;
  62. log_state_change(state_type::INITIALIZING, state_type::PENDING);
  63. SPDLOG_DEBUG("Initialize object [name = {}] succeeded.", QUERY_OBJECT_NAME(q_this));
  64. init_finished_signal.try_notify_all();
  65. co_return true;
  66. } else { // failed
  67. co_await q_this->on_reset();
  68. state = state_type::INITIAL;
  69. log_state_change(state_type::INITIALIZING, state_type::INITIAL);
  70. SPDLOG_WARN("Initialize object [name = {}] failed.", QUERY_OBJECT_NAME(q_this));
  71. reset_finished_signal.try_notify_all();
  72. co_return false;
  73. }
  74. }
  75. awaitable<bool> start(const nlohmann::json &config) {
  76. if (state == state_type::STARTING) {
  77. auto start_finished_watcher = start_finished_signal.new_watcher();
  78. auto stop_finished_watcher = stop_finished_signal.new_watcher();
  79. auto result = co_await (start_finished_watcher.coro_wait() ||
  80. stop_finished_watcher.coro_wait());
  81. co_return result.index() == 0;
  82. }
  83. if (state == state_type::STOPPING) {
  84. auto stop_finished_watcher = stop_finished_signal.new_watcher();
  85. co_await stop_finished_watcher.coro_wait();
  86. co_return false;
  87. }
  88. if (state == state_type::RUNNING) co_return true;
  89. if (state != state_type::PENDING) co_return false; // INITIAL, INITIALIZING, RESETTING
  90. state = state_type::STARTING;
  91. log_state_change(state_type::PENDING, state_type::STARTING);
  92. SPDLOG_TRACE("Starting object [name = {}] with config {}.",
  93. QUERY_OBJECT_NAME(q_this), config.dump());
  94. auto start_cancel_watcher = start_cancel_signal.new_watcher();
  95. auto result = co_await (q_this->on_start(config) || start_cancel_watcher.coro_wait());
  96. if (result.index() == 0 && std::get<0>(result) == true) { // succeeded
  97. state = state_type::RUNNING;
  98. log_state_change(state_type::STARTING, state_type::RUNNING);
  99. SPDLOG_DEBUG("Start object [name = {}] succeeded.", QUERY_OBJECT_NAME(q_this));
  100. start_finished_signal.try_notify_all();
  101. co_return true;
  102. } else { // failed
  103. co_await q_this->on_stop();
  104. state = state_type::PENDING;
  105. log_state_change(state_type::STARTING, state_type::PENDING);
  106. SPDLOG_WARN("Start object [name = {}] failed.", QUERY_OBJECT_NAME(q_this));
  107. stop_finished_signal.try_notify_all();
  108. co_return false;
  109. }
  110. }
  111. awaitable<void> stop() {
  112. if (state == state_type::RUNNING) {
  113. state = state_type::STOPPING;
  114. log_state_change(state_type::RUNNING, state_type::STOPPING);
  115. co_await q_this->on_stop();
  116. state = state_type::PENDING;
  117. log_state_change(state_type::STOPPING, state_type::PENDING);
  118. SPDLOG_DEBUG("Stopped object [name = {}].", QUERY_OBJECT_NAME(q_this));
  119. stop_finished_signal.try_notify_all();
  120. global_sophiar_manager->notify_object_stop(q_this);
  121. } else if (state == state_type::STOPPING) {
  122. auto stop_finished_watcher = stop_finished_signal.new_watcher();
  123. co_await stop_finished_watcher.coro_wait();
  124. } else if (state == state_type::STARTING) {
  125. start_cancel_signal.try_notify_all();
  126. auto stop_finished_watcher = stop_finished_signal.new_watcher();
  127. co_await stop_finished_watcher.coro_wait();
  128. }
  129. co_return;
  130. }
  131. awaitable<void> reset() {
  132. // force reset
  133. if (state == state_type::RUNNING ||
  134. state == state_type::STOPPING ||
  135. state == state_type::STARTING) {
  136. co_await stop();
  137. assert(state == state_type::PENDING);
  138. }
  139. if (state == state_type::PENDING) {
  140. state = state_type::RESETTING;
  141. log_state_change(state_type::PENDING, state_type::RESETTING);
  142. co_await q_this->on_reset();
  143. state = state_type::INITIAL;
  144. log_state_change(state_type::RESETTING, state_type::INITIAL);
  145. SPDLOG_DEBUG("Reset object [name = {}].", QUERY_OBJECT_NAME(q_this));
  146. reset_finished_signal.try_notify_all();
  147. } else if (state == state_type::RESETTING) {
  148. auto reset_finished_watcher = reset_finished_signal.new_watcher();
  149. co_await reset_finished_watcher.coro_wait();
  150. } else if (state == state_type::INITIALIZING) {
  151. init_cancel_signal.try_notify_all();
  152. auto reset_finished_watcher = reset_finished_signal.new_watcher();
  153. co_await reset_finished_watcher.coro_wait();
  154. }
  155. co_return;
  156. }
  157. impl() {
  158. initialize_translator();
  159. }
  160. };
  161. tristate_obj::state_type tristate_obj::get_state() const {
  162. return pimpl->state;
  163. }
  164. awaitable<bool> tristate_obj::init(const nlohmann::json &config) noexcept {
  165. return pimpl->init(config);
  166. }
  167. awaitable<bool> tristate_obj::start(const nlohmann::json &config) noexcept {
  168. return pimpl->start(config);
  169. }
  170. awaitable<void> tristate_obj::stop() noexcept {
  171. return pimpl->stop();
  172. }
  173. awaitable<void> tristate_obj::reset() noexcept {
  174. return pimpl->reset();
  175. }
  176. tristate_obj::tristate_obj()
  177. : pimpl(std::make_unique<impl>()) {
  178. pimpl->q_this = this;
  179. }
  180. tristate_obj::~tristate_obj() = default;
  181. }