#include "tristate_obj.h" #include "core/global_defs.h" #include "utility/coro_signal2.hpp" #include "utility/debug_utility.hpp" #include "utility/name_translator.hpp" #include #include #include "utility/assert_utility.h" namespace sophiar { name_translator *state_name_translator; using boost::asio::awaitable; using namespace boost::asio::experimental::awaitable_operators; struct tristate_obj::impl { tristate_obj *q_this = nullptr; state_type state = state_type::INITIAL; coro_signal2 init_cancel_signal; coro_signal2 start_cancel_signal; coro_signal2 init_finished_signal; coro_signal2 start_finished_signal; coro_signal2 stop_finished_signal; coro_signal2 reset_finished_signal; static void initialize_translator() { RUN_ONCE state_name_translator = new name_translator; state_name_translator->register_item("Initial", (uint8_t) tristate_obj::state_type::INITIAL); state_name_translator->register_item("Initializing", (uint8_t) tristate_obj::state_type::INITIALIZING); state_name_translator->register_item("Resetting", (uint8_t) tristate_obj::state_type::RESETTING); state_name_translator->register_item("Pending", (uint8_t) tristate_obj::state_type::PENDING); state_name_translator->register_item("Starting", (uint8_t) tristate_obj::state_type::STARTING); state_name_translator->register_item("Stopping", (uint8_t) tristate_obj::state_type::STOPPING); state_name_translator->register_item("Running", (uint8_t) tristate_obj::state_type::RUNNING); state_name_translator->register_item("Unknown", (uint8_t) tristate_obj::state_type::UNKNOWN); } void log_state_change(state_type old_state, state_type new_state) const { SPDLOG_INFO("Object [name = {}], State: {} => {}.", QUERY_OBJECT_NAME(q_this), state_name_translator->translate((uint8_t) old_state), state_name_translator->translate((uint8_t) new_state)); } awaitable init(const nlohmann::json &config) { if (state == state_type::INITIALIZING) { auto init_finished_watcher = init_finished_signal.new_watcher(); auto reset_finished_watcher = reset_finished_signal.new_watcher(); auto result = co_await (init_finished_watcher.coro_wait() || reset_finished_watcher.coro_wait()); co_return result.index() == 0; } if (state == state_type::RESETTING) { auto reset_finished_watcher = reset_finished_signal.new_watcher(); co_await reset_finished_watcher.coro_wait(); co_return false; } if (state != state_type::INITIAL) co_return true; // >= PENDING state = state_type::INITIALIZING; log_state_change(state_type::INITIAL, state_type::INITIALIZING); SPDLOG_TRACE("Initializing object [name = {}] with config {}.", QUERY_OBJECT_NAME(q_this), config.dump()); auto init_cancel_watcher = init_cancel_signal.new_watcher(); auto result = co_await (q_this->on_init(config) || init_cancel_watcher.coro_wait()); if (result.index() == 0 && std::get<0>(result) == true) { // succeeded state = state_type::PENDING; log_state_change(state_type::INITIALIZING, state_type::PENDING); SPDLOG_DEBUG("Initialize object [name = {}] succeeded.", QUERY_OBJECT_NAME(q_this)); init_finished_signal.try_notify_all(); co_return true; } else { // failed co_await q_this->on_reset(); state = state_type::INITIAL; log_state_change(state_type::INITIALIZING, state_type::INITIAL); SPDLOG_WARN("Initialize object [name = {}] failed.", QUERY_OBJECT_NAME(q_this)); reset_finished_signal.try_notify_all(); co_return false; } } awaitable start(const nlohmann::json &config) { if (state == state_type::STARTING) { auto start_finished_watcher = start_finished_signal.new_watcher(); auto stop_finished_watcher = stop_finished_signal.new_watcher(); auto result = co_await (start_finished_watcher.coro_wait() || stop_finished_watcher.coro_wait()); co_return result.index() == 0; } if (state == state_type::STOPPING) { auto stop_finished_watcher = stop_finished_signal.new_watcher(); co_await stop_finished_watcher.coro_wait(); co_return false; } if (state == state_type::RUNNING) co_return true; if (state != state_type::PENDING) co_return false; // INITIAL, INITIALIZING, RESETTING state = state_type::STARTING; log_state_change(state_type::PENDING, state_type::STARTING); SPDLOG_TRACE("Starting object [name = {}] with config {}.", QUERY_OBJECT_NAME(q_this), config.dump()); auto start_cancel_watcher = start_cancel_signal.new_watcher(); auto result = co_await (q_this->on_start(config) || start_cancel_watcher.coro_wait()); if (result.index() == 0 && std::get<0>(result) == true) { // succeeded state = state_type::RUNNING; log_state_change(state_type::STARTING, state_type::RUNNING); SPDLOG_DEBUG("Start object [name = {}] succeeded.", QUERY_OBJECT_NAME(q_this)); start_finished_signal.try_notify_all(); co_return true; } else { // failed co_await q_this->on_stop(); state = state_type::PENDING; log_state_change(state_type::STARTING, state_type::PENDING); SPDLOG_WARN("Start object [name = {}] failed.", QUERY_OBJECT_NAME(q_this)); stop_finished_signal.try_notify_all(); co_return false; } } awaitable stop() { if (state == state_type::RUNNING) { state = state_type::STOPPING; log_state_change(state_type::RUNNING, state_type::STOPPING); co_await q_this->on_stop(); state = state_type::PENDING; log_state_change(state_type::STOPPING, state_type::PENDING); SPDLOG_DEBUG("Stopped object [name = {}].", QUERY_OBJECT_NAME(q_this)); stop_finished_signal.try_notify_all(); global_sophiar_manager->notify_object_stop(q_this); } else if (state == state_type::STOPPING) { auto stop_finished_watcher = stop_finished_signal.new_watcher(); co_await stop_finished_watcher.coro_wait(); } else if (state == state_type::STARTING) { start_cancel_signal.try_notify_all(); auto stop_finished_watcher = stop_finished_signal.new_watcher(); co_await stop_finished_watcher.coro_wait(); } co_return; } awaitable reset() { // force reset if (state == state_type::RUNNING || state == state_type::STOPPING || state == state_type::STARTING) { co_await stop(); assert(state == state_type::PENDING); } if (state == state_type::PENDING) { state = state_type::RESETTING; log_state_change(state_type::PENDING, state_type::RESETTING); co_await q_this->on_reset(); state = state_type::INITIAL; log_state_change(state_type::RESETTING, state_type::INITIAL); SPDLOG_DEBUG("Reset object [name = {}].", QUERY_OBJECT_NAME(q_this)); reset_finished_signal.try_notify_all(); } else if (state == state_type::RESETTING) { auto reset_finished_watcher = reset_finished_signal.new_watcher(); co_await reset_finished_watcher.coro_wait(); } else if (state == state_type::INITIALIZING) { init_cancel_signal.try_notify_all(); auto reset_finished_watcher = reset_finished_signal.new_watcher(); co_await reset_finished_watcher.coro_wait(); } co_return; } impl() { initialize_translator(); } }; tristate_obj::state_type tristate_obj::get_state() const { return pimpl->state; } awaitable tristate_obj::init(const nlohmann::json &config) noexcept { return pimpl->init(config); } awaitable tristate_obj::start(const nlohmann::json &config) noexcept { return pimpl->start(config); } awaitable tristate_obj::stop() noexcept { return pimpl->stop(); } awaitable tristate_obj::reset() noexcept { return pimpl->reset(); } tristate_obj::tristate_obj() : pimpl(std::make_unique()) { pimpl->q_this = this; } tristate_obj::~tristate_obj() = default; }