Forráskód Böngészése

Pause developing network module.

jcsyshc 1 éve
szülő
commit
e0b887cfef
39 módosított fájl, 2381 hozzáadás és 28 törlés
  1. 17 3
      CMakeLists.txt
  2. 0 0
      src/core/cuda_helper.hpp
  3. 0 0
      src/core/image_utility.hpp
  4. 0 0
      src/core/imgui_utility.hpp
  5. 2 2
      src/core/impl/memory_pool.cpp
  6. 1 1
      src/core/impl/memory_pool_impl.h
  7. 0 1
      src/core/impl/object_manager.cpp
  8. 2 0
      src/core/impl/object_manager_impl.h
  9. 6 1
      src/core/memory_pool.h
  10. 0 0
      src/core/object_manager.h
  11. 8 0
      src/core/utility.hpp
  12. 3 3
      src/device/impl/orb_camera.cpp
  13. 1 1
      src/device/impl/orb_camera_impl.h
  14. 2 2
      src/device/impl/orb_camera_ui.cpp
  15. 2 2
      src/device/orb_camera.h
  16. 2 2
      src/device/orb_camera_ui.h
  17. 4 2
      src/impl/main_impl.cpp
  18. 1 1
      src/impl/main_impl.h
  19. 1 1
      src/impl/object_names.h
  20. 1 1
      src/module/image_viewer.h
  21. 4 2
      src/module/impl/image_viewer.cpp
  22. 265 0
      src/network/binary_utility.hpp
  23. 28 0
      src/network/impl/crc_checker.cpp
  24. 32 0
      src/network/impl/crc_checker.h
  25. 35 0
      src/network/impl/fragment/frag_base.hpp
  26. 114 0
      src/network/impl/fragment/frag_basic.cpp
  27. 79 0
      src/network/impl/fragment/frag_basic.h
  28. 52 0
      src/network/impl/fragment/frag_rs.cpp
  29. 167 0
      src/network/impl/fragment/frag_rs.h
  30. 998 0
      src/network/impl/fragment/third_party/rs.c
  31. 88 0
      src/network/impl/fragment/third_party/rs.h
  32. 132 0
      src/network/impl/impl_utility.hpp
  33. 61 0
      src/network/impl/multiplexer.cpp
  34. 156 0
      src/network/impl/multiplexer_impl.h
  35. 0 0
      src/network/impl/network.cpp
  36. 20 0
      src/network/impl/network_impl.h
  37. 94 0
      src/network/network.h
  38. 1 1
      src/render/render_tools.h
  39. 2 2
      src/render/render_utility.h

+ 17 - 3
CMakeLists.txt

@@ -5,9 +5,15 @@ set(CMAKE_CXX_STANDARD 20)
 
 add_executable(${PROJECT_NAME} src/main.cpp
         src/impl/main_impl.cpp
-        src/impl/memory_pool.cpp
-        src/impl/object_manager.cpp
+        src/core/impl/memory_pool.cpp
+        src/core/impl/object_manager.cpp
         src/module/impl/image_viewer.cpp
+#        src/network/impl/crc_checker.cpp
+#        src/network/impl/fragment/third_party/rs.c
+#        src/network/impl/fragment/frag_basic.cpp
+#        src/network/impl/fragment/frag_rs.cpp
+#        src/network/impl/multiplexer.cpp
+#        src/network/impl/network.cpp
         src/render/impl/render_texture.cpp
         src/render/impl/render_tools.cpp
         src/render/impl/render_utility.cpp)
@@ -83,4 +89,12 @@ find_package(OrbbecSDK REQUIRED)
 target_link_libraries(${PROJECT_NAME} OrbbecSDK::OrbbecSDK)
 target_sources(${PROJECT_NAME} PRIVATE
         src/device/impl/orb_camera.cpp
-        src/device/impl/orb_camera_ui.cpp)
+        src/device/impl/orb_camera_ui.cpp)
+
+# Crypto++ config
+set(CRYPTOPP_DIR /home/tpx/usr)
+set(CRYPTOPP_LIB_DIR ${CRYPTOPP_DIR}/lib)
+find_library(CRYPTOPP_LIB cryptopp HINTS ${CRYPTOPP_LIB_DIR})
+set(CRYPTOPP_INCLUDE_DIR ${CRYPTOPP_DIR}/include)
+target_include_directories(${PROJECT_NAME} PRIVATE ${CRYPTOPP_INCLUDE_DIR})
+target_link_libraries(${PROJECT_NAME} ${CRYPTOPP_LIB})

+ 0 - 0
src/cuda_helper.hpp → src/core/cuda_helper.hpp


+ 0 - 0
src/image_utility.hpp → src/core/image_utility.hpp


+ 0 - 0
src/imgui_utility.hpp → src/core/imgui_utility.hpp


+ 2 - 2
src/impl/memory_pool.cpp → src/core/impl/memory_pool.cpp

@@ -1,6 +1,6 @@
 #include "memory_pool_impl.h"
-#include "cuda_helper.hpp"
-#include "utility.hpp"
+#include "core/cuda_helper.hpp"
+#include "core/utility.hpp"
 
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/post.hpp>

+ 1 - 1
src/impl/memory_pool_impl.h → src/core/impl/memory_pool_impl.h

@@ -1,7 +1,7 @@
 #ifndef DEPTHGUIDE_MEMORY_POOL_IMPL_H
 #define DEPTHGUIDE_MEMORY_POOL_IMPL_H
 
-#include "memory_pool.h"
+#include "core/memory_pool.h"
 
 #include <boost/asio/io_context.hpp>
 

+ 0 - 1
src/impl/object_manager.cpp → src/core/impl/object_manager.cpp

@@ -1,4 +1,3 @@
-#include "object_manager.h"
 #include "object_manager_impl.h"
 
 #include <boost/asio/io_context.hpp>

+ 2 - 0
src/impl/object_manager_impl.h → src/core/impl/object_manager_impl.h

@@ -1,6 +1,8 @@
 #ifndef DEPTHGUIDE_OBJECT_MANAGER_IMPL_H
 #define DEPTHGUIDE_OBJECT_MANAGER_IMPL_H
 
+#include "core/object_manager.h"
+
 #include <list>
 #include <unordered_map>
 

+ 6 - 1
src/memory_pool.h → src/core/memory_pool.h

@@ -39,7 +39,6 @@ public:
         return as_shared(allocate_pitch<T>(cols, rows, mem_loc, pitch));
     }
 
-    // can be called from any thread.
     void deallocate(void *ptr);
 
     // free all unused memory
@@ -64,6 +63,12 @@ public:
 
 extern memory_pool global_mp;
 
+#define MEM_ALLOC(type, n, loc) \
+    global_mp.allocate<type>(n, loc)
+
+#define MEM_DEALLOC(ptr) \
+    global_mp.deallocate(ptr)
+
 #define ALLOC_SHARED(type, n, loc) \
     global_mp.allocate_shared<type>(n, loc)
 

+ 0 - 0
src/object_manager.h → src/core/object_manager.h


+ 8 - 0
src/utility.hpp → src/core/utility.hpp

@@ -20,10 +20,18 @@
     assert(false);\
     unreachable()
 
+#define RET_ERROR_N \
+    assert(false); \
+    return
+
 #define RET_ERROR_B \
     assert(false); \
     return false
 
+#define RET_ERROR_E \
+    assert(false); \
+    return {}
+
 #define RET_ERROR_P \
     assert(false); \
     return nullptr

+ 3 - 3
src/device/impl/orb_camera.cpp

@@ -1,7 +1,7 @@
 #include "orb_camera_impl.h"
-#include "image_utility.hpp"
-#include "object_manager.h"
-#include "utility.hpp"
+#include "core/image_utility.hpp"
+#include "core/object_manager.h"
+#include "core/utility.hpp"
 
 #include <boost/asio/post.hpp>
 

+ 1 - 1
src/device/impl/orb_camera_impl.h

@@ -2,7 +2,7 @@
 #define DEPTHGUIDE_ORB_CAMERA_IMPL_H
 
 #include "device/orb_camera.h"
-#include "image_utility.hpp"
+#include "core/image_utility.hpp"
 
 #include <boost/asio/io_context.hpp>
 

+ 2 - 2
src/device/impl/orb_camera_ui.cpp

@@ -1,6 +1,6 @@
 #include "orb_camera_ui_impl.h"
-#include "image_utility.hpp"
-#include "imgui_utility.hpp"
+#include "core/image_utility.hpp"
+#include "core/imgui_utility.hpp"
 
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/post.hpp>

+ 2 - 2
src/device/orb_camera.h

@@ -1,8 +1,8 @@
 #ifndef DEPTHGUIDE_ORB_CAMERA_H
 #define DEPTHGUIDE_ORB_CAMERA_H
 
-#include "cuda_helper.hpp"
-#include "object_manager.h"
+#include "core/cuda_helper.hpp"
+#include "core/object_manager.h"
 
 #include <boost/asio/io_context.hpp>
 

+ 2 - 2
src/device/orb_camera_ui.h

@@ -1,8 +1,8 @@
 #ifndef DEPTHGUIDE_ORB_CAMERA_UI_H
 #define DEPTHGUIDE_ORB_CAMERA_UI_H
 
-#include "cuda_helper.hpp"
-#include "object_manager.h"
+#include "core/cuda_helper.hpp"
+#include "core/object_manager.h"
 
 #include <memory>
 

+ 4 - 2
src/impl/main_impl.cpp

@@ -1,6 +1,6 @@
 #include "main_impl.h"
 #include "device/orb_camera_ui.h"
-#include "image_utility.hpp"
+#include "core/image_utility.hpp"
 #include "module/image_viewer.h"
 #include "object_names.h"
 
@@ -16,7 +16,7 @@
 #include <imgui_impl_opengl3.h>
 
 // make glad happy
-#include "imgui_utility.hpp"
+#include "core/imgui_utility.hpp"
 
 using boost::asio::io_context;
 using boost::asio::post;
@@ -185,6 +185,8 @@ void show_ui() {
                 ImGui::TreePop();
             }
         }
+
+        ImGui::PopItemWidth();
     }
     ImGui::End();
     ImGui::Render();

+ 1 - 1
src/impl/main_impl.h

@@ -1,7 +1,7 @@
 #ifndef DEPTHGUIDE_MAIN_IMPL_H
 #define DEPTHGUIDE_MAIN_IMPL_H
 
-#include "cuda_helper.hpp"
+#include "core/cuda_helper.hpp"
 
 #include <boost/asio/io_context.hpp>
 

+ 1 - 1
src/object_names.h → src/impl/object_names.h

@@ -1,7 +1,7 @@
 #ifndef DEPTHGUIDE_OBJECT_NAMES_H
 #define DEPTHGUIDE_OBJECT_NAMES_H
 
