|
|
@@ -1,40 +1,80 @@
|
|
|
#ifndef SOPHIAR2_CORO_SIGNAL2_HPP
|
|
|
#define SOPHIAR2_CORO_SIGNAL2_HPP
|
|
|
|
|
|
-#ifndef CORO_SIGNAL2_USE_TIMER // channel based implementation, ~ 6% faster than timer
|
|
|
-
|
|
|
#include "core/timestamp_helper.hpp"
|
|
|
-#include "third_party/scope_guard.hpp"
|
|
|
|
|
|
#include <boost/asio/awaitable.hpp>
|
|
|
-#include <boost/asio/experimental/channel.hpp>
|
|
|
#include <boost/asio/use_awaitable.hpp>
|
|
|
#include <boost/core/noncopyable.hpp>
|
|
|
-#include <boost/intrusive/list.hpp>
|
|
|
|
|
|
#include <cassert>
|
|
|
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER // timer based implementation
|
|
|
+
|
|
|
+#include <boost/asio/deadline_timer.hpp>
|
|
|
+#include <boost/asio/redirect_error.hpp>
|
|
|
+
|
|
|
+#else // channel based implementation, ~ 6% faster than timer
|
|
|
+
|
|
|
+#include "third_party/scope_guard.hpp"
|
|
|
+
|
|
|
+#include <boost/asio/experimental/channel.hpp>
|
|
|
+#include <boost/intrusive/list.hpp>
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
namespace sophiar {
|
|
|
|
|
|
class coro_signal2;
|
|
|
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ class signal_watcher {
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
using singal_watcher_base =
|
|
|
boost::intrusive::list_base_hook<
|
|
|
boost::intrusive::link_mode<
|
|
|
boost::intrusive::auto_unlink>>;
|
|
|
|
|
|
class signal_watcher : public singal_watcher_base {
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
public:
|
|
|
|
|
|
template<typename Executor>
|
|
|
explicit signal_watcher(Executor &executor, coro_signal2 &_sig)
|
|
|
+
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ : sig(_sig) {}
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
: sig(_sig),
|
|
|
chan(executor, 1) {}
|
|
|
|
|
|
+#endif
|
|
|
+
|
|
|
+
|
|
|
signal_watcher(signal_watcher &&other) noexcept
|
|
|
+
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ : sig(other.sig),
|
|
|
+ last_watch_ts(other.last_watch_ts) {}
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
: sig(other.sig),
|
|
|
chan(std::move(other.chan)),
|
|
|
- last_watch_ts(other.last_watch_ts) {}
|
|
|
+ last_watch_ts(other.last_watch_ts) {
|
|
|
+ assert(!other.is_linked());
|
|
|
+ }
|
|
|
+
|
|
|
+#endif
|
|
|
|
|
|
bool try_wait();
|
|
|
|
|
|
@@ -42,16 +82,24 @@ namespace sophiar {
|
|
|
|
|
|
void sync();
|
|
|
|
|
|
+ timestamp_type get_last_update_ts() const {
|
|
|
+ return last_watch_ts;
|
|
|
+ }
|
|
|
+
|
|
|
private:
|
|
|
|
|
|
friend class coro_signal2;
|
|
|
|
|
|
+#ifndef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
using channel_type = boost::asio::experimental::channel<
|
|
|
void(boost::system::error_code, bool)>;
|
|
|
+ channel_type chan;
|
|
|
+
|
|
|
+#endif
|
|
|
|
|
|
timestamp_type last_watch_ts = 0;
|
|
|
coro_signal2 &sig;
|
|
|
- channel_type chan;
|
|
|
|
|
|
};
|
|
|
|
|
|
@@ -59,7 +107,19 @@ namespace sophiar {
|
|
|
public:
|
|
|
|
|
|
template<typename Executor>
|
|
|
- explicit coro_signal2(Executor &) {}
|
|
|
+ explicit coro_signal2(Executor &executor)
|
|
|
+
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ :timer(executor) {
|
|
|
+ timer.expires_at(boost::posix_time::pos_infin);
|
|
|
+}
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
+ {}
|
|
|
+
|
|
|
+#endif
|
|
|
|
|
|
template<typename Executor>
|
|
|
auto new_watcher(Executor &executor) {
|
|
|
@@ -68,6 +128,13 @@ namespace sophiar {
|
|
|
|
|
|
void try_notify_all(timestamp_type ts = current_timestamp()) {
|
|
|
last_notify_ts = ts;
|
|
|
+
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ timer.cancel();
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
list_type requeue_list;
|
|
|
while (!watcher_list.empty()) {
|
|
|
auto &node = watcher_list.front();
|
|
|
@@ -81,20 +148,31 @@ namespace sophiar {
|
|
|
}
|
|
|
}
|
|
|
watcher_list.swap(requeue_list);
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private:
|
|
|
|
|
|
friend class signal_watcher;
|
|
|
|
|
|
+ timestamp_type last_notify_ts = 0;
|
|
|
+
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
+ boost::asio::deadline_timer timer;
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
using list_type =
|
|
|
boost::intrusive::list<
|
|
|
signal_watcher,
|
|
|
boost::intrusive::constant_time_size<false>>;
|
|
|
-
|
|
|
- timestamp_type last_notify_ts = 0;
|
|
|
list_type watcher_list;
|
|
|
|
|
|
+#endif
|
|
|
+
|
|
|
};
|
|
|
|
|
|
inline bool signal_watcher::try_wait() {
|
|
|
@@ -106,7 +184,13 @@ namespace sophiar {
|
|
|
}
|
|
|
|
|
|
inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
|
|
|
+
|
|
|
+#ifndef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
assert(!chan.ready());
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
if (auto_sync) {
|
|
|
sync();
|
|
|
} else {
|
|
|
@@ -115,117 +199,33 @@ namespace sophiar {
|
|
|
co_return;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+#ifndef CORO_SIGNAL2_USE_TIMER
|
|
|
+
|
|
|
auto closer = sg::make_scope_guard([&]() {
|
|
|
if (is_linked()) unlink();
|
|
|
if (chan.ready()) chan.reset();
|
|
|
});
|
|
|
assert(!is_linked());
|
|
|
sig.watcher_list.push_back(*this);
|
|
|
- for (;;) {
|
|
|
- co_await chan.async_receive(boost::asio::use_awaitable);
|
|
|
- if (last_watch_ts < sig.last_notify_ts) break;
|
|
|
- }
|
|
|
- sync();
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- inline void signal_watcher::sync() {
|
|
|
- last_watch_ts = sig.last_notify_ts;
|
|
|
- }
|
|
|
|
|
|
-}
|
|
|
-
|
|
|
-#else // timer based implementation
|
|
|
-
|
|
|
-#include "core/timestamp_helper.hpp"
|
|
|
-
|
|
|
-#include <boost/asio/awaitable.hpp>
|
|
|
-#include <boost/asio/deadline_timer.hpp>
|
|
|
-#include <boost/asio/redirect_error.hpp>
|
|
|
-#include <boost/asio/use_awaitable.hpp>
|
|
|
-#include <boost/core/noncopyable.hpp>
|
|
|
-
|
|
|
-#include <cassert>
|
|
|
-
|
|
|
-namespace sophiar {
|
|
|
-
|
|
|
- class coro_signal2;
|
|
|
-
|
|
|
- class signal_watcher {
|
|
|
- public:
|
|
|
-
|
|
|
- explicit signal_watcher(coro_signal2 &_sig)
|
|
|
- : sig(_sig) {}
|
|
|
-
|
|
|
- signal_watcher(signal_watcher &&other) noexcept
|
|
|
- : sig(other.sig),
|
|
|
- last_watch_ts(other.last_watch_ts) {}
|
|
|
-
|
|
|
- bool try_wait();
|
|
|
-
|
|
|
- boost::asio::awaitable<void> coro_wait(bool auto_sync = true);
|
|
|
-
|
|
|
- void sync();
|
|
|
-
|
|
|
- private:
|
|
|
-
|
|
|
- friend class coro_signal2;
|
|
|
-
|
|
|
- timestamp_type last_watch_ts = 0;
|
|
|
- coro_signal2 &sig;
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
- class coro_signal2 : private boost::noncopyable {
|
|
|
- public:
|
|
|
-
|
|
|
- template<typename Executor>
|
|
|
- explicit coro_signal2(Executor &executor)
|
|
|
- :timer(executor) {
|
|
|
- timer.expires_at(boost::posix_time::pos_infin);
|
|
|
- }
|
|
|
-
|
|
|
- template<typename Executor>
|
|
|
- auto new_watcher(Executor &) {
|
|
|
- return signal_watcher{*this};
|
|
|
- }
|
|
|
-
|
|
|
- void try_notify_all(timestamp_type ts = current_timestamp()) {
|
|
|
- last_notify_ts = ts;
|
|
|
- timer.cancel();
|
|
|
- }
|
|
|
-
|
|
|
- private:
|
|
|
-
|
|
|
- friend class signal_watcher;
|
|
|
-
|
|
|
- timestamp_type last_notify_ts = 0;
|
|
|
- boost::asio::deadline_timer timer;
|
|
|
+#endif
|
|
|
|
|
|
- };
|
|
|
+ for (;;) {
|
|
|
|
|
|
- inline bool signal_watcher::try_wait() {
|
|
|
- if (last_watch_ts < sig.last_notify_ts) {
|
|
|
- sync();
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
+#ifdef CORO_SIGNAL2_USE_TIMER
|
|
|
|
|
|
- inline boost::asio::awaitable<void> signal_watcher::coro_wait(bool auto_sync) {
|
|
|
- if (auto_sync) {
|
|
|
- sync();
|
|
|
- } else {
|
|
|
- if (last_watch_ts < sig.last_notify_ts) {
|
|
|
- sync();
|
|
|
- co_return;
|
|
|
- }
|
|
|
- }
|
|
|
- for (;;) {
|
|
|
boost::system::error_code ec;
|
|
|
co_await sig.timer.async_wait(
|
|
|
boost::asio::redirect_error(boost::asio::use_awaitable, ec));
|
|
|
assert(ec == boost::asio::error::operation_aborted);
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
+ co_await chan.async_receive(boost::asio::use_awaitable);
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
if (last_watch_ts < sig.last_notify_ts) break;
|
|
|
}
|
|
|
sync();
|
|
|
@@ -238,6 +238,4 @@ namespace sophiar {
|
|
|
|
|
|
}
|
|
|
|
|
|
-#endif
|
|
|
-
|
|
|
#endif //SOPHIAR2_CORO_SIGNAL2_HPP
|