Procházet zdrojové kódy

实现了 Global Obj 系统

jcsyshc před 3 roky
rodič
revize
761df9648f

+ 1 - 1
src/core/datanode_base.h

@@ -12,7 +12,7 @@
 
 namespace sophiar {
 
-    class datanode_base : public tristate_obj {
+    class [[deprecated]] datanode_base : public tristate_obj {
     public:
 
         enum class exec_state_type {

+ 54 - 5
src/core/sophiar_manager.cpp

@@ -1,7 +1,7 @@
 #include "sophiar_manager.h"
 #include "core/sophiar_obj.hpp"
-#include "core/timestamp_helper.hpp"
 #include "core/tristate_obj.h"
+#include "utility/coro_signal.hpp"
 #include "utility/debug_utility.hpp"
 #include "utility/named_vector.hpp"
 
@@ -95,6 +95,15 @@ namespace sophiar {
         using connection_pool_type = named_vector<connection_index_type, connection_info>;
         using connection_set_type = std::unordered_set<connection_index_type>;
 
+        struct global_obj_info {
+            void *placeholder;
+            coro_signal *update_signal;
+            timestamp_type last_update_ts;
+            std::type_index obj_type = typeid(void);
+        };
+
+        using global_obj_pool_type = named_vector<global_obj_index_type, global_obj_info>;
+
         struct mode_info {
             obj_set_type obj_set; // 需要运行的对象
             connection_set_type connection_set; // 需要建立的连接
@@ -114,6 +123,8 @@ namespace sophiar {
 
         connection_pool_type connection_pool;
 
+        global_obj_pool_type global_obj_pool;
+
         mode_index_type current_mode = 0; // all_down
         mode_pool_type mode_pool;
 
@@ -597,10 +608,6 @@ namespace sophiar {
     }
 
     boost::asio::awaitable<bool> sophiar_manager::switch_mode(const std::string &mode_name) {
-        if (!pimpl->mode_pool.contains(mode_name)) {
-            // TODO show log
-            co_return false;
-        }
         auto mode_index = pimpl->mode_pool.to_index_by_name(mode_name);
         co_return co_await pimpl->switch_mode(mode_index);
     }
@@ -614,6 +621,48 @@ namespace sophiar {
         pimpl->on_object_stopped(obj);
     }
 
+    timestamp_type sophiar_manager::get_global_obj_update_timestamp(global_obj_index_type obj_index) {
+        return pimpl->global_obj_pool[obj_index].last_update_ts;
+    }
+
+    global_obj_index_type sophiar_manager::register_global_obj_impl(const std::string &obj_name,
+                                                                    std::type_index obj_type,
+                                                                    void *placeholder) {
+        if (placeholder == nullptr) {
+            if (!pimpl->global_obj_pool.contains(obj_name))
+                return ~(global_obj_index_type) 0; // indicate the caller to create a placeholder
+            assert(pimpl->global_obj_pool[obj_name].obj_type == obj_type);
+            return pimpl->global_obj_pool.to_index_by_name(obj_name);
+        }
+
+        // create new one
+        auto obj_index = pimpl->global_obj_pool.new_elem(obj_name);
+        auto &obj_info = pimpl->global_obj_pool[obj_index];
+        obj_info.obj_type = obj_type;
+        obj_info.placeholder = placeholder;
+        obj_info.update_signal = new coro_signal(global_context);
+        return obj_index;
+    }
+
+    void *sophiar_manager::get_global_obj_placeholder(global_obj_index_type obj_index,
+                                                      std::type_index obj_type) {
+        auto &obj_info = pimpl->global_obj_pool[obj_index];
+        assert(obj_info.obj_type == obj_type);
+        return obj_info.placeholder;
+    }
+
+    void sophiar_manager::update_global_obj_timestamp(global_obj_index_type obj_index,
+                                                      timestamp_type ts) {
+        auto &obj_info = pimpl->global_obj_pool[obj_index];
+        obj_info.last_update_ts = ts;
+        obj_info.update_signal->try_notify_all();
+    }
+
+    boost::asio::awaitable<void> sophiar_manager::await_global_obj_update(global_obj_index_type obj_index) {
+        co_await pimpl->global_obj_pool[obj_index].update_signal->coro_wait();
+        co_return;
+    }
+
 #ifdef SOPHIAR_TEST
 
     sophiar_obj *sophiar_manager::get_object(const std::string &obj_name) const {

+ 58 - 0
src/core/sophiar_manager.h

@@ -1,6 +1,7 @@
 #ifndef SOPHIAR2_SOPHIAR_MANAGER_H
 #define SOPHIAR2_SOPHIAR_MANAGER_H
 
+#include "core/timestamp_helper.hpp"
 #include "utility/tiny_signal.hpp"
 #include "utility/signal_muxer.hpp"
 #include "utility/slot_demuxer.hpp"
@@ -12,13 +13,18 @@
 #include <nlohmann/json.hpp>
 
 #include <cassert>
+#include <coroutine>
 #include <exception>
 #include <memory>
 #include <string>
 #include <type_traits>
+#include <typeindex>
+#include <typeinfo>
 
 namespace sophiar {
 
+    using global_obj_index_type = uint16_t;
+
     class sophiar_obj;
 
     class sophiar_manager {
@@ -40,7 +46,46 @@ namespace sophiar {
             register_object_type_impl(type_name, &Derived::new_instance);
         }
 
+        template<typename SmallObjType>
+        global_obj_index_type register_global_obj(const std::string &obj_name) {
+            auto ret = register_global_obj_impl(obj_name, typeid(SmallObjType), nullptr);
+            if ((~ret) == 0) { // first time of register
+                using ptr_type = typename SmallObjType::pointer;
+                auto placeholder = new ptr_type;
+                ret = register_global_obj_impl(obj_name, typeid(SmallObjType), placeholder);
+            }
+            return ret;
+        }
+
+        template<typename SmallObjType>
+        void update_global_obj(global_obj_index_type obj_index,
+                               const typename SmallObjType::pointer &value,
+                               timestamp_type ts = current_timestamp()) {
+            using ptr_type = typename SmallObjType::pointer;
+            auto placeholder = get_global_obj_placeholder(obj_index, typeid(SmallObjType));
+            auto &inner_ptr = *static_cast<ptr_type *>(placeholder);
+            inner_ptr = value;
+            update_global_obj_timestamp(obj_index, ts);
+        }
+
+        template<typename SmallObjType>
+        typename SmallObjType::pointer get_global_obj(global_obj_index_type obj_index) {
+            using ptr_type = typename SmallObjType::pointer;
+            auto placeholder = get_global_obj_placeholder(obj_index, typeid(SmallObjType));
+            return *static_cast<ptr_type *>(placeholder);
+        }
+
+        timestamp_type get_global_obj_update_timestamp(global_obj_index_type obj_index);
+
+        template<typename SmallObjType>
+        boost::asio::awaitable<typename SmallObjType::pointer>
+        await_next_global_obj(global_obj_index_type obj_index) {
+            co_await await_global_obj_update(obj_index);
+            co_return get_global_obj<SmallObjType>(obj_index);
+        }
+
         template<typename ...Args>
+        [[deprecated("Use obj pool system instead.")]]
         void register_signal(sophiar_obj *obj,
                              const std::string &signal_name,
                              tiny_signal<Args...> &signal) {
@@ -50,6 +95,7 @@ namespace sophiar {
 
         // 将套用 slot_demuxer
         template<typename ...Args>
+        [[deprecated("Use obj pool system instead.")]]
         void register_slot(sophiar_obj *obj,
                            const std::string &slot_name,
                            typename tiny_signal<Args...>::slot_type &slot) {
@@ -103,6 +149,18 @@ namespace sophiar {
                                 tiny_slot_base *slot_base,
                                 slot_demuxer_base *demuxer_base);
 
+        global_obj_index_type register_global_obj_impl(const std::string &obj_name,
+                                                       std::type_index obj_type,
+                                                       void *placeholder); // small_obj<Derive>::pointer
+
+        void *get_global_obj_placeholder(global_obj_index_type obj_index,
+                                         std::type_index obj_type);
+
+        void update_global_obj_timestamp(global_obj_index_type obj_index,
+                                         timestamp_type ts);
+
+        boost::asio::awaitable<void> await_global_obj_update(global_obj_index_type obj_index);
+
     };
 
     extern sophiar_manager global_sophiar_manager;

+ 2 - 2
src/core/sophiar_obj.hpp

@@ -16,8 +16,8 @@ namespace sophiar {
 
         virtual ~sophiar_obj() = default;
 
-        // 创建所有的 slot 和 signal
-        virtual void load_construct_config(const nlohmann::json &config) {};
+        // 加载初始化配置
+        [[deprecated]] virtual void load_construct_config(const nlohmann::json &config) {};
 
         static constexpr auto &get_context() {
             return global_context;

+ 1 - 1
src/tracker/ndi/ndi_interface.cpp

@@ -68,7 +68,7 @@ namespace sophiar {
             FORWARD_CONSTRUCT(binary_reply_obj, binary_reply_content)
         };
 
-        static constexpr auto reply_queue_size = 4;
+        static constexpr auto reply_queue_size = 1;
         static constexpr auto ndi_endian = boost::endian::order::little;
 
         using crc_checker_type = boost::crc_16_type;

+ 1 - 1
src/utility/coro_signal.hpp

@@ -12,7 +12,6 @@
 
 namespace sophiar {
 
-    // TODO 为多线程改造
     class coro_signal : private boost::noncopyable {
     public:
 
@@ -31,6 +30,7 @@ namespace sophiar {
                 // handle leaking signal
                 if (signal.waiting_count == 0 &&
                         signal.channel.ready()) {
+                    assert(false);
                     signal.channel.reset();
                     signal.is_notifying = false;
                 }

+ 1 - 1
src/utility/signal_muxer.hpp

@@ -7,7 +7,7 @@
 namespace sophiar {
 
     template<typename... Args>
-    class signal_muxer : public tiny_signal<Args...> {
+    class [[deprecated]] signal_muxer : public tiny_signal<Args...> {
     public:
 
         using signal_type = tiny_signal<Args...>;

+ 1 - 1
src/utility/slot_demuxer.hpp

@@ -15,7 +15,7 @@ namespace sophiar {
     };
 
     template<typename... Args>
-    class slot_demuxer : public slot_demuxer_base {
+    class [[deprecated]] slot_demuxer : public slot_demuxer_base {
     public:
 
         using signal_type = tiny_signal<Args...>;

+ 2 - 2
src/utility/tiny_signal.hpp

@@ -15,7 +15,7 @@ namespace sophiar {
             boost::intrusive::link_mode<
                     boost::intrusive::auto_unlink>>;
 
-    struct tiny_slot_base : public slot_hook_type {
+    struct [[deprecated]] tiny_slot_base : public slot_hook_type {
 
         bool is_enabled = true;
 
@@ -36,7 +36,7 @@ namespace sophiar {
     };
 
     template<typename... Args>
-    class tiny_signal : public tiny_signal_base {
+    class [[deprecated]] tiny_signal : public tiny_signal_base {
     public:
 
         class slot_type : public tiny_slot_base {