coro_worker.hpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #ifndef SOPHIAR2_CORO_WORKER_HPP
  2. #define SOPHIAR2_CORO_WORKER_HPP
  3. #include "third_party/scope_guard.hpp"
  4. #include "utility/coro_signal2.hpp"
  5. #include <boost/asio/awaitable.hpp>
  6. #include <boost/asio/co_spawn.hpp>
  7. #include <boost/asio/detached.hpp>
  8. #include <boost/asio/experimental/awaitable_operators.hpp>
  9. #include <boost/asio/high_resolution_timer.hpp>
  10. #include <chrono>
  11. #include <memory>
  12. #include <type_traits>
  13. namespace sophiar {
  14. class coro_worker {
  15. public:
  16. using pointer = std::shared_ptr<coro_worker>;
  17. virtual ~coro_worker() {
  18. assert(!is_running);
  19. }
  20. virtual void run() = 0;
  21. void cancel() {
  22. if (!is_running) return;
  23. request_stop_signal.try_notify_all();
  24. }
  25. virtual boost::asio::awaitable<void> coro_wait_stop() = 0;
  26. protected:
  27. template<typename Executor>
  28. explicit coro_worker(Executor &executor)
  29. :request_stop_signal(executor),
  30. stop_finished_signal(executor),
  31. request_stop_watcher(request_stop_signal.new_watcher(executor)) {
  32. }
  33. coro_signal2 request_stop_signal;
  34. coro_signal2 stop_finished_signal;
  35. signal_watcher request_stop_watcher;
  36. bool is_running = false;
  37. };
  38. template<typename Executor, typename FuncType>
  39. class coro_worker_impl : public coro_worker {
  40. public:
  41. static_assert(std::is_convertible_v<
  42. decltype(std::declval<FuncType>()()),
  43. boost::asio::awaitable<bool>>);
  44. coro_worker_impl(Executor &_executor, FuncType &&_func)
  45. : coro_worker(_executor),
  46. executor(_executor),
  47. func(std::forward<FuncType>(_func)) {}
  48. ~coro_worker_impl() override = default;
  49. void run() override {
  50. assert(!is_running);
  51. request_stop_watcher.sync();
  52. boost::asio::co_spawn(executor, run_impl(), boost::asio::detached);
  53. }
  54. boost::asio::awaitable<void> coro_wait_stop() override {
  55. if (!is_running) co_return;
  56. auto stop_finished_watcher = stop_finished_signal.new_watcher(executor);
  57. co_await stop_finished_watcher.coro_wait();
  58. assert(!is_running);
  59. co_return;
  60. }
  61. private:
  62. using store_type = std::remove_cvref_t<FuncType>;
  63. store_type func;
  64. Executor &executor;
  65. boost::asio::awaitable<void> run_impl() {
  66. is_running = true;
  67. auto closer = sg::make_scope_guard([this]() {
  68. is_running = false;
  69. stop_finished_signal.try_notify_all();
  70. });
  71. for (;;) {
  72. using namespace boost::asio::experimental::awaitable_operators;
  73. auto result = co_await (func() || request_stop_watcher.coro_wait(false));
  74. if (result.index() != 0) break; // else index() == 0
  75. if (std::get<0>(result) == false) break;
  76. }
  77. co_return;
  78. }
  79. };
  80. template<typename Executor, typename FuncType>
  81. auto make_infinite_coro_worker(Executor &executor, FuncType &&func) {
  82. return coro_worker::pointer(new coro_worker_impl<Executor, FuncType>(
  83. executor, std::forward<FuncType>(func)));
  84. }
  85. template<typename Executor, typename FuncType>
  86. auto make_interval_coro_worker(Executor &executor, std::chrono::milliseconds interval, FuncType &&func) {
  87. auto worker_func = [
  88. interval,
  89. func = std::forward<FuncType>(func),
  90. timer = boost::asio::high_resolution_timer(executor)]() mutable
  91. -> boost::asio::awaitable<bool> {
  92. timer.expires_from_now(interval);
  93. auto ret = co_await func();
  94. co_await timer.async_wait(boost::asio::use_awaitable);
  95. co_return ret;
  96. };
  97. return make_infinite_coro_worker(executor, std::move(worker_func));
  98. }
  99. }
  100. #endif //SOPHIAR2_CORO_WORKER_HPP