#ifndef SOPHIAR2_CORO_WORKER_HPP #define SOPHIAR2_CORO_WORKER_HPP #include "core/global_defs.h" #include "third_party/scope_guard.hpp" #include "utility/coro_signal2.hpp" #include #include #include #include #include #include #include #include namespace sophiar { class coro_worker { public: using pointer = std::unique_ptr; static constexpr auto empty_func = []() {}; virtual ~coro_worker() { assert(!is_running); } void run() { assert(!is_running); request_stop_watcher.sync(); boost::asio::co_spawn(*run_ctx, run_impl(), boost::asio::detached); } void cancel() { if (!is_running) return; request_stop_signal.try_notify_all(); } boost::asio::awaitable coro_wait_stop() { if (!is_running) co_return; auto stop_finished_watcher = stop_finished_signal.new_watcher(); co_await stop_finished_watcher.coro_wait(); assert(!is_running); co_return; } protected: explicit coro_worker(boost::asio::io_context *ctx) : request_stop_signal(ctx), stop_finished_signal(ctx), request_stop_watcher(request_stop_signal.new_watcher()) { run_ctx = ctx; } coro_signal2 request_stop_signal; coro_signal2 stop_finished_signal; signal_watcher request_stop_watcher; boost::asio::io_context *run_ctx = nullptr; bool is_running = false; virtual boost::asio::awaitable run_impl() = 0; }; template class coro_worker_impl : public coro_worker { public: static_assert(std::is_convertible_v< decltype(std::declval()()), boost::asio::awaitable>); static_assert(std::is_void< decltype(std::declval()())>()); coro_worker_impl(FuncType &&_func, ExitFuncType &&_exit_func, boost::asio::io_context *ctx) : coro_worker(ctx), func(std::forward(_func)), exit_func(std::forward(_exit_func)) {} ~coro_worker_impl() override = default; private: using func_store_type = std::remove_cvref_t; using exit_func_store_type = std::remove_cvref_t; func_store_type func; exit_func_store_type exit_func; boost::asio::awaitable run_impl() override { is_running = true; auto closer = sg::make_scope_guard([this]() { is_running = false; stop_finished_signal.try_notify_all(); exit_func(); }); for (;;) { using namespace boost::asio::experimental::awaitable_operators; auto result = co_await (func() || request_stop_watcher.coro_wait(false)); if (result.index() != 0) break; // else index() == 0 if (std::get<0>(result) == false) break; } co_return; } }; template inline auto make_infinite_coro_worker(FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func, boost::asio::io_context *ctx = global_context) { return coro_worker::pointer(new coro_worker_impl( std::forward(func), std::forward(exit_func), ctx) ); } template inline auto make_interval_coro_worker(std::chrono::milliseconds interval, FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func, boost::asio::io_context *ctx = global_context) { auto worker_func = [ interval, func = std::forward(func), timer = boost::asio::high_resolution_timer(*ctx)]() mutable -> boost::asio::awaitable { timer.expires_after(interval); auto ret = co_await func(); co_await timer.async_wait(boost::asio::use_awaitable); co_return ret; }; return make_infinite_coro_worker(std::move(worker_func), std::forward(exit_func), ctx); } static constexpr auto empty_exception_handler = [](std::exception &) {}; template inline auto make_noexcept_func(FuncType &&func, ErrorHandlerType &&error_handler = empty_exception_handler) { static_assert(std::is_convertible_v< decltype(std::declval()()), boost::asio::awaitable>); static_assert(std::is_void< decltype(std::declval()( std::declval()))>()); auto noexcept_func = [ real_func = std::forward(func), error_handler = std::forward(error_handler)]() mutable noexcept -> boost::asio::awaitable { try { auto ok = co_await real_func(); co_return ok; } catch (std::exception &e) { error_handler(e); co_return false; } assert(false); co_return false; }; return std::move(noexcept_func); } } #endif //SOPHIAR2_CORO_WORKER_HPP