| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- #ifndef SOPHIAR2_CORO_SIGNAL2_HPP
- #define SOPHIAR2_CORO_SIGNAL2_HPP
- #include "core/global_defs.h"
- #include "core/timestamp_helper.hpp"
- #include <boost/asio/awaitable.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/core/noncopyable.hpp>
- #ifdef CORO_SIGNAL2_USE_TIMER // timer based implementation
- #include <boost/asio/deadline_timer.hpp>
- #include <boost/asio/redirect_error.hpp>
- #else // channel based implementation, ~ 6% faster than timer
- #include "third_party/scope_guard.hpp"
- #include <boost/asio/experimental/channel.hpp>
- #include <boost/intrusive/list.hpp>
- #endif
- namespace sophiar {
- class coro_signal2;
- #ifdef CORO_SIGNAL2_USE_TIMER
- class signal_watcher {
- #else
- using singal_watcher_base =
- boost::intrusive::list_base_hook<
- boost::intrusive::link_mode<
- boost::intrusive::auto_unlink>>;
- class signal_watcher : public singal_watcher_base {
- #endif
- public:
- explicit signal_watcher(coro_signal2 *_sig, boost::asio::io_context *ctx = global_context)
- #ifdef CORO_SIGNAL2_USE_TIMER
- : sig(_sig) {
- assert(sig != nullptr);
- }
- #else
- : sig(_sig),
- chan(*ctx, 1) {
- assert(sig != nullptr);
- }
- #endif
- signal_watcher(signal_watcher &&other) noexcept
- #ifdef CORO_SIGNAL2_USE_TIMER
- : sig(other.sig),
- last_watch_ts(other.last_watch_ts) {}
- #else
- : sig(other.sig),
- chan(std::move(other.chan)),
- last_watch_ts(other.last_watch_ts) {
- assert(!other.is_linked());
- }
- #endif
- bool try_wait();
- boost::asio::awaitable<void> coro_wait(bool auto_sync = true);
- void sync();
- timestamp_type get_last_update_ts() const {
- return last_watch_ts;
- }
- private:
- friend class coro_signal2;
- #ifndef CORO_SIGNAL2_USE_TIMER
- using channel_type = boost::asio::experimental::channel<
- void(boost::system::error_code, bool)>;
- channel_type chan;
- #endif
- timestamp_type last_watch_ts = 0;
- coro_signal2 *sig;
- };
- class coro_signal2 : private boost::noncopyable {
- public:
- // if coro signal is moved, signal watcher will not work
- coro_signal2(coro_signal2 &&other) noexcept = delete;
- #ifdef CORO_SIGNAL2_USE_TIMER
- coro_signal2()
- : timer(*global_context) {
- timer.expires_at(boost::posix_time::pos_infin);
- }
- #else
- explicit coro_signal2(boost::asio::io_context *ctx = global_context) {
- run_ctx = ctx;
- }
- #endif
- auto new_watcher() {
- return signal_watcher{this, run_ctx};
- }
- void try_notify_all(timestamp_type ts = current_timestamp()) {
- last_notify_ts = ts;
- #ifdef CORO_SIGNAL2_USE_TIMER
- timer.cancel();
- #else
- list_type requeue_list;
- while (!watcher_list.empty()) {
- auto &node = watcher_list.front();
- assert(node.is_linked());
- node.unlink();
- if (node.last_watch_ts < ts) {
- assert(!node.chan.ready());
- node.chan.try_send(boost::system::error_code{}, true);
- } else {
- requeue_list.push_back(node);
- }
- }
- watcher_list.swap(requeue_list);
- #endif
- }
- private:
- friend class signal_watcher;
- timestamp_type last_notify_ts = 0;
- boost::asio::io_context *run_ctx = nullptr;
- #ifdef CORO_SIGNAL2_USE_TIMER
- boost::asio::deadline_timer timer;
- #else
- using list_type =
- boost::intrusive::list<
- signal_watcher,
- boost::intrusive::constant_time_size<false>>;
- list_type watcher_list;
- #endif
- };
- inline bool signal_watcher::try_wait() {
- if (last_watch_ts < sig->last_notify_ts) {
- sync();
- return true;
- }
- return false;
- }
- inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
- #ifndef CORO_SIGNAL2_USE_TIMER
- assert(!chan.ready());
- #endif
- if (auto_sync) {
- sync();
- } else {
- if (last_watch_ts < sig->last_notify_ts) {
- sync();
- co_return;
- }
- }
- #ifndef CORO_SIGNAL2_USE_TIMER
- auto closer = sg::make_scope_guard([&]() {
- if (is_linked()) unlink();
- if (chan.ready()) chan.reset();
- });
- assert(!is_linked());
- sig->watcher_list.push_back(*this);
- #endif
- for (;;) {
- #ifdef CORO_SIGNAL2_USE_TIMER
- boost::system::error_code ec;
- co_await sig->timer.async_wait(
- boost::asio::redirect_error(boost::asio::use_awaitable, ec));
- assert(ec == boost::asio::error::operation_aborted);
- #else
- co_await chan.async_receive(boost::asio::use_awaitable);
- #endif
- if (last_watch_ts < sig->last_notify_ts) break;
- }
- sync();
- co_return;
- }
- inline void signal_watcher::sync() {
- last_watch_ts = sig->last_notify_ts;
- }
- }
- #endif //SOPHIAR2_CORO_SIGNAL2_HPP
|