Browse Source

实现了 coro_worker

jcsyshc 3 years ago
parent
commit
d3db17427c

+ 6 - 6
src/core/sophiar_manager.cpp

@@ -1,7 +1,6 @@
 #include "sophiar_manager.h"
 #include "core/sophiar_obj.hpp"
 #include "core/tristate_obj.h"
-#include "utility/coro_signal.hpp"
 #include "utility/debug_utility.hpp"
 #include "utility/named_vector.hpp"
 
@@ -97,7 +96,7 @@ namespace sophiar {
 
         struct global_obj_info {
             void *placeholder;
-            coro_signal *update_signal;
+            coro_signal2 *update_signal;
             timestamp_type last_update_ts;
             std::type_index obj_type = typeid(void);
         };
@@ -618,6 +617,7 @@ namespace sophiar {
             return "unknown";
         }
 #endif
+        assert(pimpl->obj_ptr_index_map.contains(obj));
         auto obj_index = pimpl->obj_ptr_index_map.at(obj);
         return pimpl->obj_pool.to_name_by_index(obj_index);
     }
@@ -645,7 +645,7 @@ namespace sophiar {
         auto &obj_info = pimpl->global_obj_pool[obj_index];
         obj_info.obj_type = obj_type;
         obj_info.placeholder = placeholder;
-        obj_info.update_signal = new coro_signal(global_context);
+        obj_info.update_signal = new coro_signal2{global_context};
         return obj_index;
     }
 
@@ -663,9 +663,9 @@ namespace sophiar {
         obj_info.update_signal->try_notify_all();
     }
 
-    boost::asio::awaitable<void> sophiar_manager::await_global_obj_update(global_obj_index_type obj_index) {
-        co_await pimpl->global_obj_pool[obj_index].update_signal->coro_wait();
-        co_return;
+    signal_watcher sophiar_manager::request_global_obj_update_watcher(global_obj_index_type obj_index) {
+        return pimpl->global_obj_pool[obj_index]
+                .update_signal->new_watcher(global_context);
     }
 
 #ifdef SOPHIAR_TEST

+ 2 - 8
src/core/sophiar_manager.h

@@ -2,6 +2,7 @@
 #define SOPHIAR2_SOPHIAR_MANAGER_H
 
 #include "core/timestamp_helper.hpp"
+#include "utility/coro_signal2.hpp"
 #include "utility/tiny_signal.hpp"
 #include "utility/signal_muxer.hpp"
 #include "utility/slot_demuxer.hpp"
@@ -77,12 +78,7 @@ namespace sophiar {
 
         timestamp_type get_global_obj_update_timestamp(global_obj_index_type obj_index);
 
-        template<typename SmallObjType>
-        boost::asio::awaitable<typename SmallObjType::pointer>
-        await_next_global_obj(global_obj_index_type obj_index) {
-            co_await await_global_obj_update(obj_index);
-            co_return get_global_obj<SmallObjType>(obj_index);
-        }
+        signal_watcher request_global_obj_update_watcher(global_obj_index_type obj_index);
 
         template<typename ...Args>
         [[deprecated("Use obj pool system instead.")]]
@@ -159,8 +155,6 @@ namespace sophiar {
         void update_global_obj_timestamp(global_obj_index_type obj_index,
                                          timestamp_type ts);
 
-        boost::asio::awaitable<void> await_global_obj_update(global_obj_index_type obj_index);
-
     };
 
     extern sophiar_manager global_sophiar_manager;

+ 1 - 0
src/core/transform_tree.cpp

@@ -3,6 +3,7 @@
 #include "core/sophiar_manager.h"
 #include "utility/named_vector.hpp"
 #include "utility/signal_muxer.hpp"
+#include "utility/coro_worker.hpp"
 
 #include <fmt/format.h>
 

+ 0 - 2
src/core/tristate_obj.cpp

@@ -1,7 +1,5 @@
 #include "tristate_obj.h"
 
-#include "utility/debug_utility.hpp"
-
 #include "third_party/static_block.hpp"
 #include "utility/coro_signal2.hpp"
 #include "utility/name_translator.hpp"

+ 12 - 5
src/utility/coro_signal2.hpp

@@ -121,7 +121,10 @@ namespace sophiar {
         });
         assert(!is_linked());
         sig.watcher_list.push_back(*this);
-        co_await chan.async_receive(boost::asio::use_awaitable);
+        for (;;) {
+            co_await chan.async_receive(boost::asio::use_awaitable);
+            if (last_watch_ts < sig.last_notify_ts) break;
+        }
         sync();
         co_return;
     }
