|
|
@@ -17,6 +17,7 @@
|
|
|
#include <chrono>
|
|
|
#include <coroutine>
|
|
|
#include <cstring>
|
|
|
+#include <fstream>
|
|
|
#include <iostream>
|
|
|
|
|
|
using boost::asio::awaitable;
|
|
|
@@ -33,7 +34,8 @@ using boost::asio::use_awaitable;
|
|
|
if (!ok) co_return false; \
|
|
|
}
|
|
|
|
|
|
-inline awaitable<void> coro_sleep(std::chrono::milliseconds t) {
|
|
|
+template<typename DurationType>
|
|
|
+inline awaitable<void> coro_sleep(DurationType t) {
|
|
|
boost::asio::high_resolution_timer timer(co_await boost::asio::this_coro::executor);
|
|
|
timer.expires_from_now(t);
|
|
|
co_await timer.async_wait(use_awaitable);
|
|
|
@@ -70,6 +72,37 @@ private:
|
|
|
bool is_empty = true;
|
|
|
};
|
|
|
|
|
|
+class string_reader {
|
|
|
+public:
|
|
|
+
|
|
|
+ explicit string_reader(int _sep_length = 1)
|
|
|
+ : sep_length(_sep_length) {}
|
|
|
+
|
|
|
+ void set_string(std::string str) {
|
|
|
+ ss.clear();
|
|
|
+ ss.str(std::move(str));
|
|
|
+ ss.seekg(0, std::ios::beg);
|
|
|
+ }
|
|
|
+
|
|
|
+ template<typename T>
|
|
|
+ string_reader &operator>>(T &val) {
|
|
|
+ assert(!empty());
|
|
|
+ ss >> val;
|
|
|
+ if (!empty()) {
|
|
|
+ ss.ignore(sep_length);
|
|
|
+ }
|
|
|
+ return *this;
|
|
|
+ }
|
|
|
+
|
|
|
+ bool empty() const {
|
|
|
+ return ss.rdbuf()->in_avail() == 0;
|
|
|
+ }
|
|
|
+
|
|
|
+private:
|
|
|
+ std::stringstream ss;
|
|
|
+ int sep_length;
|
|
|
+};
|
|
|
+
|
|
|
namespace sophiar {
|
|
|
|
|
|
template<typename SmallObjType>
|
|
|
@@ -77,12 +110,16 @@ namespace sophiar {
|
|
|
assert(config.contains("obj_name"));
|
|
|
assert(config["obj_name"].is_string());
|
|
|
auto obj_name = config["obj_name"].get<std::string>();
|
|
|
- auto obj_index = global_sophiar_manager.register_global_obj<SmallObjType>(obj_name);
|
|
|
+ auto obj_index = REGISTER_GLOBAL_OBJ(SmallObjType, obj_name);
|
|
|
auto worker = make_infinite_coro_worker(global_context, [
|
|
|
obj_name, buffer = string_writer(),
|
|
|
obj_helper = GLOBAL_OBJ_AUTO_DELEGATE(SmallObjType, obj_index)]() mutable
|
|
|
-> awaitable<bool> {
|
|
|
co_await obj_helper.coro_wait_update();
|
|
|
+ if (obj_helper.empty()) {
|
|
|
+ SPDLOG_DEBUG("{} is empty.", obj_name);
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
obj_helper->write_to(buffer);
|
|
|
SPDLOG_DEBUG("{} = {}", obj_name, buffer.get_string_and_reset());
|
|
|
co_return true;
|
|
|
@@ -90,11 +127,99 @@ namespace sophiar {
|
|
|
return std::move(worker);
|
|
|
}
|
|
|
|
|
|
+ template<typename SmallObjType>
|
|
|
+ inline coro_worker::pointer global_obj_recorder_func(const nlohmann::json &config) {
|
|
|
+ // global obj_config
|
|
|
+ assert(config.contains("obj_name"));
|
|
|
+ assert(config["obj_name"].is_string());
|
|
|
+ auto obj_name = config["obj_name"].get<std::string>();
|
|
|
+ auto obj_index = REGISTER_GLOBAL_OBJ(SmallObjType, obj_name);
|
|
|
+ // output file config
|
|
|
+ assert(config.contains("save_file"));
|
|
|
+ assert(config["save_file"].is_string());
|
|
|
+ auto save_file_path = config["save_file"].get<std::string>();
|
|
|
+ auto ofs = std::ofstream(save_file_path, std::ofstream::out);
|
|
|
+ assert(ofs.is_open());
|
|
|
+ // create worker
|
|
|
+ auto worker = make_infinite_coro_worker(global_context, [
|
|
|
+ obj_index, buffer = string_writer(","), ofs = std::move(ofs),
|
|
|
+ obj_helper = GLOBAL_OBJ_AUTO_DELEGATE(SmallObjType, obj_index)]() mutable
|
|
|
+ -> awaitable<bool> {
|
|
|
+ co_await obj_helper.coro_wait_update();
|
|
|
+ auto ts = GLOBAL_OBJ_UPDATE_TS(obj_index);
|
|
|
+ buffer << (static_cast<double>(ts) / 1000.0); // us -> ms
|
|
|
+ if (!obj_helper.empty()) {
|
|
|
+ obj_helper->write_to(buffer);
|
|
|
+ }
|
|
|
+ ofs << buffer.get_string_and_reset() << std::endl;
|
|
|
+ co_return true;
|
|
|
+ });
|
|
|
+ return std::move(worker);
|
|
|
+ }
|
|
|
+
|
|
|
+ template<typename SmallObjType>
|
|
|
+ inline coro_worker::pointer global_obj_replayer_func(const nlohmann::json &config) {
|
|
|
+ // global obj_config
|
|
|
+ assert(config.contains("obj_name"));
|
|
|
+ assert(config["obj_name"].is_string());
|
|
|
+ auto obj_name = config["obj_name"].get<std::string>();
|
|
|
+ auto obj_index = REGISTER_GLOBAL_OBJ(SmallObjType, obj_name);
|
|
|
+ // output file config
|
|
|
+ assert(config.contains("record_file"));
|
|
|
+ assert(config["record_file"].is_string());
|
|
|
+ auto record_file_path = config["record_file"].get<std::string>();
|
|
|
+ auto ifs = std::ifstream(record_file_path, std::ifstream::in);
|
|
|
+ assert(ifs.is_open());
|
|
|
+ // create worker
|
|
|
+ auto worker = make_infinite_coro_worker(global_context, [
|
|
|
+ obj_index, buffer = string_reader(), ifs = std::move(ifs)]() mutable
|
|
|
+ -> awaitable<bool> {
|
|
|
+ std::string str_buf;
|
|
|
+ if (!std::getline(ifs, str_buf)) co_return false; // EOF
|
|
|
+ buffer.set_string(std::move(str_buf));
|
|
|
+ double next_ts_ms;
|
|
|
+ buffer >> next_ts_ms;
|
|
|
+ auto next_ts = static_cast<timestamp_type>(next_ts_ms * 1000); // ms -> us
|
|
|
+ timestamp_type cur_ts = current_timestamp();
|
|
|
+ if (next_ts > cur_ts) {
|
|
|
+ co_await coro_sleep(std::chrono::microseconds(next_ts - cur_ts));
|
|
|
+ } else if (next_ts != cur_ts) { // if time is missed, simply ignore
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+ if (buffer.empty()) { // empty obj
|
|
|
+ UPDATE_GLOBAL_OBJ(SmallObjType, obj_index, nullptr);
|
|
|
+ } else {
|
|
|
+ auto new_obj = SmallObjType::new_instance();
|
|
|
+ new_obj->fill_from(buffer);
|
|
|
+ assert(buffer.empty());
|
|
|
+ UPDATE_GLOBAL_OBJ(SmallObjType, obj_index, std::move(new_obj));
|
|
|
+ }
|
|
|
+ co_return true;
|
|
|
+ });
|
|
|
+ return std::move(worker);
|
|
|
+ }
|
|
|
+
|
|
|
template<typename SmallObjType>
|
|
|
using global_obj_watcher = simple_tristate_obj_wrapper<global_obj_watcher_func<SmallObjType>>;
|
|
|
|
|
|
+ template<typename SmallObjType>
|
|
|
+ using global_obj_recorder = simple_tristate_obj_wrapper<global_obj_recorder_func<SmallObjType>>;
|
|
|
+
|
|
|
+ template<typename SmallObjType>
|
|
|
+ using global_obj_replayer = simple_tristate_obj_wrapper<global_obj_replayer_func<SmallObjType>>;
|
|
|
+
|
|
|
+ using double_obj_watcher = global_obj_watcher<double_obj>;
|
|
|
+ using scalarxyz_obj_watcher = global_obj_watcher<scalarxyz_obj>;
|
|
|
using transform_obj_watcher = global_obj_watcher<transform_obj>;
|
|
|
|
|
|
+ using double_obj_recorder = global_obj_recorder<double_obj>;
|
|
|
+ using scalarxyz_obj_recorder = global_obj_recorder<scalarxyz_obj>;
|
|
|
+ using transform_obj_recorder = global_obj_recorder<transform_obj>;
|
|
|
+
|
|
|
+ using double_obj_replayer = global_obj_replayer<double_obj>;
|
|
|
+ using scalarxyz_obj_replayer = global_obj_replayer<scalarxyz_obj>;
|
|
|
+ using transform_obj_replayer = global_obj_replayer<transform_obj>;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
#endif //SOPHIAR2_DEBUG_UTILITY_HPP
|