Jelajahi Sumber

实现了 coro_signal_group

jcsyshc 3 tahun lalu
induk
melakukan
d9d9c0f159

+ 1 - 0
src/core/transform_tree.cpp

@@ -2,6 +2,7 @@
 
 #include "core/sophiar_manager.h"
 #include "utility/coro_worker.hpp"
+#include "utility/coro_signal_group.hpp"
 #include "utility/global_obj_helper.hpp"
 #include "utility/named_vector.hpp"
 #include "utility/signal_muxer.hpp"

+ 88 - 0
src/utility/coro_signal_group.hpp

@@ -0,0 +1,88 @@
+#ifndef SOPHIAR2_CORO_SIGNAL_GROUP_HPP
+#define SOPHIAR2_CORO_SIGNAL_GROUP_HPP
+
+#include "utility/coro_signal2.hpp"
+#include "utility/coro_worker.hpp"
+
+#include <boost/dynamic_bitset.hpp>
+
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+namespace sophiar {
+
+    template<auto CondFunc>
+    class coro_signal_group {
+    public:
+
+        template<typename Executor>
+        coro_signal_group(Executor &executor)
+                :final_signal(executor) {}
+
+        ~coro_signal_group() {
+            assert(!is_running);
+        }
+
+        void add_watcher(signal_watcher &&watcher) {
+            assert(!is_running);
+            watcher_list.push_back(std::move(watcher));
+        }
+
+        template<typename Executor>
+        auto new_watcher(Executor &executor) {
+            return final_signal.new_watcher(executor);
+        }
+
+        template<typename Executor>
+        void start(Executor &executor, bool auto_sync = true) { // 思考如果 auto_sync == false 有什么副作用?
+            item_mask.resize(watcher_list.size());
+            is_running = true;
+            for (size_t index = 0; index < watcher_list.size(); ++index) {
+                auto worker = make_infinite_coro_worker(executor, [this, index, auto_sync]()
+                        -> boost::asio::awaitable<bool> {
+                    co_await watcher_list[index].coro_wait(auto_sync);
+                    item_mask.set(index);
+                    check_and_notify();
+                    co_return true;
+                });
+                worker->run();
+                worker_list.push_back(std::move(worker));
+            }
+        }
+
+        boost::asio::awaitable<void> stop() {
+            for (auto &worker: worker_list) {
+                worker->cancel();
+            }
+            for (auto &worker: worker_list) {
+                co_await worker->coro_wait_stop();
+            }
+            worker_list.clear();
+            is_running = false;
+        }
+
+    private:
+        using bitset_type = boost::dynamic_bitset<>;
+        static_assert(std::is_convertible_v<decltype(CondFunc(std::declval<bitset_type &>())), bool>);
+
+        bool is_running = false;
+        coro_signal2 final_signal;
+        bitset_type item_mask;
+        std::vector<signal_watcher> watcher_list;
+        std::vector<coro_worker::pointer> worker_list;
+
+        void check_and_notify() {
+            if (!CondFunc(item_mask)) return;
+            item_mask.reset();
+            final_signal.try_notify_all();
+        }
+
+    };
+
+    using coro_signal_all_group = coro_signal_group<[](auto &mask) { return mask.all(); }>;
+    using coro_signal_any_group = coro_signal_group<[](auto &mask) { return mask.any(); }>;
+
+}
+
+#endif //SOPHIAR2_CORO_SIGNAL_GROUP_HPP

+ 1 - 1
src/utility/coro_worker.hpp

@@ -39,7 +39,7 @@ namespace sophiar {
                 auto is_stopped = stop_finished_watcher.try_wait();
                 assert(is_stopped);
             }
-            assert(false);
+            assert(!is_running);
             co_return;
         }
 

+ 4 - 11
src/utility/global_obj_helper.hpp