-#include "object_manager.h"
+#include "core/object_manager.h"
 
 enum obj_names : object_manager::name_type {
 

+ 1 - 1
src/module/image_viewer.h

@@ -1,7 +1,7 @@
 #ifndef DEPTHGUIDE_IMAGE_VIEWER_H
 #define DEPTHGUIDE_IMAGE_VIEWER_H
 
-#include "object_manager.h"
+#include "core/object_manager.h"
 #include "render/render_utility.h"
 
 #include <memory>

+ 4 - 2
src/module/impl/image_viewer.cpp

@@ -1,5 +1,5 @@
 #include "image_viewer_impl.h"
-#include "imgui_utility.hpp"
+#include "core/imgui_utility.hpp"
 #include "render/render_texture.h"
 
 void image_viewer::impl::show_color_depth() {
@@ -9,11 +9,12 @@ void image_viewer::impl::show_color_depth() {
     ImGui::SameLine();
     ImGui::RadioButton("Both", &chose_index, 2);
 
+    ImGui::PushItemWidth(150);
     {
         auto guard = imgui_disable_guard(!depth_conf.manual_depth_range);
         static constexpr float dep_hard_min = 0.15; // TODO: config value
         static constexpr float dep_hard_max = 10.0;
-        ImGui::DragFloat2("Depth Range", (float *) &depth_conf.depth_range, 0.05f,
+        ImGui::DragFloat2("Depth Range (m)", (float *) &depth_conf.depth_range, 0.05f,
                           dep_hard_min, dep_hard_max, "%.2f");
     }
     ImGui::SameLine();
@@ -22,6 +23,7 @@ void image_viewer::impl::show_color_depth() {
     if (chose_index == 2) { // both
         ImGui::SliderFloat("Depth Alpha", &depth_overlay_alpha, 0.f, 1.f, "%.2f");
     }
+    ImGui::PopItemWidth();
 }
 
 void image_viewer::impl::show() {

+ 265 - 0
src/network/binary_utility.hpp

@@ -0,0 +1,265 @@
+#ifndef DEPTHGUIDE_BINARY_UTILITY_HPP
+#define DEPTHGUIDE_BINARY_UTILITY_HPP
+
+#include "core/memory_pool.h"
+
+#include <boost/core/noncopyable.hpp>
+#include <boost/endian.hpp>
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+struct data_mem_type : private boost::noncopyable {
+
+    uint8_t *ptr = nullptr;
+    size_t size = 0;
+
+    explicit data_mem_type(size_t _size) {
+        size = _size;
+        ptr = MEM_ALLOC(uint8_t, size, MEM_HOST);
+    }
+
+    ~data_mem_type() {
+        MEM_DEALLOC(ptr);
+    }
+
+    uint8_t *start_ptr() const {
+        return ptr;
+    }
+
+    uint8_t *end_ptr() const {
+        return ptr + size;
+    }
+};
+
+struct data_type {
+    uint8_t *ptr = nullptr;
+    size_t size = 0;
+
+    data_type() = default;
+
+    // pre_size: reserved size before ptr.
+    explicit data_type(size_t _size, size_t pre_size = 0)
+            : data_type(
+            std::make_shared<data_mem_type>(_size + pre_size), pre_size, _size) {
+    }
+
+    bool empty() const {
+        return ptr == nullptr;
+    }
+
+    auto clone(size_t pre_size = 0) const {
+        auto ret = data_type(size, pre_size);
+        ret.replace(0, *this);
+        return ret;
+    }
+
+    /* if _size == -1:
+     *   ret_size = -offset       (offset < 0)
+     *   ret_size keeps end_ptr() (offset >= 0) */
+    auto sub_data(ptrdiff_t offset, size_t _size = -1) const {
+        // determine ret_size
+        if (_size == -1) {
+            if (offset < 0) {
+                _size = -offset;
+            } else {
+                _size = size - offset;
+            }
+        }
+
+        auto ret_ptr = ptr + offset;
+        assert(ret_ptr >= mem->start_ptr());
+        assert(ret_ptr + _size <= mem->end_ptr());
+        return data_type(mem, ret_ptr - mem->ptr, _size);
+    }
+
+    void replace(size_t offset, size_t _size, uint8_t *data) {
+        assert(offset + _size <= size);
+        std::copy_n(data, _size, start_ptr() + offset);
+    }
+
+    void replace(size_t offset, const data_type &data) {
+        replace(offset, data.size, data.start_ptr());
+    }
+
+    auto extend(size_t _size) const {
+        return sub_data(0, size + _size);
+    }
+
+    uint8_t *start_ptr() const {
+        return ptr;
+    }
+
+    uint8_t *end_ptr() const {
+        return ptr + size;
+    }
+
+private:
+    std::shared_ptr<data_mem_type> mem;
+
+    data_type(const std::shared_ptr<data_mem_type> &_mem,
+              size_t offset, size_t _size) {
+        mem = _mem;
+        ptr = mem->ptr + offset;
+        size = _size;
+        assert(offset + size <= mem->size);
+    }
+};
+
+template<boost::endian::order net_order, typename T>
+std::enable_if_t<std::is_arithmetic_v<T>>
+inline swap_net_loc_endian(T &val) {
+    if constexpr (boost::endian::order::native == net_order) {
+        return;
+    } else {
+        boost::endian::endian_reverse_inplace(val);
+    }
+}
+
+class versatile_io : public boost::noncopyable {
+public:
+
+    explicit versatile_io(data_type _data)
+            : data(std::move(_data)) {
+        cur_ptr = start_ptr();
+    }
+
+    // 从当前位置开始调整 offset
+    void manual_offset(ptrdiff_t offset) {
+        cur_ptr += offset;
+        assert(cur_ptr >= start_ptr());
+        assert(cur_ptr <= end_ptr());
+    }
+
+    auto current_offset() const {
+        return cur_ptr - start_ptr();
+    }
+
+    auto remaining_bytes() const {
+        return end_ptr() - cur_ptr;
+    }
+
+    bool empty() const {
+        return cur_ptr == end_ptr();
+    }
+
+protected:
+    data_type data;
+    uint8_t *cur_ptr = nullptr;
+
+    uint8_t *start_ptr() const {
+        return data.start_ptr();
+    }
+
+    uint8_t *end_ptr() const {
+        return data.end_ptr();
+    }
+};
+
+// 分多次读取数据
+template<boost::endian::order net_order>
+class versatile_reader : public versatile_io {
+public:
+
+    using versatile_io::versatile_io;
+
+    template<typename T>
+    std::enable_if_t<std::is_arithmetic_v<T>, T>
+    read_value() {
+        T tmp_val;
+        std::copy_n(cur_ptr, sizeof(T), (uint8_t *) &tmp_val);
+        swap_net_loc_endian<net_order>(tmp_val);
+        cur_ptr += sizeof(T);
+        assert(cur_ptr <= end_ptr());
+        return tmp_val;
+    }
+
+    template<typename T>
+    std::enable_if_t<std::is_arithmetic_v<T>>
+    read_value(T &val) {
+        val = read_value<T>();
+    }
+
+    template<typename T, size_t Length>
+    void read_value(std::array<T, Length> &arr) {
+        for (auto &val: arr) {
+            read_value(val);
+        }
+    }
+
+    data_type read_data(size_t size) {
+        auto offset = cur_ptr - start_ptr();
+        auto ret = data.sub_data(offset, size);
+        cur_ptr += size;
+        assert(cur_ptr <= end_ptr());
+        return ret;
+    }
+
+    void read_data(const data_type &out) {
+        std::copy_n(cur_ptr, out.size, out.ptr);
+        cur_ptr += out.size;
+        assert(cur_ptr <= end_ptr());
+    }
+
+    data_type read_remain() {
+        auto offset = cur_ptr - start_ptr();
+        auto size = end_ptr() - cur_ptr;
+        auto ret = data.sub_data(offset, size);
+        cur_ptr = end_ptr();
+        return ret;
+    }
+
+    template<typename T>
+    auto &operator>>(T &val) {
+        read_value(val);
+        return *this;
+    }
+};
+
+// 分多次写入数据
+template<boost::endian::order net_order>
+class versatile_writer : public versatile_io {
+public:
+
+    using versatile_io::versatile_io;
+
+    template<typename T>
+    std::enable_if_t<std::is_arithmetic_v<T>>
+    write_value(T val) {
+        swap_net_loc_endian<net_order>(val);
+        std::copy_n((uint8_t *) &val, sizeof(T), cur_ptr);
+        cur_ptr += sizeof(T);
+        assert(cur_ptr <= end_ptr());
+    }
+
+    template<typename T, size_t Length>
+    void write_value(const std::array<T, Length> &arr) {
+        for (auto val: arr) {
+            write_value(val);
+        }
+    }
+
+    void write_data(const data_type &_data) {
+        std::copy_n(cur_ptr, _data.size, _data.start_ptr());
+        cur_ptr += _data.size;
+        assert(cur_ptr <= end_ptr());
+    }
+
+    void write_value(const data_type &_data) {
+        write_data(_data);
+    }
+
+    template<typename T>
+    auto &operator<<(const T &val) {
+        write_value(val);
+        return *this;
+    }
+};
+
+static constexpr auto network_order = boost::endian::order::big;
+using network_writer = versatile_writer<network_order>;
+using network_reader = versatile_reader<network_order>;
+
+#endif //DEPTHGUIDE_BINARY_UTILITY_HPP

+ 28 - 0
src/network/impl/crc_checker.cpp

@@ -0,0 +1,28 @@
+#include "crc_checker.h"
+#include "network/binary_utility.hpp"
+
+data_type checksum_channel::encode_data(const data_type &data) {
+    crc_val_type crc_val;
+    crc.Restart();
+    crc.CalculateDigest(crc_val.data(), data.start_ptr(), data.size);
+
+    auto enc_data = data.sub_data(-crc.DigestSize());
+    auto writer = network_writer(enc_data);
+    writer << crc_val;
+    assert(writer.remaining_bytes() == 0);
+
+    return enc_data;
+}
+
+data_type checksum_channel::decode_data(const data_type &data) {
+    auto reader = network_reader(data);
+    crc_val_type crc_val;
+    reader >> crc_val;
+    auto dec_data = reader.read_remain();
+
+    crc.Restart();
+    auto ok = crc.VerifyDigest(crc_val.data(), dec_data.start_ptr(), dec_data.size);
+    if (!ok) [[unlikely]] return {};
+
+    return dec_data;
+}

+ 32 - 0
src/network/impl/crc_checker.h

@@ -0,0 +1,32 @@
+#ifndef DEPTHGUIDE_CRC_CHECKER_H
+#define DEPTHGUIDE_CRC_CHECKER_H
+
+#include "impl_utility.hpp"
+
+#include <cryptopp/crc.h>
+
+/* [4 bytes] crc32
+ * [n bytes] data */
+class checksum_channel : public filter_channel {
+public:
+
+    using filter_channel::filter_channel;
+
+protected:
+
+    data_type encode_data(const data_type &data) override;
+
+    data_type decode_data(const data_type &data) override;
+
+    size_t extra_header_size() const override {
+        return crc_size;
+    }
+
+private:
+    using crc_type = CryptoPP::CRC32;
+    static constexpr auto crc_size = crc_type::DIGESTSIZE;
+    using crc_val_type = std::array<uint8_t, crc_size>;
+    crc_type crc;
+};
+
+#endif //DEPTHGUIDE_CRC_CHECKER_H

+ 35 - 0
src/network/impl/fragment/frag_base.hpp

@@ -0,0 +1,35 @@
+#ifndef DEPTHGUIDE_FRAG_BASE_HPP
+#define DEPTHGUIDE_FRAG_BASE_HPP
+
+#include "../impl_utility.hpp"
+
+#include <boost/signals2.hpp>
+
+class frag_base : public data_channel {
+public:
+    struct create_config {
+        data_channel_type low_channel;
+        size_t queue_size = 1; // max incomplete messages in queue, >= 1
+    };
+
+    explicit frag_base(create_config conf) {
+        low = std::move(conf.low_channel);
+        q_size = conf.queue_size;
+
+        low->set_recv_func([this](auto data) { append_data(data); });
+    }
+
+    size_t header_size() const override { return 0; }
+
+    using signal_type = boost::signals2::signal<void()>;
+    signal_type sig_msg_loss;
+
+protected:
+
+    data_channel_type low;
+    size_t q_size = 0;
+
+    virtual void append_data(const data_type &data) = 0;
+};
+
+#endif //DEPTHGUIDE_FRAG_BASE_HPP

+ 114 - 0
src/network/impl/fragment/frag_basic.cpp

@@ -0,0 +1,114 @@
+#include "frag_basic.h"
+#include "core/utility.hpp"
+
+bool frag_basic::send(const data_type &data) {
+    auto header_extra_size = frag_header_type::extra_size;
+    auto frag_header_size = low->header_size() + header_extra_size;
+    auto frag_max_size = low->max_length() - header_extra_size;
+
+    frag_header_type header;
+    header.msg_id = ++last_send_id;
+    header.msg_size = data.size;
+    header.bl_num = data.size / frag_max_size + ((data.size % frag_max_size) != 0);
+    header.bl_id = 0;
+
+    for (uint32_t offset = 0; offset < data.size; offset += frag_max_size) {
+        auto frag_size = std::min(frag_max_size, data.size - offset);
+        auto frag_data = data.sub_data(offset, frag_size).clone(frag_header_size);
+        header.bl_offset = offset;
+        bool ok = low->send(encode_header(data, header));
+        if (!ok) return false;
+        ++header.bl_id;
+    }
+    return true;
+}
+
+void frag_basic::append_data(const data_type &data) {
+    frag_header_type header;
+    auto dec_data = decode_header(data, &header);
+    if (dec_data.empty()) return;
+
+    // ignore if message is too old
+    auto msg_id = header.msg_id;
+    if (msg_id <= last_recv_id) return;
+    if (!msg_q.empty()) {
+        auto last_id = msg_q.back().msg_id;
+        if ((int32_t) (last_id - msg_id) >= q_size) return;
+    }
+
+    // remove old incomplete messages
+    while (!msg_q.empty()) {
+        auto fnt = msg_q.begin(); // front
+        if ((int32_t) (msg_id - fnt->msg_id) >= q_size) {
+            if (fnt->is_complete()) {
+                recv_cb_func(fnt->data);
+                assert(fnt->msg_id > last_recv_id);
+                last_recv_id = fnt->msg_id;
+            } else {
+                sig_msg_loss(); // emit message loss signal
+            }
+            msg_q.pop_front();
+        }
+    }
+
+    // determine message location
+    auto iter = queue_type::iterator();
+    if (msg_q.empty() || msg_id < msg_q.front().msg_id) [[unlikely]] {
+        msg_q.emplace_front(header);
+        iter = msg_q.begin();
+    } else {
+        iter = msg_q.begin();
+        for (;;) {
+            if (iter->msg_id == msg_id) break;
+            assert(msg_id > iter->msg_id);
+            auto iter_next = iter++;
+            if (iter_next == msg_q.end() || msg_id < iter_next->msg_id) {
+                iter = msg_q.emplace(iter, header);
+                break;
+            }
+            iter = iter_next;
+        }
+    }
+
+    // check consistency
+    if (!iter->verify(header)) { RET_ERROR_N; }
+
+    // ignore duplicated block
+    auto bl_id = header.bl_id;
+    if (iter->bl_ok.test(bl_id)) return;
+
+    // copy block data
+    iter->merge(header, dec_data);
+
+    // callback if message completes
+    while (!msg_q.empty()) {
+        if (auto fnt = msg_q.begin(); fnt->is_complete()) {
+            recv_cb_func(fnt->data);
+            assert(fnt->msg_id > last_recv_id);
+            last_recv_id = fnt->msg_id;
+            msg_q.pop_front();
+        } else break;
+    }
+}
+
+bool frag_basic::msg_info::verify(frag_header_type header) const {
+    assert(msg_id == header.msg_id);
+    if (bl_ok.size() != header.bl_num) { RET_ERROR_B; }
+    if (data.size != header.msg_size) { RET_ERROR_B; }
+    return true;
+}
+
+bool frag_basic::msg_info::is_complete() const {
+    auto ok = bl_ok.all();
+    if (ok) { assert(bytes_ok == data.size); }
+    return ok;
+}
+
+void frag_basic::msg_info::merge(frag_header_type header, const data_type &sub_data) {
+    auto offset = header.bl_offset;
+    assert(offset + sub_data.size <= data.size);
+    std::copy_n(sub_data.start_ptr(), sub_data.size,
+                data.start_ptr() + offset);
+    bl_ok.set(header.bl_id);
+    bytes_ok += sub_data.size;
+}

+ 79 - 0
src/network/impl/fragment/frag_basic.h

@@ -0,0 +1,79 @@
+#ifndef DEPTHGUIDE_FRAG_BASIC_H
+#define DEPTHGUIDE_FRAG_BASIC_H
+
+#include "frag_base.hpp"
+
+#include <boost/dynamic_bitset.hpp>
+
+#include <list>
+
+class frag_basic : public frag_base {
+public:
+
+    using frag_base::frag_base;
+
+    bool send(const data_type &data) override;
+
+    size_t max_length() const override { return UINT32_MAX; }
+
+private:
+
+    struct frag_header_type {
+        uint32_t msg_id = 0;
+        uint32_t msg_size = 0;
+        uint32_t bl_num = 0;
+        uint32_t bl_id = 0;
+        uint32_t bl_offset = 0;
+
+        using header_size_type = uint8_t;
+        static constexpr header_size_type header_size = 5 * sizeof(uint32_t);
+        static constexpr size_t extra_size = header_size + sizeof(header_size);
+
+        template<typename ReaderType>
+        void fill_from(ReaderType &reader) {
+            reader >> msg_id >> msg_size >> bl_num >> bl_id >> bl_offset;
+        }
+
+        template<typename WriterType>
+        void write_to(WriterType &writer) {
+            writer << msg_id << msg_size << bl_num << bl_id << bl_offset;
+        }
+    };
+
+    // for sending message
+
+    uint32_t last_send_id = 0;
+
+    // for receiving message
+
+    using bitset_type = boost::dynamic_bitset<>;
+
+    struct msg_info : boost::noncopyable {
+        uint32_t msg_id = 0;
+        bitset_type bl_ok;
+        data_type data;
+        size_t bytes_ok = 0;
+
+        explicit msg_info(frag_header_type header)
+                : bl_ok(header.bl_num), data(header.msg_size) {
+            msg_id = header.msg_id;
+        }
+
+        bool verify(frag_header_type header) const;
+
+        bool is_complete() const;
+
+        void merge(frag_header_type header, const data_type &data);
+    };
+
+    using queue_type = std::list<msg_info>;
+
+    uint32_t last_recv_id = 0; // id of last returned message
+    queue_type msg_q; // message queue
+
+    void append_data(const data_type &data) override;
+
+};
+
+
+#endif //DEPTHGUIDE_FRAG_BASIC_H

+ 52 - 0
src/network/impl/fragment/frag_rs.cpp

@@ -0,0 +1,52 @@
+#include "frag_rs.h"
+
+frag_rs::rs_helper::rs_helper(uint8_t data_bls, uint8_t _fec_bls, uint16_t _bl_size)
+        : data((data_bls + _fec_bls) * _bl_size) {
+    assert(data_bls + fec_bls <= max_bl_num);
+    bl_size = _bl_size;
+    fec_bls = _fec_bls;
+    rs = reed_solomon_new(data_bls, fec_bls); // TODO: create an per-object rs pool
+
+    auto all_bls = data_bls + fec_bls;
+    bl_ptrs.resize(all_bls);
+    for (auto k = 0; k < all_bls; ++k) {
+        bl_ptrs[k] = data.start_ptr() + bl_size * k;
+    }
+}
+
+bool frag_rs::chunk_info::verify(frag_header_type h) const {
+    assert(ck_id == h.ck_id);
+    if (ck_offset != h.ck_offset) { RET_ERROR_B; }
+    if (rs.bl_num() != h.bl_num) { RET_ERROR_B; }
+    if (rs.fec_bls != h.bl_fec) { RET_ERROR_B; }
+    if (rs.bl_size != h.bl_size) { RET_ERROR_B; }
+    return true;
+}
+
+bool frag_rs::chunk_info::is_complete() const {
+    return bl_ok_cnt >= rs.bl_num() - rs.fec_bls;
+}
+
+data_type frag_rs::chunk_info::decode() {
+    assert(is_complete());
+    auto ret = rs.decode(bl_miss.data());
+    assert(ret == true);
+    auto ck_size = (rs.bl_num() - rs.fec_bls) * rs.bl_size;
+    return rs.data.sub_data(0, ck_size);
+}
+
+frag_rs::chunk_info::chunk_info(frag_header_type h)
+        : rs(h.bl_num - h.bl_fec, h.bl_fec, h.bl_size) {
+    assert(bl_miss.empty());
+    bl_miss.resize(h.bl_num, true);
+}
+
+void frag_rs::chunk_info::merge(frag_header_type h, const data_type &data) {
+    auto bl_id = h.bl_id;
+    if (!bl_miss[bl_id]) return; // duplicate block
+    assert(rs.bl_size == data.size);
+    assert(rs.bl_ptrs[bl_id] + data.size <= rs.data.end_ptr());
+    std::copy_n(data.start_ptr(), data.size, rs.bl_ptrs[bl_id]);
+    bl_miss[bl_id] = false;
+    ++bl_ok_cnt;
+}

+ 167 - 0
src/network/impl/fragment/frag_rs.h

@@ -0,0 +1,167 @@
+#ifndef DEPTHGUIDE_FRAG_RS_H
+#define DEPTHGUIDE_FRAG_RS_H
+
+#include "frag_base.hpp"
+
+extern "C" {
+#include "third_party/rs.h"
+}
+
+#include <boost/container/static_vector.hpp>
+#include <boost/dynamic_bitset.hpp>
+
+#include <list>
+
+class frag_rs : public frag_base {
+public:
+
+    struct create_config {
+        // for parent
+        data_channel_type low_channel;
+        size_t queue_size = 1;
+        // for this
+        float parity_rate = 0.2f;
+    };
+
+    explicit frag_rs(create_config conf)
+            : frag_base(frag_base::create_config{
+            .low_channel=std::move(conf.low_channel),
+            .queue_size = conf.queue_size}) {
+        parity_rate = conf.parity_rate;
+    }
+
+    bool send(const data_type &data) override;
+
+    size_t max_length() const override;
+
+private:
+
+    struct frag_header_type {
+        uint32_t msg_id = 0;
+        uint32_t msg_size = 0;
+
+        uint8_t ck_num = 0; // number of chunks
+        uint8_t ck_id = 0;
+        uint32_t ck_offset = 0;
+
+        uint8_t bl_num = 0; // number of blocks
+        uint8_t bl_fec = 0; // number of fec blocks
+        uint8_t bl_id = 0;
+        uint16_t bl_size = 0;
+
+        using header_size_type = uint8_t;
+        static constexpr header_size_type header_size = 4 * sizeof(uint32_t) +
+                                                        1 * sizeof(uint16_t) +
+                                                        5 * sizeof(uint8_t);
+        static constexpr size_t extra_size = header_size + sizeof(header_size);
+
+        template<typename ReaderType>
+        void fill_from(ReaderType &reader) {
+            reader >> msg_id >> msg_size;
+            reader >> ck_num >> ck_id >> ck_offset;
+            reader >> bl_num >> bl_fec >> bl_id >> bl_size;
+        }
+
+        template<typename WriterType>
+        void write_to(WriterType &writer) {
+            writer << msg_id << msg_size;
+            writer << ck_num << ck_id << ck_offset;
+            writer << bl_num << bl_fec << bl_id << bl_size;
+        }
+    };
+
+    // for sending message
+
+    uint32_t last_send_id = 0;
+    float parity_rate = 0.2;
+
+    // for receiving message
+
+    class rs_helper : private boost::noncopyable {
+    public:
+        static constexpr auto max_bl_num = DATA_SHARDS_MAX;
+        data_type data;
+
+        using bl_ptrs_types = boost::container::static_vector<uint8_t *, max_bl_num>;
+        bl_ptrs_types bl_ptrs;
+
+        // for verify
+        uint8_t fec_bls = 0;
+        uint16_t bl_size = 0;
+
+        // data blocks, fec blocks
+        rs_helper(uint8_t data_bls, uint8_t fec_bls, uint16_t bl_size);
+
+        ~rs_helper() { reed_solomon_release(rs); }
+
+        uint8_t bl_num() const { return bl_ptrs.size(); }
+
+        void encode() {
+            reed_solomon_encode2(rs, bl_ptrs.data(), bl_ptrs.size(), bl_size);
+        }
+
+        bool decode(bool *bl_miss) {
+            auto ret = reed_solomon_reconstruct(
+                    rs, bl_ptrs.data(), (uint8_t *) bl_miss, bl_ptrs.size(), bl_size);
+            return ret == 0;
+        }
+
+    private:
+        reed_solomon *rs = nullptr;
+    };
+
+    using bitset_type = boost::dynamic_bitset<>;
+
+    struct chunk_info : private boost::noncopyable {
+        static constexpr auto max_bl_num = rs_helper::max_bl_num;
+
+        uint8_t ck_id = 0;
+        uint32_t ck_offset = 0;
+        rs_helper rs;
+
+        using bl_miss_type = boost::container::static_vector<bool, max_bl_num>;
+        bl_miss_type bl_miss;
+        uint8_t bl_ok_cnt = 0; // number of received blocks
+
+        explicit chunk_info(frag_header_type header);
+
+        bool verify(frag_header_type header) const;
+
+        bool is_complete() const;
+
+        data_type decode();
+
+        void merge(frag_header_type header, const data_type &data);
+    };
+
+    struct msg_info : private boost::noncopyable {
+        uint32_t msg_id = 0;
+        data_type data; // TODO: handle extra bytes for block padding
+        size_t bytes_ok = 0;
+        bitset_type ck_ok;
+
+        using ck_list_type = std::list<chunk_info>;
+        ck_list_type ck_list;
+
+        explicit msg_info(frag_header_type header)
+                : bl_ok(header.bl_num), data(header.msg_size) {
+            msg_id = header.msg_id;
+        }
+
+        bool verify(frag_header_type header) const;
+
+        bool is_complete() const;
+
+        void merge(frag_header_type header, const data_type &data);
+    };
+
+    using queue_type = std::list<msg_info>;
+
+    uint32_t last_recv_id = 0; // id of last returned message
+    queue_type msg_q; // message queue
+
+    void append_data(const data_type &data) override;
+};
+
+
+#endif //DEPTHGUIDE_FRAG_RS_H

+ 998 - 0
src/network/impl/fragment/third_party/rs.c

@@ -0,0 +1,998 @@
+/*#define PROFILE*/
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980624
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ * (C) 2001 Alain Knaff (alain@knaff.lu)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * Reimplement by Jannson (20161018): compatible for golang version of https://github.com/klauspost/reedsolomon
+ */
+
+/*
+ * The following parameter defines how many bits are used for
+ * field elements. The code supports any value from 2 to 16
+ * but fastest operation is achieved with 8 bit elements
+ * This is the only parameter you may want to change.
+ */
+#define GF_BITS  8  /* code over GF(2**GF_BITS) - change to suit */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <assert.h>
+#include "rs.h"
+
+/*
+ * stuff used for testing purposes only
+ */
+
+#ifdef  TEST
+#define DEB(x)
+#define DDB(x) x
+#define DEBUG   0   /* minimal debugging */
+
+#include <sys/time.h>
+#define DIFF_T(a,b) \
+    (1+ 1000000*(a.tv_sec - b.tv_sec) + (a.tv_usec - b.tv_usec) )
+
+#define TICK(t) \
+    {struct timeval x ; \
+    gettimeofday(&x, NULL) ; \
+    t = x.tv_usec + 1000000* (x.tv_sec & 0xff ) ; \
+    }
+#define TOCK(t) \
+    { u_long t1 ; TICK(t1) ; \
+      if (t1 < t) t = 256000000 + t1 - t ; \
+      else t = t1 - t ; \
+      if (t == 0) t = 1 ;}
+
+u_long ticks[10];   /* vars for timekeeping */
+#else
+#define DEB(x)
+#define DDB(x)
+#define TICK(x)
+#define TOCK(x)
+#endif /* TEST */
+
+/*
+ * You should not need to change anything beyond this point.
+ * The first part of the file implements linear algebra in GF.
+ *
+ * gf is the type used to store an element of the Galois Field.
+ * Must constain at least GF_BITS bits.
+ *
+ * Note: unsigned char will work up to GF(256) but int seems to run
+ * faster on the Pentium. We use int whenever have to deal with an
+ * index, since they are generally faster.
+ */
+/*
+ * AK: Udpcast only uses GF_BITS=8. Remove other possibilities
+ */
+#if (GF_BITS != 8)
+#error "GF_BITS must be 8"
+#endif
+typedef unsigned char gf;
+
+#define GF_SIZE ((1 << GF_BITS) - 1)    /* powers of \alpha */
+
+/*
+ * Primitive polynomials - see Lin & Costello, Appendix A,
+ * and  Lee & Messerschmitt, p. 453.
+ */
+static char *allPp[] = {    /* GF_BITS  polynomial      */
+        NULL,           /*  0   no code         */
+        NULL,           /*  1   no code         */
+        "111",          /*  2   1+x+x^2         */
+        "1101",         /*  3   1+x+x^3         */
+        "11001",            /*  4   1+x+x^4         */
+        "101001",           /*  5   1+x^2+x^5       */
+        "1100001",          /*  6   1+x+x^6         */
+        "10010001",         /*  7   1 + x^3 + x^7       */
+        "101110001",        /*  8   1+x^2+x^3+x^4+x^8   */
+        "1000100001",       /*  9   1+x^4+x^9       */
+        "10010000001",      /* 10   1+x^3+x^10      */
+        "101000000001",     /* 11   1+x^2+x^11      */
+        "1100101000001",        /* 12   1+x+x^4+x^6+x^12    */
+        "11011000000001",       /* 13   1+x+x^3+x^4+x^13    */
+        "110000100010001",      /* 14   1+x+x^6+x^10+x^14   */
+        "1100000000000001",     /* 15   1+x+x^15        */
+        "11010000000010001"     /* 16   1+x+x^3+x^12+x^16   */
+};
+
+
+/*
+ * To speed up computations, we have tables for logarithm, exponent
+ * and inverse of a number. If GF_BITS <= 8, we use a table for
+ * multiplication as well (it takes 64K, no big deal even on a PDA,
+ * especially because it can be pre-initialized an put into a ROM!),
+ * otherwhise we use a table of logarithms.
+ * In any case the macro gf_mul(x,y) takes care of multiplications.
+ */
+
+static gf gf_exp[2*GF_SIZE];    /* index->poly form conversion table    */
+static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table    */
+static gf inverse[GF_SIZE+1];   /* inverse of field elem.       */
+/* inv[\alpha**i]=\alpha**(GF_SIZE-i-1) */
+
+/*
+ * modnn(x) computes x % GF_SIZE, where GF_SIZE is 2**GF_BITS - 1,
+ * without a slow divide.
+ */
+static inline gf
+modnn(int x)
+{
+    while (x >= GF_SIZE) {
+        x -= GF_SIZE;
+        x = (x >> GF_BITS) + (x & GF_SIZE);
+    }
+    return x;
+}
+
+#define SWAP(a,b,t) {t tmp; tmp=a; a=b; b=tmp;}
+
+/*
+ * gf_mul(x,y) multiplies two numbers. If GF_BITS<=8, it is much
+ * faster to use a multiplication table.
+ *
+ * USE_GF_MULC, GF_MULC0(c) and GF_ADDMULC(x) can be used when multiplying
+ * many numbers by the same constant. In this case the first
+ * call sets the constant, and others perform the multiplications.
+ * A value related to the multiplication is held in a local variable
+ * declared with USE_GF_MULC . See usage in addmul1().
+ */
+
+#ifdef _MSC_VER
+__declspec(align(16))
+#else
+_Alignas(16)
+#endif
+static gf gf_mul_table[(GF_SIZE + 1) * (GF_SIZE + 1)];
+
+#define gf_mul(x, y) gf_mul_table[(x<<8)+y]
+
+#define USE_GF_MULC register gf * __gf_mulc_
+#define GF_MULC0(c) __gf_mulc_ = &gf_mul_table[(c)<<8]
+#define GF_ADDMULC(dst, x) dst ^= __gf_mulc_[x]
+#define GF_MULC(dst, x) dst = __gf_mulc_[x]
+
+static void
+init_mul_table(void)
+{
+    int i, j;
+    for (i=0; i< GF_SIZE+1; i++)
+        for (j=0; j< GF_SIZE+1; j++)
+            gf_mul_table[(i<<8)+j] = gf_exp[modnn(gf_log[i] + gf_log[j]) ] ;
+
+    for (j=0; j< GF_SIZE+1; j++)
+        gf_mul_table[j] = gf_mul_table[j<<8] = 0;
+}
+
+/*
+ * Generate GF(2**m) from the irreducible polynomial p(X) in p[0]..p[m]
+ * Lookup tables:
+ *     index->polynomial form       gf_exp[] contains j= \alpha^i;
+ *     polynomial form -> index form    gf_log[ j = \alpha^i ] = i
+ * \alpha=x is the primitive element of GF(2^m)
+ *
+ * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple
+ * multiplication of two numbers can be resolved without calling modnn
+ */
+
+
+
+/*
+ * initialize the data structures used for computations in GF.
+ */
+static void
+generate_gf(void)
+{
+    int i;
+    gf mask;
+    char *Pp =  allPp[GF_BITS] ;
+
+    mask = 1;   /* x ** 0 = 1 */
+    gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */
+    /*
+     * first, generate the (polynomial representation of) powers of \alpha,
+     * which are stored in gf_exp[i] = \alpha ** i .
+     * At the same time build gf_log[gf_exp[i]] = i .
+     * The first GF_BITS powers are simply bits shifted to the left.
+     */
+    for (i = 0; i < GF_BITS; i++, mask <<= 1 ) {
+        gf_exp[i] = mask;
+        gf_log[gf_exp[i]] = i;
+        /*
+         * If Pp[i] == 1 then \alpha ** i occurs in poly-repr
+         * gf_exp[GF_BITS] = \alpha ** GF_BITS
+         */
+        if ( Pp[i] == '1' )
+            gf_exp[GF_BITS] ^= mask;
+    }
+    /*
+     * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can als
+     * compute its inverse.
+     */
+    gf_log[gf_exp[GF_BITS]] = GF_BITS;
+    /*
+     * Poly-repr of \alpha ** (i+1) is given by poly-repr of
+     * \alpha ** i shifted left one-bit and accounting for any
+     * \alpha ** GF_BITS term that may occur when poly-repr of
+     * \alpha ** i is shifted.
+     */
+    mask = 1 << (GF_BITS - 1 ) ;
+    for (i = GF_BITS + 1; i < GF_SIZE; i++) {
+        if (gf_exp[i - 1] >= mask)
+            gf_exp[i] = gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1);
+        else
+            gf_exp[i] = gf_exp[i - 1] << 1;
+        gf_log[gf_exp[i]] = i;
+    }
+    /*
+     * log(0) is not defined, so use a special value
+     */
+    gf_log[0] = GF_SIZE ;
+    /* set the extended gf_exp values for fast multiply */
+    for (i = 0 ; i < GF_SIZE ; i++)
+        gf_exp[i + GF_SIZE] = gf_exp[i] ;
+
+    /*
+     * again special cases. 0 has no inverse. This used to
+     * be initialized to GF_SIZE, but it should make no difference
+     * since noone is supposed to read from here.
+     */
+    inverse[0] = 0 ;
+    inverse[1] = 1;
+    for (i=2; i<=GF_SIZE; i++)
+        inverse[i] = gf_exp[GF_SIZE-gf_log[i]];
+}
+
+/*
+ * Various linear algebra operations that i use often.
+ */
+
+/*
+ * addmul() computes dst[] = dst[] + c * src[]
+ * This is used often, so better optimize it! Currently the loop is
+ * unrolled 16 times, a good value for 486 and pentium-class machines.
+ * The case c=0 is also optimized, whereas c=1 is not. These
+ * calls are unfrequent in my typical apps so I did not bother.
+ *
+ * Note that gcc on
+ */
+#if 0
+#define addmul(dst, src, c, sz) \
+    if (c != 0) addmul1(dst, src, c, sz)
+#endif
+
+
+
+#define UNROLL 16 /* 1, 4, 8, 16 */
+static void
+slow_addmul1(gf *dst1, gf *src1, gf c, int sz)
+{
+    USE_GF_MULC ;
+    register gf *dst = dst1, *src = src1 ;
+    gf *lim = &dst[sz - UNROLL + 1] ;
+
+    GF_MULC0(c) ;
+
+#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */
+    for (; dst < lim ; dst += UNROLL, src += UNROLL ) {
+        GF_ADDMULC( dst[0] , src[0] );
+        GF_ADDMULC( dst[1] , src[1] );
+        GF_ADDMULC( dst[2] , src[2] );
+        GF_ADDMULC( dst[3] , src[3] );
+#if (UNROLL > 4)
+        GF_ADDMULC( dst[4] , src[4] );
+        GF_ADDMULC( dst[5] , src[5] );
+        GF_ADDMULC( dst[6] , src[6] );
+        GF_ADDMULC( dst[7] , src[7] );
+#endif
+#if (UNROLL > 8)
+        GF_ADDMULC( dst[8] , src[8] );
+        GF_ADDMULC( dst[9] , src[9] );
+        GF_ADDMULC( dst[10] , src[10] );
+        GF_ADDMULC( dst[11] , src[11] );
+        GF_ADDMULC( dst[12] , src[12] );
+        GF_ADDMULC( dst[13] , src[13] );
+        GF_ADDMULC( dst[14] , src[14] );
+        GF_ADDMULC( dst[15] , src[15] );
+#endif
+    }
+#endif
+    lim += UNROLL - 1 ;
+    for (; dst < lim; dst++, src++ )        /* final components */
+        GF_ADDMULC( *dst , *src );
+}
+
+# define addmul1 slow_addmul1
+
+static void addmul(gf *dst, gf *src, gf c, int sz) {
+    // fprintf(stderr, "Dst=%p Src=%p, gf=%02x sz=%d\n", dst, src, c, sz);
+    if (c != 0) addmul1(dst, src, c, sz);
+}
+
+/*
+ * mul() computes dst[] = c * src[]
+ * This is used often, so better optimize it! Currently the loop is
+ * unrolled 16 times, a good value for 486 and pentium-class machines.
+ * The case c=0 is also optimized, whereas c=1 is not. These
+ * calls are unfrequent in my typical apps so I did not bother.
+ *
+ * Note that gcc on
+ */
+#if 0
+#define mul(dst, src, c, sz) \
+    do { if (c != 0) mul1(dst, src, c, sz); else memset(dst, 0, c); } while(0)
+#endif
+
+#define UNROLL 16 /* 1, 4, 8, 16 */
+static void
+slow_mul1(gf *dst1, gf *src1, gf c, int sz)
+{
+    USE_GF_MULC ;
+    register gf *dst = dst1, *src = src1 ;
+    gf *lim = &dst[sz - UNROLL + 1] ;
+
+    GF_MULC0(c) ;
+
+#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */
+    for (; dst < lim ; dst += UNROLL, src += UNROLL ) {
+        GF_MULC( dst[0] , src[0] );
+        GF_MULC( dst[1] , src[1] );
+        GF_MULC( dst[2] , src[2] );
+        GF_MULC( dst[3] , src[3] );
+#if (UNROLL > 4)
+        GF_MULC( dst[4] , src[4] );
+        GF_MULC( dst[5] , src[5] );
+        GF_MULC( dst[6] , src[6] );
+        GF_MULC( dst[7] , src[7] );
+#endif
+#if (UNROLL > 8)
+        GF_MULC( dst[8] , src[8] );
+        GF_MULC( dst[9] , src[9] );
+        GF_MULC( dst[10] , src[10] );
+        GF_MULC( dst[11] , src[11] );
+        GF_MULC( dst[12] , src[12] );
+        GF_MULC( dst[13] , src[13] );
+        GF_MULC( dst[14] , src[14] );
+        GF_MULC( dst[15] , src[15] );
+#endif
+    }
+#endif
+    lim += UNROLL - 1 ;
+    for (; dst < lim; dst++, src++ )        /* final components */
+        GF_MULC( *dst , *src );
+}
+
+# define mul1 slow_mul1
+
+static inline void mul(gf *dst, gf *src, gf c, int sz) {
+    /*fprintf(stderr, "%p = %02x * %p\n", dst, c, src);*/
+    if (c != 0) mul1(dst, src, c, sz); else memset(dst, 0, c);
+}
+
+/*
+ * invert_mat() takes a matrix and produces its inverse
+ * k is the size of the matrix.
+ * (Gauss-Jordan, adapted from Numerical Recipes in C)
+ * Return non-zero if singular.
+ */
+DEB( int pivloops=0; int pivswaps=0 ; /* diagnostic */)
+static int
+invert_mat(gf *src, int k)
+{
+    gf c, *p ;
+    int irow, icol, row, col, i, ix ;
+
+    int error = 1 ;
+    int *indxc = malloc(k*sizeof(int));
+    int *indxr = malloc(k*sizeof(int));
+    int *ipiv = malloc(k*sizeof(int));
+    gf *id_row = malloc(k*sizeof(gf));
+//    int indxc[k];
+//    int indxr[k];
+//    int ipiv[k];
+//    gf id_row[k];
+
+    memset(id_row, 0, k*sizeof(gf));
+    DEB( pivloops=0; pivswaps=0 ; /* diagnostic */ )
+    /*
+     * ipiv marks elements already used as pivots.
+     */
+    for (i = 0; i < k ; i++)
+        ipiv[i] = 0 ;
+
+    for (col = 0; col < k ; col++) {
+        gf *pivot_row ;
+        /*
+         * Zeroing column 'col', look for a non-zero element.
+         * First try on the diagonal, if it fails, look elsewhere.
+         */
+        irow = icol = -1 ;
+        if (ipiv[col] != 1 && src[col*k + col] != 0) {
+            irow = col ;
+            icol = col ;
+            goto found_piv ;
+        }
+        for (row = 0 ; row < k ; row++) {
+            if (ipiv[row] != 1) {
+                for (ix = 0 ; ix < k ; ix++) {
+                    DEB( pivloops++ ; )
+                    if (ipiv[ix] == 0) {
+                        if (src[row*k + ix] != 0) {
+                            irow = row ;
+                            icol = ix ;
+                            goto found_piv ;
+                        }
+                    } else if (ipiv[ix] > 1) {
+                        fprintf(stderr, "singular matrix\n");
+                        goto fail ;
+                    }
+                }
+            }
+        }
+        if (icol == -1) {
+            fprintf(stderr, "XXX pivot not found!\n");
+            goto fail ;
+        }
+        found_piv:
+        ++(ipiv[icol]) ;
+        /*
+         * swap rows irow and icol, so afterwards the diagonal
+         * element will be correct. Rarely done, not worth
+         * optimizing.
+         */
+        if (irow != icol) {
+            for (ix = 0 ; ix < k ; ix++ ) {
+                SWAP( src[irow*k + ix], src[icol*k + ix], gf) ;
+            }
+        }
+        indxr[col] = irow ;
+        indxc[col] = icol ;
+        pivot_row = &src[icol*k] ;
+        c = pivot_row[icol] ;
+        if (c == 0) {
+            fprintf(stderr, "singular matrix 2\n");
+            goto fail ;
+        }
+        if (c != 1 ) { /* otherwhise this is a NOP */
+            /*
+             * this is done often , but optimizing is not so
+             * fruitful, at least in the obvious ways (unrolling)
+             */
+            DEB( pivswaps++ ; )
+            c = inverse[ c ] ;
+            pivot_row[icol] = 1 ;
+            for (ix = 0 ; ix < k ; ix++ )
+                pivot_row[ix] = gf_mul(c, pivot_row[ix] );
+        }
+        /*
+         * from all rows, remove multiples of the selected row
+         * to zero the relevant entry (in fact, the entry is not zero
+         * because we know it must be zero).
+         * (Here, if we know that the pivot_row is the identity,
+         * we can optimize the addmul).
+         */
+        id_row[icol] = 1;
+        if (memcmp(pivot_row, id_row, k*sizeof(gf)) != 0) {
+            for (p = src, ix = 0 ; ix < k ; ix++, p += k ) {
+                if (ix != icol) {
+                    c = p[icol] ;
+                    p[icol] = 0 ;
+                    addmul(p, pivot_row, c, k );
+                }
+            }
+        }
+        id_row[icol] = 0;
+    } /* done all columns */
+    for (col = k-1 ; col >= 0 ; col-- ) {
+        if (indxr[col] <0 || indxr[col] >= k)
+            fprintf(stderr, "AARGH, indxr[col] %d\n", indxr[col]);
+        else if (indxc[col] <0 || indxc[col] >= k)
+            fprintf(stderr, "AARGH, indxc[col] %d\n", indxc[col]);
+        else
+        if (indxr[col] != indxc[col] ) {
+            for (row = 0 ; row < k ; row++ ) {
+                SWAP( src[row*k + indxr[col]], src[row*k + indxc[col]], gf) ;
+            }
+        }
+    }
+    error = 0 ;
+    fail:
+    free(indxc);
+    free(indxr);
+    free(ipiv);
+    free(id_row);
+    return error ;
+}
+
+static int fec_initialized = 0 ;
+
+void fec_init(void)
+{
+    TICK(ticks[0]);
+    generate_gf();
+    TOCK(ticks[0]);
+    DDB(fprintf(stderr, "generate_gf took %ldus\n", ticks[0]);)
+    TICK(ticks[0]);
+    init_mul_table();
+    TOCK(ticks[0]);
+    DDB(fprintf(stderr, "init_mul_table took %ldus\n", ticks[0]);)
+    fec_initialized = 1 ;
+}
+
+
+#ifdef PROFILE
+#ifdef __x86_64__
+static long long rdtsc(void)
+{
+    unsigned long low, hi;
+    asm volatile ("rdtsc" : "=d" (hi), "=a" (low));
+    return ( (((long long)hi) << 32) | ((long long) low));
+}
+#elif __arm__
+static long long rdtsc(void)
+{
+    u64 val;
+    asm volatile("mrs %0, cntvct_el0" : "=r" (val));
+    return val;
+}
+#endif
+
+void print_matrix1(gf* matrix, int nrows, int ncols) {
+    int i, j;
+    printf("matrix (%d,%d):\n", nrows, ncols);
+    for(i = 0; i < nrows; i++) {
+        for(j = 0; j < ncols; j++) {
+            printf("%6d ", matrix[i*ncols + j]);
+        }
+        printf("\n");
+    }
+}
+
+void print_matrix2(gf** matrix, int nrows, int ncols) {
+    int i, j;
+    printf("matrix (%d,%d):\n", nrows, ncols);
+    for(i = 0; i < nrows; i++) {
+        for(j = 0; j < ncols; j++) {
+            printf("%6d ", matrix[i][j]);
+        }
+        printf("\n");
+    }
+}
+
+#endif
+
+/* y = a**n */
+static gf galExp(gf a, gf n) {
+    int logA;
+    int logResult;
+    if(0 == n) {
+        return 1;
+    }
+    if(0 == a) {
+        return 0;
+    }
+    logA = gf_log[a];
+    logResult = logA * n;
+    while(logResult >= 255) {
+        logResult -= 255;
+    }
+
+    return gf_exp[logResult];
+}
+
+static inline gf galMultiply(gf a, gf b) {
+    return gf_mul_table[ ((int)a << 8) + (int)b ];
+}
+
+static gf* vandermonde(int nrows, int ncols) {
+    int row, col, ptr;
+    gf* matrix = (gf*)RS_MALLOC(nrows * ncols);
+    if(NULL != matrix) {
+        ptr = 0;
+        for(row = 0; row < nrows; row++) {
+            for(col = 0; col < ncols; col++) {
+                matrix[ptr++] = galExp((gf)row, (gf)col);
+            }
+        }
+    }
+
+    return matrix;
+}
+
+/*
+ * Not check for input params
+ * */
+static gf* sub_matrix(gf* matrix, int rmin, int cmin, int rmax, int cmax,  int nrows, int ncols) {
+    int i, j, ptr = 0;
+    gf* new_m = (gf*)RS_MALLOC( (rmax-rmin) * (cmax-cmin) );
+    if(NULL != new_m) {
+        for(i = rmin; i < rmax; i++) {
+            for(j = cmin; j < cmax; j++) {
+                new_m[ptr++] = matrix[i*ncols + j];
+            }
+        }
+    }
+
+    return new_m;
+}
+
+/* y = a.dot(b) */
+static gf* multiply1(gf *a, int ar, int ac, gf *b, int br, int bc) {
+    gf *new_m, tg;
+    int r, c, i, ptr = 0;
+
+    assert(ac == br);
+    new_m = (gf*)RS_CALLOC(1, ar*bc);
+    if(NULL != new_m) {
+
+        /* this multiply is slow */
+        for(r = 0; r < ar; r++) {
+            for(c = 0; c < bc; c++) {
+                tg = 0;
+                for(i = 0; i < ac; i++) {
+                    /* tg ^= gf_mul_table[ ((int)a[r*ac+i] << 8) + (int)b[i*bc+c] ]; */
+                    tg ^= galMultiply(a[r*ac+i], b[i*bc+c]);
+                }
+
+                new_m[ptr++] = tg;
+            }
+        }
+
+    }
+
+    return new_m;
+}
+
+/* copy from golang rs version */
+static inline int code_some_shards(gf* matrixRows, gf** inputs, gf** outputs,
+                                   int dataShards, int outputCount, int byteCount) {
+    gf* in;
+    int iRow, c;
+    for(c = 0; c < dataShards; c++) {
+        in = inputs[c];
+        for(iRow = 0; iRow < outputCount; iRow++) {
+            if(0 == c) {
+                mul(outputs[iRow], in, matrixRows[iRow*dataShards+c], byteCount);
+            } else {
+                addmul(outputs[iRow], in, matrixRows[iRow*dataShards+c], byteCount);
+            }
+        }
+    }
+
+    return 0;
+}
+
+reed_solomon* reed_solomon_new(int data_shards, int parity_shards) {
+    gf* vm = NULL;
+    gf* top = NULL;
+    int err = 0;
+    reed_solomon* rs = NULL;
+
+    /* MUST use fec_init once time first */
+    assert(fec_initialized);
+
+    do {
+        rs = (reed_solomon*) RS_MALLOC(sizeof(reed_solomon));
+        if(NULL == rs) {
+            return NULL;
+        }
+        rs->data_shards = data_shards;
+        rs->parity_shards = parity_shards;
+        rs->shards = (data_shards + parity_shards);
+        rs->m = NULL;
+        rs->parity = NULL;
+
+        if(rs->shards > DATA_SHARDS_MAX || data_shards <= 0 || parity_shards <= 0) {
+            err = 1;
+            break;
+        }
+
+        vm = vandermonde(rs->shards, rs->data_shards);
+        if(NULL == vm) {
+            err = 2;
+            break;
+        }
+
+        top = sub_matrix(vm, 0, 0, data_shards, data_shards, rs->shards, data_shards);
+        if(NULL == top) {
+            err = 3;
+            break;
+        }
+
+        err = invert_mat(top, data_shards);
+        assert(0 == err);
+
+        rs->m = multiply1(vm, rs->shards, data_shards, top, data_shards, data_shards);
+        if(NULL == rs->m) {
+            err = 4;
+            break;
+        }
+
+        rs->parity = sub_matrix(rs->m, data_shards, 0, rs->shards, data_shards, rs->shards, data_shards);
+        if(NULL == rs->parity) {
+            err = 5;
+            break;
+        }
+
+        RS_FREE(vm);
+        RS_FREE(top);
+        vm = NULL;
+        top = NULL;
+        return rs;
+
+    } while(0);
+
+    fprintf(stderr, "err=%d\n", err);
+    if(NULL != vm) {
+        RS_FREE(vm);
+    }
+    if(NULL != top) {
+        RS_FREE(top);
+    }
+    if(NULL != rs) {
+        if(NULL != rs->m) {
+            RS_FREE(rs->m);
+        }
+        if(NULL != rs->parity) {
+            RS_FREE(rs->parity);
+        }
+        RS_FREE(rs);
+    }
+
+    return NULL;
+}
+
+void reed_solomon_release(reed_solomon* rs) {
+    if(NULL != rs) {
+        if(NULL != rs->m) {
+            RS_FREE(rs->m);
+        }
+        if(NULL != rs->parity) {
+            RS_FREE(rs->parity);
+        }
+        RS_FREE(rs);
+    }
+}
+
+/**
+ * encode one shard
+ * input:
+ * rs
+ * data_blocks[rs->data_shards][block_size]
+ * fec_blocks[rs->data_shards][block_size]
+ * */
+int reed_solomon_encode(reed_solomon* rs,
+                        unsigned char** data_blocks,
+                        unsigned char** fec_blocks,
+                        int block_size) {
+    assert(NULL != rs && NULL != rs->parity);
+
+    return code_some_shards(rs->parity, data_blocks, fec_blocks
+            , rs->data_shards, rs->parity_shards, block_size);
+}
+
+/**
+ * decode one shard
+ * input:
+ * rs
+ * original data_blocks[rs->data_shards][block_size]
+ * dec_fec_blocks[nr_fec_blocks][block_size]
+ * fec_block_nos: fec pos number in original fec_blocks
+ * erased_blocks: erased blocks in original data_blocks
+ * nr_fec_blocks: the number of erased blocks
+ * */
+int reed_solomon_decode(reed_solomon* rs,
+                        unsigned char **data_blocks,
+                        int block_size,
+                        unsigned char **dec_fec_blocks,
+                        unsigned int *fec_block_nos,
+                        unsigned int *erased_blocks,
+                        int nr_fec_blocks) {
+    /* use stack instead of malloc, define a small number of DATA_SHARDS_MAX to save memory */
+    gf dataDecodeMatrix[DATA_SHARDS_MAX*DATA_SHARDS_MAX];
+    unsigned char* subShards[DATA_SHARDS_MAX];
+    unsigned char* outputs[DATA_SHARDS_MAX];
+    gf* m = rs->m;
+    int i, j, c, swap, subMatrixRow, dataShards, nos, nshards;
+
+    /* the erased_blocks should always sorted
+     * if sorted, nr_fec_blocks times to check it
+     * if not, sort it here
+     * */
+    for(i = 0; i < nr_fec_blocks; i++) {
+        swap = 0;
+        for(j = i+1; j < nr_fec_blocks; j++) {
+            if(erased_blocks[i] > erased_blocks[j]) {
+                /* the prefix is bigger than the following, swap */
+                c = erased_blocks[i];
+                erased_blocks[i] = erased_blocks[j];
+                erased_blocks[j] = c;
+
+                swap = 1;
+            }
+        }
+        //printf("swap:%d\n", swap);
+        if(!swap) {
+            //already sorted or sorted ok
+            break;
+        }
+    }
+
+    j = 0;
+    subMatrixRow = 0;
+    nos = 0;
+    nshards = 0;
+    dataShards = rs->data_shards;
+    for(i = 0; i < dataShards; i++) {
+        if(j < nr_fec_blocks && i == erased_blocks[j]) {
+            //ignore the invalid block
+            j++;
+        } else {
+            /* this row is ok */
+            for(c = 0; c < dataShards; c++) {
+                dataDecodeMatrix[subMatrixRow*dataShards + c] = m[i*dataShards + c];
+            }
+            subShards[subMatrixRow] = data_blocks[i];
+            subMatrixRow++;
+        }
+    }
+
+    for(i = 0; i < nr_fec_blocks && subMatrixRow < dataShards; i++) {
+        subShards[subMatrixRow] = dec_fec_blocks[i];
+        j = dataShards + fec_block_nos[i];
+        for(c = 0; c < dataShards; c++) {
+            dataDecodeMatrix[subMatrixRow*dataShards + c] = m[j*dataShards + c]; //use spefic pos of original fec_blocks
+        }
+        subMatrixRow++;
+    }
+
+    if(subMatrixRow < dataShards) {
+        //cannot correct
+        return -1;
+    }
+
+    invert_mat(dataDecodeMatrix, dataShards);
+    //printf("invert:\n");
+    //print_matrix1(dataDecodeMatrix, dataShards, dataShards);
+    //printf("nShards:\n");
+    //print_matrix2(subShards, dataShards, block_size);
+
+    for(i = 0; i < nr_fec_blocks; i++) {
+        j = erased_blocks[i];
+        outputs[i] = data_blocks[j];
+        //data_blocks[j][0] = 0;
+        memmove(dataDecodeMatrix+i*dataShards, dataDecodeMatrix+j*dataShards, dataShards);
+    }
+    //printf("subMatrixRow:\n");
+    //print_matrix1(dataDecodeMatrix, nr_fec_blocks, dataShards);
+
+    //printf("outputs:\n");
+    //print_matrix2(outputs, nr_fec_blocks, block_size);
+
+    return code_some_shards(dataDecodeMatrix, subShards, outputs,
+                            dataShards, nr_fec_blocks, block_size);
+}
+
+/**
+ * encode a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->shards)
+ * shards[nr_shards][block_size]
+ * */
+int reed_solomon_encode2(reed_solomon* rs, unsigned char** shards, int nr_shards, int block_size) {
+    unsigned char** data_blocks;
+    unsigned char** fec_blocks;
+    int i, ds = rs->data_shards, ps = rs->parity_shards, ss = rs->shards;
+    i = nr_shards / ss;
+    data_blocks = shards;
+    fec_blocks = &shards[(i*ds)];
+
+    for(i = 0; i < nr_shards; i += ss) {
+        reed_solomon_encode(rs, data_blocks, fec_blocks, block_size);
+        data_blocks += ds;
+        fec_blocks += ps;
+    }
+    return 0;
+}
+
+/**
+ * reconstruct a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * marks[nr_shards] marks as errors
+ * */
+int reed_solomon_reconstruct(reed_solomon* rs,
+                             unsigned char** shards,
+                             unsigned char* marks,
+                             int nr_shards,
+                             int block_size) {
+    unsigned char *dec_fec_blocks[DATA_SHARDS_MAX];
+    unsigned int fec_block_nos[DATA_SHARDS_MAX];
+    unsigned int erased_blocks[DATA_SHARDS_MAX];
+    unsigned char* fec_marks;
+    unsigned char **data_blocks, **fec_blocks;
+    int i, j, dn, pn, n;
+    int ds = rs->data_shards;
+    int ps = rs->parity_shards;
+    int err = 0;
+
+    data_blocks = shards;
+    n = nr_shards / rs->shards;
+    fec_marks = marks + n*ds; //after all data, is't fec marks
+    fec_blocks = shards + n*ds;
+
+    for(j = 0; j < n; j++) {
+        dn = 0;
+        for(i = 0; i < ds; i++) {
+            if(marks[i]) {
+                //errors
+                erased_blocks[dn++] = i;
+            }
+        }
+        if(dn > 0) {
+            pn = 0;
+            for(i = 0; i < ps && pn < dn; i++) {
+                if(!fec_marks[i]) {
+                    //got valid fec row
+                    fec_block_nos[pn] = i;
+                    dec_fec_blocks[pn] = fec_blocks[i];
+                    pn++;
+                }
+            }
+
+            if(dn == pn) {
+                reed_solomon_decode(rs
+                        , data_blocks
+                        , block_size
+                        , dec_fec_blocks
+                        , fec_block_nos
+                        , erased_blocks
+                        , dn);
+            } else {
+                //error but we continue
+                err = -1;
+            }
+        }
+        data_blocks += ds;
+        marks += ds;
+        fec_blocks += ps;
+        fec_marks += ps;
+    }
+
+    return err;
+}

+ 88 - 0
src/network/impl/fragment/third_party/rs.h

@@ -0,0 +1,88 @@
+#ifndef __RS_H_
+#define __RS_H_
+
+/* use small value to save memory */
+#ifndef DATA_SHARDS_MAX
+#define DATA_SHARDS_MAX (255)
+#endif
+
+/* use other memory allocator */
+#ifndef RS_MALLOC
+#define RS_MALLOC(x)    malloc(x)
+#endif
+
+#ifndef RS_FREE
+#define RS_FREE(x)      free(x)
+#endif
+
+#ifndef RS_CALLOC
+#define RS_CALLOC(n, x) calloc(n, x)
+#endif
+
+typedef struct _reed_solomon {
+    int data_shards;
+    int parity_shards;
+    int shards;
+    unsigned char* m;
+    unsigned char* parity;
+} reed_solomon;
+
+/**
+ * MUST initial one time
+ * */
+void fec_init(void);
+
+reed_solomon* reed_solomon_new(int data_shards, int parity_shards);
+void reed_solomon_release(reed_solomon* rs);
+
+/**
+ * encode one shard
+ * input:
+ * rs
+ * data_blocks[rs->data_shards][block_size]
+ * fec_blocks[rs->data_shards][block_size]
+ * */
+int reed_solomon_encode(reed_solomon* rs,
+                        unsigned char** data_blocks,
+                        unsigned char** fec_blocks,
+                        int block_size);
+
+
+/**
+ * decode one shard
+ * input:
+ * rs
+ * original data_blocks[rs->data_shards][block_size]
+ * dec_fec_blocks[nr_fec_blocks][block_size]
+ * fec_block_nos: fec pos number in original fec_blocks
+ * erased_blocks: erased blocks in original data_blocks
+ * nr_fec_blocks: the number of erased blocks
+ * */
+int reed_solomon_decode(reed_solomon* rs,
+                        unsigned char **data_blocks,
+                        int block_size,
+                        unsigned char **dec_fec_blocks,
+                        unsigned int *fec_block_nos,
+                        unsigned int *erased_blocks,
+                        int nr_fec_blocks);
+
+/**
+ * encode a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * */
+int reed_solomon_encode2(reed_solomon* rs, unsigned char** shards, int nr_shards, int block_size);
+
+/**
+ * reconstruct a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * marks[nr_shards] marks as errors
+ * */
+int reed_solomon_reconstruct(reed_solomon* rs, unsigned char** shards, unsigned char* marks, int nr_shards, int block_size);
+#endif
+

+ 132 - 0
src/network/impl/impl_utility.hpp

@@ -0,0 +1,132 @@
+#ifndef DEPTHGUIDE_IMPL_UTILITY_HPP
+#define DEPTHGUIDE_IMPL_UTILITY_HPP
+
+#include "network/network.h"
+#include "core/utility.hpp"
+
+#include <boost/container/flat_map.hpp>
+#include <boost/container/static_vector.hpp>
+
+template<typename T>
+void set_min(T &x, const T &y) { if (y < x) x = y; }
+
+template<typename T>
+void set_max(T &x, const T &y) { if (y > x) x = y; }
+
+template<typename HeaderType>
+data_type encode_header(const data_type &data, HeaderType header) {
+    auto header_data = data.sub_data(-HeaderType::extra_size);
+    auto writer = network_writer(header_data);
+    writer << HeaderType::header_size;
+    header.write_to(writer);
+    assert(writer.empty());
+    return header_data.extend(data.size);
+}
+
+template<typename HeaderType>
+data_type decode_header(const data_type &data, HeaderType *header) {
+    auto reader = network_reader(data);
+    auto header_size = reader.read_value<typename HeaderType::header_size_type>();
+    if (header_size != HeaderType::header_size) [[unlikely]] { RET_ERROR_E; }
+    header->fill_from(reader);
+    return reader.read_remain();
+}
+
+template<typename K, typename T, size_t N>
+using static_flat_map = boost::container::flat_map<K, T, std::less<K>,
+        boost::container::static_vector<std::pair<K, T>, N>>;
+
+class filter_channel : public data_channel {
+public:
+    struct create_config {
+        data_channel_type low_channel;
+    };
+
+    explicit filter_channel(create_config conf) {
+        low = std::move(conf.low_channel);
+        low->set_recv_func([this](const data_type &data) {
+            auto dec_data = decode_data(data);
+            if (!dec_data.empty()) [[likely]] {
+                recv_cb_func(data);
+            }
+        });
+    }
+
+    bool send(const data_type &data) override {
+        auto enc_data = encode_data(data);
+        if (!enc_data.empty()) [[likely]] {
+            return low->send(enc_data);
+        }
+        return true;
+    }
+
+    size_t header_size() const override {
+        if (header_size_cache == -1) [[unlikely]] {
+            header_size_cache = low->header_size() + extra_header_size();
+        }
+        return header_size_cache;
+    }
+
+    size_t max_length() const override {
+        if (max_length_cache == 0) [[unlikely]] {
+            if (low->max_length() == SIZE_MAX) { // unlimited
+                max_length_cache = SIZE_MAX;
+            } else {
+                max_length_cache = low->max_length() - extra_header_size();
+            }
+        }
+        return max_length_cache;
+    }
+
+protected:
+
+    virtual data_type encode_data(const data_type &data) = 0;
+
+    virtual data_type decode_data(const data_type &data) = 0;
+
+    // header size of this data channel
+    virtual size_t extra_header_size() const = 0;
+
+private:
+    data_channel_type low;
+
+    mutable size_t max_length_cache = 0;
+    mutable size_t header_size_cache = -1;
+};
+
+class bypass_channel : public filter_channel {
+public:
+
+    using filter_channel::filter_channel;
+
+protected:
+
+    data_type encode_data(const data_type &data) override { return data; }
+
+    data_type decode_data(const data_type &data) override { return data; }
+
+    size_t extra_header_size() const override { return 0; }
+};
+
+class black_hole_channel : public data_channel {
+public:
+    bool send(const data_type &data) override { return true; }
+
+    size_t max_length() const override { return SIZE_MAX; };
+
+    size_t header_size() const override { return 0; }
+};
+
+class loop_channel : public data_channel {
+public:
+    bool send(const data_type &data) override {
+        recv_cb_func(data);
+        return true;
+    }
+
+    size_t max_length() const override { return SIZE_MAX; }
+
+    size_t header_size() const override { return 0; }
+};
+
+#endif //DEPTHGUIDE_IMPL_UTILITY_HPP

+ 61 - 0
src/network/impl/multiplexer.cpp

@@ -0,0 +1,61 @@
+#include "multiplexer_impl.h"
+#include "network/binary_utility.hpp"
+#include "core/utility.hpp"
+
+void multiplexer::demux_data(const data_type &data) {
+    vc_header_type header;
+    auto dec_data = decode_header(data, &header);
+    if (dec_data.empty()) [[unlikely]] return;
+
+    auto iter = vc_pool_recv.find(header.vc_id);
+    if (iter == vc_pool_recv.end()) [[unlikely]] { RET_ERROR_N; }
+    auto &vc_info = iter->second;
+    assert(vc_info.vc_id == header.vc_id);
+    if (vc_info.option != header.option) { RET_ERROR_N; }
+    vc_info.recv_func(dec_data);
+}
+
+bool multiplexer::send(uint8_t vc_id, const data_type &data) {
+    auto iter = vc_pool_send.find(vc_id);
+    assert(iter != vc_pool_send.end());
+    auto header = iter->second.to_header();
+    return low->send(encode_header(data, header));
+}
+
+void multiplexer::reg_vc_send(const vc_info &info) {
+    assert(!vc_pool_send.contains(info.vc_id));
+    vc_pool_send.emplace(info.vc_id, info);
+}
+
+void multiplexer::reg_vc_recv(const vc_info &info) {
+    assert(!vc_pool_recv.contains(info.vc_id));
+    vc_pool_recv.emplace(info.vc_id, info);
+}
+
+void multiplexer::un_reg_vc_send(uint8_t vc_id) {
+    assert(vc_pool_send.contains(vc_id));
+    vc_pool_send.erase(vc_id);
+}
+
+void multiplexer::un_reg_vc_recv(uint8_t vc_id) {
+    assert(vc_pool_recv.contains(vc_id));
+    vc_pool_recv.erase(vc_id);
+}
+
+size_t multiplexer::max_length() const {
+    if (max_length_cache == 0) [[unlikely]] {
+        if (low->max_length() == SIZE_MAX) { // unlimited
+            max_length_cache = SIZE_MAX;
+        } else {
+            max_length_cache = low->max_length() - vc_header_type::extra_size;
+        }
+    }
+    return max_length_cache;
+}
+
+size_t multiplexer::header_size() const {
+    if (header_size_cache == -1) [[unlikely]] {
+        header_size_cache = low->header_size() + vc_header_type::extra_size;
+    }
+    return header_size_cache;
+}

+ 156 - 0
src/network/impl/multiplexer_impl.h

@@ -0,0 +1,156 @@
+#ifndef DEPTHGUIDE_MULTIPLEXER_IMPL_H
+#define DEPTHGUIDE_MULTIPLEXER_IMPL_H
+
+#include "impl_utility.hpp"
+
+class mux_channel;
+
+/* [1 byte ] header size (=3)
+ * [1 byte ] version     (=1)
+ * [1 byte ] channel id
+ * [1 byte ] option
+ * [n bytes] data */
+class multiplexer {
+public:
+
+    static constexpr auto VERSION = 1;
+
+    struct create_config {
+        data_channel_type low_channel;
+    };
+
+    explicit multiplexer(create_config conf) {
+        low = std::move(conf.low_channel);
+        low->set_recv_func([this](auto data) { demux_data(data); });
+    }
+
+private:
+
+    struct vc_header_type {
+        uint8_t version = VERSION;
+        uint8_t vc_id = 0;
+        uint8_t option = 0;
+
+        using header_size_type = uint8_t;
+        static constexpr header_size_type header_size = 3 * sizeof(uint8_t);
+        static constexpr size_t extra_size = header_size + sizeof(header_size);
+
+        template<typename ReaderType>
+        void fill_from(ReaderType &reader) {
+            reader >> version >> vc_id >> option;
+        }
+
+        template<typename WriterType>
+        void write_to(WriterType &writer) {
+            writer << version << vc_id << option;
+        }
+    };
+
+    using recv_func_type = data_channel::recv_func_type;
+
+    struct vc_info {
+        uint8_t vc_id = 0;
+        uint8_t option = 0;
+        recv_func_type recv_func;
+
+        vc_header_type to_header() const {
+            return {.vc_id = vc_id, .option=option};
+        }
+    };
+
+    static constexpr auto max_vc_count = 16; // TODO: select in compile option
+    using vc_pool_type = static_flat_map<uint8_t, vc_info, max_vc_count>;
+    vc_pool_type vc_pool_send;
+    vc_pool_type vc_pool_recv;
+
+    data_channel_type low;
+
+    // de-multiplex data
+    void demux_data(const data_type &data);
+
+    // for mux_channel
+
+    bool send(uint8_t vc_id, const data_type &data);
+
+    void reg_vc_send(const vc_info &info);
+
+    void reg_vc_recv(const vc_info &info);
+
+    void un_reg_vc_send(uint8_t vc_id);
+
+    void un_reg_vc_recv(uint8_t vc_id);
+
+    size_t max_length() const;
+
+    size_t header_size() const;
+
+    friend class mux_channel;
+
+    mutable size_t max_length_cache = 0;
+    mutable size_t header_size_cache = -1;
+};
+
+using multiplexer_type = std::shared_ptr<multiplexer>;
+
+enum mux_mode {
+    SEND_ONLY = 1,
+    RECV_ONLY = 2,
+    MUX_BOTH = SEND_ONLY | RECV_ONLY
+};
+
+class mux_channel : public data_channel {
+public:
+
+    struct create_config {
+        multiplexer_type mux;
+        uint8_t vc_id; // virtual channel id
+        uint8_t option;
+        mux_mode mode;
+    };
+
+    explicit mux_channel(const create_config &conf) {
+        mux = conf.mux;
+        vc_id = conf.vc_id;
+        mode = conf.mode;
+
+        if (mode & SEND_ONLY) {
+            mux->reg_vc_send({vc_id, conf.option});
+        }
+        if (mode & RECV_ONLY) {
+            auto recv_func = [this](auto data) { recv_cb_func(data); };
+            mux->reg_vc_recv({vc_id, conf.option, recv_func});
+        }
+    }
+
+    ~mux_channel() override {
+        if (mode & SEND_ONLY) {
+            mux->un_reg_vc_send(vc_id);
+        }
+        if (mode & RECV_ONLY) {
+            mux->un_reg_vc_recv(vc_id);
+        }
+    }
+
+    bool send(const data_type &data) override {
+        if (mode & SEND_ONLY) {
+            return mux->send(vc_id, data);
+        }
+        assert(false);
+        return true;
+    }
+
+    size_t max_length() const override {
+        return mux->max_length();
+    }
+
+    size_t header_size() const override {
+        return mux->header_size();
+    }
+
+private:
+    multiplexer_type mux;
+    uint8_t vc_id;
+    mux_mode mode;
+};
+
+#endif //DEPTHGUIDE_MULTIPLEXER_IMPL_H

+ 0 - 0
src/network/impl/network.cpp


+ 20 - 0
src/network/impl/network_impl.h

@@ -0,0 +1,20 @@
+#ifndef DEPTHGUIDE_NETWORK_IMPL_H
+#define DEPTHGUIDE_NETWORK_IMPL_H
+
+#include "network/network.h"
+#include "multiplexer_impl.h"
+
+struct virtual_channel_manager::impl {
+
+    multiplexer_type mux;
+
+};
+
+virtual_channel_manager::virtual_channel_manager(create_config conf)
+        : pimpl(std::make_unique<impl>()) {
+    pimpl->mux = std::make_shared<multiplexer>(
+            multiplexer::create_config{
+                    .low_channel = std::move(conf.low_channel)});
+}
+
+#endif //DEPTHGUIDE_NETWORK_IMPL_H

+ 94 - 0
src/network/network.h

@@ -0,0 +1,94 @@
+#ifndef DEPTHGUIDE_NETWORK_H
+#define DEPTHGUIDE_NETWORK_H
+
+#include "binary_utility.hpp"
+
+#include <boost/asio/io_context.hpp>
+
+#include <functional>
+#include <memory>
+
+class data_channel {
+public:
+
+    virtual ~data_channel() = default;
+
+    // return false to indicate channel close
+    virtual bool send(const data_type &data) = 0;
+
+    using recv_func_type = std::function<void(data_type)>;
+
+    void set_recv_func(const recv_func_type &func) {
+        recv_cb_func = func;
+    }
+
+    // max length of one send call
+    virtual size_t max_length() const = 0;
+
+    virtual size_t header_size() const = 0;
+
+protected:
+    recv_func_type recv_cb_func = [](auto) {};
+};
+
+using data_channel_type = std::unique_ptr<data_channel>;
+
+enum channel_encrypt_method {
+    ENCRYPT_NONE = 0,
+};
+
+enum channel_fragment_method {
+    FRAG_NONE = 0,
+    FRAG_BASIC = 1,
+    FRAG_RS_V1 = 2,
+};
+
+class virtual_channel_manager {
+public:
+
+    struct create_config {
+        data_channel_type low_channel;
+    };
+
+    explicit virtual_channel_manager(create_config conf);
+
+    ~virtual_channel_manager();
+
+    struct virtual_channel_create_config {
+        uint8_t channel_id = 0;
+        channel_encrypt_method enc_method = ENCRYPT_NONE;
+        channel_fragment_method frag_method = FRAG_NONE;
+    };
+
+    data_channel_type create_virtual_channel(virtual_channel_create_config conf);
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+enum low_level_channel_type {
+    CHAN_TCP,
+    CHAN_UDP
+};
+
+enum low_level_create_method {
+    CREATE_ACTIVE,
+    CREATE_PASSIVE
+};
+
+struct low_level_channel_create_config {
+    low_level_channel_type type = CHAN_TCP;
+    low_level_create_method method = CREATE_ACTIVE;
+    boost::asio::io_context *ctx = nullptr;
+    const char *address = nullptr; // for CREATE_ACTIVE
+    uint16_t port = 0;
+};
+
+using data_channel_create_cb_type
+        = std::function<void(data_channel_type)>;
+
+void create_low_level_channel(low_level_channel_create_config conf,
+                              data_channel_create_cb_type cb);
+
+#endif //DEPTHGUIDE_NETWORK_H

+ 1 - 1
src/render/render_tools.h

@@ -1,7 +1,7 @@
 #ifndef DEPTHGUIDE_RENDER_TOOLS_H
 #define DEPTHGUIDE_RENDER_TOOLS_H
 
-#include "object_manager.h"
+#include "core/object_manager.h"
 #include "render_utility.h"
 
 simple_rect calc_render_range(cv::Size img_size);

+ 2 - 2
src/render/render_utility.h

@@ -1,8 +1,8 @@
 #ifndef DEPTHGUIDE_RENDER_UTILITY_H
 #define DEPTHGUIDE_RENDER_UTILITY_H
 
-#include "cuda_helper.hpp"
-#include "image_utility.hpp"
+#include "core/cuda_helper.hpp"
+#include "core/image_utility.hpp"
 
 #include <glad/gl.h>
 #include <glm/glm.hpp>