Browse Source

New variable utility.

jcsyshc 2 năm trước cách đây
mục cha
commit
6ea070709b

+ 2 - 1
src/core/CMakeLists.txt

@@ -1,6 +1,7 @@
 file(GLOB_RECURSE CORE_SRC_FILES ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
+file(GLOB_RECURSE UTILITY_SRC_FILES ${CMAKE_CURRENT_LIST_DIR}/../utility/*.cpp)
 
-add_library(sophiar_core STATIC ${CORE_SRC_FILES})
+add_library(sophiar_core STATIC ${CORE_SRC_FILES} ${UTILITY_SRC_FILES})
 
 target_link_libraries(sophiar_core ${BASIC_LIBS})
 

+ 7 - 7
src/core/external_variable_io.cpp

@@ -126,10 +126,10 @@ namespace sophiar {
                 : s(std::move(_s)),
                   buf(dynamic_memory::new_instance()) {
             for (auto &info: infos) {
-                assert(!info_pool.contains(info.var_index));
-                info_pool.emplace(info.var_index,
+                assert(!info_pool.contains(info.index));
+                info_pool.emplace(info.index,
                                   variable_io_info{
-                                          .var_type_index = info.var_type_index,
+                                          .var_type_index = info.type_index,
                                           .placeholder = info.placeholder,
                                   });
             }
@@ -191,11 +191,11 @@ namespace sophiar {
                   watcher(signal_group->new_watcher()) {
             s.set_option(tcp::no_delay{true});
             for (auto &info: infos) {
-                signal_group->add_watcher(REQUIRE_VARIABLE_WATCHER(info.var_index));
+                signal_group->add_watcher(REQUIRE_VARIABLE_WATCHER(info.index));
                 info_pool.push_back(
                         {
-                                .var_type_index = info.var_type_index,
-                                .var_index = info.var_index,
+                                .var_type_index = info.type_index,
+                                .var_index = info.index,
                                 .placeholder = info.placeholder,
                                 .last_update_ts = 0
                         });
@@ -264,7 +264,7 @@ namespace sophiar {
         variable_info_list infos;
         for (auto var_name: params) {
             auto info = query_variable_information(var_name);
-            writer << info.var_index;
+            writer << info.index;
             infos.push_back(info);
         }
         auto buf_str = writer.get_string_and_reset();

+ 3 - 0
src/core/global_defs.cpp

@@ -72,6 +72,8 @@ namespace sophiar {
 #undef REGISTER_VARIABLE_TYPE
     }
 
+    void register_variable_utility();
+
     // prevent this file from being too big.
     void register_algorithms();
 
@@ -91,6 +93,7 @@ namespace sophiar {
 
 #if !SOPHIAR_TEST || SOPHIAR_TEST_ALGORITHM
 
+        register_variable_utility();
         register_algorithms();
 
 #endif

+ 23 - 0
src/core/global_defs.h

@@ -11,7 +11,9 @@
 namespace sophiar {
 
     class dynamic_pool;
+
     class sophiar_manager;
+
     class sophiar_pool;
 
     extern dynamic_pool *global_dynamic_pool;
@@ -57,6 +59,27 @@ namespace sophiar {
 #define UPDATE_VARIABLE_VAL_WITH_TS(var_type, var_index, val, ts) \
     global_sophiar_pool->update_variable<var_type>(var_index, var_type::new_instance(val), ts)
 
+#define REGISTER_CALLBACK(func) \
+    global_sophiar_pool->register_callback(func)
+
+#define REGISTER_CALLBACK2(func, exit_func) \
+    global_sophiar_pool->register_callback(func, exit_func)
+
+#define REGISTER_CORO_CALLBACK(func) \
+    global_sophiar_pool->register_coro_callback(func)
+
+#define REGISTER_CORO_CALLBACK2(func, exit_func) \
+    global_sophiar_pool->register_coro_callback(func, exit_func)
+
+#define UNREGISTER_CALLBACK(token) \
+    global_sophiar_pool->unregister_callback(token)
+
+#define ATTACH_CALLBACK(var_index, token) \
+    global_sophiar_pool->attach_callback(var_index, token)
+
+#define DETACH_CALLBACK(token) \
+    global_sophiar_pool->detach_callback(token)
+
 #ifdef SOPHIAR_TEST
 
 #define REGISTER_VARIABLE(var_name, var_type) \

+ 4 - 1
src/core/sophiar_manager.h

@@ -67,7 +67,7 @@ namespace sophiar {
 
 #else
 
-    private:
+        private:
 
 #endif // SOPHIAR_TEST
 
@@ -89,6 +89,9 @@ namespace sophiar {
 #define REGISTER_TYPE(DerivedT) \
     global_sophiar_manager->register_object_type<DerivedT>(#DerivedT)
 
+#define REGISTER_TYPE2(type, name) \
+    global_sophiar_manager->register_object_type<type>(name)
+
 }
 
 #endif //SOPHIAR2_SOPHIAR_MANAGER_H

+ 31 - 9
src/core/sophiar_pool.cpp

@@ -45,13 +45,18 @@ namespace sophiar {
 
             using callback_store_type = std::variant<function_callback_type, coroutine_callback_type>;
             callback_store_type func_store;
+            function_callback_type exit_func;
             attach_list_type attach_list;
             callback_queue_iter_type queue_iter;
+
+            bool exit_flag = false;
+            void *pool_iter = nullptr; // callback_iter_type
         };
 
         using callback_pool_type = std::list<callback_info>;
         using callback_iter_type = callback_pool_type::iterator;
         static_assert(sizeof(callback_iter_type) == sizeof(callback_token_type));
+        static_assert(sizeof(callback_iter_type) == sizeof(void *));
 
         using callback_list_type = std::list<callback_info *>;
         using callback_list_iter_type = callback_list_type::iterator;
@@ -157,19 +162,27 @@ namespace sophiar {
             info.last_update_ts = 0;
         }
 
-        callback_token_type register_callback(callback_info::callback_store_type &&callback) {
+        callback_token_type register_callback(callback_info::callback_store_type &&callback,
+                                              function_callback_type &&exit_func) {
             auto &item = callback_pool.emplace_front();
             item.func_store = std::move(callback);
-            return std::bit_cast<callback_token_type>(callback_pool.begin());
+            item.exit_func = std::move(exit_func);
+            item.pool_iter = std::bit_cast<void *>(callback_pool.begin());
+            return std::bit_cast<callback_token_type>(item.pool_iter);
         }
 
         void unregister_callback(callback_iter_type callback) {
             while (!callback->attach_list.empty()) {
                 detach_callback(callback->attach_list.begin());
             }
+            if (callback->status == callback_info::RUNNING) {
+                callback->exit_flag = true;
+                return;
+            }
             if (callback->status == callback_info::PENDING) {
                 callback_queue.erase(callback->queue_iter);
             }
+            callback->exit_func();
             callback_pool.erase(callback);
         }
 
@@ -203,9 +216,14 @@ namespace sophiar {
                     callback->status = callback_info::IDLE;
                 } else {
                     assert(callback->func_store.index() == 1);
-                    co_spawn(*global_context, std::get<1>(callback->func_store)(), [=](std::exception_ptr eptr) {
+                    co_spawn(*global_context, std::get<1>(callback->func_store)(), [=, this](std::exception_ptr eptr) {
                         assert(eptr == nullptr);
                         callback->status = callback_info::IDLE;
+                        if (callback->exit_flag) {
+                            callback->exit_func();
+                            auto real_iter = std::bit_cast<callback_iter_type>(callback->pool_iter);
+                            callback_pool.erase(real_iter);
+                        }
                     });
                 }
             }
@@ -271,8 +289,8 @@ namespace sophiar {
         assert(pimpl->variable_pool.contains(var_name));
         const auto &info = pimpl->variable_pool[var_name];
         variable_info ret{
-                .var_type_index = info.type_info->type_index,
-                .var_index = pimpl->variable_pool.to_index_by_name(var_name),
+                .type_index = info.type_info->type_index,
+                .index = pimpl->variable_pool.to_index_by_name(var_name),
                 .placeholder = info.placeholder
         };
         return ret;
@@ -312,12 +330,16 @@ namespace sophiar {
         pimpl->load_config(config);
     }
 
-    sophiar_pool::callback_token_type sophiar_pool::register_callback(function_callback_type &&callback) {
-        return pimpl->register_callback(std::move(callback));
+    sophiar_pool::callback_token_type
+    sophiar_pool::register_callback(function_callback_type &&callback,
+                                    function_callback_type &&exit_func) {
+        return pimpl->register_callback(std::move(callback), std::move(exit_func));
     }
 
-    sophiar_pool::callback_token_type sophiar_pool::register_coro_callback(coroutine_callback_type &&callback) {
-        return pimpl->register_callback(std::move(callback));
+    sophiar_pool::callback_token_type
+    sophiar_pool::register_coro_callback(coroutine_callback_type &&callback,
+                                         function_callback_type &&exit_func) {
+        return pimpl->register_callback(std::move(callback), std::move(exit_func));
     }
 
     void sophiar_pool::unregister_callback(callback_token_type token) {

+ 15 - 11
src/core/sophiar_pool.h

@@ -68,9 +68,13 @@ namespace sophiar {
         using function_callback_type = std::function<void()>;
         using coroutine_callback_type = std::function<boost::asio::awaitable<void>()>;
 
-        callback_token_type register_callback(function_callback_type &&callback);
+        static constexpr auto empty_callback = []() {};
 
-        callback_token_type register_coro_callback(coroutine_callback_type &&callback);
+        callback_token_type register_callback(function_callback_type &&callback,
+                                              function_callback_type &&exit_func = empty_callback);
+
+        callback_token_type register_coro_callback(coroutine_callback_type &&callback,
+                                                   function_callback_type &&exit_func = empty_callback);
 
         void unregister_callback(callback_token_type token);
 
@@ -88,6 +92,15 @@ namespace sophiar {
 
 #endif // SOPHIAR_TEST
 
+        // for internal classes
+        struct variable_info {
+            variable_type_index_type type_index;
+            variable_index_type index;
+            void *placeholder;
+        };
+
+        variable_info query_variable_information(std::string_view var_name);
+
     private:
 
         struct impl;
@@ -103,15 +116,6 @@ namespace sophiar {
         void update_variable_timestamp_impl(variable_index_type var_index,
                                             timestamp_type ts);
 
-        // for friend classes
-        struct variable_info {
-            variable_type_index_type var_type_index;
-            variable_index_type var_index;
-            void *placeholder;
-        };
-
-        variable_info query_variable_information(std::string_view var_name);
-
         friend class external_variable_io;
 
     };

+ 124 - 0
src/utility/variable_utility2.cpp

@@ -0,0 +1,124 @@
+#include "core/basic_obj_types.hpp"
+#include "core/global_defs.h"
+#include "core/sophiar_pool.h"
+#include "core/timestamp_helper.hpp"
+#include "core/tristate_obj.h"
+#include "utility/config_utility.hpp"
+#include "utility/versatile_buffer2.hpp"
+
+#include <spdlog/spdlog.h>
+
+#include <vector>
+
+using boost::asio::awaitable;
+
+namespace sophiar {
+
+    class multi_variable_utility_base : public tristate_obj {
+    public:
+        awaitable<void> on_stop() noexcept override {
+            for (auto cb: cb_pool) {
+                UNREGISTER_CALLBACK(cb);
+            }
+            cb_pool.clear();
+            co_return;
+        }
+
+    protected:
+        void register_variables(const nlohmann::json &config) {
+            if (config.contains("variable_name_list")) {
+                ENSURE_ARRAY("variable_name_list");
+                for (auto &item: config["variable_name_list"]) {
+                    assert(item.is_string());
+                    auto var_name = item.get<std::string>();
+                    cb_pool.push_back(create_callback(var_name));
+                }
+            } else {
+                auto var_name = LOAD_STRING_ITEM("variable_name");
+                cb_pool.push_back(create_callback(var_name));
+            }
+        }
+
+        virtual sophiar_pool::callback_token_type create_callback(const std::string &var_name) = 0;
+
+    private:
+        std::vector<sophiar_pool::callback_token_type> cb_pool;
+    };
+
+    class variable_debug_watcher : public multi_variable_utility_base {
+    public:
+        DEFAULT_NEW_INSTANCE(variable_debug_watcher)
+
+    protected:
+        awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
+            min_interval = TRY_LOAD_FLOAT_ITEM("minimum_interval_ms", 0) * 1000; // ms -> us
+            register_variables(config);
+            co_return true;
+        }
+
+    private:
+        timestamp_type min_interval = 0;
+
+        struct callback_info {
+            string_writer buffer;
+            timestamp_type last_print_ts = 0;
+        };
+
+        sophiar_pool::callback_token_type create_callback(const std::string &var_name) override {
+            auto var_info = global_sophiar_pool->query_variable_information(var_name);
+            auto cb_info = new callback_info;
+            auto cb_func = [=, this]() {
+                auto ts = QUERY_VARIABLE_TS(var_info.index);
+                if (ts - cb_info->last_print_ts < min_interval) return;
+                cb_info->last_print_ts = ts;
+                if (!raw_pointer_is_valid(var_info.placeholder, var_info.type_index)) {
+                    SPDLOG_DEBUG("{} is empty.", var_name);
+                } else {
+                    raw_pointer_write_to(cb_info->buffer, var_info.placeholder, var_info.type_index);
+                    SPDLOG_DEBUG("{} = {}", var_name, cb_info->buffer.get_string_and_reset());
+                }
+            };
+            auto exit_func = [=]() { delete cb_info; };
+            auto token = REGISTER_CALLBACK2(cb_func, exit_func);
+            ATTACH_CALLBACK(var_info.index, token);
+            return token;
+        }
+    };
+
+    class variable_validity_watcher : public multi_variable_utility_base {
+    public:
+        DEFAULT_NEW_INSTANCE(variable_validity_watcher)
+
+    protected:
+        awaitable<bool> on_start(const nlohmann::json &config) noexcept override {
+            create_callback(config);
+            co_return true;
+        }
+
+    private:
+        sophiar_pool::callback_token_type create_callback(const std::string &var_name) override {
+            auto var_info = global_sophiar_pool->query_variable_information(var_name);
+            bool is_valid = raw_pointer_is_valid(var_info.placeholder, var_info.type_index);
+            auto cb_func = [=]() mutable {
+                if (raw_pointer_is_valid(var_info.placeholder, var_info.type_index)) {
+                    if (is_valid)[[likely]] return;
+                    SPDLOG_DEBUG("{} becomes invalid.", var_name);
+                    is_valid = false;
+                } else {
+                    if (!is_valid)[[likely]] return;
+                    SPDLOG_DEBUG("{} becomes valid.", var_name);
+                    is_valid = true;
+                }
+            };
+            auto token = REGISTER_CALLBACK(cb_func);
+            ATTACH_CALLBACK(var_info.index, token);
+            return token;
+        }
+    };
+
+    void register_variable_utility() {
+        REGISTER_TYPE2(variable_debug_watcher, "object_watcher");
+        REGISTER_TYPE2(variable_validity_watcher, "object_validity_watcher");
+    }
+
+}

+ 1 - 1
tests/CMakeLists.txt

@@ -13,7 +13,7 @@ target_compile_definitions(test_utility PUBLIC SOPHIAR_TEST)
 target_link_libraries(test_utility ${Boost_LIBRARIES} ${BASIC_LIBS})
 
 file(GLOB_RECURSE TEST_ALGORITHM_SRC_FILES ./algorithm/*.cpp)
-add_executable(test_algorithm ${TEST_ALGORITHM_SRC_FILES} ${CORE_SRC_FILES})
+add_executable(test_algorithm ${TEST_ALGORITHM_SRC_FILES} ${CORE_SRC_FILES} ${UTILITY_SRC_FILES})
 target_compile_definitions(test_algorithm PUBLIC SOPHIAR_TEST SOPHIAR_TEST_ALGORITHM)
 target_link_libraries(test_algorithm ${Boost_LIBRARIES} ${BASIC_LIBS} sophiar_algorithm)
 

+ 20 - 20
tests/core/sophiar_pool.cpp

@@ -23,42 +23,42 @@ using namespace std::chrono_literals;
 
 awaitable<void> test_callback() {
     int test_flag = 0;
-    auto callback_token = global_sophiar_pool->register_callback([&]() { ++test_flag; });
+    auto callback_token = REGISTER_CALLBACK([&]() { ++test_flag; });
     auto var_index = REQUIRE_VARIABLE(u64int_obj, "var_num");
 
-    auto attach_token = global_sophiar_pool->attach_callback(var_index, callback_token);
+    auto attach_token = ATTACH_CALLBACK(var_index, callback_token);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 1);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 1);
 
-    global_sophiar_pool->detach_callback(attach_token);
+    DETACH_CALLBACK(attach_token);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 1);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 1);
 
-    global_sophiar_pool->attach_callback(var_index, callback_token);
+    ATTACH_CALLBACK(var_index, callback_token);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 2);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 2);
 
-    global_sophiar_pool->unregister_callback(callback_token);
+    UNREGISTER_CALLBACK(callback_token);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 2);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 2);
 
     int test_flag_2 = 0;
     auto var_index_2 = REQUIRE_VARIABLE(double_obj, "var_float");
-    auto callback_token_1 = global_sophiar_pool->register_callback([&]() {
+    auto callback_token_1 = REGISTER_CALLBACK([&]() {
         SPDLOG_DEBUG("callback 1");
         ++test_flag;
         UPDATE_VARIABLE_VAL(double_obj, var_index_2, 1.0);
     });
-    auto callback_token_2 = global_sophiar_pool->register_callback([&]() {
+    auto callback_token_2 = REGISTER_CALLBACK([&]() {
         SPDLOG_DEBUG("callback 2");
         ++test_flag_2;
     });
-    global_sophiar_pool->attach_callback(var_index, callback_token_1);
-    global_sophiar_pool->attach_callback(var_index_2, callback_token_2);
+    ATTACH_CALLBACK(var_index, callback_token_1);
+    ATTACH_CALLBACK(var_index_2, callback_token_2);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 3);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 3);
@@ -69,21 +69,21 @@ awaitable<void> test_callback() {
     BOOST_TEST(test_flag == 4);
     BOOST_TEST(test_flag_2 == 2);
 
-    auto coro_token_1 = global_sophiar_pool->register_coro_callback([&]() -> awaitable<void> {
+    auto coro_token_1 = REGISTER_CORO_CALLBACK([&]() -> awaitable<void> {
         co_await coro_sleep(100ms);
         SPDLOG_DEBUG("coro callback 1");
         ++test_flag;
         UPDATE_VARIABLE_VAL(double_obj, var_index_2, 1.0);
         co_return;
     });
-    auto coro_token_2 = global_sophiar_pool->register_coro_callback([&]() -> awaitable<void> {
+    auto coro_token_2 = REGISTER_CORO_CALLBACK([&]() -> awaitable<void> {
         co_await coro_sleep(100ms);
         SPDLOG_DEBUG("coro callback 2");
         ++test_flag_2;
         co_return;
     });
-    global_sophiar_pool->attach_callback(var_index, coro_token_1);
-    global_sophiar_pool->attach_callback(var_index_2, coro_token_2);
+    ATTACH_CALLBACK(var_index, coro_token_1);
+    ATTACH_CALLBACK(var_index_2, coro_token_2);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 5);
     co_await coro_sleep(50ms);
     BOOST_TEST(test_flag == 5);
@@ -93,15 +93,15 @@ awaitable<void> test_callback() {
     BOOST_TEST(test_flag == 6);
     BOOST_TEST(test_flag_2 == 5);
 
-    global_sophiar_pool->unregister_callback(callback_token_1);
-    global_sophiar_pool->unregister_callback(callback_token_2);
-    global_sophiar_pool->unregister_callback(coro_token_1);
-    global_sophiar_pool->unregister_callback(coro_token_2);
+    UNREGISTER_CALLBACK(callback_token_1);
+    UNREGISTER_CALLBACK(callback_token_2);
+    UNREGISTER_CALLBACK(coro_token_1);
+    UNREGISTER_CALLBACK(coro_token_2);
 
     test_flag = 0;
-    callback_token = global_sophiar_pool->register_callback([&]() { ++test_flag; });
-    global_sophiar_pool->attach_callback(var_index, callback_token);
-    global_sophiar_pool->attach_callback(var_index_2, callback_token);
+    callback_token = REGISTER_CALLBACK([&]() { ++test_flag; });
+    ATTACH_CALLBACK(var_index, callback_token);
+    ATTACH_CALLBACK(var_index_2, callback_token);
     UPDATE_VARIABLE_VAL(u64int_obj, var_index, 1);
     UPDATE_VARIABLE_VAL(double_obj, var_index_2, 1.0);
     co_await coro_sleep(50ms);

+ 5 - 2
tests/data/transform_tree_config.json

@@ -59,10 +59,13 @@
       }
     },
     {
-      "type": "transform_obj_watcher",
+      "type": "object_watcher",
       "name": "transform_watcher",
       "start_config": {
-        "variable_name": "C_in_D"
+        "variable_name_list": [
+          "C_in_D",
+          "D_in_Root"
+        ]
       }
     }
   ]