#ifndef SOPHIAR2_VARIABLE_UTILITY_HPP #define SOPHIAR2_VARIABLE_UTILITY_HPP #include "core/basic_obj_types.hpp" #include "core/timestamp_helper.hpp" #include "utility/config_utility.hpp" #include "utility/simple_tristate_obj.hpp" #include "utility/variable_helper.hpp" #include "utility/versatile_buffer2.hpp" #include #include #include #include #include #include namespace sophiar { using boost::asio::awaitable; template inline std::string variable_to_string(const SmallObjType &var) { auto writer = string_writer{}; var.write_to(writer); return writer.get_string_and_reset(); } template inline coro_worker::pointer variable_debug_watcher_func(const nlohmann::json &config) { std::string var_name; auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name); auto worker = make_infinite_coro_worker( [=, buffer = string_writer(), var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable -> awaitable { co_await var_helper.coro_wait_update(); if (var_helper.empty()) { SPDLOG_DEBUG("{} is empty.", var_name); } else { var_helper->write_to(buffer); SPDLOG_DEBUG("{} = {}", var_name, buffer.get_string_and_reset()); } co_return true; }); return std::move(worker); } template inline coro_worker::pointer variable_validity_watcher_func(const nlohmann::json &config) { enum class status_type { UNKNOWN, VALID, INVALID }; std::string var_name; auto var_index = LOAD_VARIABLE_INDEX_WITH_NAME(SmallObjType, "variable_name", var_name); auto worker = make_infinite_coro_worker( [=, status = status_type{}, var_helper = VARIABLE_MANUAL_DELEGATE(SmallObjType, var_index)]() mutable -> awaitable { if (status != status_type::UNKNOWN) { co_await var_helper.coro_wait_update(); } else { var_helper.manual_sync(); } if (var_helper.empty()) { if (status == status_type::INVALID)[[likely]] co_return true; SPDLOG_DEBUG("{} becomes invalid.", var_name); status = status_type::INVALID; } else { if (status == status_type::VALID)[[likely]] co_return true; SPDLOG_DEBUG("{} becomes valid.", var_name); status = status_type::VALID; } co_return true; }); return std::move(worker); } template inline coro_worker::pointer variable_recorder_func(const nlohmann::json &config) { auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name"); auto save_file_path = LOAD_STRING_ITEM("save_file"); auto ofs = std::ofstream(save_file_path, std::ofstream::out); assert(ofs.is_open()); // create worker auto worker = make_infinite_coro_worker( [=, buffer = string_writer(","), ofs = std::move(ofs), var_helper = VARIABLE_AUTO_DELEGATE(SmallObjType, var_index)]() mutable -> awaitable { co_await var_helper.coro_wait_update(); auto ts = var_helper.get_last_update_ts(); buffer << (static_cast(ts) / 1000.0); // us -> ms if (!var_helper.empty()) { var_helper->write_to(buffer); } ofs << buffer.get_string_and_reset() << std::endl; co_return true; }); return std::move(worker); } template inline coro_worker::pointer variable_replayer_func(const nlohmann::json &config) { auto var_index = LOAD_VARIABLE_INDEX(SmallObjType, "variable_name"); auto record_file_path = LOAD_STRING_ITEM("record_file"); auto ifs = std::ifstream(record_file_path, std::ifstream::in); assert(ifs.is_open()); // create worker auto worker = make_infinite_coro_worker( [=, timer = boost::asio::high_resolution_timer(*global_context), buffer = string_reader(), ifs = std::move(ifs)]() mutable -> awaitable { std::string str_buf; if (!std::getline(ifs, str_buf)) co_return false; // EOF buffer.set_string(std::move(str_buf)); auto next_ts_ms = buffer.read_value(); auto next_ts = static_cast(next_ts_ms * 1000); // ms -> us auto cur_ts = current_timestamp(); if (next_ts > cur_ts) { timer.expires_at(get_time_from_timestamp(next_ts)); co_await timer.async_wait(boost::asio::use_awaitable); } else if (next_ts != cur_ts) { // if time is missed, simply ignore co_return true; } if (buffer.empty()) { // empty obj UPDATE_VARIABLE(SmallObjType, var_index, nullptr); } else { auto new_var = SmallObjType::new_instance(); new_var->fill_from(buffer); assert(buffer.empty()); UPDATE_VARIABLE(SmallObjType, var_index, std::move(new_var)); } co_return true; }); return std::move(worker); } template using variable_debug_watcher = simple_tristate_obj_wrapper>; template using variable_validity_watcher = simple_tristate_obj_wrapper>; template using variable_recorder = simple_tristate_obj_wrapper>; template using variable_replayer = simple_tristate_obj_wrapper>; } #endif //SOPHIAR2_VARIABLE_UTILITY_HPP