@@ -188,6 +191,7 @@ namespace sophiar {
         }
 
         void try_notify_all(timestamp_type ts = current_timestamp()) {
+            last_notify_ts = ts;
             timer.cancel();
         }
 
@@ -217,10 +221,13 @@ namespace sophiar {
                 co_return;
             }
         }
-        boost::system::error_code ec;
-        co_await sig.timer.async_wait(
-                boost::asio::redirect_error(boost::asio::use_awaitable, ec));
-        assert(ec == boost::asio::error::operation_aborted);
+        for (;;) {
+            boost::system::error_code ec;
+            co_await sig.timer.async_wait(
+                    boost::asio::redirect_error(boost::asio::use_awaitable, ec));
+            assert(ec == boost::asio::error::operation_aborted);
+            if (last_watch_ts < sig.last_notify_ts) break;
+        }
         sync();
         co_return;
     }

+ 135 - 0
src/utility/coro_worker.hpp

@@ -0,0 +1,135 @@
+#ifndef SOPHIAR2_CORO_WORKER_HPP
+#define SOPHIAR2_CORO_WORKER_HPP
+
+#include "third_party/scope_guard.hpp"
+#include "utility/coro_signal2.hpp"
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/asio/experimental/awaitable_operators.hpp>
+#include <boost/asio/high_resolution_timer.hpp>
+
+#include <chrono>
+#include <memory>
+#include <type_traits>
+
+namespace sophiar {
+
+    class coro_worker {
+    public:
+
+        using pointer = std::unique_ptr<coro_worker>;
+
+        virtual ~coro_worker() {
+            assert(!is_running);
+        }
+
+        virtual void run() = 0;
+
+        void cancel() {
+            if (!is_running) return;
+            request_stop_signal.try_notify_all();
+        }
+
+        boost::asio::awaitable<void> coro_wait_stop() {
+            if (is_running) {
+                co_await stop_finished_watcher.coro_wait(false);
+            } else {
+                auto is_stopped = stop_finished_watcher.try_wait();
+                assert(is_stopped);
+            }
+            assert(false);
+            co_return;
+        }
+
+    protected:
+
+        template<typename Executor>
+        explicit coro_worker(Executor &executor)
+                :request_stop_signal(executor),
+                 stop_finished_signal(executor),
+                 request_stop_watcher(request_stop_signal.new_watcher(executor)),
+                 stop_finished_watcher(stop_finished_signal.new_watcher(executor)) {
+        }
+
+        signal_watcher request_stop_watcher;
+        signal_watcher stop_finished_watcher;
+
+        coro_signal2 request_stop_signal;
+        coro_signal2 stop_finished_signal;
+
+        bool is_running = false;
+
+    };
+
+    template<typename Executor, typename FuncType>
+    class coro_worker_impl : public coro_worker {
+    public:
+
+        static_assert(std::is_convertible_v<
+                decltype(std::declval<FuncType>()()),
+                boost::asio::awaitable<bool>>);
+
+        coro_worker_impl(Executor &_executor, FuncType &&_func)
+                : coro_worker(_executor),
+                  executor(_executor),
+                  func(std::forward<FuncType>(_func)) {}
+
+        ~coro_worker_impl() override = default;
+
+        void run() override {
+            assert(!is_running);
+            request_stop_watcher.sync();
+            stop_finished_watcher.sync();
+            boost::asio::co_spawn(executor, run_impl(), boost::asio::detached);
+        }
+
+    private:
+
+        using store_type = std::remove_cvref_t<FuncType>;
+        store_type func;
+
+        Executor &executor;
+
+        boost::asio::awaitable<void> run_impl() {
+            is_running = true;
+            auto closer = sg::make_scope_guard([this]() {
+                is_running = false;
+                stop_finished_signal.try_notify_all();
+            });
+            for (;;) {
+                using namespace boost::asio::experimental::awaitable_operators;
+                auto result = co_await (func() || request_stop_watcher.coro_wait(false));
+                if (result.index() != 0) break; // else index() == 0
+                if (std::get<0>(result) == false) break;
+            }
+            co_return;
+        }
+
+    };
+
+    template<typename Executor, typename FuncType>
+    auto make_infinite_coro_worker(Executor &executor, FuncType &&func) {
+        return coro_worker::pointer(new coro_worker_impl<Executor, FuncType>(
+                executor, std::forward<FuncType>(func)));
+    }
+
+    template<typename Executor, typename FuncType>
+    auto make_interval_coro_worker(Executor &executor, std::chrono::milliseconds interval, FuncType &&func) {
+        auto worker_func = [
+                interval,
+                func = std::forward<FuncType>(func),
+                timer = boost::asio::high_resolution_timer(executor)]() mutable
+                -> boost::asio::awaitable<bool> {
+            timer.expires_from_now(interval);
+            auto ret = co_await func();
+            co_await timer.async_wait(boost::asio::use_awaitable);
+            co_return ret;
+        };
+        return make_infinite_coro_worker(executor, std::move(worker_func));
+    }
+
+}
+
+#endif //SOPHIAR2_CORO_WORKER_HPP

