Przeglądaj źródła

基本实现了 sophiar_manager

jcsyshc 3 lat temu
rodzic
commit
a16ee3418d

+ 24 - 8
src/core/sophiar_manager.cpp

@@ -243,14 +243,17 @@ namespace sophiar {
                 }
                 co_return true;
             });
+            manager_worker->run();
         }
 
         void on_object_stopped(sophiar_obj *obj_ptr) {
             assert(obj_ptr_index_map.contains(obj_ptr));
             auto obj_index = obj_ptr_index_map.at(obj_ptr);
             auto event = manager_event::new_instance(manager_event_type::STOP, obj_index);
-            auto ok = manager_event_channel.try_send(error_code{}, std::move(event));
-            assert(ok);
+            co_spawn(global_context, [this, event = std::move(event)]() mutable -> awaitable<void> {
+                co_await manager_event_channel.async_send(error_code{}, std::move(event), use_awaitable);
+                co_return;
+            }, detached);
         }
 
         obj_index_type create_object(const std::string &type_name,
@@ -282,14 +285,14 @@ namespace sophiar {
                 obj_info.init_config = config["init_config"];
             } else {
                 obj_info.init_config = {};
-                // TODO show log, use empty config
+                SPDLOG_WARN("Use empty init config for node {}", obj_name);
             }
 
-            if (config.contains("start_configs")) {
-                obj_info.start_config = config["start_configs"];
+            if (config.contains("start_config")) {
+                obj_info.start_config = config["start_config"];
             } else {
                 obj_info.start_config = {};
-                // TODO show, use empty config
+                SPDLOG_WARN("Use empty start config for node {}", obj_name);
             }
         }
 
@@ -543,12 +546,13 @@ namespace sophiar {
             auto &rep_length_tmp = *reinterpret_cast<uint16_t *>(reply_memory.data() + 0);
             rep_length_tmp = reply_length;
             swap_net_loc_endian<boost::endian::order::big>(rep_length_tmp);
-            auto reply_buffer = buffer(reply_memory.data(), reply_length);
+            auto reply_buffer = buffer(reply_memory.data(), reply_length + 2);
             co_await async_write(client_socket, reply_buffer, use_awaitable);
             co_return;
         }
 
         void create_client_worker(tcp::socket &&client_socket) {
+            auto remote_endpoint = client_socket.remote_endpoint();
             auto worker_func = [
                     this,
                     client_socket = std::move(client_socket),
@@ -566,11 +570,15 @@ namespace sophiar {
             auto exit_func = [=]() {
                 assert(worker_iter_ptr != nullptr);
                 client_worker_list.erase(*worker_iter_ptr);
-                delete worker_iter_ptr;
+//                delete worker_iter_ptr; // TODO 这句话会报错,怀疑其他地方有内存访问越界
             };
             auto worker = make_infinite_coro_worker(global_context,
                                                     std::move(noexcept_worker_func),
                                                     std::move(exit_func));
+            worker->run();
+            SPDLOG_INFO("Working with client {}:{}",
+                        remote_endpoint.address().to_string(),
+                        remote_endpoint.port());
             client_worker_list.push_front(std::move(worker));
             *worker_iter_ptr = client_worker_list.begin();
         }
@@ -593,10 +601,18 @@ namespace sophiar {
                     co_return true;
                 };
                 listen_worker = make_infinite_coro_worker(global_context, std::move(listen_func));
+                listen_worker->run();
+                SPDLOG_INFO("Sophiar manager is listen on {}:{}",
+                            listen_endpoint.address().to_string(),
+                            listen_endpoint.port());
             } catch (std::exception &e) {
                 // TODO show log, failed to create coro_worker
                 return false;
             }
+
+            // start node event handler
+            start_manager_worker();
+
             return true;
         }
 

+ 3 - 0
src/utility/coro_signal2.hpp

@@ -106,6 +106,9 @@ namespace sophiar {
     class coro_signal2 : private boost::noncopyable {
     public:
 
+        // if coro signal is moved, signal watcher will not work
+        coro_signal2(coro_signal2 &&other) noexcept = delete;
+
         template<typename Executor>
         explicit coro_signal2(Executor &executor)
 

+ 5 - 2
src/utility/coro_signal_group.hpp

@@ -6,6 +6,7 @@
 
 #include <boost/dynamic_bitset.hpp>
 
+#include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
@@ -16,9 +17,11 @@ namespace sophiar {
     class coro_signal_group {
     public:
 
+        using pointer = std::unique_ptr<coro_signal_group<CondFunc>>;
+
         template<typename Executor>
-        coro_signal_group(Executor &executor)
-                :final_signal(executor) {}
+        explicit coro_signal_group(Executor &executor)
+                : final_signal(executor) {}
 
         ~coro_signal_group() {
             assert(!is_running);

+ 1 - 6
src/utility/global_obj_helper.hpp

@@ -24,15 +24,10 @@ namespace sophiar {
                 : global_obj_delegate(_manager,
                                       _manager.register_global_obj<SmallObjType>(obj_name)) {}
 
-        global_obj_delegate(global_obj_delegate &other) noexcept
-                : manager(other.manager),
-                  watcher(std::move(other.watcher)),
-                  obj_index(other.obj_index),
-                  obj_ptr(std::move(other.obj_ptr)) {}
+        global_obj_delegate(global_obj_delegate &&other) noexcept = default;
 
         void set_value(const pointer_type &ptr,
                        timestamp_type ts = current_timestamp()) {
-            assert(ptr.get() != nullptr);
             manager.update_global_obj<SmallObjType>(obj_index, ptr, ts);
         }
 

+ 3 - 3
src/utility/versatile_buffer2.hpp

@@ -129,7 +129,7 @@ namespace sophiar {
                                            size_t initial_pos = 0)
                 : buffer(std::move(other_buffer)) {
             cur_pos = initial_pos;
-            cur_length = initial_length;
+            cur_length = initial_length != 0 ? initial_length : buffer.max_length();
         }
 
         template<typename T>
@@ -280,8 +280,8 @@ namespace sophiar {
 
         // 从当前位置开始调整 offset
         void manual_offset(ptrdiff_t offset) {
-            assert(offset < 0 && cur_pos >= -offset);
-            assert(offset > 0 && cur_pos + offset <= buffer.max_length());
+            assert(offset > 0 || cur_pos >= -offset);
+            assert(offset < 0 || cur_pos + offset <= buffer.max_length());
             cur_pos += offset;
         }
 

+ 2 - 2
tests/CMakeLists.txt

@@ -3,8 +3,8 @@ find_package (Boost REQUIRED COMPONENTS unit_test_framework)
 include_directories (${Boost_INCLUDE_DIRS})
 
 add_executable(test_core
-        core/datanode_base.cpp
-        core/geometry_types.cpp
+#        core/datanode_base.cpp
+#        core/geometry_types.cpp
         core/small_obj.cpp
         core/sophiar_manager.cpp
         #        core/transform_tree.cpp

+ 94 - 65
tests/core/sophiar_manager.cpp

@@ -3,6 +3,8 @@
 #include "core/sophiar_manager.h"
 #include "core/tristate_obj.h"
 #include "core/basic_obj_types.hpp"
+#include "utility/coro_signal_group.hpp"
+#include "utility/coro_worker.hpp"
 #include "utility/debug_utility.hpp"
 #include "utility/global_obj_helper.hpp"
 
@@ -28,100 +30,135 @@ using namespace std::chrono_literals;
 struct source_node_type : public tristate_obj {
     DEFAULT_NEW_INSTANCE(source_node_type)
 
-    global_obj_auto_sync_delegate<double_obj> out_obj;
+    global_obj_index_type output_obj_index;
+    coro_worker::pointer source_worker;
 
     boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
-        init_value = config["output_obj_name"].get<int64_t>();
-        co_return true; // TODO
+        output_obj_index = get_manager().
+                register_global_obj<u64int_obj>(
+                config["output_obj_name"].get<std::string>());
+        co_return true;
     }
 
     boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
-        assert(config.contains("start_value"));
-        assert(config["start_value"].is_number());
-        start_value = config["start_value"].get<int64_t>();
-        co_spawn(get_context(), [=]() -> awaitable<void> {
-            co_await coro_sleep(50ms);
-            this->output_signal.emit(init_value * start_value);
-        }, detached);
+        source_worker = make_interval_coro_worker(
+                get_context(), std::chrono::seconds(1), [
+                        output_obj = global_obj_auto_sync_delegate<u64int_obj>(
+                                get_manager(), output_obj_index),
+                        start_value = config["start_value"].get<std::uint64_t>()]() mutable
+                        -> boost::asio::awaitable<bool> {
+                    auto new_out = u64int_obj::new_instance(start_value);
+                    output_obj.set_value(new_out);
+                    SPDLOG_WARN("New value from source {}", new_out->value);
+                    ++start_value;
+                    co_return true;
+                });
+        source_worker->run();
         co_return true;
     }
 
-    using output_signal_type = tiny_signal<int64_t>;
-    output_signal_type output_signal;
-    int64_t init_value = -1, start_value = -1;
+    boost::asio::awaitable<void> on_stop() override {
+        source_worker->cancel();
+        co_await source_worker->coro_wait_stop();
+        source_worker.reset();
+        co_return;
+    }
 
 };
 
 struct proxy_node_type : public tristate_obj {
     DEFAULT_NEW_INSTANCE(proxy_node_type);
 
-    using signal_type = tiny_signal<int64_t>;
-    using slot_type = signal_type::slot_type;
-
-    struct slot_impl_type : public slot_type {
-        proxy_node_type *p_this = nullptr;
-
-        void on_signal_received(int64_t args) override {
-            assert(p_this != nullptr);
-            p_this->output_signal.emit(args * p_this->init_value);
-        }
-    };
-
-    void load_construct_config(const nlohmann::json &config) override {
-        BOOST_TEST(config.empty());
-        auto input_slot = new slot_impl_type();
-        input_slot->p_this = this;
-        get_manager().register_signal(this, "output", output_signal);
-        get_manager().register_slot<int64_t>(this, "input", *input_slot);
-    };
+    global_obj_index_type input_obj_index;
+    global_obj_index_type output_obj_index;
+    coro_worker::pointer proxy_worker;
 
     boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
-        assert(config.contains("init_value"));
-        assert(config["init_value"].is_number());
-        init_value = config["init_value"].get<int64_t>();
+        input_obj_index = get_manager().
+                register_global_obj<u64int_obj>(
+                config["input_obj_name"].get<std::string>());
+        output_obj_index = get_manager().
+                register_global_obj<u64int_obj>(
+                config["output_obj_name"].get<std::string>());
         co_return true;
     }
 
     boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
-        BOOST_TEST(config.empty());
-        co_spawn(get_context(), [=]() -> awaitable<void> {
-            co_await coro_sleep(100ms);
-            co_await this->stop();
-        }, detached);
+        proxy_worker = make_infinite_coro_worker(
+                get_context(), [
+                        input_obj = global_obj_auto_sync_delegate<u64int_obj>(
+                                get_manager(), input_obj_index),
+                        output_obj = global_obj_auto_sync_delegate<u64int_obj>(
+                                get_manager(), output_obj_index),
+                        start_value = config["start_value"].get<std::uint64_t>()]() mutable
+                        -> boost::asio::awaitable<bool> {
+                    co_await input_obj.coro_wait_update();
+                    auto new_out = u64int_obj::new_instance(input_obj->value + start_value);
+                    output_obj.set_value(new_out);
+                    SPDLOG_WARN("New value from proxy {}", new_out->value);
+                    ++start_value;
+                    co_return true;
+                });
+        proxy_worker->run();
         co_return true;
     }
 
-    signal_type output_signal;
-    int64_t init_value = -1;
+    boost::asio::awaitable<void> on_stop() override {
+        proxy_worker->cancel();
+        co_await proxy_worker->coro_wait_stop();
+        proxy_worker.reset();
+        co_return;
+    }
+
 };
 
 struct target_node_type : public tristate_obj {
     DEFAULT_NEW_INSTANCE(target_node_type);
 
-    using signal_type = tiny_signal<int64_t>;
-    using slot_type = signal_type::slot_type;
-
-    struct slot_impl_type : public slot_type {
-        void on_signal_received(int64_t args) override {
-            std::cout << args << std::endl;
-        }
-    };
-
-    void load_construct_config(const nlohmann::json &config) override {
-        BOOST_TEST(config.empty());
-        get_manager().register_slot<int64_t>(this, "input", *(new slot_impl_type));
-    };
+    global_obj_index_type source_obj_index;
+    global_obj_index_type proxy_obj_index;
+    coro_worker::pointer target_worker;
+    coro_signal_any_group::pointer watch_group;
 
     boost::asio::awaitable<bool> on_init(const nlohmann::json &config) override {
-        BOOST_TEST(config.empty());
+        source_obj_index = get_manager().
+                register_global_obj<u64int_obj>(
+                config["source_obj_name"].get<std::string>());
+        proxy_obj_index = get_manager().
+                register_global_obj<u64int_obj>(
+                config["proxy_obj_name"].get<std::string>());
         co_return true;
     }
 
     boost::asio::awaitable<bool> on_start(const nlohmann::json &config) override {
-        BOOST_TEST(config.empty());
+        watch_group = std::make_unique<coro_signal_any_group>(get_manager());
+        watch_group->add_watcher(get_manager().request_global_obj_update_watcher(source_obj_index));
+        watch_group->add_watcher(get_manager().request_global_obj_update_watcher(proxy_obj_index));
+        watch_group->start(get_context());
+        target_worker = make_infinite_coro_worker(
+                get_context(), [
+                        watcher = watch_group->new_watcher(get_context()),
+                        source_obj = global_obj_auto_sync_delegate<u64int_obj>(
+                                get_manager(), source_obj_index),
+                        proxy_obj = global_obj_auto_sync_delegate<u64int_obj>(
+                                get_manager(), proxy_obj_index)]() mutable
+                        -> boost::asio::awaitable<bool> {
+                    co_await watcher.coro_wait();
+                    SPDLOG_ERROR("source: {}, proxy: {}", source_obj->value, proxy_obj->value);
+                    co_return true;
+                });
+        target_worker->run();
         co_return true;
     }
 
+    boost::asio::awaitable<void> on_stop() override {
+        co_await watch_group->stop();
+        target_worker->cancel();
+        co_await target_worker->coro_wait_stop();
+        target_worker.reset();
+        co_return;
+    }
+
 };
 
 BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
@@ -137,14 +174,6 @@ BOOST_AUTO_TEST_CASE(test_sophiar_manager) {
 
     global_sophiar_manager.load_config_and_start(nlohmann::json::parse(config_file));
 
-    co_spawn(global_context, []() -> awaitable<void> {
-        co_await global_sophiar_manager.switch_mode("mode_a");
-        co_await coro_sleep(100ms);
-        co_await global_sophiar_manager.switch_mode("mode_b");
-//        co_await coro_sleep(100ms);
-//        co_await global_sophiar_manager.switch_mode("mode_a");
-    }, detached);
-
     global_context.run();
 
 }

+ 7 - 6
tests/data/sophiar_manager_config.json

@@ -7,18 +7,18 @@
       "init_config": {
         "output_obj_name": "out_obj"
       },
-      "start_configs": {
+      "start_config": {
         "start_value": 234
       }
     },
     {
       "type": "proxy_node_type",
       "name": "proxy",
-      "init_configs": {
+      "init_config": {
         "input_obj_name": "out_obj",
         "output_obj_name": "proxy_obj"
       },
-      "start_configs": {
+      "start_config": {
         "start_value": 10000
       },
       "dependencies": [
@@ -28,10 +28,11 @@
     {
       "type": "target_node_type",
       "name": "target",
-      "init_configs": {
-        "listen_obj_name": "out_obj"
+      "init_config": {
+        "source_obj_name": "out_obj",
+        "proxy_obj_name": "proxy_obj"
       },
-      "start_configs": {}
+      "start_config": {}
     }
   ]
 }