瀏覽代碼

Fixed ZeroMQ client and FastSAM module.

jcsyshc 1 年之前
父節點
當前提交
607a8768f1
共有 2 個文件被更改,包括 23 次插入15 次删除
  1. 7 4
      src/utility/coro_signal2.hpp
  2. 16 11
      src/utility/coro_worker.hpp

+ 7 - 4
src/utility/coro_signal2.hpp

@@ -43,7 +43,7 @@ namespace sophiar {
 
 
     public:
     public:
 
 
-        explicit signal_watcher(coro_signal2 *_sig)
+        explicit signal_watcher(coro_signal2 *_sig, boost::asio::io_context *ctx = global_context)
 
 
 #ifdef CORO_SIGNAL2_USE_TIMER
 #ifdef CORO_SIGNAL2_USE_TIMER
 
 
@@ -54,7 +54,7 @@ namespace sophiar {
 #else
 #else
 
 
         : sig(_sig),
         : sig(_sig),
-          chan(*global_context, 1) {
+          chan(*ctx, 1) {
     assert(sig != nullptr);
     assert(sig != nullptr);
 }
 }
 
 
@@ -121,12 +121,14 @@ namespace sophiar {
 
 
 #else
 #else
 
 
-        coro_signal2() = default;
+        explicit coro_signal2(boost::asio::io_context *ctx = global_context) {
+            run_ctx = ctx;
+        }
 
 
 #endif
 #endif
 
 
         auto new_watcher() {
         auto new_watcher() {
-            return signal_watcher{this};
+            return signal_watcher{this, run_ctx};
         }
         }
 
 
         void try_notify_all(timestamp_type ts = current_timestamp()) {
         void try_notify_all(timestamp_type ts = current_timestamp()) {
@@ -161,6 +163,7 @@ namespace sophiar {
         friend class signal_watcher;
         friend class signal_watcher;
 
 
         timestamp_type last_notify_ts = 0;
         timestamp_type last_notify_ts = 0;
+        boost::asio::io_context *run_ctx = nullptr;
 
 
 #ifdef CORO_SIGNAL2_USE_TIMER
 #ifdef CORO_SIGNAL2_USE_TIMER
 
 

+ 16 - 11
src/utility/coro_worker.hpp

@@ -31,7 +31,7 @@ namespace sophiar {
         void run() {
         void run() {
             assert(!is_running);
             assert(!is_running);
             request_stop_watcher.sync();
             request_stop_watcher.sync();
-            boost::asio::co_spawn(*global_context, run_impl(), boost::asio::detached);
+            boost::asio::co_spawn(*run_ctx, run_impl(), boost::asio::detached);
         }
         }
 
 
         void cancel() {
         void cancel() {
@@ -49,16 +49,18 @@ namespace sophiar {
 
 
     protected:
     protected:
 
 
-        explicit coro_worker()
-                : request_stop_signal(),
-                  stop_finished_signal(),
+        explicit coro_worker(boost::asio::io_context *ctx)
+                : request_stop_signal(ctx),
+                  stop_finished_signal(ctx),
                   request_stop_watcher(request_stop_signal.new_watcher()) {
                   request_stop_watcher(request_stop_signal.new_watcher()) {
+            run_ctx = ctx;
         }
         }
 
 
         coro_signal2 request_stop_signal;
         coro_signal2 request_stop_signal;
         coro_signal2 stop_finished_signal;
         coro_signal2 stop_finished_signal;
         signal_watcher request_stop_watcher;
         signal_watcher request_stop_watcher;
 
 
+        boost::asio::io_context *run_ctx = nullptr;
         bool is_running = false;
         bool is_running = false;
 
 
         virtual boost::asio::awaitable<void> run_impl() = 0;
         virtual boost::asio::awaitable<void> run_impl() = 0;
@@ -76,8 +78,9 @@ namespace sophiar {
         static_assert(std::is_void<
         static_assert(std::is_void<
                 decltype(std::declval<ExitFuncType>()())>());
                 decltype(std::declval<ExitFuncType>()())>());
 
 
-        coro_worker_impl(FuncType &&_func, ExitFuncType &&_exit_func)
-                : coro_worker(),
+        coro_worker_impl(FuncType &&_func, ExitFuncType &&_exit_func,
+                         boost::asio::io_context *ctx)
+                : coro_worker(ctx),
                   func(std::forward<FuncType>(_func)),
                   func(std::forward<FuncType>(_func)),
                   exit_func(std::forward<ExitFuncType>(_exit_func)) {}
                   exit_func(std::forward<ExitFuncType>(_exit_func)) {}
 
 
@@ -111,20 +114,22 @@ namespace sophiar {
     template<typename FuncType,
     template<typename FuncType,
             typename ExitFuncType = decltype(coro_worker::empty_func) const &>
             typename ExitFuncType = decltype(coro_worker::empty_func) const &>
     inline auto make_infinite_coro_worker(FuncType &&func,
     inline auto make_infinite_coro_worker(FuncType &&func,
-                                          ExitFuncType &&exit_func = coro_worker::empty_func) {
+                                          ExitFuncType &&exit_func = coro_worker::empty_func,
+                                          boost::asio::io_context *ctx = global_context) {
         return coro_worker::pointer(new coro_worker_impl<FuncType, ExitFuncType>(
         return coro_worker::pointer(new coro_worker_impl<FuncType, ExitFuncType>(
-                std::forward<FuncType>(func), std::forward<ExitFuncType>(exit_func))
+                std::forward<FuncType>(func), std::forward<ExitFuncType>(exit_func), ctx)
         );
         );
     }
     }
 
 
     template<typename FuncType,
     template<typename FuncType,
             typename ExitFuncType = decltype(coro_worker::empty_func) const &>
             typename ExitFuncType = decltype(coro_worker::empty_func) const &>
     inline auto make_interval_coro_worker(std::chrono::milliseconds interval,
     inline auto make_interval_coro_worker(std::chrono::milliseconds interval,
-                                          FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func) {
+                                          FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func,
+                                          boost::asio::io_context *ctx = global_context) {
         auto worker_func = [
         auto worker_func = [
                 interval,
                 interval,
                 func = std::forward<FuncType>(func),
                 func = std::forward<FuncType>(func),
-                timer = boost::asio::high_resolution_timer(*global_context)]() mutable
+                timer = boost::asio::high_resolution_timer(*ctx)]() mutable
                 -> boost::asio::awaitable<bool> {
                 -> boost::asio::awaitable<bool> {
             timer.expires_from_now(interval);
             timer.expires_from_now(interval);
             auto ret = co_await func();
             auto ret = co_await func();
@@ -132,7 +137,7 @@ namespace sophiar {
             co_return ret;
             co_return ret;
         };
         };
         return make_infinite_coro_worker(std::move(worker_func),
         return make_infinite_coro_worker(std::move(worker_func),
-                                         std::forward<ExitFuncType>(exit_func));
+                                         std::forward<ExitFuncType>(exit_func), ctx);
     }
     }
 
 
     static constexpr auto empty_exception_handler = [](std::exception &) {};
     static constexpr auto empty_exception_handler = [](std::exception &) {};