+ 2 - 1
tests/CMakeLists.txt

@@ -17,8 +17,9 @@ target_link_libraries(test_core ${Boost_LIBRARIES} ${EXTRA_LIBS})
 
 add_executable(test_utility
         utility/coro_signal2.cpp
+        utility/coro_worker.cpp
         ${EXTERN_DEF_FILES}
         ${CORE_IMPL_FILES})
 target_compile_definitions(test_utility PUBLIC SOPHIAR_TEST)
-target_compile_definitions(test_utility PUBLIC CORO_SIGNAL2_USE_TIMER)
+#target_compile_definitions(test_utility PUBLIC CORO_SIGNAL2_USE_TIMER)
 target_link_libraries(test_utility ${Boost_LIBRARIES} ${EXTRA_LIBS})

+ 105 - 0
tests/utility/coro_worker.cpp

@@ -0,0 +1,105 @@
+#define BOOST_TEST_DYN_LINK
+
+#include "core/sophiar_obj.hpp"
+#include "utility/coro_worker.hpp"
+#include "utility/debug_utility.hpp"
+
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/asio/this_coro.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/test/unit_test.hpp>
+
+#include <chrono>
+#include <iostream>
+#include <vector>
+
+using boost::asio::awaitable;
+using boost::asio::co_spawn;
+using boost::asio::detached;
+using boost::asio::use_awaitable;
+
+using namespace sophiar;
+using namespace std::chrono_literals;
+
+awaitable<void> test_coro_worker_1() {
+    int cnt = 0;
+    auto worker = make_infinite_coro_worker(global_context, [&]() -> awaitable<bool> {
+        co_await coro_sleep(50ms);
+        ++cnt;
+        co_return true;
+    });
+    worker->run();
+    co_await coro_sleep(210ms);
+    worker->cancel();
+    co_await worker->coro_wait_stop();
+    BOOST_TEST(cnt == 4);
+    co_return;
+}
+
+awaitable<void> test_coro_worker_2() {
+    int cnt = 0;
+    auto worker = make_infinite_coro_worker(global_context, [&]() -> awaitable<bool> {
+        co_await coro_sleep(50ms);
+        ++cnt;
+        co_return false;
+    });
+    worker->run();
+    co_await coro_sleep(210ms);
+    worker->cancel();
+    co_await worker->coro_wait_stop();
+    BOOST_TEST(cnt == 1);
+    co_return;
+}
+
+awaitable<void> test_coro_worker_3() {
+    int cnt = 0;
+    auto worker = make_interval_coro_worker(global_context, 50ms, [&]() -> awaitable<bool> {
+        ++cnt;
+        co_return true;
+    });
+    worker->run();
+    co_await coro_sleep(190ms);
+    worker->cancel();
+    co_await worker->coro_wait_stop();
+    BOOST_TEST(cnt == 4);
+    co_return;
+}
+
+awaitable<void> test_coro_worker_4() {
+    int cnt = 0;
+    auto worker = make_interval_coro_worker(global_context, 50ms, [&]() -> awaitable<bool> {
+        ++cnt;
+        co_return false;
+    });
+    worker->run();
+    co_await coro_sleep(210ms);
+    worker->cancel();
+    co_await worker->coro_wait_stop();
+    BOOST_TEST(cnt == 1);
+    co_return;
+}
+
+awaitable<void> test_coro_worker_5() {
+    int cnt = 0;
+    auto worker = make_interval_coro_worker(global_context, 10ms, [&]() -> awaitable<bool> {
+        co_await coro_sleep(50ms);
+        ++cnt;
+        co_return true;
+    });
+    worker->run();
+    co_await coro_sleep(210ms);
+    worker->cancel();
+    co_await worker->coro_wait_stop();
+    BOOST_TEST(cnt == 4);
+    co_return;
+}
+
+BOOST_AUTO_TEST_CASE(test_coro_worker) {
+    co_spawn(global_context, test_coro_worker_1(), detached);
+    co_spawn(global_context, test_coro_worker_2(), detached);
+    co_spawn(global_context, test_coro_worker_3(), detached);
+    co_spawn(global_context, test_coro_worker_4(), detached);
+    co_spawn(global_context, test_coro_worker_5(), detached);
+    global_context.run();
+}