| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- #ifndef SOPHIAR2_VARIABLE_UTILITY_HPP
- #define SOPHIAR2_VARIABLE_UTILITY_HPP
- #include "core/basic_obj_types.hpp"
- #include "core/timestamp_helper.hpp"
- #include "utility/config_utility.hpp"
- #include "utility/simple_tristate_obj.hpp"
- #include "utility/variable_helper.hpp"
- #include "utility/versatile_buffer2.hpp"
- #include <boost/asio/awaitable.hpp>
- #include <spdlog/spdlog.h>
- #include <chrono>
- #include <coroutine>
- #include <cstring>
- #include <fstream>
- namespace sophiar {
- using boost::asio::awaitable;
- template<typename SmallObjType>
- inline std::string variable_to_string(const SmallObjType &var) {
- auto writer = string_writer{};
- var.write_to(writer);
- return writer.get_string_and_reset();
- }
- template<typename SmallObjType>
- inline coro_worker::pointer variable_debug_watcher_func(const nlohmann::json &config) {
- std::string var_name;
- auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name);
- auto worker = make_infinite_coro_worker(
- [=,
- buffer = string_writer(),
- var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable
- -> awaitable<bool> {
- co_await var_helper.coro_wait_update();
- if (var_helper.empty()) {
- SPDLOG_DEBUG("{} is empty.", var_name);
- } else {
- var_helper->write_to(buffer);
- SPDLOG_DEBUG("{} = {}", var_name, buffer.get_string_and_reset());
- }
- co_return true;
- });
- return std::move(worker);
- }
- template<typename SmallObjType>
- inline coro_worker::pointer variable_validity_watcher_func(const nlohmann::json &config) {
- enum class status_type {
- UNKNOWN,
- VALID,
- INVALID
- };
- std::string var_name;
- auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name);
- auto worker = make_infinite_coro_worker(
- [=,
- status = status_type{},
- var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable
- -> awaitable<bool> {
- if (status != status_type::UNKNOWN) {
- co_await var_helper.coro_wait_update();
- } else {
- var_helper.manual_sync();
- }
- if (var_helper.empty()) {
- if (status == status_type::INVALID)[[likely]] co_return true;
- SPDLOG_DEBUG("{} becomes invalid.", var_name);
- status = status_type::INVALID;
- } else {
- if (status == status_type::VALID)[[likely]] co_return true;
- SPDLOG_DEBUG("{} becomes valid.", var_name);
- status = status_type::VALID;
- }
- co_return true;
- });
- return std::move(worker);
- }
- template<typename SmallObjType>
- inline coro_worker::pointer variable_recorder_func(const nlohmann::json &config) {
- auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name");
- auto save_file_path = LOAD_STRING_ITEM("save_file");
- auto ofs = std::ofstream(save_file_path, std::ofstream::out);
- assert(ofs.is_open());
- // create worker
- auto worker = make_infinite_coro_worker(
- [=,
- buffer = string_writer(","), ofs = std::move(ofs),
- var_helper = VARIABLE_AUTO_DELEGATE(SmallObjType, var_index)]() mutable
- -> awaitable<bool> {
- co_await var_helper.coro_wait_update();
- auto ts = var_helper.get_last_update_ts();
- buffer << (static_cast<double>(ts) / 1000.0); // us -> ms
- if (!var_helper.empty()) {
- var_helper->write_to(buffer);
- }
- ofs << buffer.get_string_and_reset() << std::endl;
- co_return true;
- });
- return std::move(worker);
- }
- template<typename SmallObjType>
- inline coro_worker::pointer variable_replayer_func(const nlohmann::json &config) {
- auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name");
- auto record_file_path = LOAD_STRING_ITEM("record_file");
- auto ifs = std::ifstream(record_file_path, std::ifstream::in);
- assert(ifs.is_open());
- // create worker
- auto worker = make_infinite_coro_worker(
- [=,
- timer = boost::asio::high_resolution_timer(*global_context),
- buffer = string_reader(), ifs = std::move(ifs)]() mutable
- -> awaitable<bool> {
- std::string str_buf;
- if (!std::getline(ifs, str_buf)) co_return false; // EOF
- buffer.set_string(std::move(str_buf));
- auto next_ts_ms = buffer.read_value<double>();
- auto next_ts = static_cast<timestamp_type>(next_ts_ms * 1000); // ms -> us
- auto cur_ts = current_timestamp();
- if (next_ts > cur_ts) {
- timer.expires_at(get_time_from_timestamp(next_ts));
- co_await timer.async_wait(boost::asio::use_awaitable);
- } else if (next_ts != cur_ts) { // if time is missed, simply ignore
- co_return true;
- }
- if (buffer.empty()) { // empty obj
- UPDATE_VARIABLE(SmallObjType, var_index, nullptr);
- } else {
- auto new_var = SmallObjType::new_instance();
- new_var->fill_from(buffer);
- assert(buffer.empty());
- UPDATE_VARIABLE(SmallObjType, var_index, std::move(new_var));
- }
- co_return true;
- });
- return std::move(worker);
- }
- template<typename SmallObjType>
- using variable_debug_watcher = simple_tristate_obj_wrapper<variable_debug_watcher_func<SmallObjType>>;
- template<typename SmallObjType>
- using variable_validity_watcher = simple_tristate_obj_wrapper<variable_validity_watcher_func<SmallObjType>>;
- template<typename SmallObjType>
- using variable_recorder = simple_tristate_obj_wrapper<variable_recorder_func<SmallObjType>>;
- template<typename SmallObjType>
- using variable_replayer = simple_tristate_obj_wrapper<variable_replayer_func<SmallObjType>>;
- }
- #endif //SOPHIAR2_VARIABLE_UTILITY_HPP
|