#ifndef SOPHIAR2_CORO_SIGNAL2_HPP #define SOPHIAR2_CORO_SIGNAL2_HPP #include "core/global_defs.h" #include "core/timestamp_helper.hpp" #include #include #include #ifdef CORO_SIGNAL2_USE_TIMER // timer based implementation #include #include #else // channel based implementation, ~ 6% faster than timer #include "third_party/scope_guard.hpp" #include #include #endif namespace sophiar { class coro_signal2; #ifdef CORO_SIGNAL2_USE_TIMER class signal_watcher { #else using singal_watcher_base = boost::intrusive::list_base_hook< boost::intrusive::link_mode< boost::intrusive::auto_unlink>>; class signal_watcher : public singal_watcher_base { #endif public: explicit signal_watcher(coro_signal2 *_sig, boost::asio::io_context *ctx = global_context) #ifdef CORO_SIGNAL2_USE_TIMER : sig(_sig) { assert(sig != nullptr); } #else : sig(_sig), chan(*ctx, 1) { assert(sig != nullptr); } #endif signal_watcher(signal_watcher &&other) noexcept #ifdef CORO_SIGNAL2_USE_TIMER : sig(other.sig), last_watch_ts(other.last_watch_ts) {} #else : sig(other.sig), chan(std::move(other.chan)), last_watch_ts(other.last_watch_ts) { assert(!other.is_linked()); } #endif bool try_wait(); boost::asio::awaitable coro_wait(bool auto_sync = true); void sync(); timestamp_type get_last_update_ts() const { return last_watch_ts; } private: friend class coro_signal2; #ifndef CORO_SIGNAL2_USE_TIMER using channel_type = boost::asio::experimental::channel< void(boost::system::error_code, bool)>; channel_type chan; #endif timestamp_type last_watch_ts = 0; coro_signal2 *sig; }; class coro_signal2 : private boost::noncopyable { public: // if coro signal is moved, signal watcher will not work coro_signal2(coro_signal2 &&other) noexcept = delete; #ifdef CORO_SIGNAL2_USE_TIMER coro_signal2() : timer(*global_context) { timer.expires_at(boost::posix_time::pos_infin); } #else explicit coro_signal2(boost::asio::io_context *ctx = global_context) { run_ctx = ctx; } #endif auto new_watcher() { return signal_watcher{this, run_ctx}; } void try_notify_all(timestamp_type ts = current_timestamp()) { last_notify_ts = ts; #ifdef CORO_SIGNAL2_USE_TIMER timer.cancel(); #else list_type requeue_list; while (!watcher_list.empty()) { auto &node = watcher_list.front(); assert(node.is_linked()); node.unlink(); if (node.last_watch_ts < ts) { assert(!node.chan.ready()); node.chan.try_send(boost::system::error_code{}, true); } else { requeue_list.push_back(node); } } watcher_list.swap(requeue_list); #endif } private: friend class signal_watcher; timestamp_type last_notify_ts = 0; boost::asio::io_context *run_ctx = nullptr; #ifdef CORO_SIGNAL2_USE_TIMER boost::asio::deadline_timer timer; #else using list_type = boost::intrusive::list< signal_watcher, boost::intrusive::constant_time_size>; list_type watcher_list; #endif }; inline bool signal_watcher::try_wait() { if (last_watch_ts < sig->last_notify_ts) { sync(); return true; } return false; } inline boost::asio::awaitable signal_watcher::coro_wait(bool auto_sync) { #ifndef CORO_SIGNAL2_USE_TIMER assert(!chan.ready()); #endif if (auto_sync) { sync(); } else { if (last_watch_ts < sig->last_notify_ts) { sync(); co_return; } } #ifndef CORO_SIGNAL2_USE_TIMER auto closer = sg::make_scope_guard([&]() { if (is_linked()) unlink(); if (chan.ready()) chan.reset(); }); assert(!is_linked()); sig->watcher_list.push_back(*this); #endif for (;;) { #ifdef CORO_SIGNAL2_USE_TIMER boost::system::error_code ec; co_await sig->timer.async_wait( boost::asio::redirect_error(boost::asio::use_awaitable, ec)); assert(ec == boost::asio::error::operation_aborted); #else co_await chan.async_receive(boost::asio::use_awaitable); #endif if (last_watch_ts < sig->last_notify_ts) break; } sync(); co_return; } inline void signal_watcher::sync() { last_watch_ts = sig->last_notify_ts; } } #endif //SOPHIAR2_CORO_SIGNAL2_HPP