@@ -7,12 +7,7 @@
 
 namespace sophiar {
 
-    struct global_obj_helper_auto_sync {
-    };
-    struct global_obj_helper_manual_sync {
-    };
-
-    template<typename SmallObjType, typename SyncMode>
+    template<typename SmallObjType, bool AutoSync = true>
     class global_obj_delegate {
     public:
 
@@ -41,7 +36,7 @@ namespace sophiar {
         }
 
         pointer_type get_value() {
-            if constexpr (std::is_same_v<SyncMode, global_obj_helper_auto_sync>) {
+            if constexpr (AutoSync) {
                 if (watcher.try_wait()) { // new value available
                     obj_ptr = manager.get_global_obj<SmallObjType>(obj_index);
                 }
@@ -67,9 +62,7 @@ namespace sophiar {
             return get_value().get();
         }
 
-        template<typename OtherSyncMode=SyncMode>
         boost::asio::awaitable<void> coro_wait_update(bool auto_sync = true) {
-            static_assert(std::is_same_v<OtherSyncMode, global_obj_helper_manual_sync>);
             co_await watcher.coro_wait(auto_sync);
             obj_ptr = manager.get_global_obj<SmallObjType>(obj_index);
             co_return;
@@ -84,10 +77,10 @@ namespace sophiar {
     };
 
     template<typename SmallObjType>
-    using global_obj_auto_sync_delegate = global_obj_delegate<SmallObjType, global_obj_helper_auto_sync>;
+    using global_obj_auto_sync_delegate = global_obj_delegate<SmallObjType, true>;
 
     template<typename SmallObjType>
-    using global_obj_manual_sync_delegate = global_obj_delegate<SmallObjType, global_obj_helper_manual_sync>;
+    using global_obj_manual_sync_delegate = global_obj_delegate<SmallObjType, false>;
 
 }
 

+ 2 - 1
tests/CMakeLists.txt

@@ -17,10 +17,11 @@ target_link_libraries(test_core ${Boost_LIBRARIES} ${EXTRA_LIBS})
 
 add_executable(test_utility
         utility/coro_signal2.cpp
+        utility/coro_signal_group.cpp
         utility/coro_worker.cpp
         utility/global_obj_helper.cpp
         ${EXTERN_DEF_FILES}
         ${CORE_IMPL_FILES})
 target_compile_definitions(test_utility PUBLIC SOPHIAR_TEST)
-#target_compile_definitions(test_utility PUBLIC CORO_SIGNAL2_USE_TIMER)
+target_compile_definitions(test_utility PUBLIC CORO_SIGNAL2_USE_TIMER)
 target_link_libraries(test_utility ${Boost_LIBRARIES} ${EXTRA_LIBS})

+ 89 - 0
tests/utility/coro_signal_group.cpp

@@ -0,0 +1,89 @@
+#define BOOST_TEST_DYN_LINK
+
+#include "core/sophiar_obj.hpp"
+#include "utility/coro_signal_group.hpp"
+#include "utility/debug_utility.hpp"
+
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/asio/this_coro.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/test/unit_test.hpp>
+
+#include <chrono>
+#include <iostream>
+#include <vector>
+
+using boost::asio::awaitable;
+using boost::asio::co_spawn;
+using boost::asio::detached;
+using boost::asio::use_awaitable;
+
+using namespace sophiar;
+using namespace std::chrono_literals;
+
+awaitable<void> test_coro_signal_group_1() {
+    coro_signal2 sig_a(global_context);
+    coro_signal2 sig_b(global_context);
+
+    coro_signal_any_group group_any(global_context);
+    coro_signal_all_group group_all(global_context);
+
+    group_any.add_watcher(sig_a.new_watcher(global_context));
+    group_any.add_watcher(sig_b.new_watcher(global_context));
+    group_all.add_watcher(sig_a.new_watcher(global_context));
+    group_all.add_watcher(sig_b.new_watcher(global_context));
+
+    int cnt_any = 0, cnt_all = 0;
+
+    auto worker_any = make_infinite_coro_worker(global_context, [
+            &cnt_any,
+            watcher = group_any.new_watcher(global_context)]() mutable
+            -> awaitable<bool> {
+        co_await watcher.coro_wait(false);
+        ++cnt_any;
+    });
+    auto worker_all = make_infinite_coro_worker(global_context, [
+            &cnt_all,
+            watcher = group_all.new_watcher(global_context)]() mutable
+            -> awaitable<bool> {
+        co_await watcher.coro_wait(false);
+        ++cnt_all;
+    });
+
+    worker_any->run();
+    worker_all->run();
+
+    group_any.start(global_context);
+    group_all.start(global_context);
+
+    sig_a.try_notify_all();
+    co_await coro_sleep(10ms);
+    BOOST_TEST(cnt_any == 1);
+    BOOST_TEST(cnt_all == 0);
+
+    sig_b.try_notify_all();
+    co_await coro_sleep(10ms);
+    BOOST_TEST(cnt_any == 2);
+    BOOST_TEST(cnt_all == 1);
+
+    sig_a.try_notify_all();
+    sig_b.try_notify_all();
+    co_await coro_sleep(10ms);
+    BOOST_TEST(cnt_any == 3);
+    BOOST_TEST(cnt_all == 2);
+
+    worker_any->cancel();
+    worker_all->cancel();
+    co_await worker_any->coro_wait_stop();
+    co_await worker_all->coro_wait_stop();
+    co_await group_any.stop();
+    co_await group_all.stop();
+
+    co_return;
+}
+
+BOOST_AUTO_TEST_CASE(test_coro_signal_group) {
+    co_spawn(global_context, test_coro_signal_group_1(), detached);
+    global_context.run();
+}