|
|
@@ -1,371 +0,0 @@
|
|
|
-#include "datanode_base.h"
|
|
|
-
|
|
|
-#include "core/sophiar_manager.h"
|
|
|
-#include "third_party/static_block.hpp"
|
|
|
-#include "utility/bit_operations.hpp"
|
|
|
-#include "utility/coro_signal.hpp"
|
|
|
-#include "utility/name_translator.hpp"
|
|
|
-#include "utility/statistic_timer.hpp"
|
|
|
-#include "utility/tiny_signal.hpp"
|
|
|
-
|
|
|
-#include <boost/asio/co_spawn.hpp>
|
|
|
-#include <boost/asio/detached.hpp>
|
|
|
-#include <boost/asio/experimental/awaitable_operators.hpp>
|
|
|
-#include <boost/asio/high_resolution_timer.hpp>
|
|
|
-#include <boost/asio/this_coro.hpp>
|
|
|
-#include <boost/asio/use_awaitable.hpp>
|
|
|
-
|
|
|
-#include <fmt/format.h>
|
|
|
-
|
|
|
-#include <cassert>
|
|
|
-#include <functional>
|
|
|
-#include <string>
|
|
|
-
|
|
|
-namespace sophiar {
|
|
|
-
|
|
|
- using boost::asio::awaitable;
|
|
|
- using boost::asio::co_spawn;
|
|
|
- using boost::asio::detached;
|
|
|
- using boost::asio::use_awaitable;
|
|
|
-
|
|
|
- using namespace boost::asio::experimental::awaitable_operators;
|
|
|
-
|
|
|
- name_translator<datanode_base::trigger_mode_type> trigger_mode_translator;
|
|
|
-
|
|
|
- static_block {
|
|
|
- using mode_type = datanode_base::trigger_mode_type;
|
|
|
- trigger_mode_translator.register_item("manual", mode_type::MANUAL);
|
|
|
- trigger_mode_translator.register_item("input", mode_type::INPUT);
|
|
|
- trigger_mode_translator.register_item("periodic", mode_type::PERIODIC);
|
|
|
- };
|
|
|
-
|
|
|
- struct datanode_base::impl {
|
|
|
-
|
|
|
- using update_signal_type = tiny_signal<data_pointer_type>;
|
|
|
- using update_slot_type = update_signal_type::slot_type;
|
|
|
- using trigger_slot_type = tiny_signal<>::slot_type;
|
|
|
-
|
|
|
- struct update_slot_impl_type : public update_slot_type {
|
|
|
- impl *p_this = nullptr;
|
|
|
- uint8_t channel_index;
|
|
|
-
|
|
|
- void on_signal_received(data_pointer_type packet) override {
|
|
|
- assert(p_this != nullptr);
|
|
|
- p_this->update_input_data(channel_index, std::move(packet));
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- struct trigger_slot_impl_type : public trigger_slot_type {
|
|
|
- impl *p_this = nullptr;
|
|
|
-
|
|
|
- void on_signal_received() override {
|
|
|
- assert(p_this != nullptr);
|
|
|
- co_spawn(get_context(), p_this->trigger(), detached);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- datanode_base *q_this = nullptr;
|
|
|
-
|
|
|
- data_pointer_type input_data[MAX_CHANNEL_CNT];
|
|
|
- data_pointer_type output_data[MAX_CHANNEL_CNT];
|
|
|
-
|
|
|
- uint8_t input_total = 0, output_total = 0;
|
|
|
- update_signal_type output_signal[MAX_CHANNEL_CNT];
|
|
|
-
|
|
|
- channel_mask_type input_update_mask = 0;
|
|
|
- channel_mask_type output_updated_mask = 0;
|
|
|
-
|
|
|
- trigger_mode_type trigger_mode = trigger_mode_type::MANUAL;
|
|
|
- channel_mask_type trigger_input_mask = 0;
|
|
|
- std::chrono::microseconds trigger_interval{0};
|
|
|
- std::chrono::microseconds minimal_exec_interval{0};
|
|
|
- boost::asio::high_resolution_timer trigger_timer;
|
|
|
-
|
|
|
- exec_state_type exec_state = exec_state_type::IDLE;
|
|
|
- bool last_exec_success = false;
|
|
|
- timestamp_type last_exec_ts = 0;
|
|
|
-
|
|
|
- bool enable_stimer = false; // 是否启用统计时钟
|
|
|
-
|
|
|
- using trigger_stimer_type = statistic_timer<statistic_timer_single_shot_tag>;
|
|
|
- using exec_stimer_type = statistic_timer<statistic_timer_start_stop_tag>;
|
|
|
- trigger_stimer_type trigger_stimer;
|
|
|
- exec_stimer_type exec_stimer;
|
|
|
-
|
|
|
- coro_signal exec_cancel_signal;
|
|
|
- coro_signal run_finished_signal;
|
|
|
- bool exec_block = false;
|
|
|
-
|
|
|
- impl()
|
|
|
- : trigger_timer(get_context()),
|
|
|
- exec_cancel_signal(get_context()),
|
|
|
- run_finished_signal(get_context()) {}
|
|
|
-
|
|
|
- void create_trigger_slot() {
|
|
|
- auto trigger_slot = new trigger_slot_impl_type;
|
|
|
- trigger_slot->p_this = this;
|
|
|
-// get_manager().register_slot<>(q_this, "trigger", *trigger_slot);
|
|
|
- }
|
|
|
-
|
|
|
- uint8_t create_input_slot(const std::string &name) {
|
|
|
- assert(input_total + 1 <= MAX_CHANNEL_CNT);
|
|
|
- auto slot_index = input_total++;
|
|
|
- auto update_slot = new update_slot_impl_type;
|
|
|
- update_slot->p_this = this;
|
|
|
- update_slot->channel_index = slot_index;
|
|
|
-// get_manager().register_slot<data_pointer_type>(q_this, name, *update_slot);
|
|
|
- return slot_index;
|
|
|
- }
|
|
|
-
|
|
|
- uint8_t create_output_signal(const std::string &name) {
|
|
|
- assert(output_total + 1 <= MAX_CHANNEL_CNT);
|
|
|
- auto signal_index = output_total++;
|
|
|
-// get_manager().register_signal(q_this, name, output_signal[signal_index]);
|
|
|
- return signal_index;
|
|
|
- }
|
|
|
-
|
|
|
- void load_start_config(const nlohmann::json &config) {
|
|
|
- assert(config.contains("trigger_mode"));
|
|
|
- assert(config["trigger_mode"].is_string());
|
|
|
- auto trigger_mode_str = config["trigger_mode"].get<std::string>();
|
|
|
- trigger_mode = trigger_mode_translator.translate(trigger_mode_str);
|
|
|
-
|
|
|
- switch (trigger_mode) {
|
|
|
- case trigger_mode_type::INPUT: {
|
|
|
- assert(config.contains("input_mask"));
|
|
|
- assert(config["input_mask"].is_number_integer());
|
|
|
- trigger_input_mask = config["input_mask"].get<uint64_t>();
|
|
|
- break;
|
|
|
- }
|
|
|
- case trigger_mode_type::PERIODIC: {
|
|
|
- assert(config.contains("exec_interval_ms"));
|
|
|
- assert(config["exec_interval_ms"].is_number_integer());
|
|
|
- trigger_interval = std::chrono::milliseconds(
|
|
|
- config["exec_interval_ms"].get<uint64_t>());
|
|
|
- break;
|
|
|
- }
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if (config.contains("minimal_exec_interval_ms")) {
|
|
|
- assert(config["minimal_exec_interval_ms"].is_number_integer());
|
|
|
- minimal_exec_interval = std::chrono::milliseconds(
|
|
|
- config["minimal_exec_interval_ms"].get<uint64_t>());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- bool check_is_running() const {
|
|
|
- return q_this->get_state() == state_type::RUNNING;
|
|
|
- }
|
|
|
-
|
|
|
- bool check_input_trigger_condition() const {
|
|
|
- return (input_update_mask & trigger_input_mask) == trigger_input_mask;
|
|
|
- }
|
|
|
-
|
|
|
- void commit_output() {
|
|
|
- // https://lemire.me/blog/2018/02/21/iterating-over-set-bits-quickly/
|
|
|
- auto &mask = output_updated_mask;
|
|
|
- while (mask != 0) {
|
|
|
- auto tmp = mask & -mask;
|
|
|
- auto pos = std::countr_zero(tmp);
|
|
|
- assert(pos < output_total);
|
|
|
- output_signal[pos].emit(output_data[pos]);
|
|
|
- mask ^= tmp;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<bool> exec_impl(timestamp_type current_ts = current_timestamp()) {
|
|
|
- exec_state = exec_state_type::RUNNING;
|
|
|
- last_exec_ts = current_ts;
|
|
|
-
|
|
|
- if (enable_stimer) {
|
|
|
- trigger_stimer.start(current_ts);
|
|
|
- exec_stimer.start(current_ts);
|
|
|
- }
|
|
|
- auto ret_val = co_await q_this->exec();
|
|
|
- if (enable_stimer) {
|
|
|
- exec_stimer.stop();
|
|
|
- }
|
|
|
- last_exec_success = ret_val;
|
|
|
-
|
|
|
- input_update_mask = 0;
|
|
|
- commit_output();
|
|
|
- exec_state = exec_state_type::IDLE;
|
|
|
-
|
|
|
- co_return ret_val;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> periodic_run() {
|
|
|
- auto exec_cancel_token = exec_cancel_signal.new_token();
|
|
|
- for (;;) {
|
|
|
- co_await exec_impl();
|
|
|
- assert(exec_state == exec_state_type::IDLE);
|
|
|
-
|
|
|
- // check if cancelled
|
|
|
- if (exec_cancel_signal.try_wait(exec_cancel_token)) break;
|
|
|
-
|
|
|
- // schedule next run
|
|
|
- auto current_ts = current_timestamp();
|
|
|
- auto time_left = last_exec_ts + trigger_interval.count() - current_ts;
|
|
|
- if (time_left <= 0) {
|
|
|
- // TODO show warning, requested frequency too high
|
|
|
- } else {
|
|
|
- exec_state = exec_state_type::PENDING;
|
|
|
- trigger_timer.expires_at(get_time_from_timestamp(last_exec_ts) + trigger_interval);
|
|
|
- auto await_result = co_await (trigger_timer.async_wait(use_awaitable) ||
|
|
|
- exec_cancel_signal.coro_wait(exec_cancel_token));
|
|
|
- if (await_result.index() == 1) { // cancelled
|
|
|
- exec_state = exec_state_type::IDLE;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- run_finished_signal.try_notify_all();
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<bool> on_start(const nlohmann::json &config) {
|
|
|
- load_start_config(config);
|
|
|
- trigger_stimer.reset();
|
|
|
- exec_stimer.reset();
|
|
|
- exec_block = false;
|
|
|
- if (trigger_mode == trigger_mode_type::PERIODIC) {
|
|
|
- if (trigger_interval.count() < 0) {
|
|
|
- co_return false;
|
|
|
- } else if (trigger_interval.count() == 0) {
|
|
|
- // TODO show warning, running without interval
|
|
|
- }
|
|
|
- co_spawn(get_context(), periodic_run(), detached);
|
|
|
- }
|
|
|
- co_return true;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> on_stop() {
|
|
|
- exec_block = true;
|
|
|
- if (trigger_mode == trigger_mode_type::PERIODIC ||
|
|
|
- exec_state == exec_state_type::PENDING) {
|
|
|
- exec_cancel_signal.try_notify_all();
|
|
|
- }
|
|
|
- if (exec_state == exec_state_type::RUNNING) {
|
|
|
- co_await run_finished_signal.coro_wait();
|
|
|
- }
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- void update_input_data(uint8_t channel_index, data_pointer_type data) {
|
|
|
- assert(channel_index < input_total);
|
|
|
- input_data[channel_index] = std::move(data);
|
|
|
- set_bit(input_update_mask, channel_index);
|
|
|
- if (check_is_running() && trigger_mode == trigger_mode_type::INPUT) {
|
|
|
- if (check_input_trigger_condition()) {
|
|
|
- co_spawn(get_context(), trigger(), detached);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- boost::asio::awaitable<bool> trigger() {
|
|
|
- if (!check_is_running()) co_return false; // only trigger when running
|
|
|
- if (exec_block || exec_state != exec_state_type::IDLE) co_return false;
|
|
|
- assert(trigger_mode != trigger_mode_type::PERIODIC);
|
|
|
- auto current_ts = current_timestamp();
|
|
|
- auto time_left = last_exec_ts + minimal_exec_interval.count() - current_ts;
|
|
|
- if (time_left > 0) {
|
|
|
- exec_state = exec_state_type::PENDING;
|
|
|
- trigger_timer.expires_at(get_time_from_timestamp(last_exec_ts) + minimal_exec_interval);
|
|
|
- auto await_result = co_await (trigger_timer.async_wait(use_awaitable) ||
|
|
|
- exec_cancel_signal.coro_wait());
|
|
|
- if (await_result.index() == 1) { // cancelled
|
|
|
- exec_state = exec_state_type::IDLE;
|
|
|
- run_finished_signal.try_notify_all();
|
|
|
- co_return false;
|
|
|
- } else {
|
|
|
- current_ts = current_timestamp();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- auto ret_val = co_await exec_impl(current_ts);
|
|
|
- run_finished_signal.try_notify_all();
|
|
|
- co_return ret_val;
|
|
|
- }
|
|
|
-
|
|
|
-// bool cancel_pending() {
|
|
|
-// if (exec_state != exec_state_type::PENDING) return false;
|
|
|
-// if (trigger_mode == trigger_mode_type::PERIODIC) return false;
|
|
|
-// exec_cancel_signal.try_notify_all();
|
|
|
-// return true;
|
|
|
-// }
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
- awaitable<bool> datanode_base::on_start(const nlohmann::json &config) {
|
|
|
- return pimpl->on_start(config);
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> datanode_base::on_stop() {
|
|
|
- return pimpl->on_stop();
|
|
|
- }
|
|
|
-
|
|
|
- datanode_base::datanode_base()
|
|
|
- : pimpl(std::make_unique<impl>()) {
|
|
|
- pimpl->q_this = this;
|
|
|
- }
|
|
|
-
|
|
|
- void datanode_base::load_construct_config(const nlohmann::json &json) {
|
|
|
- pimpl->create_trigger_slot();
|
|
|
- }
|
|
|
-
|
|
|
- uint8_t datanode_base::register_input(const std::string &name) {
|
|
|
- return pimpl->create_input_slot(name);
|
|
|
- }
|
|
|
-
|
|
|
- uint8_t datanode_base::register_output(const std::string &name) {
|
|
|
- return pimpl->create_output_signal(name);
|
|
|
- }
|
|
|
-
|
|
|
-// bool datanode_base::cancel_pending() {
|
|
|
-// return pimpl->cancel_pending();
|
|
|
-// }
|
|
|
-
|
|
|
- datanode_base::data_pointer_type
|
|
|
- datanode_base::get_input_data(uint8_t channel_index) {
|
|
|
- assert(channel_index < pimpl->input_total);
|
|
|
- return pimpl->input_data[channel_index];
|
|
|
- }
|
|
|
-
|
|
|
- void datanode_base::set_output_data(uint8_t channel_index, data_pointer_type data) {
|
|
|
- assert(channel_index < pimpl->output_total);
|
|
|
- pimpl->output_data[channel_index] = std::move(data);
|
|
|
- set_bit(pimpl->output_updated_mask, channel_index);
|
|
|
- }
|
|
|
-
|
|
|
- datanode_base::exec_state_type datanode_base::get_exec_state() const {
|
|
|
- return pimpl->exec_state;
|
|
|
- }
|
|
|
-
|
|
|
- timestamp_type datanode_base::get_last_exec_ts() const {
|
|
|
- return pimpl->last_exec_ts;
|
|
|
- }
|
|
|
-
|
|
|
- bool datanode_base::is_last_exec_success() const {
|
|
|
- return pimpl->last_exec_success;
|
|
|
- }
|
|
|
-
|
|
|
-// template<typename FreqTag>
|
|
|
-// std::string datanode_base<FreqTag>::get_statistic_info_as_string() const {
|
|
|
-// return fmt::format("\nTrigger timer: {}\nExec timer: {}",
|
|
|
-// pimpl->trigger_stimer.to_string(), pimpl->exec_stimer.to_string());
|
|
|
-// }
|
|
|
-
|
|
|
- void datanode_base::enable_statistic_timer() {
|
|
|
- pimpl->enable_stimer = true;
|
|
|
- }
|
|
|
-
|
|
|
- void datanode_base::disable_statistic_timer() {
|
|
|
- pimpl->enable_stimer = false;
|
|
|
- }
|
|
|
-
|
|
|
- datanode_base::~datanode_base() = default;
|
|
|
-
|
|
|
-}
|