#ifndef SOPHIAR2_CORO_WORKER_HPP #define SOPHIAR2_CORO_WORKER_HPP #include "third_party/scope_guard.hpp" #include "utility/coro_signal2.hpp" #include #include #include #include #include #include #include #include namespace sophiar { class coro_worker { public: using pointer = std::shared_ptr; virtual ~coro_worker() { assert(!is_running); } virtual void run() = 0; void cancel() { if (!is_running) return; request_stop_signal.try_notify_all(); } virtual boost::asio::awaitable coro_wait_stop() = 0; protected: template explicit coro_worker(Executor &executor) :request_stop_signal(executor), stop_finished_signal(executor), request_stop_watcher(request_stop_signal.new_watcher(executor)) { } coro_signal2 request_stop_signal; coro_signal2 stop_finished_signal; signal_watcher request_stop_watcher; bool is_running = false; }; template class coro_worker_impl : public coro_worker { public: static_assert(std::is_convertible_v< decltype(std::declval()()), boost::asio::awaitable>); coro_worker_impl(Executor &_executor, FuncType &&_func) : coro_worker(_executor), executor(_executor), func(std::forward(_func)) {} ~coro_worker_impl() override = default; void run() override { assert(!is_running); request_stop_watcher.sync(); boost::asio::co_spawn(executor, run_impl(), boost::asio::detached); } boost::asio::awaitable coro_wait_stop() override { if (!is_running) co_return; auto stop_finished_watcher = stop_finished_signal.new_watcher(executor); co_await stop_finished_watcher.coro_wait(); assert(!is_running); co_return; } private: using store_type = std::remove_cvref_t; store_type func; Executor &executor; boost::asio::awaitable run_impl() { is_running = true; auto closer = sg::make_scope_guard([this]() { is_running = false; stop_finished_signal.try_notify_all(); }); for (;;) { using namespace boost::asio::experimental::awaitable_operators; auto result = co_await (func() || request_stop_watcher.coro_wait(false)); if (result.index() != 0) break; // else index() == 0 if (std::get<0>(result) == false) break; } co_return; } }; template auto make_infinite_coro_worker(Executor &executor, FuncType &&func) { return coro_worker::pointer(new coro_worker_impl( executor, std::forward(func))); } template auto make_interval_coro_worker(Executor &executor, std::chrono::milliseconds interval, FuncType &&func) { auto worker_func = [ interval, func = std::forward(func), timer = boost::asio::high_resolution_timer(executor)]() mutable -> boost::asio::awaitable { timer.expires_from_now(interval); auto ret = co_await func(); co_await timer.async_wait(boost::asio::use_awaitable); co_return ret; }; return make_infinite_coro_worker(executor, std::move(worker_func)); } } #endif //SOPHIAR2_CORO_WORKER_HPP