Jelajahi Sumber

重新实现了 coro_signal

jcsyshc 3 tahun lalu
induk
melakukan
eb52d4c710

+ 5 - 0
src/core/sophiar_manager.cpp

@@ -613,6 +613,11 @@ namespace sophiar {
     }
 
     std::string sophiar_manager::get_object_name(sophiar_obj *obj) const {
+#ifdef SOPHIAR_TEST // make test without manager happy
+        if (!pimpl->obj_ptr_index_map.contains(obj)) {
+            return "unknown";
+        }
+#endif
         auto obj_index = pimpl->obj_ptr_index_map.at(obj);
         return pimpl->obj_pool.to_name_by_index(obj_index);
     }

+ 33 - 19
src/core/tristate_obj.cpp

@@ -1,7 +1,9 @@
 #include "tristate_obj.h"
 
+#include "utility/debug_utility.hpp"
+
 #include "third_party/static_block.hpp"
-#include "utility/coro_signal.hpp"
+#include "utility/coro_signal2.hpp"
 #include "utility/name_translator.hpp"
 
 #include <spdlog/spdlog.h>
@@ -34,12 +36,12 @@ namespace sophiar {
         tristate_obj *q_this = nullptr;
 
         state_type state = state_type::INITIAL;
-        coro_signal init_cancel_signal;
-        coro_signal start_cancel_signal;
-        coro_signal init_finished_signal;
-        coro_signal start_finished_signal;
-        coro_signal stop_finished_signal;
-        coro_signal reset_finished_signal;
+        coro_signal2 init_cancel_signal;
+        coro_signal2 start_cancel_signal;
+        coro_signal2 init_finished_signal;
+        coro_signal2 start_finished_signal;
+        coro_signal2 stop_finished_signal;
+        coro_signal2 reset_finished_signal;
 
         impl()
                 : init_cancel_signal(get_context()),
@@ -58,12 +60,15 @@ namespace sophiar {
 
         awaitable<bool> init(const nlohmann::json &config) {
             if (state == state_type::INITIALIZING) {
-                auto result = co_await (init_finished_signal.coro_wait() ||
-                                        reset_finished_signal.coro_wait());
+                auto init_finished_watcher = init_finished_signal.new_watcher(get_context());
+                auto reset_finished_watcher = reset_finished_signal.new_watcher(get_context());
+                auto result = co_await (init_finished_watcher.coro_wait() ||
+                                        reset_finished_watcher.coro_wait());
                 co_return result.index() == 0;
             }
             if (state == state_type::RESETTING) {
-                co_await reset_finished_signal.coro_wait();
+                auto reset_finished_watcher = reset_finished_signal.new_watcher(get_context());
+                co_await reset_finished_watcher.coro_wait();
                 co_return false;
             }
             if (state != state_type::INITIAL) co_return true; // >= PENDING
@@ -71,7 +76,8 @@ namespace sophiar {
             log_state_change(state_type::INITIAL, state_type::INITIALIZING);
             SPDLOG_TRACE("Initializing object [name = {}] with config {}.",
                          get_manager().get_object_name(q_this), config.dump());
-            auto result = co_await (q_this->on_init(config) || init_cancel_signal.coro_wait());
+            auto init_cancel_watcher = init_cancel_signal.new_watcher(get_context());
+            auto result = co_await (q_this->on_init(config) || init_cancel_watcher.coro_wait());
             if (result.index() == 0 && std::get<0>(result) == true) { // succeeded
                 state = state_type::PENDING;
                 log_state_change(state_type::INITIALIZING, state_type::PENDING);
@@ -90,12 +96,15 @@ namespace sophiar {
 
         awaitable<bool> start(const nlohmann::json &config) {
             if (state == state_type::STARTING) {
-                auto result = co_await (start_finished_signal.coro_wait() ||
-                                        stop_finished_signal.coro_wait());
+                auto start_finished_watcher = start_finished_signal.new_watcher(get_context());
+                auto stop_finished_watcher = stop_finished_signal.new_watcher(get_context());
+                auto result = co_await (start_finished_watcher.coro_wait() ||
+                                        stop_finished_watcher.coro_wait());
                 co_return result.index() == 0;
             }
             if (state == state_type::STOPPING) {
-                co_await stop_finished_signal.coro_wait();
+                auto stop_finished_watcher = stop_finished_signal.new_watcher(get_context());
+                co_await stop_finished_watcher.coro_wait();
                 co_return false;
             }
             if (state == state_type::RUNNING) co_return true;
@@ -104,7 +113,8 @@ namespace sophiar {
             log_state_change(state_type::PENDING, state_type::STARTING);
             SPDLOG_TRACE("Starting object [name = {}] with config {}.",
                          get_manager().get_object_name(q_this), config.dump());
-            auto result = co_await (q_this->on_start(config) || start_cancel_signal.coro_wait());
+            auto start_cancel_watcher = start_cancel_signal.new_watcher(get_context());
+            auto result = co_await (q_this->on_start(config) || start_cancel_watcher.coro_wait());
             if (result.index() == 0 && std::get<0>(result) == true) { // succeeded
                 state = state_type::RUNNING;
                 log_state_change(state_type::STARTING, state_type::RUNNING);
@@ -132,10 +142,12 @@ namespace sophiar {
                 stop_finished_signal.try_notify_all();
                 get_manager().notify_object_stop(q_this);
             } else if (state == state_type::STOPPING) {
-                co_await stop_finished_signal.coro_wait();
+                auto stop_finished_watcher = stop_finished_signal.new_watcher(get_context());
+                co_await stop_finished_watcher.coro_wait();
             } else if (state == state_type::STARTING) {
                 start_cancel_signal.try_notify_all();
-                co_await stop_finished_signal.coro_wait();
+                auto stop_finished_watcher = stop_finished_signal.new_watcher(get_context());
+                co_await stop_finished_watcher.coro_wait();
             }
             co_return;
         }
@@ -158,10 +170,12 @@ namespace sophiar {
                 SPDLOG_DEBUG("Reset object [name = {}].", get_manager().get_object_name(q_this));
                 reset_finished_signal.try_notify_all();
             } else if (state == state_type::RESETTING) {
-                co_await reset_finished_signal.coro_wait();
+                auto reset_finished_watcher = reset_finished_signal.new_watcher(get_context());
+                co_await reset_finished_watcher.coro_wait();
             } else if (state == state_type::INITIALIZING) {
                 init_cancel_signal.try_notify_all();
-                co_await reset_finished_signal.coro_wait();
+                auto reset_finished_watcher = reset_finished_signal.new_watcher(get_context());
+                co_await reset_finished_watcher.coro_wait();
             }
             co_return;
         }

+ 29 - 4
src/main.cpp

@@ -11,14 +11,39 @@
 #include <boost/property_tree/ptree.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include <boost/asio/experimental/channel.hpp>
+#include <boost/intrusive/list.hpp>
+
 using boost::asio::co_spawn;
 using boost::asio::detached;
 
 using namespace sophiar;
 
-template<typename Derived>
-auto test_func(const Eigen::MatrixBase<Derived> &m) {
-    return sizeof(std::remove_cv<decltype(m)>);
+void test() {
+    using token_hook_type = boost::intrusive::list_base_hook<
+            boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
+    struct test_type : token_hook_type {
+        int val;
+
+        test_type(int v) : val(v) {}
+    };
+
+    using list_type = boost::intrusive::list<
+            test_type, boost::intrusive::constant_time_size<false>>;
+
+    list_type my_list;
+    test_type a(1), b(2), c(3);
+
+    my_list.push_back(a);
+    my_list.push_back(b);
+    my_list.push_back(c);
+
+    while (!my_list.empty()) {
+        auto &node = my_list.front();
+        node.unlink();
+        std::cout << node.val << std::endl;
+    }
+
 }
 
 int main() {
@@ -43,7 +68,7 @@ int main() {
 //    ur.get_context().run();
 //    std::cout << listener.cnt << std::endl;
 
-    std::cout << sizeof(std::string) << std::endl;
+    test();
 
     return 0;
 }

+ 184 - 0
src/third_party/scope_guard.hpp

@@ -0,0 +1,184 @@
+/*
+ *  Created on: 13/02/2018
+ *      Author: ricab
+ *
+ * See README.md for documentation of this header's public interface.
+ */
+
+#ifndef SCOPE_GUARD_HPP_
+#define SCOPE_GUARD_HPP_
+
+#include <type_traits>
+#include <utility>
+
+#if __cplusplus >= 201703L && defined(SG_REQUIRE_NOEXCEPT_IN_CPP17)
+#define SG_REQUIRE_NOEXCEPT
+#endif
+
+namespace sg
+{
+    namespace detail
+    {
+        /* --- Some custom type traits --- */
+
+        // Type trait determining whether a type is callable with no arguments
+        template<typename T, typename = void>
+        struct is_noarg_callable_t
+                : public std::false_type
+        {}; // in general, false
+
+        template<typename T>
+        struct is_noarg_callable_t<T, decltype(std::declval<T&&>()())>
+                : public std::true_type
+        {}; // only true when call expression valid
+
+        // Type trait determining whether a no-argument callable returns void
+        template<typename T>
+        struct returns_void_t
+                : public std::is_same<void, decltype(std::declval<T&&>()())>
+        {};
+
+        /* Type trait determining whether a no-arg callable is nothrow invocable if
+        required. This is where SG_REQUIRE_NOEXCEPT logic is encapsulated. */
+        template<typename T>
+        struct is_nothrow_invocable_if_required_t
+                : public
+#ifdef SG_REQUIRE_NOEXCEPT
+                  std::is_nothrow_invocable<T> /* Note: _r variants not enough to
+                                          confirm void return: any return can be
+                                          discarded so all returns are
+                                          compatible with void */
+#else
+                  std::true_type
+#endif
+        {};
+
+        // logic AND of two or more type traits
+        template<typename A, typename B, typename... C>
+        struct and_t : public and_t<A, and_t<B, C...>>
+        {}; // for more than two arguments
+
+        template<typename A, typename B>
+        struct and_t<A, B> : public std::conditional<A::value, B, A>::type
+        {}; // for two arguments
+
+        // Type trait determining whether a type is a proper scope_guard callback.
+        template<typename T>
+        struct is_proper_sg_callback_t
+                : public and_t<is_noarg_callable_t<T>,
+                        returns_void_t<T>,
+                        is_nothrow_invocable_if_required_t<T>,
+                        std::is_nothrow_destructible<T>>
+        {};
+
+
+        /* --- The actual scope_guard template --- */
+
+        template<typename Callback,
+                typename = typename std::enable_if<
+                        is_proper_sg_callback_t<Callback>::value>::type>
+        class scope_guard;
+
+
+        /* --- Now the friend maker --- */
+
+        template<typename Callback>
+        detail::scope_guard<Callback> make_scope_guard(Callback&& callback)
+        noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value); /*
+    we need this in the inner namespace due to MSVC bugs preventing
+    sg::detail::scope_guard from befriending a sg::make_scope_guard
+    template instance in the parent namespace (see https://is.gd/xFfFhE). */
+
+
+        /* --- The template specialization that actually defines the class --- */
+
+        template<typename Callback>
+        class scope_guard<Callback> final
+        {
+        public:
+            typedef Callback callback_type;
+
+            scope_guard(scope_guard&& other)
+            noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value);
+
+            ~scope_guard() noexcept; // highlight noexcept dtor
+
+            void dismiss() noexcept;
+
+        public:
+            scope_guard() = delete;
+            scope_guard(const scope_guard&) = delete;
+            scope_guard& operator=(const scope_guard&) = delete;
+            scope_guard& operator=(scope_guard&&) = delete;
+
+        private:
+            explicit scope_guard(Callback&& callback)
+            noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value); /*
+                                                      meant for friends only */
+
+            friend scope_guard<Callback> make_scope_guard<Callback>(Callback&&)
+            noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value); /*
+      only make_scope_guard can create scope_guards from scratch (i.e. non-move)
+      */
+
+        private:
+            Callback m_callback;
+            bool m_active;
+
+        };
+
+    } // namespace detail
+
+
+    /* --- Now the single public maker function --- */
+
+    using detail::make_scope_guard; // see comment on declaration above
+
+} // namespace sg
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::scope_guard(Callback&& callback)
+noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value)
+        : m_callback(std::forward<Callback>(callback)) /* use () instead of {} because
+    of DR 1467 (https://is.gd/WHmWuo), which still impacts older compilers
+    (e.g. GCC 4.x and clang <=3.6, see https://godbolt.org/g/TE9tPJ and
+    https://is.gd/Tsmh8G) */
+        , m_active{true}
+{}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::~scope_guard() noexcept
+{
+    if(m_active)
+        m_callback();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::scope_guard(scope_guard&& other)
+noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value)
+        : m_callback(std::forward<Callback>(other.m_callback)) // idem
+        , m_active{std::move(other.m_active)}
+{
+    other.m_active = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+inline void sg::detail::scope_guard<Callback>::dismiss() noexcept
+{
+    m_active = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+inline auto sg::detail::make_scope_guard(Callback&& callback)
+noexcept(std::is_nothrow_constructible<Callback, Callback&&>::value)
+-> detail::scope_guard<Callback>
+{
+    return detail::scope_guard<Callback>{std::forward<Callback>(callback)};
+}
+
+#endif /* SCOPE_GUARD_HPP_ */

+ 4 - 4
src/tracker/ndi/ndi_interface.h

@@ -13,17 +13,17 @@ namespace sophiar {
 
         ~ndi_interface() override;
 
-        void load_construct_config(const nlohmann::json &config) override;
+//        void load_construct_config(const nlohmann::json &config) override;
 
     protected:
 
-        boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override;
+//        boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override;
 
-        boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override;
+//        boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override;
 
         boost::asio::awaitable<void> on_stop() override;
 
-        boost::asio::awaitable<void> on_reset() override;
+//        boost::asio::awaitable<void> on_reset() override;
 
     private:
         struct impl;

+ 1 - 1
src/utility/coro_signal.hpp

@@ -12,7 +12,7 @@
 
 namespace sophiar {
 
-    class coro_signal : private boost::noncopyable {
+    class [[deprecated]] coro_signal : private boost::noncopyable {
     public:
 
         class signal_token : private boost::noncopyable {

+ 236 - 0
src/utility/coro_signal2.hpp

@@ -0,0 +1,236 @@
+#ifndef SOPHIAR2_CORO_SIGNAL2_HPP
+#define SOPHIAR2_CORO_SIGNAL2_HPP
+
+#ifndef CORO_SIGNAL2_USE_TIMER // channel based implementation, ~ 6% faster than timer
+
+#include "core/timestamp_helper.hpp"
+#include "third_party/scope_guard.hpp"
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/experimental/channel.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/core/noncopyable.hpp>
+#include <boost/intrusive/list.hpp>
+
+#include <cassert>
+
+namespace sophiar {
+
+    class coro_signal2;
+
+    using singal_watcher_base =
+            boost::intrusive::list_base_hook<
+                    boost::intrusive::link_mode<
+                            boost::intrusive::auto_unlink>>;
+
+    class signal_watcher : public singal_watcher_base {
+    public:
+
+        template<typename Executor>
+        explicit signal_watcher(Executor &executor, coro_signal2 &_sig)
+                : sig(_sig),
+                  chan(executor, 1) {}
+
+        signal_watcher(signal_watcher &&other) noexcept
+                : sig(other.sig),
+                  chan(std::move(other.chan)),
+                  last_watch_ts(other.last_watch_ts) {}
+
+        bool try_wait();
+
+        boost::asio::awaitable<void> coro_wait(bool auto_sync = true);
+
+        void sync();
+
+    private:
+
+        friend class coro_signal2;
+
+        using channel_type = boost::asio::experimental::channel<
+                void(boost::system::error_code, bool)>;
+
+        timestamp_type last_watch_ts = 0;
+        coro_signal2 &sig;
+        channel_type chan;
+
+    };
+
+    class coro_signal2 : private boost::noncopyable {
+    public:
+
+        template<typename Executor>
+        explicit coro_signal2(Executor &) {}
+
+        template<typename Executor>
+        auto new_watcher(Executor &executor) {
+            return signal_watcher{executor, *this};
+        }
+
+        void try_notify_all(timestamp_type ts = current_timestamp()) {
+            last_notify_ts = ts;
+            list_type requeue_list;
+            while (!watcher_list.empty()) {
+                auto &node = watcher_list.front();
+                assert(node.is_linked());
+                node.unlink();
+                if (node.last_watch_ts < ts) {
+                    assert(!node.chan.ready());
+                    node.chan.try_send(boost::system::error_code{}, true);
+                } else {
+                    requeue_list.push_back(node);
+                }
+            }
+            watcher_list.swap(requeue_list);
+        }
+
+    private:
+
+        friend class signal_watcher;
+
+        using list_type =
+                boost::intrusive::list<
+                        signal_watcher,
+                        boost::intrusive::constant_time_size<false>>;
+
+        timestamp_type last_notify_ts = 0;
+        list_type watcher_list;
+
+    };
+
+    inline bool signal_watcher::try_wait() {
+        if (last_watch_ts < sig.last_notify_ts) {
+            sync();
+            return true;
+        }
+        return false;
+    }
+
+    inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
+        assert(!chan.ready());
+        if (auto_sync) {
+            sync();
+        } else {
+            if (last_watch_ts < sig.last_notify_ts) {
+                sync();
+                co_return;
+            }
+        }
+        auto closer = sg::make_scope_guard([&]() {
+            if (is_linked()) unlink();
+            if (chan.ready()) chan.reset();
+        });
+        assert(!is_linked());
+        sig.watcher_list.push_back(*this);
+        co_await chan.async_receive(boost::asio::use_awaitable);
+        sync();
+        co_return;
+    }
+
+    inline void signal_watcher::sync() {
+        last_watch_ts = sig.last_notify_ts;
+    }
+
+}
+
+#else // timer based implementation
+
+#include "core/timestamp_helper.hpp"
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/redirect_error.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/core/noncopyable.hpp>
+
+#include <cassert>
+
+namespace sophiar {
+
+    class coro_signal2;
+
+    class signal_watcher {
+    public:
+
+        explicit signal_watcher(coro_signal2 &_sig)
+                : sig(_sig) {}
+
+        signal_watcher(signal_watcher &&other) noexcept
+                : sig(other.sig),
+                  last_watch_ts(other.last_watch_ts) {}
+
+        bool try_wait();
+
+        boost::asio::awaitable<void> coro_wait(bool auto_sync = true);
+
+        void sync();
+
+    private:
+
+        friend class coro_signal2;
+
+        timestamp_type last_watch_ts = 0;
+        coro_signal2 &sig;
+
+    };
+
+    class coro_signal2 : private boost::noncopyable {
+    public:
+
+        template<typename Executor>
+        explicit coro_signal2(Executor &executor)
+                :timer(executor) {
+            timer.expires_at(boost::posix_time::pos_infin);
+        }
+
+        template<typename Executor>
+        auto new_watcher(Executor &) {
+            return signal_watcher{*this};
+        }
+
+        void try_notify_all(timestamp_type ts = current_timestamp()) {
+            timer.cancel();
+        }
+
+    private:
+
+        friend class signal_watcher;
+
+        timestamp_type last_notify_ts = 0;
+        boost::asio::deadline_timer timer;
+
+    };
+
+    inline bool signal_watcher::try_wait() {
+        if (last_watch_ts < sig.last_notify_ts) {
+            sync();
+            return true;
+        }
+        return false;
+    }
+
+    inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
+        if (auto_sync) {
+            sync();
+        } else {
+            if (last_watch_ts < sig.last_notify_ts) {
+                sync();
+                co_return;
+            }
+        }
+        boost::system::error_code ec;
+        co_await sig.timer.async_wait(
+                boost::asio::redirect_error(boost::asio::use_awaitable, ec));
+        assert(ec == boost::asio::error::operation_aborted);
+        sync();
+        co_return;
+    }
+
+    inline void signal_watcher::sync() {
+        last_watch_ts = sig.last_notify_ts;
+    }
+
+}
+
+#endif
+
+#endif //SOPHIAR2_CORO_SIGNAL2_HPP

+ 11 - 2
tests/CMakeLists.txt

@@ -2,7 +2,7 @@ set (Boost_USE_STATIC_LIBS OFF)
 find_package (Boost REQUIRED COMPONENTS unit_test_framework)
 include_directories (${Boost_INCLUDE_DIRS})
 
-add_executable (test_core
+add_executable(test_core
         core/datanode_base.cpp
         core/geometry_types.cpp
         core/small_obj.cpp
@@ -12,4 +12,13 @@ add_executable (test_core
         ${EXTERN_DEF_FILES}
         ${CORE_IMPL_FILES})
 target_compile_definitions(test_core PUBLIC SOPHIAR_TEST)
-target_link_libraries (test_core ${Boost_LIBRARIES} ${EXTRA_LIBS})
+#target_compile_definitions(test_core PUBLIC CORO_SIGNAL2_USE_TIMER)
+target_link_libraries(test_core ${Boost_LIBRARIES} ${EXTRA_LIBS})
+
+add_executable(test_utility
+        utility/coro_signal2.cpp
+        ${EXTERN_DEF_FILES}
+        ${CORE_IMPL_FILES})
+target_compile_definitions(test_utility PUBLIC SOPHIAR_TEST)
+target_compile_definitions(test_utility PUBLIC CORO_SIGNAL2_USE_TIMER)
+target_link_libraries(test_utility ${Boost_LIBRARIES} ${EXTRA_LIBS})

+ 82 - 0
tests/utility/coro_signal2.cpp

@@ -0,0 +1,82 @@
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN  // in only one cpp file
+
+#include "utility/coro_signal2.hpp"
+#include "core/sophiar_obj.hpp"
+#include "utility/debug_utility.hpp"
+
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/asio/this_coro.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/test/unit_test.hpp>
+
+#include <chrono>
+#include <iostream>
+#include <vector>
+
+using boost::asio::awaitable;
+using boost::asio::co_spawn;
+using boost::asio::detached;
+using boost::asio::use_awaitable;
+
+using namespace sophiar;
+using namespace std::chrono_literals;
+
+coro_signal2 sig_a{global_context}, sig_b{global_context};
+
+awaitable<void> coro_a() {
+    auto watcher_b = sig_b.new_watcher(global_context);
+    FILE_LINE_TRACE
+    for (int i = 0; i < 1000; ++i) {
+        sig_a.try_notify_all();
+        co_await watcher_b.coro_wait();
+    }
+    FILE_LINE_TRACE
+    sig_a.try_notify_all();
+    co_return;
+}
+
+awaitable<void> coro_b() {
+    co_await coro_sleep(10ms);
+    auto watcher_a = sig_a.new_watcher(global_context);
+    FILE_LINE_TRACE
+    for (int i = 0; i < 1000; ++i) {
+        sig_b.try_notify_all();
+        co_await watcher_a.coro_wait();
+    }
+    FILE_LINE_TRACE
+    sig_b.try_notify_all();
+    co_return;
+}
+
+awaitable<void> coro_signal2_bench() {
+    constexpr auto length = 100000;
+    std::vector<std::unique_ptr<coro_signal2>> sig_pool;
+    std::vector<signal_watcher> watcher_pool;
+    for (int i = 0; i < length; ++i) {
+        auto new_sig = std::make_unique<coro_signal2>(global_context);
+        watcher_pool.push_back(new_sig->new_watcher(global_context));
+        sig_pool.push_back(std::move(new_sig));
+    }
+    for (int i = 0; i < length - 1; ++i) {
+        co_spawn(global_context, [&]() -> awaitable<void> {
+            int index = i;
+            co_await watcher_pool[index].coro_wait();
+            sig_pool[index + 1]->try_notify_all();
+        }, detached);
+    }
+    co_await coro_sleep(10ms);
+    FILE_LINE_TRACE
+    sig_pool[0]->try_notify_all();
+    co_await watcher_pool[length - 1].coro_wait();
+    FILE_LINE_TRACE
+    co_return;
+}
+
+BOOST_AUTO_TEST_CASE(test_coro_signal2) {
+//    co_spawn(global_context, coro_a(), detached);
+//    co_spawn(global_context, coro_b(), detached);
+    co_spawn(global_context, coro_signal2_bench(), detached);
+    global_context.run();
+}