瀏覽代碼

Merged old network modules.

jcsyshc 1 年之前
父節點
當前提交
55874ad2ae

+ 4 - 1
CMakeLists.txt

@@ -8,8 +8,11 @@ add_executable(${PROJECT_NAME} src/main.cpp
         src/core/impl/memory_pool.cpp
         src/core/impl/object_manager.cpp
         src/module/impl/image_viewer.cpp
+        src/network_v3/sender_base.cpp
+        src/network_v3/sender_tcp.cpp
+        src/network_v3/sender_udp_fec.cpp
 #        src/network/impl/crc_checker.cpp
-#        src/network/impl/fragment/third_party/rs.c
+        src/network/impl/fragment/third_party/rs.c
 #        src/network/impl/fragment/frag_basic.cpp
 #        src/network/impl/fragment/frag_rs.cpp
 #        src/network/impl/multiplexer.cpp

+ 23 - 50
src/core/impl/object_manager.cpp

@@ -6,10 +6,6 @@
 using boost::asio::io_context;
 using boost::asio::post;
 
-object_manager main_ob;
-
-extern io_context main_ctx;
-
 object_manager::impl::~impl() {
     for (auto &item: obj_pool) {
         auto &obj_st = item.second;
@@ -18,24 +14,18 @@ object_manager::impl::~impl() {
 }
 
 object_manager::impl::obj_st_type *
-object_manager::impl::query_st(object_manager::name_type obj_name) {
+object_manager::impl::query_st(name_type obj_name) {
     auto iter = obj_pool.find(obj_name);
     assert(iter != obj_pool.end());
     return &iter->second;
 }
 
-void *object_manager::impl::query_placeholder(name_type obj_name, std::type_index obj_type) {
+std::optional<object_manager::obj_info>
+object_manager::impl::query_info(name_type obj_name) {
     auto iter = obj_pool.find(obj_name);
-    if (iter == obj_pool.end()) [[unlikely]] return nullptr;
-    auto &obj_st = iter->second;
-    assert(obj_st.type == obj_type);
-    return obj_st.ptr;
-}
-
-std::type_index object_manager::impl::query_type(obj_name_type obj_name) {
-    auto iter = obj_pool.find(obj_name);
-    assert(iter != obj_pool.end());
-    return iter->second.type;
+    if (iter == obj_pool.end()) [[unlikely]] return {};
+    auto &st = iter->second;
+    return obj_info{.type = st.type, .pl_ptr = st.ptr, .sig = &st.sig};
 }
 
 void object_manager::impl::create_placeholder(name_type obj_name, std::type_index obj_type,
@@ -49,57 +39,40 @@ void object_manager::impl::create_placeholder(name_type obj_name, std::type_inde
 void object_manager::impl::notify_signal(name_type obj_name) {
     auto obj_st = query_st(obj_name);
     if (!obj_st->is_pending) {
-        post(main_ctx, [=] {
-            obj_st->is_running = true;
-            for (const auto &ob_st: obj_st->ob_list) {
-                ob_st.func(obj_name);
-            }
+        post(*ctx, [=] {
+            obj_st->sig(obj_name);
             obj_st->is_pending = false;
-            obj_st->is_running = false;
         });
         obj_st->is_pending = true;
     }
 }
 
-object_manager::de_ob_func_type
-object_manager::impl::observe(name_type obj_name, const ob_type &cb_func, priority_type pri) {
-    auto obj_st = query_st(obj_name);
-    assert(!obj_st->is_running);
-    obj_st->ob_list.emplace_front(cb_func, pri);
-    auto ob_list = &obj_st->ob_list;
-    auto ob_iter = ob_list->begin();
-    ob_list->sort(); // sort to ensure priority order
-
-    auto exit_func = [=] {
-        assert(!obj_st->is_running);
-        obj_st->ob_list.erase(ob_iter);
-    };
-    return exit_func;
+io_context *object_manager::impl::switch_ctx() const {
+    if (std::this_thread::get_id() == tid) [[likely]] return nullptr;
+    return ctx;
 }
 
-object_manager::object_manager()
-        : pimpl(std::make_unique<impl>()) {}
+object_manager::object_manager(create_config conf)
+        : pimpl(std::make_unique<impl>()) {
+    pimpl->ctx = conf.ctx;
+}
 
 object_manager::~object_manager() = default;
 
-void *object_manager::query_placeholder(name_type obj_name, std::type_index obj_type) {
-    return pimpl->query_placeholder(obj_name, obj_type);
+void object_manager::create_placeholder(name_type obj_name, std::type_index obj_type,
+                                        void *ptr, del_func_type del_func) {
+    pimpl->create_placeholder(obj_name, obj_type, ptr, del_func);
 }
 
-std::type_index object_manager::query_type(name_type obj_name) {
-    return pimpl->query_type(obj_name);
+std::optional<object_manager::obj_info>
+object_manager::query_info(name_type obj_name) {
+    return pimpl->query_info(obj_name);
 }
 
-void object_manager::create_placeholder(name_type obj_name, std::type_index obj_type,
-                                        void *ptr, del_func_type del_func) {
-    pimpl->create_placeholder(obj_name, obj_type, ptr, del_func);
+io_context *object_manager::switch_ctx() {
+    return pimpl->switch_ctx();
 }
 
 void object_manager::notify_signal(name_type obj_name) {
     pimpl->notify_signal(obj_name);
-}
-
-object_manager::de_ob_func_type
-object_manager::observe(name_type obj_name, const ob_type &cb_func, priority_type pri) {
-    return pimpl->observe(obj_name, cb_func, pri);
 }

+ 8 - 18
src/core/impl/object_manager_impl.h

@@ -4,47 +4,37 @@
 #include "core/object_manager.h"
 
 #include <list>
+#include <thread>
 #include <unordered_map>
 
 struct object_manager::impl {
 
-    struct ob_st_type { // observer store type
-        ob_type func = nullptr;
-        priority_type pri = 0;
-
-        bool operator<(const ob_st_type &o) const {
-            return pri > o.pri;
-        }
-    };
-
     struct obj_st_type { // object store type
         void *ptr = nullptr;
         del_func_type del_func = nullptr;
         std::type_index type;
-        bool is_pending = false; // invoking function is pending or running
-        bool is_running = false; // observe functions is running
-
-        using ob_list_type = std::list<ob_st_type>;
-        ob_list_type ob_list;
+        bool is_pending = false; // whether signal is queued.
+        obj_sig_type sig;
     };
 
     using obj_pool_type = std::unordered_map<name_type, obj_st_type>;
     obj_pool_type obj_pool;
 
+    io_context *ctx = nullptr;
+    std::thread::id tid = std::this_thread::get_id();
+
     ~impl();
 
     obj_st_type *query_st(name_type obj_name);
 
-    void *query_placeholder(name_type obj_name, std::type_index obj_type);
-
-    std::type_index query_type(obj_name_type obj_name);
+    std::optional<obj_info> query_info(name_type obj_name);
 
     void create_placeholder(name_type obj_name, std::type_index obj_type,
                             void *ptr, del_func_type del_func);
 
     void notify_signal(name_type obj_name);
 
-    de_ob_func_type observe(name_type obj_name, const ob_type &cb_func, priority_type pri);
+    io_context *switch_ctx() const;
 
 };
 

+ 52 - 15
src/core/object_manager.h

@@ -1,10 +1,15 @@
 #ifndef DEPTHGUIDE_OBJECT_MANAGER_H
 #define DEPTHGUIDE_OBJECT_MANAGER_H
 
+#include <boost/asio/post.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/signals2.hpp>
+
 #include <cassert>
 #include <cstdint>
 #include <functional>
 #include <memory>
+#include <optional>
 #include <typeindex>
 #include <typeinfo>
 
@@ -12,16 +17,29 @@ class object_manager {
 public:
 
     using name_type = uint16_t;
+    using io_context = boost::asio::io_context;
+
+    struct create_config {
+        io_context *ctx = nullptr;
+    };
 
-    object_manager();
+    explicit object_manager(create_config conf);
 
     ~object_manager();
 
+    // thread-safe
     template<typename T>
-    void save(name_type obj_name, T &&ptr) {
+    void save(name_type obj_name, T &&val) {
         using RT = std::remove_cvref_t<T>;
+        if (auto ctx = switch_ctx(); ctx != nullptr) [[unlikely]] {
+            using boost::asio::post;
+            static_assert(std::is_copy_constructible_v<RT>);
+            post(*ctx, [=, rt_val = RT(val), this] { save(obj_name, rt_val); });
+            return;
+        }
+
         auto pl_ptr = query_placeholder(obj_name, typeid(RT));
-        if (pl_ptr == nullptr) {
+        if (pl_ptr == nullptr) [[unlikely]] {
             static_assert(std::is_default_constructible_v<RT>);
             create_placeholder(obj_name, typeid(RT),
                                new RT{}, [](void *ptr) { delete (RT *) ptr; });
@@ -30,7 +48,7 @@ public:
         assert(pl_ptr != nullptr);
 
         static_assert(std::is_copy_assignable_v<RT>);
-        *(RT *) pl_ptr = ptr;
+        *(RT *) pl_ptr = val;
         notify_signal(obj_name);
     }
 
@@ -43,25 +61,47 @@ public:
         return *(T *) pl_ptr;
     }
 
-    std::type_index query_type(name_type obj_name);
+    std::type_index query_type(name_type obj_name) {
+        auto info_o = query_info(obj_name);
+        assert(info_o.has_value());
+        return info_o->type;
+    }
 
-    using priority_type = int;
-    using ob_type = std::function<void(name_type)>;
-    using de_ob_func_type = std::function<void()>;
+    using obj_sig_type = boost::signals2::signal<void(name_type)>;
 
-    de_ob_func_type observe(name_type obj_name, const ob_type &cb_func, priority_type pri = 0);
+    obj_sig_type *query_signal(name_type obj_name) {
+        auto info_o = query_info(obj_name);
+        assert(info_o.has_value());
+        return info_o->sig;
+    }
 
 private:
 
     using del_func_type = void (*)(void *);
 
-    void *query_placeholder(name_type obj_name, std::type_index obj_type);
+    struct obj_info {
+        std::type_index type = typeid(void);
+        void *pl_ptr = nullptr;
+        obj_sig_type *sig = nullptr;
+    };
+
+    std::optional<obj_info> query_info(name_type obj_name);
+
+    void *query_placeholder(name_type obj_name, std::type_index obj_type) {
+        auto info_o = query_info(obj_name);
+        if (!info_o.has_value()) [[unlikely]] return nullptr;
+        assert(info_o->type == obj_type);
+        return info_o->pl_ptr;
+    }
 
     void create_placeholder(name_type obj_name, std::type_index obj_type,
                             void *ptr, del_func_type del_func);
 
     void notify_signal(name_type obj_name);
 
+    // return target context or nullptr
+    io_context *switch_ctx();
+
     struct impl;
     std::unique_ptr<impl> pimpl;
 };
@@ -81,10 +121,7 @@ extern object_manager main_ob;
 #define OBJ_SAVE(name, val) \
     main_ob.save(name, val)
 
-#define OBJ_OBSERVE(name, func) \
-    main_ob.observe(name, func)
-
-#define OBJ_OBSERVE_PRI(name, func, pri) \
-    main_ob.observe(name, func, pri)
+#define OBJ_SIG(name) \
+    main_ob.query_signal(name)
 
 #endif //DEPTHGUIDE_OBJECT_MANAGER_H

+ 14 - 0
src/core/utility.hpp

@@ -2,6 +2,20 @@
 #define DEPTHGUIDE_UTILITY_HPP
 
 #include <cassert>
+#include <chrono>
+
+inline auto current_timestamp() {
+    using namespace std::chrono;
+    auto time_point = high_resolution_clock::now();
+    return duration_cast<microseconds>(time_point.time_since_epoch()).count();
+}
+
+using timestamp_type = decltype(current_timestamp());
+
+inline auto now_since_epoch_in_seconds() {
+    using namespace std::chrono;
+    return time_point_cast<seconds, system_clock>(system_clock::now());
+}
 
 // https://en.cppreference.com/w/cpp/utility/unreachable
 [[noreturn]] inline void unreachable() {

+ 3 - 2
src/impl/main_impl.cpp

@@ -27,6 +27,7 @@ CUcontext cuda_ctx = nullptr;
 GLFWwindow *window = nullptr;
 smart_cuda_stream *default_cuda_stream = nullptr;
 io_context main_ctx;
+object_manager main_ob({&main_ctx});
 
 std::unique_ptr<steady_timer> ui_timer;
 std::chrono::milliseconds ui_interval;
@@ -112,9 +113,9 @@ void init_om() {
     OBJ_SAVE(img_depth, image_f32c1());
     OBJ_SAVE(img_bg, image_u8c3());
 
-    main_ob.observe(img_color, [=](obj_name_type _) {
+    OBJ_SIG(img_color)->connect(INT_MIN, [=](obj_name_type _) {
         OBJ_SAVE(img_bg, OBJ_QUERY(image_u8c3, img_color));
-    }, INT_MIN);
+    });
 }
 
 void init_modules() {

+ 52 - 0
src/network/binary_utility.hpp

@@ -46,6 +46,11 @@ struct data_type {
             std::make_shared<data_mem_type>(_size + pre_size), pre_size, _size) {
     }
 
+    data_type(size_t _size, void *data, size_t pre_size = 0)
+            : data_type(_size, pre_size) {
+        replace(0, _size, (uint8_t *) data);
+    }
+
     bool empty() const {
         return ptr == nullptr;
     }
@@ -88,6 +93,29 @@ struct data_type {
         return sub_data(0, size + _size);
     }
 
+    void extend_self(size_t _size) {
+        reserve(size + _size, true);
+    }
+
+    void reserve(size_t _size, bool keep_data = false) {
+        if (_size <= size) [[likely]] {
+            size = _size;
+            return;
+        }
+        assert(_size > size);
+        if (mem != nullptr &&
+            start_ptr() + _size <= mem->end_ptr()) {
+            size = _size;
+            return;
+        }
+        auto next = data_type(_size);
+        if (keep_data) {
+            assert(next.size >= size);
+            next.replace(0, size, start_ptr());
+        }
+        *this = next;
+    }
+
     uint8_t *start_ptr() const {
         return ptr;
     }
@@ -262,4 +290,28 @@ static constexpr auto network_order = boost::endian::order::big;
 using network_writer = versatile_writer<network_order>;
 using network_reader = versatile_reader<network_order>;
 
+template<typename T, boost::endian::order net_order = network_order>
+static uint8_t *write_binary_number(uint8_t *ptr, T val) {
+    static constexpr auto need_swap =
+            (boost::endian::order::native != net_order);
+    auto real_ptr = (T *) ptr;
+    if constexpr (need_swap) {
+        *real_ptr = boost::endian::endian_reverse(val);
+    } else {
+        *real_ptr = val;
+    }
+    return ptr + sizeof(T);
+}
+
+template<typename T, boost::endian::order net_order = network_order>
+static uint8_t *read_binary_number(uint8_t *ptr, T *val) {
+    static constexpr auto need_swap =
+            (boost::endian::order::native != net_order);
+    *val = *(T *) ptr;
+    if constexpr (need_swap) {
+        boost::endian::endian_reverse_inplace(*val);
+    }
+    return ptr + sizeof(T);
+}
+
 #endif //DEPTHGUIDE_BINARY_UTILITY_HPP

+ 127 - 0
src/network_v3/sender_base.cpp

@@ -0,0 +1,127 @@
+#include "sender_base.h"
+#include "core/utility.hpp"
+
+#include <boost/asio/post.hpp>
+
+#include <fmt/chrono.h>
+#include <spdlog/spdlog.h>
+
+#include <deque>
+#include <fstream>
+#include <optional>
+
+using boost::asio::io_context;
+using boost::asio::post;
+
+struct sender_base::impl {
+
+    using frame_list_type = std::deque<frame_info>;
+    frame_list_type frame_list;
+
+    sender_base *q_this = nullptr;
+    io_context *ctx = nullptr;
+    bool waiting_idr = false;
+    bool stopped = false;
+
+    std::ofstream log_file;
+    obj_name_type connect_obj = invalid_obj_name;
+
+    explicit impl(create_config conf) {
+        ctx = conf.ctx;
+        connect_obj = conf.connect_obj;
+
+        // create log file if requested
+        if (conf.enable_log) {
+            auto file_name = fmt::format("log_{:%Y_%m_%d_%H_%M_%S}.csv",
+                                         std::chrono::system_clock::now());
+            log_file.open(file_name);
+        }
+    }
+
+    void clear_frame_list() {
+        frame_list.clear();
+    }
+
+    void request_idr_frame() {
+        clear_frame_list();
+        waiting_idr = true;
+        q_this->sig_req_idr();
+    }
+
+    std::optional<frame_info> retrieve_one_frame() {
+        if (frame_list.empty()) return {};
+        auto frame = frame_list.front();
+        frame_list.pop_front();
+        if (waiting_idr) { // if idr frame is requested, only idr frame will be returned.
+            if (!frame.idr) return {};
+            waiting_idr = false;
+        }
+        return frame;
+    }
+
+    void handle_frames() {
+        for (;;) {
+            // test stop flag
+            if (stopped) {
+                q_this->close_connection();
+                return;
+            }
+
+            auto frame = retrieve_one_frame();
+            if (!frame.has_value()) return;
+            q_this->handle_frame(frame.value());
+        }
+    }
+
+    void push_frame_impl(const frame_info &frame) {
+        assert(!frame.data.empty());
+        if (frame.idr) {
+            frame_list.clear();
+        }
+        frame_list.push_back(frame);
+    }
+
+    void push_frame(const frame_info &frame) {
+        push_frame_impl(frame);
+        post(*ctx, [ptr = q_this->shared_from_this()] {
+            ptr->pimpl->handle_frames();
+        });
+    }
+
+    void log_frame_sent(uint64_t frame_id) {
+        if (!log_file.is_open()) return;
+        log_file << fmt::format("{},{}\n", frame_id, current_timestamp());
+    }
+
+};
+
+sender_base::sender_base(create_config conf)
+        : pimpl(std::make_unique<impl>(conf)) {
+    pimpl->q_this = this;
+}
+
+sender_base::~sender_base() = default;
+
+void sender_base::stop() {
+    pimpl->stopped = true;
+}
+
+void sender_base::send_frame(const frame_info &frame) {
+    pimpl->push_frame(frame);
+}
+
+void sender_base::request_idr_frame() {
+    pimpl->request_idr_frame();
+}
+
+void sender_base::log_frame_sent(uint64_t frame_id) {
+    pimpl->log_frame_sent(frame_id);
+}
+
+void sender_base::notify_connected() {
+    OBJ_SAVE(pimpl->connect_obj, true);
+}
+
+void sender_base::notify_disconnected() {
+    OBJ_SAVE(pimpl->connect_obj, false);
+}

+ 67 - 0
src/network_v3/sender_base.h

@@ -0,0 +1,67 @@
+#ifndef REMOTEAR3_SENDER_BASE_HPP
+#define REMOTEAR3_SENDER_BASE_HPP
+
+#include "core/object_manager.h"
+#include "network/binary_utility.hpp"
+
+#include <boost/asio/io_context.hpp>
+#include <boost/signals2.hpp>
+
+#include <memory>
+
+enum sender_type {
+    SENDER_TCP,
+    SENDER_UDP,
+    SENDER_UDP_FEC
+};
+
+struct frame_info {
+    data_type data;
+    bool idr = false;
+    uint64_t frame_id = 0;
+
+    uint8_t *start_ptr() const { return data.start_ptr(); }
+
+    size_t size() const { return data.size; }
+};
+
+class sender_base : public std::enable_shared_from_this<sender_base> {
+public:
+
+    virtual ~sender_base();
+
+    void send_frame(const frame_info &frame);
+
+    void stop();
+
+    using req_idr_sig_type = boost::signals2::signal<void()>;
+    req_idr_sig_type sig_req_idr;
+
+protected:
+
+    struct create_config {
+        boost::asio::io_context *ctx = nullptr;
+        bool enable_log = false;
+        obj_name_type connect_obj = invalid_obj_name; // bool
+    };
+
+    explicit sender_base(create_config conf);
+
+    void request_idr_frame();
+
+    void log_frame_sent(uint64_t frame_id);
+
+    virtual void handle_frame(const frame_info &frame) = 0;
+
+    virtual void close_connection() = 0;
+
+    void notify_connected();
+
+    void notify_disconnected();
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+#endif //REMOTEAR3_SENDER_BASE_HPP

+ 163 - 0
src/network_v3/sender_tcp.cpp

@@ -0,0 +1,163 @@
+#include "sender_tcp.h"
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/read.hpp>
+#include <boost/asio/write.hpp>
+
+#include <spdlog/spdlog.h>
+
+#include <deque>
+
+using namespace boost::asio::ip;
+using boost::asio::buffer;
+using boost::asio::io_context;
+using boost::asio::post;
+using boost::asio::read;
+using boost::asio::write;
+using boost::system::error_code;
+
+struct sender_tcp::impl {
+
+    sender_base::create_config par_conf;
+
+    sender_tcp *q_this = nullptr;
+    io_context *ctx = nullptr;
+
+    std::unique_ptr<tcp::acceptor> acceptor;
+    std::unique_ptr<tcp::socket> socket;
+    data_type out_buf;
+
+    void close_connection() {
+        if (socket == nullptr) return;
+        // TODO: log client exit
+//        if (socket->is_open()) {
+//            auto remote_ep = socket->remote_endpoint();
+//            SPDLOG_INFO("Client {}:{} left.",
+//                        remote_ep.address().to_string(), remote_ep.port());
+//        }
+        socket = std::make_unique<tcp::socket>(*ctx);
+        q_this->notify_disconnected();
+    }
+
+    void send_frame(const frame_info &frame) {
+        if (!socket->is_open()) return;
+
+        uint64_t packet_length = frame.size()
+                                 + sizeof(frame.frame_id);
+        auto out_length = packet_length
+                          + sizeof(packet_length);
+        out_buf.reserve(out_length);
+
+        // fill out buffer
+        auto ptr = out_buf.ptr;
+        ptr = write_binary_number(ptr, packet_length);
+        ptr = write_binary_number(ptr, frame.frame_id);
+        memcpy(ptr, frame.start_ptr(), frame.size());
+
+        error_code err;
+        write(*socket, buffer(out_buf.ptr, out_buf.size), err); // TODO: change to async operation
+        if (err) {
+            SPDLOG_WARN("Error while sending frame: {}", err.to_string());
+            close_connection();
+            async_waiting_client();
+        }
+
+        q_this->log_frame_sent(frame.frame_id);
+        SPDLOG_TRACE("Frame {} sent, length = {}",
+                     frame.frame_id, frame.size());
+    }
+
+//    void hole_punch(const tcp::endpoint &listen_ep) { // UDP hole punch
+//        static constexpr auto fake_stun_server = "38.59.254.192"; // TODO: setting in config file
+//        static constexpr auto fake_stun_port = 5281;
+//        auto fake_stun_ep = tcp::endpoint{
+//                address::from_string(fake_stun_server), fake_stun_port};
+//
+//        socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
+//        socket->open(tcp::v4());
+//        socket->set_option(boost::asio::socket_base::reuse_address{true});
+//        socket->bind(listen_ep);
+//        socket->connect(fake_stun_ep);
+//
+//        static constexpr auto hole_punch_request_len =
+//                sizeof(size_t) + 2 * sizeof(uint8_t);
+//        out_buf.create(hole_punch_request_len);
+//        auto ptr = write_binary_number(out_buf.ptr, (size_t) (2 * sizeof(uint8_t)));
+//        ptr = write_binary_number(ptr, 'R');
+//        ptr = write_binary_number(ptr, 'S');
+//        auto pkt_len = ptr - out_buf.ptr;
+//        assert(pkt_len == hole_punch_request_len);
+//        socket->send(buffer(out_buf.ptr, pkt_len));
+//
+//        smart_buffer<uint8_t> in_buf;
+//        size_t rep_len;
+//        in_buf.create(sizeof(rep_len));
+//        read(*socket, buffer(in_buf.ptr, in_buf.length));
+//        read_binary_number(in_buf.ptr, &rep_len);
+//        in_buf.create(rep_len);
+//        read(*socket, buffer(in_buf.ptr, in_buf.length));
+//        assert(rep_len == 2);
+//        assert(std::string_view((char *) in_buf.ptr, 2) == "OK");
+//        SPDLOG_INFO("TCP hole punch succeeded.");
+//
+//        socket.reset();
+//    }
+
+    void async_waiting_client() {
+        socket = std::make_unique<tcp::socket>(*ctx);
+        acceptor->async_accept(*socket, [this](error_code err) {
+            if (err) {
+                SPDLOG_ERROR("Error while accepting client: {}", err.what());
+                assert(false);
+                async_waiting_client();
+                return;
+            }
+            assert(socket->is_open());
+            q_this->request_idr_frame();
+            q_this->notify_connected();
+
+            auto remote_ep = socket->remote_endpoint();
+            SPDLOG_INFO("New client from {}:{}.",
+                        remote_ep.address().to_string(), remote_ep.port());
+        });
+    }
+
+    static impl *create(create_config conf) {
+        auto ret = new impl;
+        ret->par_conf = sender_base::create_config{
+                .ctx = conf.ctx, .enable_log = conf.enable_log, .connect_obj = conf.connect_obj};
+        auto listen_ep = tcp::endpoint{tcp::v4(), conf.listen_port};
+//        ret->hole_punch(listen_ep);
+        ret->acceptor = std::make_unique<tcp::acceptor>(*conf.ctx, listen_ep);
+//        ret->acceptor = std::make_unique<tcp::acceptor>(*q_this->get_ctx());
+//        ret->acceptor->open(tcp::v4());
+//        ret->acceptor->set_option(boost::asio::socket_base::reuse_address{true});
+//        ret->acceptor->bind(listen_ep);
+//        ret->acceptor->listen();
+        ret->async_waiting_client();
+        return ret;
+    }
+
+};
+
+sender_tcp::sender_tcp(sender_tcp::impl *_pimpl)
+        : sender_base(_pimpl->par_conf),
+          pimpl(std::unique_ptr<impl>(_pimpl)) {
+    pimpl->q_this = this;
+}
+
+sender_tcp::~sender_tcp() = default;
+
+std::shared_ptr<sender_tcp> sender_tcp::create(create_config conf) {
+    return std::make_shared<sender_tcp>(impl::create(conf));
+}
+
+void sender_tcp::close_connection() {
+    pimpl->close_connection();
+}
+
+void sender_tcp::handle_frame(const frame_info &frame) {
+    pimpl->send_frame(frame);
+}

+ 39 - 0
src/network_v3/sender_tcp.h

@@ -0,0 +1,39 @@
+#ifndef REMOTEAR3_SENDER_TCP_H
+#define REMOTEAR3_SENDER_TCP_H
+
+#include "sender_base.h"
+
+#include <memory>
+
+class sender_tcp : public sender_base {
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+
+public:
+
+    explicit sender_tcp(impl *pimpl);
+
+    ~sender_tcp() override;
+
+    struct create_config {
+        uint16_t listen_port;
+
+        // for parent
+        boost::asio::io_context *ctx;
+        bool enable_log;
+        obj_name_type connect_obj;
+    };
+
+    static std::shared_ptr<sender_tcp> create(create_config conf);
+
+protected:
+
+    void handle_frame(const frame_info &frame) override;
+
+    void close_connection() override;
+
+};
+
+
+#endif //REMOTEAR3_SENDER_TCP_H

+ 364 - 0
src/network_v3/sender_udp_fec.cpp

@@ -0,0 +1,364 @@
+#include "sender_udp_fec.h"
+#include "core/utility.hpp"
+#include "third_party/scope_guard.hpp"
+
+extern "C" {
+#include "network/impl/fragment/third_party/rs.h"
+}
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/udp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/container/static_vector.hpp>
+#include <boost/crc.hpp>
+
+#include <opencv2/core/types.hpp>
+
+#include <spdlog/spdlog.h>
+
+using namespace boost::asio::ip;
+using boost::asio::buffer;
+using boost::asio::io_context;
+using boost::asio::post;
+using boost::system::error_code;
+
+namespace sender_udp_fec_impl {
+
+    struct smart_reed_solomon {
+        static constexpr auto max_blocks = DATA_SHARDS_MAX;
+
+        reed_solomon *rs = nullptr;
+        using block_ptrs_type = boost::container::static_vector<uint8_t *, max_blocks>;
+        block_ptrs_type block_ptrs;
+
+        smart_reed_solomon() = default;
+
+        smart_reed_solomon(const smart_reed_solomon &other) = delete;
+
+        ~smart_reed_solomon() {
+            deallocate();
+        }
+
+        // assume parity_rate and block_size will not change in the lifetime
+        void process(uint8_t data_blocks, uint8_t parity_blocks,
+                     uint16_t block_size, uint8_t *data, uint32_t length) {
+            if (data_blocks != last_data_blocks) {
+                deallocate();
+                allocate(data_blocks, parity_blocks, block_size);
+            }
+            assert(data_blocks * block_size >= length);
+            memcpy(block_data.ptr, data, length);
+            reed_solomon_encode2(rs, block_ptrs.data(), block_count(), block_size);
+        }
+
+        uint8_t block_count() const {
+            return block_ptrs.size();
+        }
+
+    private:
+        uint8_t last_data_blocks = 0;
+        data_type block_data;
+
+        void deallocate() {
+            if (rs == nullptr) return;
+            reed_solomon_release(rs);
+            rs = nullptr;
+        }
+
+        void allocate(uint8_t data_blocks, uint8_t parity_blocks, uint16_t block_size) {
+            assert(rs == nullptr);
+            rs = reed_solomon_new(data_blocks, parity_blocks);
+            auto total_blocks = data_blocks + parity_blocks;
+            block_data.reserve(total_blocks * block_size);
+            block_ptrs.resize(total_blocks);
+            for (int i = 0; i < total_blocks; ++i) {
+                block_ptrs[i] = block_data.ptr + block_size * i;
+            }
+            last_data_blocks = data_blocks;
+        }
+    };
+
+}
+
+using namespace sender_udp_fec_impl;
+
+struct sender_udp_fec::impl {
+
+    struct frag_header {
+        uint32_t frag_checksum;
+        uint8_t frame_type; // 'I' or 'P'
+        uint32_t frame_id;
+        uint32_t frame_length;
+        uint8_t chunk_count;
+        uint8_t chunk_id;
+        uint32_t chunk_offset;
+        uint32_t chunk_length;
+        uint16_t block_size;
+        uint8_t block_count;
+        uint8_t chunk_decode_block_count;
+        uint8_t block_id;
+    };
+
+    struct request_type {
+        uint32_t request_checksum;
+        uint8_t request_type;
+        uint32_t frame_id;
+    };
+
+    static constexpr auto frag_header_size = 28;
+    static constexpr auto request_size = 9;
+    static constexpr auto max_block_count = smart_reed_solomon::max_blocks;
+    static constexpr auto max_package_size = 64 * 1024; // 64KiB
+    static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
+    static constexpr int confirm_timeout = 10 * 1e6; // 10s
+
+    // parent config
+    sender_base::create_config par_conf;
+
+    sender_udp_fec *q_this = nullptr;
+    std::unique_ptr<udp::socket> socket;
+
+    udp::endpoint request_ep;
+    std::unique_ptr<udp::endpoint> remote_ep;
+    float parity_rate;
+    uint8_t max_data_block_count; // max_block_count / (1 + parity_rate)
+    uint16_t block_size; // conn_mtu - header_size
+    uint32_t max_chunk_size; // max_data_block_count * block_size
+    smart_reed_solomon rs;
+    data_type in_buf, out_buf;
+    timestamp_type last_confirm_ts = 0;
+
+    static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) {
+#define WRITE(member) ptr = write_binary_number(ptr, header.member)
+        WRITE(frag_checksum);
+        WRITE(frame_type);
+        WRITE(frame_id);
+        WRITE(frame_length);
+        WRITE(chunk_count);
+        WRITE(chunk_id);
+        WRITE(chunk_offset);
+        WRITE(chunk_length);
+        WRITE(block_size);
+        WRITE(block_count);
+        WRITE(chunk_decode_block_count);
+        WRITE(block_id);
+#undef WRITE
+        return ptr;
+    }
+
+    static uint8_t *read_request(uint8_t *ptr, request_type *req) {
+#define READ(member) ptr = read_binary_number(ptr, &req->member)
+        READ(request_checksum);
+        READ(request_type);
+        READ(frame_id);
+#undef READ
+        return ptr;
+    }
+
+    void send_block(uint8_t *block_data, const frag_header &header) {
+        out_buf.reserve(max_package_size);
+        auto ptr = write_frag_header(out_buf.ptr, header);
+        assert(ptr - out_buf.ptr == frag_header_size);
+        memcpy(ptr, block_data, block_size);
+
+        // calculate crc32
+        auto crc = boost::crc_32_type{};
+        crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
+                          frag_header_size + block_size - sizeof(uint32_t));
+        write_binary_number(out_buf.ptr, crc.checksum());
+
+        // send packet
+        assert(socket != nullptr);
+        auto buf = buffer(out_buf.ptr, frag_header_size + block_size);
+        assert(remote_ep != nullptr);
+        socket->send_to(buf, *remote_ep);
+    }
+
+    void send_chunk(uint8_t *chunk_data, uint32_t chunk_length, frag_header *header) {
+        auto data_blocks = (chunk_length + block_size - 1) / block_size;
+        assert(data_blocks <= max_data_block_count);
+        auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate));
+        rs.process(data_blocks, parity_blocks, block_size, chunk_data, chunk_length);
+        header->block_size = block_size;
+        header->block_count = rs.block_count();
+        header->chunk_decode_block_count = data_blocks;
+        for (auto k = 0; k < rs.block_count(); ++k) {
+            header->block_id = k;
+            send_block(rs.block_ptrs[k], *header);
+        }
+    }
+
+    void send_frame(const frame_info &frame) {
+        if (!is_connected()) return;
+
+        frag_header header;
+        header.frame_type = frame.idr ? 'I' : 'P';
+        header.frame_id = frame.frame_id;
+        header.frame_length = frame.size();
+
+        auto chunk_count = (frame.size() + max_chunk_size - 1) / max_chunk_size;
+        header.chunk_count = chunk_count;
+        for (auto k = 0; k < chunk_count; ++k) {
+            header.chunk_offset = k * max_chunk_size;
+            header.chunk_id = k;
+            header.chunk_length = std::min((size_t) max_chunk_size,
+                                           frame.size() - k * max_chunk_size);
+            auto chunk_data = frame.start_ptr() + header.chunk_offset;
+            send_chunk(chunk_data, header.chunk_length, &header);
+        }
+
+        q_this->log_frame_sent(frame.frame_id);
+        SPDLOG_TRACE("Frame {} sent.", frame.frame_id);
+    }
+
+    void hole_punch() { // UDP hole punch
+        static constexpr auto fake_stun_server = "38.59.254.192"; // TODO: setting in config file
+        static constexpr auto fake_stun_port = 5281;
+        auto fake_stun_ep = udp::endpoint{
+                address::from_string(fake_stun_server), fake_stun_port};
+
+        out_buf.reserve(2 * sizeof(uint8_t));
+        auto ptr = write_binary_number(out_buf.ptr, 'R');
+        ptr = write_binary_number(ptr, 'S');
+        auto pkt_len = ptr - out_buf.ptr;
+        socket->send_to(buffer(out_buf.ptr, pkt_len), fake_stun_ep);
+
+        udp::endpoint reply_ep;
+        in_buf.reserve(max_package_size);
+        auto rep_len = socket->receive_from(buffer(in_buf.start_ptr(), in_buf.size), reply_ep);
+        assert(reply_ep == fake_stun_ep);
+        assert(rep_len == 2);
+        assert(std::string_view((char *) in_buf.ptr, 2) == "OK");
+        SPDLOG_INFO("UDP hole punch succeeded.");
+    }
+
+    void async_handle_request() {
+        in_buf.reserve(max_package_size);
+        auto buf = buffer(in_buf.ptr, max_package_size);
+        using namespace std::placeholders;
+        socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
+    }
+
+    void handle_request(const error_code &ec, size_t length) {
+        // prepare for next request when this function exited.
+        auto closer = sg::make_scope_guard([this] {
+            async_handle_request();
+        });
+
+        // handle errors
+        if (ec) {
+            SPDLOG_ERROR("Error while receiving request: {}", ec.what());
+            return;
+        }
+
+        // parse request
+//        if (length != request_size) return; // there may be extra data with I request
+        request_type req;
+        auto ptr = read_request(in_buf.ptr, &req);
+        auto crc = boost::crc_32_type{};
+        crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
+                          length - sizeof(uint32_t));
+        if (crc.checksum() != req.request_checksum) { // checksum failed
+            // TODO show log
+            return;
+        }
+
+        // handle request
+        switch (req.request_type) {
+            case 'X': {
+                SPDLOG_INFO("Client {}:{} left.",
+                            remote_ep->address().to_string(), remote_ep->port());
+                close_connection();
+                return;
+            }
+            case 'C': {
+                last_confirm_ts = current_timestamp();
+                SPDLOG_TRACE("Frame {} confirmed.", req.frame_id);
+                return;
+            }
+            case 'I': {
+                if (length == request_size + 2 * sizeof(uint16_t)) { // requested width and height
+                    uint16_t width, height;
+                    ptr = read_binary_number(ptr, &width);
+                    ptr = read_binary_number(ptr, &height);
+                    q_this->sig_size_changed(cv::Size(width, height));
+                    SPDLOG_INFO("Output size changed to {}x{}", width, height);
+                }
+
+                remote_ep = std::make_unique<udp::endpoint>(request_ep);
+                last_confirm_ts = current_timestamp();
+                q_this->request_idr_frame();
+                q_this->notify_connected();
+
+                static uint32_t last_frame_id = 0;
+                if (req.frame_id != last_frame_id) {
+                    SPDLOG_INFO("New client from {}:{}.",
+                                remote_ep->address().to_string(), remote_ep->port());
+                    last_frame_id = req.frame_id;
+                }
+                return;
+            }
+            default: {
+                // TODO show log
+                return;
+            }
+        }
+    }
+
+    void close_connection() {
+        remote_ep.reset();
+        q_this->notify_disconnected();
+    }
+
+    bool is_connected() {
+        if (remote_ep == nullptr) return false;
+        if (current_timestamp() - last_confirm_ts > confirm_timeout) [[unlikely]] {
+            SPDLOG_WARN("Client timeout.");
+            close_connection();
+            return false;
+        }
+        return true;
+    }
+
+    static impl *create(create_config conf) {
+        auto ret = new impl;
+        auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
+        ret->par_conf = sender_base::create_config{
+                .ctx = conf.ctx, .enable_log = conf.enable_log, .connect_obj = conf.connect_obj};
+        ret->socket = std::make_unique<udp::socket>(*conf.ctx, local_ep);
+        ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
+//        ret->hole_punch();
+        ret->async_handle_request();
+
+        // constant configs
+        ret->parity_rate = conf.parity_rate;
+        ret->max_data_block_count = max_block_count / (1 + conf.parity_rate);
+        ret->block_size = conf.conn_mtu - frag_header_size;
+        ret->max_chunk_size = ret->max_data_block_count * ret->block_size;
+
+        // initialize reed solomon
+        fec_init();
+
+        return ret;
+    }
+};
+
+sender_udp_fec::sender_udp_fec(impl *_pimpl)
+        : sender_base(_pimpl->par_conf),
+          pimpl(std::unique_ptr<impl>(_pimpl)) {
+    pimpl->q_this = this;
+}
+
+sender_udp_fec::~sender_udp_fec() = default;
+
+std::shared_ptr<sender_udp_fec> sender_udp_fec::create(create_config conf) {
+    return std::make_shared<sender_udp_fec>(impl::create(conf));
+}
+
+void sender_udp_fec::close_connection() {
+    pimpl->close_connection();
+}
+
+void sender_udp_fec::handle_frame(const frame_info &frame) {
+    pimpl->send_frame(frame);
+}

+ 46 - 0
src/network_v3/sender_udp_fec.h

@@ -0,0 +1,46 @@
+#ifndef REMOTEAR3_SENDER_UDP_FEC_H
+#define REMOTEAR3_SENDER_UDP_FEC_H
+
+#include "sender_base.h"
+
+#include <opencv2/core/types.hpp>
+
+#include <memory>
+
+class sender_udp_fec : public sender_base {
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+
+public:
+
+    explicit sender_udp_fec(impl *pimpl);
+
+    ~sender_udp_fec() override;
+
+    struct create_config {
+        uint16_t conn_mtu;
+        float parity_rate;
+        uint16_t listen_port;
+
+        // for parent
+        boost::asio::io_context *ctx;
+        bool enable_log;
+        obj_name_type connect_obj;
+    };
+
+    static std::shared_ptr<sender_udp_fec> create(create_config conf);
+
+    using size_change_sig_type = boost::signals2::signal<void(cv::Size)>;
+    size_change_sig_type sig_size_changed;
+
+protected:
+
+    void handle_frame(const frame_info &frame) override;
+
+    void close_connection() override;
+
+};
+
+
+#endif //REMOTEAR3_SENDER_UDP_FEC_H

+ 180 - 0
src/network_v3/third_party/scope_guard.hpp

@@ -0,0 +1,180 @@
+/*
+ *  Created on: 13/02/2018
+ *      Author: ricab
+ *
+ * See README.md for documentation of this header's public interface.
+ */
+
+#ifndef SCOPE_GUARD_HPP_
+#define SCOPE_GUARD_HPP_
+
+#include <type_traits>
+#include <utility>
+
+#if __cplusplus >= 201703L && defined(SG_REQUIRE_NOEXCEPT_IN_CPP17)
+#define SG_REQUIRE_NOEXCEPT
+#endif
+
+namespace sg {
+    namespace detail {
+        /* --- Some custom type traits --- */
+
+        // Type trait determining whether a type is callable with no arguments
+        template<typename T, typename = void>
+        struct is_noarg_callable_t
+                : public std::false_type {
+        }; // in general, false
+
+        template<typename T>
+        struct is_noarg_callable_t<T, decltype(std::declval<T &&>()())>
+                : public std::true_type {
+        }; // only true when call expression valid
+
+        // Type trait determining whether a no-argument callable returns void
+        template<typename T>
+        struct returns_void_t
+                : public std::is_same<void, decltype(std::declval<T &&>()())> {
+        };
+
+        /* Type trait determining whether a no-arg callable is nothrow invocable if
+        required. This is where SG_REQUIRE_NOEXCEPT logic is encapsulated. */
+        template<typename T>
+        struct is_nothrow_invocable_if_required_t
+                : public
+#ifdef SG_REQUIRE_NOEXCEPT
+                  std::is_nothrow_invocable<T> /* Note: _r variants not enough to
+                                          confirm void return: any return can be
+                                          discarded so all returns are
+                                          compatible with void */
+#else
+                  std::true_type
+#endif
+        {
+        };
+
+        // logic AND of two or more type traits
+        template<typename A, typename B, typename... C>
+        struct and_t : public and_t<A, and_t<B, C...>> {
+        }; // for more than two arguments
+
+        template<typename A, typename B>
+        struct and_t<A, B> : public std::conditional<A::value, B, A>::type {
+        }; // for two arguments
+
+        // Type trait determining whether a type is a proper scope_guard callback.
+        template<typename T>
+        struct is_proper_sg_callback_t
+                : public and_t<is_noarg_callable_t<T>,
+                        returns_void_t<T>,
+                        is_nothrow_invocable_if_required_t<T>,
+                        std::is_nothrow_destructible<T>> {
+        };
+
+
+        /* --- The actual scope_guard template --- */
+
+        template<typename Callback,
+                typename = typename std::enable_if<
+                        is_proper_sg_callback_t<Callback>::value>::type>
+        class scope_guard;
+
+
+        /* --- Now the friend maker --- */
+
+        template<typename Callback>
+        detail::scope_guard<Callback> make_scope_guard(Callback &&callback)
+        noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value); /*
+    we need this in the inner namespace due to MSVC bugs preventing
+    sg::detail::scope_guard from befriending a sg::make_scope_guard
+    template instance in the parent namespace (see https://is.gd/xFfFhE). */
+
+
+        /* --- The template specialization that actually defines the class --- */
+
+        template<typename Callback>
+        class scope_guard<Callback> final {
+        public:
+            typedef Callback callback_type;
+
+            scope_guard(scope_guard &&other)
+            noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value);
+
+            ~scope_guard() noexcept; // highlight noexcept dtor
+
+            void dismiss() noexcept;
+
+        public:
+            scope_guard() = delete;
+
+            scope_guard(const scope_guard &) = delete;
+
+            scope_guard &operator=(const scope_guard &) = delete;
+
+            scope_guard &operator=(scope_guard &&) = delete;
+
+        private:
+            explicit scope_guard(Callback &&callback)
+            noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value); /*
+                                                      meant for friends only */
+
+            friend scope_guard<Callback> make_scope_guard<Callback>(Callback &&)
+            noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value); /*
+      only make_scope_guard can create scope_guards from scratch (i.e. non-move)
+      */
+
+        private:
+            Callback m_callback;
+            bool m_active;
+
+        };
+
+    } // namespace detail
+
+
+    /* --- Now the single public maker function --- */
+
+    using detail::make_scope_guard; // see comment on declaration above
+
+} // namespace sg
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::scope_guard(Callback &&callback)
+noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value)
+        : m_callback(std::forward<Callback>(callback)) /* use () instead of {} because
+    of DR 1467 (https://is.gd/WHmWuo), which still impacts older compilers
+    (e.g. GCC 4.x and clang <=3.6, see https://godbolt.org/g/TE9tPJ and
+    https://is.gd/Tsmh8G) */
+        , m_active{true} {}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::~scope_guard() noexcept {
+    if (m_active)
+        m_callback();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+sg::detail::scope_guard<Callback>::scope_guard(scope_guard &&other)
+noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value)
+        : m_callback(std::forward<Callback>(other.m_callback)) // idem
+        , m_active{std::move(other.m_active)} {
+    other.m_active = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+inline void sg::detail::scope_guard<Callback>::dismiss() noexcept {
+    m_active = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+template<typename Callback>
+inline auto sg::detail::make_scope_guard(Callback &&callback)
+noexcept(std::is_nothrow_constructible<Callback, Callback &&>::value)
+-> detail::scope_guard <Callback> {
+    return detail::scope_guard<Callback>{std::forward<Callback>(callback)};
+}
+
+#endif /* SCOPE_GUARD_HPP_ */