| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- #ifndef SOPHIAR2_CORO_WORKER_HPP
- #define SOPHIAR2_CORO_WORKER_HPP
- #include "third_party/scope_guard.hpp"
- #include "utility/coro_signal2.hpp"
- #include <boost/asio/awaitable.hpp>
- #include <boost/asio/co_spawn.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/experimental/awaitable_operators.hpp>
- #include <boost/asio/high_resolution_timer.hpp>
- #include <chrono>
- #include <memory>
- #include <type_traits>
- namespace sophiar {
- class coro_worker {
- public:
- using pointer = std::shared_ptr<coro_worker>;
- virtual ~coro_worker() {
- assert(!is_running);
- }
- virtual void run() = 0;
- void cancel() {
- if (!is_running) return;
- request_stop_signal.try_notify_all();
- }
- virtual boost::asio::awaitable<void> coro_wait_stop() = 0;
- protected:
- template<typename Executor>
- explicit coro_worker(Executor &executor)
- :request_stop_signal(executor),
- stop_finished_signal(executor),
- request_stop_watcher(request_stop_signal.new_watcher(executor)) {
- }
- coro_signal2 request_stop_signal;
- coro_signal2 stop_finished_signal;
- signal_watcher request_stop_watcher;
- bool is_running = false;
- };
- template<typename Executor, typename FuncType>
- class coro_worker_impl : public coro_worker {
- public:
- static_assert(std::is_convertible_v<
- decltype(std::declval<FuncType>()()),
- boost::asio::awaitable<bool>>);
- coro_worker_impl(Executor &_executor, FuncType &&_func)
- : coro_worker(_executor),
- executor(_executor),
- func(std::forward<FuncType>(_func)) {}
- ~coro_worker_impl() override = default;
- void run() override {
- assert(!is_running);
- request_stop_watcher.sync();
- boost::asio::co_spawn(executor, run_impl(), boost::asio::detached);
- }
- boost::asio::awaitable<void> coro_wait_stop() override {
- if (!is_running) co_return;
- auto stop_finished_watcher = stop_finished_signal.new_watcher(executor);
- co_await stop_finished_watcher.coro_wait();
- assert(!is_running);
- co_return;
- }
- private:
- using store_type = std::remove_cvref_t<FuncType>;
- store_type func;
- Executor &executor;
- boost::asio::awaitable<void> run_impl() {
- is_running = true;
- auto closer = sg::make_scope_guard([this]() {
- is_running = false;
- stop_finished_signal.try_notify_all();
- });
- for (;;) {
- using namespace boost::asio::experimental::awaitable_operators;
- auto result = co_await (func() || request_stop_watcher.coro_wait(false));
- if (result.index() != 0) break; // else index() == 0
- if (std::get<0>(result) == false) break;
- }
- co_return;
- }
- };
- template<typename Executor, typename FuncType>
- auto make_infinite_coro_worker(Executor &executor, FuncType &&func) {
- return coro_worker::pointer(new coro_worker_impl<Executor, FuncType>(
- executor, std::forward<FuncType>(func)));
- }
- template<typename Executor, typename FuncType>
- auto make_interval_coro_worker(Executor &executor, std::chrono::milliseconds interval, FuncType &&func) {
- auto worker_func = [
- interval,
- func = std::forward<FuncType>(func),
- timer = boost::asio::high_resolution_timer(executor)]() mutable
- -> boost::asio::awaitable<bool> {
- timer.expires_from_now(interval);
- auto ret = co_await func();
- co_await timer.async_wait(boost::asio::use_awaitable);
- co_return ret;
- };
- return make_infinite_coro_worker(executor, std::move(worker_func));
- }
- }
- #endif //SOPHIAR2_CORO_WORKER_HPP
|