| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #ifndef SOPHIAR2_CORO_WORKER_HPP
- #define SOPHIAR2_CORO_WORKER_HPP
- #include "core/global_defs.h"
- #include "third_party/scope_guard.hpp"
- #include "utility/coro_signal2.hpp"
- #include <boost/asio/awaitable.hpp>
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/experimental/awaitable_operators.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <chrono>
- #include <memory>
- #include <type_traits>
- namespace sophiar {
- class coro_worker {
- public:
- using pointer = std::unique_ptr<coro_worker>;
- static constexpr auto empty_func = []() {};
- virtual ~coro_worker() {
- assert(!is_running);
- }
- void run() {
- assert(!is_running);
- request_stop_watcher.sync();
- boost::asio::co_spawn(*run_ctx, run_impl(), boost::asio::detached);
- }
- void cancel() {
- if (!is_running) return;
- request_stop_signal.try_notify_all();
- }
- boost::asio::awaitable<void> coro_wait_stop() {
- if (!is_running) co_return;
- auto stop_finished_watcher = stop_finished_signal.new_watcher();
- co_await stop_finished_watcher.coro_wait();
- assert(!is_running);
- co_return;
- }
- protected:
- explicit coro_worker(boost::asio::io_context *ctx)
- : request_stop_signal(ctx),
- stop_finished_signal(ctx),
- request_stop_watcher(request_stop_signal.new_watcher()) {
- run_ctx = ctx;
- }
- coro_signal2 request_stop_signal;
- coro_signal2 stop_finished_signal;
- signal_watcher request_stop_watcher;
- boost::asio::io_context *run_ctx = nullptr;
- bool is_running = false;
- virtual boost::asio::awaitable<void> run_impl() = 0;
- };
- template<typename FuncType, typename ExitFuncType>
- class coro_worker_impl : public coro_worker {
- public:
- static_assert(std::is_convertible_v<
- decltype(std::declval<FuncType>()()),
- boost::asio::awaitable<bool>>);
- static_assert(std::is_void<
- decltype(std::declval<ExitFuncType>()())>());
- coro_worker_impl(FuncType &&_func, ExitFuncType &&_exit_func,
- boost::asio::io_context *ctx)
- : coro_worker(ctx),
- func(std::forward<FuncType>(_func)),
- exit_func(std::forward<ExitFuncType>(_exit_func)) {}
- ~coro_worker_impl() override = default;
- private:
- using func_store_type = std::remove_cvref_t<FuncType>;
- using exit_func_store_type = std::remove_cvref_t<ExitFuncType>;
- func_store_type func;
- exit_func_store_type exit_func;
- boost::asio::awaitable<void> run_impl() override {
- is_running = true;
- auto closer = sg::make_scope_guard([this]() {
- is_running = false;
- stop_finished_signal.try_notify_all();
- exit_func();
- });
- for (;;) {
- using namespace boost::asio::experimental::awaitable_operators;
- auto result = co_await (func() || request_stop_watcher.coro_wait(false));
- if (result.index() != 0) break; // else index() == 0
- if (std::get<0>(result) == false) break;
- }
- co_return;
- }
- };
- template<typename FuncType,
- typename ExitFuncType = decltype(coro_worker::empty_func) const &>
- inline auto make_infinite_coro_worker(FuncType &&func,
- ExitFuncType &&exit_func = coro_worker::empty_func,
- boost::asio::io_context *ctx = global_context) {
- return coro_worker::pointer(new coro_worker_impl<FuncType, ExitFuncType>(
- std::forward<FuncType>(func), std::forward<ExitFuncType>(exit_func), ctx)
- );
- }
- template<typename FuncType,
- typename ExitFuncType = decltype(coro_worker::empty_func) const &>
- inline auto make_interval_coro_worker(std::chrono::milliseconds interval,
- FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func,
- boost::asio::io_context *ctx = global_context) {
- auto worker_func = [
- interval,
- func = std::forward<FuncType>(func),
- timer = boost::asio::high_resolution_timer(*ctx)]() mutable
- -> boost::asio::awaitable<bool> {
- timer.expires_after(interval);
- auto ret = co_await func();
- co_await timer.async_wait(boost::asio::use_awaitable);
- co_return ret;
- };
- return make_infinite_coro_worker(std::move(worker_func),
- std::forward<ExitFuncType>(exit_func), ctx);
- }
- static constexpr auto empty_exception_handler = [](std::exception &) {};
- template<typename FuncType,
- typename ErrorHandlerType = decltype(empty_exception_handler) const &>
- inline auto make_noexcept_func(FuncType &&func,
- ErrorHandlerType &&error_handler = empty_exception_handler) {
- static_assert(std::is_convertible_v<
- decltype(std::declval<FuncType>()()),
- boost::asio::awaitable<bool>>);
- static_assert(std::is_void<
- decltype(std::declval<ErrorHandlerType>()(
- std::declval<std::exception &>()))>());
- auto noexcept_func = [
- real_func = std::forward<FuncType>(func),
- error_handler = std::forward<ErrorHandlerType>(error_handler)]() mutable noexcept
- -> boost::asio::awaitable<bool> {
- try {
- auto ok = co_await real_func();
- co_return ok;
- } catch (std::exception &e) {
- error_handler(e);
- co_return false;
- }
- assert(false);
- co_return false;
- };
- return std::move(noexcept_func);
- }
- }
- #endif //SOPHIAR2_CORO_WORKER_HPP
|