#ifndef SOPHIAR2_CORO_SIGNAL_HPP #define SOPHIAR2_CORO_SIGNAL_HPP #include #include #include #include #include #include #include namespace sophiar { class coro_signal : private boost::noncopyable { public: class signal_token : private boost::noncopyable { public: explicit signal_token(coro_signal &_s) : signal(_s) { ++signal.waiting_count; } ~signal_token() { if (!is_fulfilled) { --signal.waiting_count; } // handle leaking signal if (signal.waiting_count == 0 && signal.channel.ready()) { signal.channel.reset(); signal.is_notifying = false; } } void fulfill() { is_fulfilled = true; --signal.waiting_count; } private: coro_signal &signal; bool is_fulfilled = false; }; coro_signal(boost::asio::io_context &context) : channel(context, 1) {} auto new_token() { return signal_token{*this}; } boost::asio::awaitable coro_wait(signal_token &token) { co_await channel.async_receive(boost::asio::use_awaitable); token.fulfill(); try_notify_more(); co_return; } boost::asio::awaitable coro_wait() { signal_token token(*this); co_await coro_wait(token); co_return; } bool try_wait(signal_token &token) { if (!channel.ready()) return false; token.fulfill(); try_notify_more(); return true; } void try_notify_all() { if (is_notifying || waiting_count == 0) return; is_notifying = true; send_signal(); } private: using channel_type = boost::asio::experimental::channel< void(boost::system::error_code, bool)>; channel_type channel; size_t waiting_count = 0; bool is_notifying = false; void send_signal() { assert(!channel.ready()); channel.try_send(boost::system::error_code{}, true); } void try_notify_more() { if (waiting_count > 0) { send_signal(); } else { is_notifying = false; } } }; } #endif //SOPHIAR2_CORO_SIGNAL_HPP