|
|
@@ -1,9 +1,10 @@
|
|
|
#include "sophiar_manager.h"
|
|
|
#include "core/basic_obj_types.hpp"
|
|
|
+#include "core/global_defs.h"
|
|
|
#include "core/small_obj.hpp"
|
|
|
-#include "core/sophiar_obj.hpp"
|
|
|
#include "core/tristate_obj.h"
|
|
|
#include "third_party/scope_guard.hpp"
|
|
|
+#include "utility/config_utility.hpp"
|
|
|
#include "utility/coro_signal2.hpp"
|
|
|
#include "utility/coro_worker.hpp"
|
|
|
#include "utility/debug_utility.hpp"
|
|
|
@@ -14,11 +15,7 @@
|
|
|
#include <boost/asio/co_spawn.hpp>
|
|
|
#include <boost/asio/detached.hpp>
|
|
|
#include <boost/asio/experimental/channel.hpp>
|
|
|
-#include <boost/asio/ip/address.hpp>
|
|
|
-#include <boost/asio/ip/tcp.hpp>
|
|
|
-#include <boost/asio/redirect_error.hpp>
|
|
|
#include <boost/asio/use_awaitable.hpp>
|
|
|
-#include <boost/iterator/counting_iterator.hpp>
|
|
|
#include <boost/system/error_code.hpp>
|
|
|
|
|
|
#include <fmt/format.h>
|
|
|
@@ -40,12 +37,9 @@ namespace sophiar {
|
|
|
using boost::asio::co_spawn;
|
|
|
using boost::asio::detached;
|
|
|
using boost::asio::experimental::channel;
|
|
|
- using boost::asio::redirect_error;
|
|
|
using boost::asio::use_awaitable;
|
|
|
using boost::system::error_code;
|
|
|
|
|
|
- using namespace boost::asio::ip;
|
|
|
-
|
|
|
struct sophiar_manager::impl {
|
|
|
|
|
|
enum class manager_status_type {
|
|
|
@@ -54,15 +48,7 @@ namespace sophiar {
|
|
|
BUSY = 0x02
|
|
|
};
|
|
|
|
|
|
- static constexpr uint16_t default_listen_port = 5277;
|
|
|
- static constexpr size_t command_initial_length = 1024; // TODO 考虑怎么支持超长报文
|
|
|
- static constexpr size_t reply_max_length = 4096; // TODO 考虑支持更长的回复
|
|
|
-
|
|
|
- using command_reader_type = versatile_readable_buffer<boost::endian::order::big, extern_memory>;
|
|
|
- using reply_writer_type = versatile_writable_buffer<boost::endian::order::big, extern_memory>;
|
|
|
-
|
|
|
using obj_type_index_type = uint8_t;
|
|
|
- using obj_factory_func_pool_type = named_vector<obj_type_index_type, obj_factory_func_type>;
|
|
|
|
|
|
using obj_index_type = uint16_t;
|
|
|
struct obj_info {
|
|
|
@@ -73,9 +59,6 @@ namespace sophiar {
|
|
|
std::vector<obj_index_type> reverse_dependency_list; // 哪些节点需要这个节点
|
|
|
};
|
|
|
|
|
|
- using obj_pool_type = named_vector<obj_index_type, obj_info>;
|
|
|
- using obj_ptr_index_map_type = std::unordered_map<sophiar_obj *, obj_index_type>;
|
|
|
-
|
|
|
enum class manager_event_type {
|
|
|
START, STOP
|
|
|
};
|
|
|
@@ -84,53 +67,25 @@ namespace sophiar {
|
|
|
manager_event_type event;
|
|
|
obj_type_index_type obj_index;
|
|
|
std::unique_ptr<coro_signal2> callback;
|
|
|
-
|
|
|
- manager_event(manager_event_type _event, obj_type_index_type _index,
|
|
|
- std::unique_ptr<coro_signal2> _callback = nullptr) :
|
|
|
- event(_event), obj_index(_index), callback(std::move(_callback)) {}
|
|
|
};
|
|
|
|
|
|
- using manager_event_channel_type = channel<void(error_code, manager_event::pointer)>;
|
|
|
-
|
|
|
- using global_obj_writer_func_type = void (*)(reply_writer_type &, void *);
|
|
|
- using global_obj_type_index = uint16_t;
|
|
|
- using global_obj_writer_func_pool_type = std::vector<global_obj_writer_func_type>;
|
|
|
- using global_obj_type_to_index_pool_type = std::unordered_map<std::type_index, global_obj_type_index>;
|
|
|
-
|
|
|
- struct global_obj_info {
|
|
|
- void *placeholder;
|
|
|
- coro_signal2 *update_signal;
|
|
|
- timestamp_type last_update_ts;
|
|
|
- std::type_index obj_type = typeid(void);
|
|
|
- global_obj_type_index obj_type_index = ~static_cast<global_obj_type_index>(0);
|
|
|
- };
|
|
|
-
|
|
|
- using global_obj_pool_type = named_vector<global_obj_index_type, global_obj_info>;
|
|
|
-
|
|
|
sophiar_manager *q_this = nullptr;
|
|
|
manager_status_type manager_status = manager_status_type::INITIAL;
|
|
|
|
|
|
+ using obj_factory_func_pool_type = named_vector<obj_type_index_type, obj_factory_func_type>;
|
|
|
+ using obj_pool_type = named_vector<obj_index_type, obj_info>;
|
|
|
+ using obj_ptr_index_map_type = std::unordered_map<sophiar_obj *, obj_index_type>;
|
|
|
+
|
|
|
obj_factory_func_pool_type obj_factory_func_pool;
|
|
|
obj_pool_type obj_pool;
|
|
|
obj_ptr_index_map_type obj_ptr_index_map;
|
|
|
|
|
|
- global_obj_writer_func_pool_type global_obj_writer_func_pool;
|
|
|
- global_obj_type_to_index_pool_type global_obj_type_to_index_pool;
|
|
|
- global_obj_pool_type global_obj_pool;
|
|
|
-
|
|
|
+ using manager_event_channel_type = channel<void(error_code, manager_event::pointer)>;
|
|
|
manager_event_channel_type manager_event_channel;
|
|
|
coro_worker::pointer manager_worker;
|
|
|
|
|
|
- using client_worker_list_type = std::list<coro_worker::pointer>;
|
|
|
-
|
|
|
- uint16_t listen_port = default_listen_port;
|
|
|
- coro_worker::pointer listen_worker;
|
|
|
- client_worker_list_type client_worker_list;
|
|
|
-
|
|
|
impl() :
|
|
|
- manager_event_channel(global_context) {
|
|
|
- register_basic_global_obj_types();
|
|
|
- }
|
|
|
+ manager_event_channel(*global_context) {}
|
|
|
|
|
|
std::string get_obj_name_by_ptr(sophiar_obj *obj) const {
|
|
|
assert(obj_ptr_index_map.contains(obj));
|
|
|
@@ -138,40 +93,16 @@ namespace sophiar {
|
|
|
return obj_pool.to_name_by_index(obj_index);
|
|
|
}
|
|
|
|
|
|
- template<typename SmallObjType>
|
|
|
- void register_global_obj_type() {
|
|
|
- std::type_index obj_type = typeid(SmallObjType);
|
|
|
- static_assert(SmallObjType::binary_length() <= std::numeric_limits<uint8_t>::max());
|
|
|
- assert(!global_obj_type_to_index_pool.contains(obj_type));
|
|
|
- auto writer_func = &SmallObjType::template raw_pointer_write_to<reply_writer_type>;
|
|
|
- auto obj_type_index = global_obj_writer_func_pool.size();
|
|
|
- global_obj_type_to_index_pool[obj_type] = obj_type_index;
|
|
|
- global_obj_writer_func_pool.push_back(writer_func);
|
|
|
- }
|
|
|
-
|
|
|
- void register_basic_global_obj_types() {
|
|
|
- register_global_obj_type<bool_obj>();
|
|
|
- register_global_obj_type<u8int_obj>();
|
|
|
- register_global_obj_type<u16int_obj>();
|
|
|
- register_global_obj_type<u32int_obj>();
|
|
|
- register_global_obj_type<u64int_obj>();
|
|
|
- register_global_obj_type<i8int_obj>();
|
|
|
- register_global_obj_type<i16int_obj>();
|
|
|
- register_global_obj_type<i32int_obj>();
|
|
|
- register_global_obj_type<i64int_obj>();
|
|
|
- register_global_obj_type<float_obj>();
|
|
|
- register_global_obj_type<double_obj>();
|
|
|
- register_global_obj_type<scalarxyz_obj>();
|
|
|
- register_global_obj_type<transform_obj>();
|
|
|
- register_global_obj_type<array6_obj>();
|
|
|
- register_global_obj_type<array7_obj>();
|
|
|
- assert(global_obj_type_to_index_pool[typeid(array7_obj)] == 14);
|
|
|
- }
|
|
|
-
|
|
|
auto get_tristate_ptr(obj_index_type obj_index) const {
|
|
|
return dynamic_cast<tristate_obj *>(obj_pool[obj_index].ptr);
|
|
|
}
|
|
|
|
|
|
+ bool check_object_name_for_controller(std::string_view obj_name) {
|
|
|
+ if (obj_pool.contains(obj_name)) [[likely]] return true;
|
|
|
+ SPDLOG_ERROR("Object [name = {}] is not registered.", obj_name);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
// start obj and its dependencies recursively
|
|
|
awaitable<bool> start_obj(obj_index_type obj_index) {
|
|
|
assert(manager_status == manager_status_type::BUSY);
|
|
|
@@ -179,7 +110,7 @@ namespace sophiar {
|
|
|
// check loop
|
|
|
auto &info = obj_pool[obj_index];
|
|
|
if (info.in_queue) {
|
|
|
- assert(false);
|
|
|
+ assert(false); // TODO 先检测是否有环,这里就可以直接 assert 了
|
|
|
co_return false;
|
|
|
}
|
|
|
info.in_queue = true;
|
|
|
@@ -200,7 +131,7 @@ namespace sophiar {
|
|
|
co_return true;
|
|
|
}
|
|
|
|
|
|
- // stop obj and objs depending on it recursively
|
|
|
+ // stop obj and objects depending on it recursively
|
|
|
awaitable<void> stop_obj(obj_index_type obj_index) {
|
|
|
assert(manager_status == manager_status_type::BUSY);
|
|
|
|
|
|
@@ -229,7 +160,7 @@ namespace sophiar {
|
|
|
|
|
|
void start_manager_worker() { // 启动一个处理 start 和 stop 事件的协程
|
|
|
assert(manager_worker == nullptr);
|
|
|
- manager_worker = make_infinite_coro_worker(global_context, [this]() -> awaitable<bool> {
|
|
|
+ manager_worker = make_infinite_coro_worker([this]() -> awaitable<bool> {
|
|
|
auto event = co_await manager_event_channel.async_receive(use_awaitable);
|
|
|
assert(event != nullptr);
|
|
|
manager_status = manager_status_type::BUSY;
|
|
|
@@ -249,13 +180,19 @@ namespace sophiar {
|
|
|
}
|
|
|
|
|
|
void on_object_stopped(sophiar_obj *obj_ptr) {
|
|
|
+
|
|
|
+#ifdef SOPHIAR_TEST
|
|
|
+
|
|
|
+ if (manager_status == manager_status_type::INITIAL) return;
|
|
|
+
|
|
|
+#endif // SOPHIAR_TEST
|
|
|
+
|
|
|
+ auto event = manager_event::new_instance();
|
|
|
assert(obj_ptr_index_map.contains(obj_ptr));
|
|
|
- auto obj_index = obj_ptr_index_map.at(obj_ptr);
|
|
|
- auto event = manager_event::new_instance(manager_event_type::STOP, obj_index);
|
|
|
- co_spawn(global_context, [this, event = std::move(event)]() mutable -> awaitable<void> {
|
|
|
- co_await manager_event_channel.async_send(error_code{}, std::move(event), use_awaitable);
|
|
|
- co_return;
|
|
|
- }, detached);
|
|
|
+ event->obj_index = obj_ptr_index_map.at(obj_ptr);
|
|
|
+ event->event = manager_event_type::STOP;
|
|
|
+ event->callback = nullptr;
|
|
|
+ manager_event_channel.async_send(error_code{}, std::move(event), detached);
|
|
|
}
|
|
|
|
|
|
obj_index_type create_object(const std::string &type_name,
|
|
|
@@ -274,12 +211,8 @@ namespace sophiar {
|
|
|
}
|
|
|
|
|
|
void create_obj_and_load_config(const nlohmann::json &config) {
|
|
|
- assert(config.contains("type"));
|
|
|
- assert(config.contains("name"));
|
|
|
- assert(config["type"].is_string());
|
|
|
- assert(config["name"].is_string());
|
|
|
- auto type_name = config["type"].get<std::string>();
|
|
|
- auto obj_name = config["name"].get<std::string>();
|
|
|
+ auto type_name = LOAD_STRING_ITEM("type");
|
|
|
+ auto obj_name = LOAD_STRING_ITEM("name");
|
|
|
auto obj_index = create_object(type_name, obj_name);
|
|
|
auto &obj_info = obj_pool[obj_index];
|
|
|
|
|
|
@@ -299,11 +232,11 @@ namespace sophiar {
|
|
|
}
|
|
|
|
|
|
void load_obj_dependencies(const nlohmann::json &config) {
|
|
|
- auto obj_name = config["name"].get<std::string>();
|
|
|
+ auto obj_name = LOAD_STRING_ITEM("name");
|
|
|
auto obj_index = obj_pool.to_index_by_name(obj_name);
|
|
|
auto &obj_info = obj_pool[obj_index];
|
|
|
if (!config.contains("dependencies")) return;
|
|
|
- assert(config["dependencies"].is_array());
|
|
|
+ ENSURE_ARRAY("dependencies");
|
|
|
for (auto &dep_json: config["dependencies"]) {
|
|
|
assert(dep_json.is_string());
|
|
|
auto dep_name = dep_json.get<std::string>();
|
|
|
@@ -315,17 +248,8 @@ namespace sophiar {
|
|
|
}
|
|
|
|
|
|
void build_graph(const nlohmann::json &config) {
|
|
|
- // load listen port
|
|
|
- if (config.contains("listen_port")) {
|
|
|
- assert(config["listen_port"].is_number_unsigned());
|
|
|
- listen_port = config["listen_port"].get<std::uint64_t>();
|
|
|
- } else {
|
|
|
- listen_port = default_listen_port;
|
|
|
- // TODO show log, use default
|
|
|
- }
|
|
|
// create obj and config them
|
|
|
- assert(config.contains("object_list"));
|
|
|
- assert(config["object_list"].is_array());
|
|
|
+ ENSURE_ARRAY("object_list");
|
|
|
for (auto &obj_config: config["object_list"]) { // pass one
|
|
|
create_obj_and_load_config(obj_config);
|
|
|
}
|
|
|
@@ -336,286 +260,82 @@ namespace sophiar {
|
|
|
// TODO check for loop in the dependency map
|
|
|
}
|
|
|
|
|
|
- static awaitable<uint8_t> receive_command(tcp::socket &client_socket, dynamic_memory &buffer_memory) {
|
|
|
- using header_reader_type = versatile_readable_buffer<boost::endian::order::big, static_memory<3>>;
|
|
|
- header_reader_type header_reader;
|
|
|
- co_await header_reader.async_read_from(client_socket, 3);
|
|
|
- auto cmd_length = header_reader.read_value<uint16_t>() - 1;
|
|
|
- assert(std::is_signed_v<decltype(cmd_length)> && cmd_length >= 0);
|
|
|
- auto cmd_code = header_reader.read_value<uint8_t>();
|
|
|
- buffer_memory.ensure_length(cmd_length);
|
|
|
- auto cmd_buffer = buffer(buffer_memory.data(), cmd_length);
|
|
|
- co_await async_read(client_socket, cmd_buffer, use_awaitable);
|
|
|
- co_return cmd_code;
|
|
|
- }
|
|
|
-
|
|
|
- void handle_query_manager_status(reply_writer_type &writer) const {
|
|
|
- writer << (uint8_t) manager_status;
|
|
|
- }
|
|
|
-
|
|
|
- void handle_query_obj_status(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) const {
|
|
|
- auto query_number = reader.read_value<uint16_t>();
|
|
|
- while (query_number--) {
|
|
|
- auto obj_index = reader.read_value<obj_index_type>();
|
|
|
- if (!obj_pool.contains(obj_index) || get_tristate_ptr(obj_index) == nullptr) {
|
|
|
- // TODO show log
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- } else {
|
|
|
- writer << (uint8_t) get_tristate_ptr(obj_index)->get_state();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void handle_query_global_obj_value(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) const {
|
|
|
- auto query_number = reader.read_value<uint16_t>();
|
|
|
- while (query_number--) {
|
|
|
- auto obj_index = reader.read_value<global_obj_index_type>();
|
|
|
- if (!global_obj_pool.contains(obj_index)) {
|
|
|
- // TODO show log
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- } else {
|
|
|
- auto &obj_info = global_obj_pool[obj_index];
|
|
|
- auto obj_type_index = obj_info.obj_type_index;
|
|
|
- assert(is_valid_id(obj_type_index));
|
|
|
- writer << obj_type_index;
|
|
|
- auto length_ptr = (uint8_t *) writer.cur_data();
|
|
|
- writer.manual_offset(1); // skip length
|
|
|
- auto writer_func = global_obj_writer_func_pool[obj_type_index];
|
|
|
- writer_func(writer, obj_info.placeholder);
|
|
|
- auto obj_length = writer.cur_data() - (char *) length_ptr - 1;
|
|
|
- *length_ptr = obj_length;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void handle_query_obj_index(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) const {
|
|
|
- auto name_length = reader.read_value<uint16_t>();
|
|
|
- auto obj_name = reader.read_string(name_length);
|
|
|
- if (!obj_pool.contains(obj_name)) {
|
|
|
- writer << (obj_index_type) -1;
|
|
|
- } else {
|
|
|
- writer << obj_pool.to_index_by_name(obj_name);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void handle_query_global_obj_index(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) const {
|
|
|
- auto name_length = reader.read_value<uint16_t>();
|
|
|
- auto global_obj_name = reader.read_string(name_length);
|
|
|
- if (!global_obj_pool.contains(global_obj_name)) {
|
|
|
- writer << (global_obj_index_type) -1;
|
|
|
- } else {
|
|
|
- writer << global_obj_pool.to_index_by_name(global_obj_name);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> handle_init_obj(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) {
|
|
|
- auto obj_index = reader.read_value<obj_index_type>();
|
|
|
- if (obj_pool.contains(obj_index)) {
|
|
|
- auto tri_obj_ptr = get_tristate_ptr(obj_index);
|
|
|
- if (tri_obj_ptr != nullptr) {
|
|
|
- auto &obj_info = obj_pool[obj_index];
|
|
|
- co_await tri_obj_ptr->init(obj_info.init_config);
|
|
|
- }
|
|
|
- }
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- co_return;
|
|
|
- }
|
|
|
+// void handle_query_global_obj_value(command_reader_type &reader,
|
|
|
+// reply_writer_type &writer) const {
|
|
|
+// auto query_number = reader.read_value<uint16_t>();
|
|
|
+// while (query_number--) {
|
|
|
+// auto obj_index = reader.read_value<global_obj_index_type>();
|
|
|
+// if (!global_obj_pool.contains(obj_index)) {
|
|
|
+// // TODO show log
|
|
|
+// writer << (uint8_t) 0xFF;
|
|
|
+// } else {
|
|
|
+// auto &obj_info = global_obj_pool[obj_index];
|
|
|
+// auto obj_type_index = obj_info.obj_type_index;
|
|
|
+// assert(is_valid_id(obj_type_index));
|
|
|
+// writer << obj_type_index;
|
|
|
+// auto length_ptr = (uint8_t *) writer.cur_data();
|
|
|
+// writer.manual_offset(1); // skip length
|
|
|
+// auto writer_func = global_obj_writer_func_pool[obj_type_index];
|
|
|
+// writer_func(writer, obj_info.placeholder);
|
|
|
+// auto obj_length = writer.cur_data() - (char *) length_ptr - 1;
|
|
|
+// *length_ptr = obj_length;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// void handle_query_obj_index(command_reader_type &reader,
|
|
|
+// reply_writer_type &writer) const {
|
|
|
+// auto name_length = reader.read_value<uint16_t>();
|
|
|
+// auto obj_name = reader.read_string(name_length);
|
|
|
+// if (!obj_pool.contains(obj_name)) {
|
|
|
+// writer << (obj_index_type) -1;
|
|
|
+// } else {
|
|
|
+// writer << obj_pool.to_index_by_name(obj_name);
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// void handle_query_global_obj_index(command_reader_type &reader,
|
|
|
+// reply_writer_type &writer) const {
|
|
|
+// auto name_length = reader.read_value<uint16_t>();
|
|
|
+// auto global_obj_name = reader.read_string(name_length);
|
|
|
+// if (!global_obj_pool.contains(global_obj_name)) {
|
|
|
+// writer << (global_obj_index_type) -1;
|
|
|
+// } else {
|
|
|
+// writer << global_obj_pool.to_index_by_name(global_obj_name);
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
awaitable<void> coro_start_obj(obj_index_type obj_index) {
|
|
|
assert(obj_pool.contains(obj_index));
|
|
|
- auto callback_signal = std::make_unique<coro_signal2>(global_context);
|
|
|
- auto callback_watcher = callback_signal->new_watcher(global_context);
|
|
|
- auto event = manager_event::new_instance(manager_event_type::START,
|
|
|
- obj_index,
|
|
|
- std::move(callback_signal));
|
|
|
- co_await manager_event_channel.async_send(error_code{}, std::move(event), use_awaitable);
|
|
|
+ auto callback_signal = std::make_unique<coro_signal2>();
|
|
|
+ auto callback_watcher = callback_signal->new_watcher();
|
|
|
+ auto event = manager_event::new_instance();
|
|
|
+ event->event = manager_event_type::START;
|
|
|
+ event->obj_index = obj_index;
|
|
|
+ event->callback = std::move(callback_signal);
|
|
|
+ manager_event_channel.async_send(error_code{}, std::move(event), detached);
|
|
|
co_await callback_watcher.coro_wait(false);
|
|
|
co_return;
|
|
|
}
|
|
|
|
|
|
- awaitable<void> coro_stop_obj(obj_index_type obj_index) {
|
|
|
+// awaitable<void> coro_stop_obj(obj_index_type obj_index) {
|
|
|
+// assert(obj_pool.contains(obj_index));
|
|
|
+// auto callback_signal = std::make_unique<coro_signal2>();
|
|
|
+// auto callback_watcher = callback_signal->new_watcher();
|
|
|
+// auto event = manager_event::new_instance();
|
|
|
+// event->event = manager_event_type::STOP;
|
|
|
+// event->obj_index = obj_index;
|
|
|
+// event->callback = std::move(callback_signal);
|
|
|
+// manager_event_channel.async_send(error_code{}, std::move(event), detached);
|
|
|
+// co_await callback_watcher.coro_wait(false);
|
|
|
+// co_return;
|
|
|
+// }
|
|
|
+
|
|
|
+ tristate_obj::state_type query_object_state(obj_index_type obj_index) {
|
|
|
assert(obj_pool.contains(obj_index));
|
|
|
- auto callback_signal = std::make_unique<coro_signal2>(global_context);
|
|
|
- auto callback_watcher = callback_signal->new_watcher(global_context);
|
|
|
- auto event = manager_event::new_instance(manager_event_type::STOP,
|
|
|
- obj_index,
|
|
|
- std::move(callback_signal));
|
|
|
- co_await manager_event_channel.async_send(error_code{}, std::move(event), use_awaitable);
|
|
|
- co_await callback_watcher.coro_wait(false);
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> handle_start_obj(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) {
|
|
|
- auto obj_index = reader.read_value<obj_index_type>();
|
|
|
- if (obj_pool.contains(obj_index)) {
|
|
|
- co_await coro_start_obj(obj_index);
|
|
|
- } else {
|
|
|
- // TODO show log
|
|
|
- }
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> handle_stop_obj(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) {
|
|
|
- auto obj_index = reader.read_value<obj_index_type>();
|
|
|
- if (obj_pool.contains(obj_index)) {
|
|
|
- co_await coro_stop_obj(obj_index);
|
|
|
- } else {
|
|
|
- // TODO show log
|
|
|
- }
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> handle_reset_obj(command_reader_type &reader,
|
|
|
- reply_writer_type &writer) {
|
|
|
- auto obj_index = reader.read_value<obj_index_type>();
|
|
|
- if (obj_pool.contains(obj_index)) {
|
|
|
- auto tri_obj_ptr = get_tristate_ptr(obj_index);
|
|
|
- if (tri_obj_ptr != nullptr) {
|
|
|
- // check if stop is needed
|
|
|
- if (tri_obj_ptr->get_state() > tristate_obj::state_type::PENDING) {
|
|
|
- co_await coro_stop_obj(obj_index); // TODO 这里可能会有重复,应该直接把对应的对象停止也可以
|
|
|
- }
|
|
|
- auto &obj_info = obj_pool[obj_index];
|
|
|
- co_await tri_obj_ptr->init(obj_info.init_config);
|
|
|
- }
|
|
|
- }
|
|
|
- writer << (uint8_t) 0xFF;
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- awaitable<void> handle_command(tcp::socket &client_socket,
|
|
|
- uint8_t cmd_code,
|
|
|
- dynamic_memory &cmd_content,
|
|
|
- dynamic_memory &reply_memory) {
|
|
|
- auto command_reader = command_reader_type(extern_memory(cmd_content));
|
|
|
- auto reply_content_memory = extern_memory(reply_memory.data() + 2, // save space for length
|
|
|
- reply_memory.max_length() - 2);
|
|
|
- auto reply_writer = reply_writer_type(std::move(reply_content_memory));
|
|
|
- switch (cmd_code) {
|
|
|
- case 0x00: {
|
|
|
- handle_query_manager_status(reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x01: {
|
|
|
- handle_query_obj_status(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x02: {
|
|
|
- handle_query_global_obj_value(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x0A: {
|
|
|
- handle_query_obj_index(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x0B: {
|
|
|
- handle_query_global_obj_index(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x10: {
|
|
|
- co_await handle_init_obj(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x11: {
|
|
|
- co_await handle_start_obj(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x12: {
|
|
|
- co_await handle_stop_obj(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- case 0x13: {
|
|
|
- co_await handle_reset_obj(command_reader, reply_writer);
|
|
|
- break;
|
|
|
- }
|
|
|
- default: // TODO 增加退出的功能
|
|
|
- // TODO show
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- // send back reply
|
|
|
- auto reply_length = reply_writer.get_cur_pos();
|
|
|
- assert(reply_length <= reply_max_length - 2);
|
|
|
- auto &rep_length_tmp = *reinterpret_cast<uint16_t *>(reply_memory.data() + 0);
|
|
|
- rep_length_tmp = reply_length;
|
|
|
- swap_net_loc_endian<boost::endian::order::big>(rep_length_tmp);
|
|
|
- auto reply_buffer = buffer(reply_memory.data(), reply_length + 2);
|
|
|
- co_await async_write(client_socket, reply_buffer, use_awaitable);
|
|
|
- co_return;
|
|
|
- }
|
|
|
-
|
|
|
- void create_client_worker(tcp::socket &&client_socket) {
|
|
|
- auto remote_endpoint = client_socket.remote_endpoint();
|
|
|
- auto worker_func = [
|
|
|
- this, // TODO 把这几个 memory 放进一个 struct 里面
|
|
|
- client_socket = std::move(client_socket),
|
|
|
- command_memory = dynamic_memory(command_initial_length),
|
|
|
- reply_memory = dynamic_memory(reply_max_length)]() mutable
|
|
|
- -> awaitable<bool> {
|
|
|
- auto cmd_code = co_await receive_command(client_socket, command_memory);
|
|
|
- co_await handle_command(client_socket, cmd_code, command_memory, reply_memory);
|
|
|
- co_return true;
|
|
|
- };
|
|
|
- auto noexcept_worker_func = make_noexcept_func(std::move(worker_func), [](std::exception &e) {
|
|
|
- SPDLOG_WARN("Client left."); // TODO better log
|
|
|
- });
|
|
|
- auto worker_iter_ptr = new client_worker_list_type::iterator;
|
|
|
- auto exit_func = [=]() {
|
|
|
- assert(worker_iter_ptr != nullptr);
|
|
|
- client_worker_list.erase(*worker_iter_ptr);
|
|
|
-// delete worker_iter_ptr; // TODO 这句话会报错,怀疑其他地方有内存访问越界
|
|
|
- };
|
|
|
- auto worker = make_infinite_coro_worker(global_context,
|
|
|
- std::move(noexcept_worker_func),
|
|
|
- std::move(exit_func));
|
|
|
- worker->run();
|
|
|
- SPDLOG_INFO("Working with client {}:{}",
|
|
|
- remote_endpoint.address().to_string(),
|
|
|
- remote_endpoint.port());
|
|
|
- client_worker_list.push_front(std::move(worker));
|
|
|
- *worker_iter_ptr = client_worker_list.begin();
|
|
|
- }
|
|
|
-
|
|
|
- bool start_manager() {
|
|
|
- assert(listen_worker == nullptr);
|
|
|
- auto listen_endpoint = tcp::endpoint(tcp::v4(), listen_port);
|
|
|
- try {
|
|
|
- auto listen_func = [
|
|
|
- this,
|
|
|
- acceptor = tcp::acceptor(global_context, listen_endpoint)
|
|
|
- ]() mutable -> awaitable<bool> {
|
|
|
- error_code ec;
|
|
|
- auto client = co_await acceptor.async_accept(redirect_error(use_awaitable, ec));
|
|
|
- if (ec) {
|
|
|
- // TODO show log
|
|
|
- } else {
|
|
|
- create_client_worker(std::move(client));
|
|
|
- }
|
|
|
- co_return true;
|
|
|
- };
|
|
|
- listen_worker = make_infinite_coro_worker(global_context, std::move(listen_func));
|
|
|
- listen_worker->run();
|
|
|
- SPDLOG_INFO("Sophiar manager is listen on {}:{}",
|
|
|
- listen_endpoint.address().to_string(),
|
|
|
- listen_endpoint.port());
|
|
|
- } catch (std::exception &e) {
|
|
|
- // TODO show log, failed to create coro_worker
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // start node event handler
|
|
|
- start_manager_worker();
|
|
|
-
|
|
|
- return true;
|
|
|
+ auto ptr = get_tristate_ptr(obj_index);
|
|
|
+ if (ptr == nullptr) return tristate_obj::state_type::UNKNOWN;
|
|
|
+ return ptr->get_state();
|
|
|
}
|
|
|
|
|
|
};
|
|
|
@@ -625,7 +345,7 @@ namespace sophiar {
|
|
|
pimpl->q_this = this;
|
|
|
}
|
|
|
|
|
|
- void sophiar_manager::register_object_type_impl(const std::string &type_name,
|
|
|
+ void sophiar_manager::register_object_type_impl(std::string_view type_name,
|
|
|
obj_factory_func_type func) {
|
|
|
auto index = pimpl->obj_factory_func_pool.new_elem(type_name);
|
|
|
pimpl->obj_factory_func_pool[index] = func;
|
|
|
@@ -635,11 +355,7 @@ namespace sophiar {
|
|
|
assert(pimpl->manager_status == impl::manager_status_type::INITIAL);
|
|
|
pimpl->build_graph(config);
|
|
|
pimpl->manager_status = impl::manager_status_type::NORMAL;
|
|
|
- bool ok = pimpl->start_manager();
|
|
|
- if (!ok) {
|
|
|
- // TODO show fatal error
|
|
|
- return false;
|
|
|
- }
|
|
|
+ pimpl->start_manager_worker();
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
@@ -658,63 +374,75 @@ namespace sophiar {
|
|
|
pimpl->on_object_stopped(obj);
|
|
|
}
|
|
|
|
|
|
- global_obj_index_type sophiar_manager::register_global_obj_impl(const std::string &obj_name,
|
|
|
- std::type_index obj_type,
|
|
|
- void *placeholder) {
|
|
|
- if (placeholder == nullptr) {
|
|
|
- if (!pimpl->global_obj_pool.contains(obj_name))
|
|
|
- return ~(global_obj_index_type) 0; // indicate the caller to create a placeholder
|
|
|
- assert(pimpl->global_obj_pool[obj_name].obj_type == obj_type);
|
|
|
- return pimpl->global_obj_pool.to_index_by_name(obj_name);
|
|
|
- }
|
|
|
+#ifdef SOPHIAR_TEST
|
|
|
|
|
|
- // create new one
|
|
|
- auto obj_index = pimpl->global_obj_pool.new_elem(obj_name);
|
|
|
- auto &obj_info = pimpl->global_obj_pool[obj_index];
|
|
|
- obj_info.obj_type = obj_type;
|
|
|
- obj_info.placeholder = placeholder;
|
|
|
- obj_info.update_signal = new coro_signal2{global_context};
|
|
|
- if (pimpl->global_obj_type_to_index_pool.contains(obj_type)) {
|
|
|
- obj_info.obj_type_index = pimpl->global_obj_type_to_index_pool[obj_type];
|
|
|
- } else {
|
|
|
- // TODO show log, not registered type
|
|
|
- }
|
|
|
- return obj_index;
|
|
|
+ sophiar_obj *sophiar_manager::get_object(std::string_view obj_name) const {
|
|
|
+ if (!pimpl->obj_pool.contains(obj_name)) return nullptr;
|
|
|
+ return pimpl->obj_pool[obj_name].ptr;
|
|
|
}
|
|
|
|
|
|
- void *sophiar_manager::get_global_obj_placeholder(global_obj_index_type obj_index,
|
|
|
- std::type_index obj_type) {
|
|
|
- auto &obj_info = pimpl->global_obj_pool[obj_index];
|
|
|
- assert(obj_info.obj_type == obj_type);
|
|
|
- return obj_info.placeholder;
|
|
|
- }
|
|
|
+#endif // SOPHIAR_TEST
|
|
|
|
|
|
- timestamp_type sophiar_manager::get_global_obj_update_timestamp(
|
|
|
- sophiar::global_obj_index_type obj_index) {
|
|
|
- auto &obj_info = pimpl->global_obj_pool[obj_index];
|
|
|
- return obj_info.last_update_ts;
|
|
|
+ boost::asio::awaitable<bool> sophiar_manager::init_object(std::string_view obj_name) noexcept {
|
|
|
+ if (!pimpl->check_object_name_for_controller(obj_name)) co_return false;
|
|
|
+ auto obj_index = pimpl->obj_pool.to_index_by_name(obj_name);
|
|
|
+ auto tri_obj_ptr = pimpl->get_tristate_ptr(obj_index);
|
|
|
+ if (tri_obj_ptr != nullptr) {
|
|
|
+ const auto &config = pimpl->obj_pool[obj_index].init_config;
|
|
|
+ co_await tri_obj_ptr->init(config);
|
|
|
+ co_return tri_obj_ptr->is_stable()
|
|
|
+ && tri_obj_ptr->get_state() >= tristate_obj::state_type::PENDING;
|
|
|
+ } else {
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+ assert(false);
|
|
|
+ co_return false;
|
|
|
}
|
|
|
|
|
|
- void sophiar_manager::update_global_obj_timestamp(global_obj_index_type obj_index,
|
|
|
- timestamp_type ts) {
|
|
|
- auto &obj_info = pimpl->global_obj_pool[obj_index];
|
|
|
- obj_info.last_update_ts = ts;
|
|
|
- obj_info.update_signal->try_notify_all();
|
|
|
+ boost::asio::awaitable<bool> sophiar_manager::start_object(std::string_view obj_name) noexcept {
|
|
|
+ if (!pimpl->check_object_name_for_controller(obj_name)) co_return false;
|
|
|
+ auto obj_index = pimpl->obj_pool.to_index_by_name(obj_name);
|
|
|
+ co_await pimpl->coro_start_obj(obj_index);
|
|
|
+ auto obj_state = pimpl->query_object_state(obj_index);
|
|
|
+ co_return obj_state == tristate_obj::state_type::UNKNOWN
|
|
|
+ || tristate_obj::is_state_stable(obj_state)
|
|
|
+ && obj_state >= tristate_obj::state_type::RUNNING;
|
|
|
}
|
|
|
|
|
|
- signal_watcher sophiar_manager::request_global_obj_update_watcher(global_obj_index_type obj_index) {
|
|
|
- return pimpl->global_obj_pool[obj_index]
|
|
|
- .update_signal->new_watcher(global_context);
|
|
|
+ boost::asio::awaitable<bool> sophiar_manager::stop_object(std::string_view obj_name) noexcept {
|
|
|
+ if (!pimpl->check_object_name_for_controller(obj_name)) co_return false;
|
|
|
+ auto obj_index = pimpl->obj_pool.to_index_by_name(obj_name);
|
|
|
+ auto tri_obj_ptr = pimpl->get_tristate_ptr(obj_index);
|
|
|
+ if (tri_obj_ptr != nullptr) {
|
|
|
+ co_await tri_obj_ptr->stop();
|
|
|
+ co_return tri_obj_ptr->is_stable()
|
|
|
+ && tri_obj_ptr->get_state() <= tristate_obj::state_type::PENDING;
|
|
|
+ } else {
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+ assert(false);
|
|
|
+ co_return false;
|
|
|
}
|
|
|
|
|
|
-#ifdef SOPHIAR_TEST
|
|
|
-
|
|
|
- sophiar_obj *sophiar_manager::get_object(const std::string &obj_name) const {
|
|
|
- if (!pimpl->obj_pool.contains(obj_name)) return nullptr;
|
|
|
- return pimpl->obj_pool[obj_name].ptr;
|
|
|
+ boost::asio::awaitable<bool> sophiar_manager::reset_object(std::string_view obj_name) noexcept {
|
|
|
+ if (!pimpl->check_object_name_for_controller(obj_name)) co_return false;
|
|
|
+ auto obj_index = pimpl->obj_pool.to_index_by_name(obj_name);
|
|
|
+ auto tri_obj_ptr = pimpl->get_tristate_ptr(obj_index);
|
|
|
+ if (tri_obj_ptr != nullptr) {
|
|
|
+ co_await tri_obj_ptr->reset();
|
|
|
+ co_return tri_obj_ptr->get_state() == tristate_obj::state_type::INITIAL;
|
|
|
+ } else {
|
|
|
+ co_return true;
|
|
|
+ }
|
|
|
+ assert(false);
|
|
|
+ co_return false;
|
|
|
}
|
|
|
|
|
|
-#endif // SOPHIAR_TEST
|
|
|
+ uint8_t sophiar_manager::query_object_state(std::string_view obj_name) {
|
|
|
+ if (!pimpl->check_object_name_for_controller(obj_name)) return -1; // 0xFF means unknown
|
|
|
+ auto obj_index = pimpl->obj_pool.to_index_by_name(obj_name);
|
|
|
+ return (uint8_t) pimpl->query_object_state(obj_index);
|
|
|
+ }
|
|
|
|
|
|
sophiar_manager::~sophiar_manager() = default;
|
|
|
|