coro_worker.hpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #ifndef SOPHIAR2_CORO_WORKER_HPP
  2. #define SOPHIAR2_CORO_WORKER_HPP
  3. #include "core/global_defs.h"
  4. #include "third_party/scope_guard.hpp"
  5. #include "utility/coro_signal2.hpp"
  6. #include <boost/asio/awaitable.hpp>
  7. #include <boost/asio/co_spawn.hpp>
  8. #include <boost/asio/detached.hpp>
  9. #include <boost/asio/experimental/awaitable_operators.hpp>
  10. #include <boost/asio/high_resolution_timer.hpp>
  11. #include <chrono>
  12. #include <memory>
  13. #include <type_traits>
  14. namespace sophiar {
  15. class coro_worker {
  16. public:
  17. using pointer = std::unique_ptr<coro_worker>;
  18. static constexpr auto empty_func = []() {};
  19. virtual ~coro_worker() {
  20. assert(!is_running);
  21. }
  22. void run() {
  23. assert(!is_running);
  24. request_stop_watcher.sync();
  25. boost::asio::co_spawn(*run_ctx, run_impl(), boost::asio::detached);
  26. }
  27. void cancel() {
  28. if (!is_running) return;
  29. request_stop_signal.try_notify_all();
  30. }
  31. boost::asio::awaitable<void> coro_wait_stop() {
  32. if (!is_running) co_return;
  33. auto stop_finished_watcher = stop_finished_signal.new_watcher();
  34. co_await stop_finished_watcher.coro_wait();
  35. assert(!is_running);
  36. co_return;
  37. }
  38. protected:
  39. explicit coro_worker(boost::asio::io_context *ctx)
  40. : request_stop_signal(ctx),
  41. stop_finished_signal(ctx),
  42. request_stop_watcher(request_stop_signal.new_watcher()) {
  43. run_ctx = ctx;
  44. }
  45. coro_signal2 request_stop_signal;
  46. coro_signal2 stop_finished_signal;
  47. signal_watcher request_stop_watcher;
  48. boost::asio::io_context *run_ctx = nullptr;
  49. bool is_running = false;
  50. virtual boost::asio::awaitable<void> run_impl() = 0;
  51. };
  52. template<typename FuncType, typename ExitFuncType>
  53. class coro_worker_impl : public coro_worker {
  54. public:
  55. static_assert(std::is_convertible_v<
  56. decltype(std::declval<FuncType>()()),
  57. boost::asio::awaitable<bool>>);
  58. static_assert(std::is_void<
  59. decltype(std::declval<ExitFuncType>()())>());
  60. coro_worker_impl(FuncType &&_func, ExitFuncType &&_exit_func,
  61. boost::asio::io_context *ctx)
  62. : coro_worker(ctx),
  63. func(std::forward<FuncType>(_func)),
  64. exit_func(std::forward<ExitFuncType>(_exit_func)) {}
  65. ~coro_worker_impl() override = default;
  66. private:
  67. using func_store_type = std::remove_cvref_t<FuncType>;
  68. using exit_func_store_type = std::remove_cvref_t<ExitFuncType>;
  69. func_store_type func;
  70. exit_func_store_type exit_func;
  71. boost::asio::awaitable<void> run_impl() override {
  72. is_running = true;
  73. auto closer = sg::make_scope_guard([this]() {
  74. is_running = false;
  75. stop_finished_signal.try_notify_all();
  76. exit_func();
  77. });
  78. for (;;) {
  79. using namespace boost::asio::experimental::awaitable_operators;
  80. auto result = co_await (func() || request_stop_watcher.coro_wait(false));
  81. if (result.index() != 0) break; // else index() == 0
  82. if (std::get<0>(result) == false) break;
  83. }
  84. co_return;
  85. }
  86. };
  87. template<typename FuncType,
  88. typename ExitFuncType = decltype(coro_worker::empty_func) const &>
  89. inline auto make_infinite_coro_worker(FuncType &&func,
  90. ExitFuncType &&exit_func = coro_worker::empty_func,
  91. boost::asio::io_context *ctx = global_context) {
  92. return coro_worker::pointer(new coro_worker_impl<FuncType, ExitFuncType>(
  93. std::forward<FuncType>(func), std::forward<ExitFuncType>(exit_func), ctx)
  94. );
  95. }
  96. template<typename FuncType,
  97. typename ExitFuncType = decltype(coro_worker::empty_func) const &>
  98. inline auto make_interval_coro_worker(std::chrono::milliseconds interval,
  99. FuncType &&func, ExitFuncType &&exit_func = coro_worker::empty_func,
  100. boost::asio::io_context *ctx = global_context) {
  101. auto worker_func = [
  102. interval,
  103. func = std::forward<FuncType>(func),
  104. timer = boost::asio::high_resolution_timer(*ctx)]() mutable
  105. -> boost::asio::awaitable<bool> {
  106. timer.expires_after(interval);
  107. auto ret = co_await func();
  108. co_await timer.async_wait(boost::asio::use_awaitable);
  109. co_return ret;
  110. };
  111. return make_infinite_coro_worker(std::move(worker_func),
  112. std::forward<ExitFuncType>(exit_func), ctx);
  113. }
  114. static constexpr auto empty_exception_handler = [](std::exception &) {};
  115. template<typename FuncType,
  116. typename ErrorHandlerType = decltype(empty_exception_handler) const &>
  117. inline auto make_noexcept_func(FuncType &&func,
  118. ErrorHandlerType &&error_handler = empty_exception_handler) {
  119. static_assert(std::is_convertible_v<
  120. decltype(std::declval<FuncType>()()),
  121. boost::asio::awaitable<bool>>);
  122. static_assert(std::is_void<
  123. decltype(std::declval<ErrorHandlerType>()(
  124. std::declval<std::exception &>()))>());
  125. auto noexcept_func = [
  126. real_func = std::forward<FuncType>(func),
  127. error_handler = std::forward<ErrorHandlerType>(error_handler)]() mutable noexcept
  128. -> boost::asio::awaitable<bool> {
  129. try {
  130. auto ok = co_await real_func();
  131. co_return ok;
  132. } catch (std::exception &e) {
  133. error_handler(e);
  134. co_return false;
  135. }
  136. assert(false);
  137. co_return false;
  138. };
  139. return std::move(noexcept_func);
  140. }
  141. }
  142. #endif //SOPHIAR2_CORO_WORKER_HPP