Browse Source

实现了基本的 mode degrade 功能

jcsyshc 3 years ago
parent
commit
c613fc8b4e

+ 59 - 8
src/core/sophiar_manager.cpp

@@ -5,6 +5,9 @@
 #include "utility/debug_utility.hpp"
 #include "utility/named_vector.hpp"
 
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/detached.hpp>
 #include <boost/iterator/counting_iterator.hpp>
 
 #include <fmt/format.h>
@@ -18,14 +21,23 @@
 #include <unordered_map>
 #include <unordered_set>
 #include <vector>
-#include <boost/asio/awaitable.hpp>
 
 namespace sophiar {
 
     using boost::asio::awaitable;
+    using boost::asio::co_spawn;
+    using boost::asio::detached;
 
     struct sophiar_manager::impl {
 
+        enum class manager_states {
+            INITIAL,
+            NORMAL,
+            SWITCHING_MODE,
+        };
+
+        manager_states current_states = manager_states::INITIAL;
+
         using config_index_type = uint16_t;
         config_index_type next_config_index = 0;
 
@@ -287,23 +299,56 @@ namespace sophiar {
                 assert(tristate_ptr->get_state() == state_type::RUNNING);
             }
 
+            current_mode = mode_index;
+            SPDLOG_INFO("Current mode switched to {}.",
+                        mode_pool.to_name_by_index(mode_index));
             co_return true;
         }
 
+        awaitable<void> try_degrade_mode(mode_index_type target_mode) {
+            for (auto degrade_mode: mode_pool[target_mode].degrade_list) {
+                auto ok = co_await try_switch_mode(degrade_mode);
+                if (ok) co_return;
+                SPDLOG_ERROR("Switch to degrade mode {} failed, degrading...",
+                             mode_pool.to_name_by_index(degrade_mode));
+            }
+            assert(false); // all_down will always success
+            co_return;
+        }
+
         awaitable<bool> switch_mode_impl(mode_index_type target_mode) { // 尝试切换模式,如果失败就降级
             bool ok = co_await try_switch_mode(target_mode);
             if (ok) co_return true;
             SPDLOG_ERROR("Switch to target mode {} failed, degrading...",
                          mode_pool.to_name_by_index(target_mode));
-            for (auto degrade_mode: mode_pool[target_mode].degrade_list) {
-                ok = co_await try_switch_mode(degrade_mode);
-                if (ok) break;
-                SPDLOG_ERROR("Switch to degrade mode {} failed, degrading...",
-                             mode_pool.to_name_by_index(degrade_mode));
-            }
+            co_await try_degrade_mode(target_mode);
             co_return false;
         }
 
+        awaitable<bool> switch_mode(mode_index_type target_mode) {
+            if (current_states != manager_states::NORMAL) co_return false;
+            current_states = manager_states::SWITCHING_MODE;
+            auto ret = co_await switch_mode_impl(target_mode);
+            current_states = manager_states::NORMAL;
+            co_return ret;
+        }
+
+        void on_object_stopped(sophiar_obj *obj_ptr) {
+            if (current_states != manager_states::NORMAL) return; // maybe triggered by mode switching
+            assert(obj_ptr_index_map.contains(obj_ptr));
+            auto obj_index = obj_ptr_index_map.at(obj_ptr);
+            auto &mode_info = mode_pool[current_mode];
+            assert(mode_info.obj_set.contains(obj_index));
+            SPDLOG_ERROR("Abnormal object stop detected, degrading...");
+            co_spawn(global_context, [=]() -> awaitable<void> {
+                if (current_states != manager_states::NORMAL) co_return;
+                current_states = manager_states::SWITCHING_MODE;
+                co_await try_degrade_mode(current_mode);
+                current_states = manager_states::NORMAL;
+                co_return;
+            }, detached);
+        }
+
         obj_index_type create_object(const std::string &type_name,
                                      const std::string &obj_name) {
 
@@ -547,7 +592,9 @@ namespace sophiar {
     }
 
     void sophiar_manager::build_from_config(const nlohmann::json &config) {
+        assert(pimpl->current_states == impl::manager_states::INITIAL);
         pimpl->build_graph(config);
+        pimpl->current_states = impl::manager_states::NORMAL;
     }
 
     boost::asio::awaitable<bool> sophiar_manager::switch_mode(const std::string &mode_name) {
@@ -556,7 +603,7 @@ namespace sophiar {
             co_return false;
         }
         auto mode_index = pimpl->mode_pool.to_index_by_name(mode_name);
-        co_return co_await pimpl->switch_mode_impl(mode_index);
+        co_return co_await pimpl->switch_mode(mode_index);
     }
 
     std::string sophiar_manager::get_object_name(sophiar_obj *obj) const {
@@ -564,6 +611,10 @@ namespace sophiar {
         return pimpl->obj_pool.to_name_by_index(obj_index);
     }
 
+    void sophiar_manager::notify_object_stop(sophiar_obj *obj) {
+        pimpl->on_object_stopped(obj);
+    }
+
     sophiar_manager::~sophiar_manager() = default;
 
 }

+ 2 - 0
src/core/sophiar_manager.h

@@ -32,6 +32,8 @@ namespace sophiar {
 
         std::string get_object_name(sophiar_obj *obj) const;
 
+        void notify_object_stop(sophiar_obj *obj);
+
         template<typename Derived>
         void register_object_type(const std::string &type_name) {
             static_assert(std::is_convertible_v<decltype(Derived::new_instance()), sophiar_obj *>);

+ 1 - 0
src/core/tristate_obj.cpp

@@ -130,6 +130,7 @@ namespace sophiar {
                 log_state_change(state_type::STOPPING, state_type::PENDING);
                 SPDLOG_DEBUG("Stopped object [name = {}].", get_manager().get_object_name(q_this));
                 stop_finished_signal.try_notify_all();
+                get_manager().notify_object_stop(q_this);
             } else if (state == state_type::STOPPING) {
                 co_await stop_finished_signal.coro_wait();
             } else if (state == state_type::STARTING) {

+ 6 - 2
tests/core/sophiar_manager.cpp

@@ -87,6 +87,10 @@ struct proxy_node_type : public tristate_obj {
 
     boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
         BOOST_TEST(config.empty());
+        co_spawn(get_context(), [=]() -> awaitable<void> {
+            co_await coro_sleep(100ms);
+            co_await this->stop();
+        }, detached);
         co_return true;
     }
 
@@ -140,8 +144,8 @@ BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
         co_await global_sophiar_manager.switch_mode("mode_a");
         co_await coro_sleep(100ms);
         co_await global_sophiar_manager.switch_mode("mode_b");
-        co_await coro_sleep(100ms);
-        co_await global_sophiar_manager.switch_mode("mode_a");
+//        co_await coro_sleep(100ms);
+//        co_await global_sophiar_manager.switch_mode("mode_a");
     }, detached);
 
     global_context.run();