coro_signal2.hpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. #ifndef SOPHIAR2_CORO_SIGNAL2_HPP
  2. #define SOPHIAR2_CORO_SIGNAL2_HPP
  3. #include "core/global_defs.h"
  4. #include "core/timestamp_helper.hpp"
  5. #include <boost/asio/awaitable.hpp>
  6. #include <boost/asio/use_awaitable.hpp>
  7. #include <boost/core/noncopyable.hpp>
  8. #ifdef CORO_SIGNAL2_USE_TIMER // timer based implementation
  9. #include <boost/asio/deadline_timer.hpp>
  10. #include <boost/asio/redirect_error.hpp>
  11. #else // channel based implementation, ~ 6% faster than timer
  12. #include "third_party/scope_guard.hpp"
  13. #include <boost/asio/experimental/channel.hpp>
  14. #include <boost/intrusive/list.hpp>
  15. #endif
  16. namespace sophiar {
  17. class coro_signal2;
  18. #ifdef CORO_SIGNAL2_USE_TIMER
  19. class signal_watcher {
  20. #else
  21. using singal_watcher_base =
  22. boost::intrusive::list_base_hook<
  23. boost::intrusive::link_mode<
  24. boost::intrusive::auto_unlink>>;
  25. class signal_watcher : public singal_watcher_base {
  26. #endif
  27. public:
  28. explicit signal_watcher(coro_signal2 *_sig, boost::asio::io_context *ctx = global_context)
  29. #ifdef CORO_SIGNAL2_USE_TIMER
  30. : sig(_sig) {
  31. assert(sig != nullptr);
  32. }
  33. #else
  34. : sig(_sig),
  35. chan(*ctx, 1) {
  36. assert(sig != nullptr);
  37. }
  38. #endif
  39. signal_watcher(signal_watcher &&other) noexcept
  40. #ifdef CORO_SIGNAL2_USE_TIMER
  41. : sig(other.sig),
  42. last_watch_ts(other.last_watch_ts) {}
  43. #else
  44. : sig(other.sig),
  45. chan(std::move(other.chan)),
  46. last_watch_ts(other.last_watch_ts) {
  47. assert(!other.is_linked());
  48. }
  49. #endif
  50. bool try_wait();
  51. boost::asio::awaitable<void> coro_wait(bool auto_sync = true);
  52. void sync();
  53. timestamp_type get_last_update_ts() const {
  54. return last_watch_ts;
  55. }
  56. private:
  57. friend class coro_signal2;
  58. #ifndef CORO_SIGNAL2_USE_TIMER
  59. using channel_type = boost::asio::experimental::channel<
  60. void(boost::system::error_code, bool)>;
  61. channel_type chan;
  62. #endif
  63. timestamp_type last_watch_ts = 0;
  64. coro_signal2 *sig;
  65. };
  66. class coro_signal2 : private boost::noncopyable {
  67. public:
  68. // if coro signal is moved, signal watcher will not work
  69. coro_signal2(coro_signal2 &&other) noexcept = delete;
  70. #ifdef CORO_SIGNAL2_USE_TIMER
  71. coro_signal2()
  72. : timer(*global_context) {
  73. timer.expires_at(boost::posix_time::pos_infin);
  74. }
  75. #else
  76. explicit coro_signal2(boost::asio::io_context *ctx = global_context) {
  77. run_ctx = ctx;
  78. }
  79. #endif
  80. auto new_watcher() {
  81. return signal_watcher{this, run_ctx};
  82. }
  83. void try_notify_all(timestamp_type ts = current_timestamp()) {
  84. last_notify_ts = ts;
  85. #ifdef CORO_SIGNAL2_USE_TIMER
  86. timer.cancel();
  87. #else
  88. list_type requeue_list;
  89. while (!watcher_list.empty()) {
  90. auto &node = watcher_list.front();
  91. assert(node.is_linked());
  92. node.unlink();
  93. if (node.last_watch_ts < ts) {
  94. assert(!node.chan.ready());
  95. node.chan.try_send(boost::system::error_code{}, true);
  96. } else {
  97. requeue_list.push_back(node);
  98. }
  99. }
  100. watcher_list.swap(requeue_list);
  101. #endif
  102. }
  103. private:
  104. friend class signal_watcher;
  105. timestamp_type last_notify_ts = 0;
  106. boost::asio::io_context *run_ctx = nullptr;
  107. #ifdef CORO_SIGNAL2_USE_TIMER
  108. boost::asio::deadline_timer timer;
  109. #else
  110. using list_type =
  111. boost::intrusive::list<
  112. signal_watcher,
  113. boost::intrusive::constant_time_size<false>>;
  114. list_type watcher_list;
  115. #endif
  116. };
  117. inline bool signal_watcher::try_wait() {
  118. if (last_watch_ts < sig->last_notify_ts) {
  119. sync();
  120. return true;
  121. }
  122. return false;
  123. }
  124. inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
  125. #ifndef CORO_SIGNAL2_USE_TIMER
  126. assert(!chan.ready());
  127. #endif
  128. if (auto_sync) {
  129. sync();
  130. } else {
  131. if (last_watch_ts < sig->last_notify_ts) {
  132. sync();
  133. co_return;
  134. }
  135. }
  136. #ifndef CORO_SIGNAL2_USE_TIMER
  137. auto closer = sg::make_scope_guard([&]() {
  138. if (is_linked()) unlink();
  139. if (chan.ready()) chan.reset();
  140. });
  141. assert(!is_linked());
  142. sig->watcher_list.push_back(*this);
  143. #endif
  144. for (;;) {
  145. #ifdef CORO_SIGNAL2_USE_TIMER
  146. boost::system::error_code ec;
  147. co_await sig->timer.async_wait(
  148. boost::asio::redirect_error(boost::asio::use_awaitable, ec));
  149. assert(ec == boost::asio::error::operation_aborted);
  150. #else
  151. co_await chan.async_receive(boost::asio::use_awaitable);
  152. #endif
  153. if (last_watch_ts < sig->last_notify_ts) break;
  154. }
  155. sync();
  156. co_return;
  157. }
  158. inline void signal_watcher::sync() {
  159. last_watch_ts = sig->last_notify_ts;
  160. }
  161. }
  162. #endif //SOPHIAR2_CORO_SIGNAL2_HPP