Explorar o código

Merged NvDec and UDP (FEC) receiver.

jcsyshc hai 1 ano
pai
achega
83b81a01c1

+ 17 - 1
CMakeLists.txt

@@ -16,6 +16,8 @@ add_executable(${PROJECT_NAME} src/main.cpp
         src/module/impl/image_streamer.cpp
         src/network_v3/sender_tcp.cpp
         src/network_v3/sender_udp_fec.cpp
+#        src/network_v3/receiver_tcp.cpp
+        src/network_v3/receiver_udp_fec.cpp
         #        src/network/impl/crc_checker.cpp
         src/network/impl/fragment/third_party/rs.c
         #        src/network/impl/fragment/frag_basic.cpp
@@ -162,6 +164,20 @@ target_include_directories(${PROJECT_NAME} PRIVATE ${NVCODEC_INCLUDE_DIR})
 target_link_libraries(${PROJECT_NAME} ${NVENC_LIB})
 target_sources(${PROJECT_NAME} PRIVATE src/codec/encoder_nvenc.cpp)
 
+# NvDec config
+if (WIN32)
+    set(NVCODEC_DIR C:/BuildEssentials/CUDA/Video_Codec_SDK_12.0.16)
+    find_library(NVDEC_LIB nvcuvid HINTS ${NVCODEC_DIR}/Lib/x64)
+else ()
+    set(NVCODEC_DIR /home/tpx/src/Video_Codec_SDK_12.0.16)
+    find_library(NVDEC_LIB nvcuvid)
+endif ()
+set(NVCODEC_INCLUDE_DIR ${NVCODEC_DIR}/Interface)
+target_include_directories(${PROJECT_NAME} PRIVATE ${NVCODEC_INCLUDE_DIR})
+target_link_libraries(${PROJECT_NAME} ${NVDEC_LIB})
+target_sources(${PROJECT_NAME} PRIVATE src/codec/decoder_nvdec.cpp)
+
 # nvJPEG config
 target_link_libraries(${PROJECT_NAME} CUDA::nvjpeg)
-#target_sources(${PROJECT_NAME} PRIVATE src/codec/encoder_jpeg.cpp)
+#target_sources(${PROJECT_NAME} PRIVATE src/codec/encoder_jpeg.cpp)
+#target_sources(${PROJECT_NAME} PRIVATE src/codec/decoder_nvjpeg.cpp)

+ 5 - 0
src/codec/codec_base.hpp

@@ -8,6 +8,11 @@ enum encoder_type {
     ENCODER_JPEG
 };
 
+enum decoder_type {
+    DECODER_NVDEC,
+    DECODER_JPEG
+};
+
 struct frame_info {
     data_type data;
     bool idr = false;

+ 153 - 0
src/codec/decoder_nvdec.cpp

@@ -0,0 +1,153 @@
+#include "decoder_nvdec.h"
+#include "core/cuda_helper.hpp"
+#include "core/image_utility.hpp"
+
+#include <nvcuvid.h>
+
+struct decoder_nvdec::impl {
+
+    create_config conf;
+
+    CUvideoparser parser = nullptr;
+    CUvideodecoder decoder = nullptr;
+
+    uint8_t decode_surface;
+    cv::Size frame_size;
+
+    impl(create_config _conf) {
+        conf = _conf;
+
+        // query decoder capability
+        CUVIDDECODECAPS caps = {};
+        caps.eCodecType = cudaVideoCodec_HEVC;
+        caps.eChromaFormat = cudaVideoChromaFormat_420;
+        caps.nBitDepthMinus8 = 0; // 8-bit
+        CUDA_API_CHECK(cuvidGetDecoderCaps(&caps));
+
+        // check decoder capability
+        CALL_CHECK(caps.bIsSupported == 1);
+        CALL_CHECK(caps.nOutputFormatMask & (1 << cudaVideoSurfaceFormat_NV12));
+
+        // create parser
+        CUVIDPARSERPARAMS params = {};
+        params.CodecType = cudaVideoCodec_HEVC;
+        params.ulMaxNumDecodeSurfaces = 1; // dummy value according to document
+        params.ulMaxDisplayDelay = 0; // no delay
+        params.pUserData = this;
+        params.pfnSequenceCallback = sequence_callback;
+        params.pfnDecodePicture = ready_decode;
+        assert(parser == nullptr);
+        CUDA_API_CHECK(cuvidCreateVideoParser(&parser, &params));
+    }
+
+    void create_decoder() {
+        CUVIDDECODECREATEINFO decoder_info = {};
+        decoder_info.ulWidth = frame_size.width;
+        decoder_info.ulHeight = frame_size.height;
+        decoder_info.ulNumDecodeSurfaces = decode_surface;
+        decoder_info.CodecType = cudaVideoCodec_HEVC;
+        decoder_info.ChromaFormat = cudaVideoChromaFormat_420;
+        decoder_info.ulCreationFlags = cudaVideoCreate_PreferCUVID;
+        decoder_info.bitDepthMinus8 = 0; // 8-bit
+        decoder_info.OutputFormat = cudaVideoSurfaceFormat_NV12;
+        decoder_info.DeinterlaceMode = cudaVideoDeinterlaceMode_Weave;
+        decoder_info.ulTargetWidth = frame_size.width;
+        decoder_info.ulTargetHeight = frame_size.height;
+        decoder_info.ulNumOutputSurfaces = 2; // TODO; learn more about this
+
+        CUDA_API_CHECK(cuvidCreateDecoder(&decoder, &decoder_info));
+        assert(decoder != nullptr);
+    }
+
+    int sequence_callback_impl(CUVIDEOFORMAT *format) {
+        // ensure consistency
+        assert(format->codec == cudaVideoCodec_HEVC);
+        assert(format->progressive_sequence == 1); // progressive
+        assert(format->bit_depth_luma_minus8 == 0); // 8-bit
+        assert(format->bit_depth_chroma_minus8 == 0); // 8-bit
+        assert(format->chroma_format == cudaVideoChromaFormat_420);
+
+        if (decoder == nullptr) {
+            frame_size.width = format->coded_width;
+            frame_size.height = format->coded_height;
+            decode_surface = format->min_num_decode_surfaces + 4;
+            create_decoder();
+        } else {
+            assert(format->coded_width == frame_size.width);
+            assert(format->coded_height == frame_size.height);
+        }
+
+        return decode_surface;
+    }
+
+    static int sequence_callback(void *ptr, CUVIDEOFORMAT *format) {
+        assert(ptr != nullptr);
+        return ((impl *) ptr)->sequence_callback_impl(format);
+    }
+
+    int ready_decode_impl(CUVIDPICPARAMS *pic) {
+        // decode image
+        assert(decoder != nullptr);
+        CUDA_API_CHECK(cuvidDecodePicture(decoder, pic));
+
+        // map frame
+        CUdeviceptr ptr_in;
+        unsigned int pitch_in;
+        CUVIDPROCPARAMS proc_params = {};
+        proc_params.progressive_frame = 1; // progressive frame
+        proc_params.second_field = 1;
+        proc_params.output_stream = conf.stream->cuda;
+        assert(decoder != nullptr);
+        CUDA_API_CHECK(cuvidMapVideoFrame(decoder, pic->CurrPicIdx, &ptr_in, &pitch_in, &proc_params));
+        assert(ptr_in != 0);
+
+        // check decode status
+        CUVIDGETDECODESTATUS status = {};
+        CUDA_API_CHECK(cuvidGetDecodeStatus(decoder, pic->CurrPicIdx, &status));
+        CALL_CHECK(status.decodeStatus == cuvidDecodeStatus_Success);
+
+        auto img_size = img_size_to_nv12(frame_size);
+        auto img_info = create_image_info<uchar1>(img_size, MEM_CUDA);
+
+        // copy frame
+        auto luma_in = (void *) ptr_in;
+        auto luma_out = img_info.start_ptr();
+        CUDA_API_CHECK(cudaMemcpy2DAsync(luma_out, img_info.pitch, luma_in, pitch_in,
+                                         frame_size.width, frame_size.height, cudaMemcpyDeviceToDevice));
+        auto chroma_in = (char *) ptr_in + pitch_in * ((frame_size.height + 1) & ~1);
+        auto chroma_out = img_info.start_ptr(frame_size.height);
+        CUDA_API_CHECK(cudaMemcpy2D(chroma_out, img_info.pitch, chroma_in, pitch_in,
+                                    frame_size.width, frame_size.height >> 1, cudaMemcpyDeviceToDevice));
+
+        // unmap frame
+        CUDA_API_CHECK(cuvidUnmapVideoFrame(decoder, ptr_in));
+
+        // commit frame
+        OBJ_SAVE(conf.img_name, create_image(img_info));
+
+        return 1; // success
+    }
+
+    static int ready_decode(void *ptr, CUVIDPICPARAMS *pic) {
+        assert(ptr != nullptr);
+        return ((impl *) ptr)->ready_decode_impl(pic);
+    }
+
+    void decode(const frame_info &frame) {
+        CUVIDSOURCEDATAPACKET packet = {};
+        packet.flags = CUVID_PKT_ENDOFPICTURE;
+        packet.payload_size = frame.size();
+        packet.payload = frame.start_ptr();
+        assert(parser != nullptr);
+        CUDA_API_CHECK(cuvidParseVideoData(parser, &packet));
+    }
+};
+
+decoder_nvdec::decoder_nvdec(create_config conf)
+        : pimpl(std::make_unique<impl>(conf)) {}
+
+decoder_nvdec::~decoder_nvdec() = default;
+
+void decoder_nvdec::decode(const frame_info &frame) {
+    pimpl->decode(frame);
+}

+ 30 - 0
src/codec/decoder_nvdec.h

@@ -0,0 +1,30 @@
+#ifndef TINYPLAYER3_DECODER_NVDEC_H
+#define TINYPLAYER3_DECODER_NVDEC_H
+
+#include "codec_base.hpp"
+#include "core/cuda_helper.hpp"
+#include "core/object_manager.h"
+
+#include <memory>
+
+class decoder_nvdec {
+public:
+
+    struct create_config {
+        obj_name_type img_name = invalid_obj_name; // image_u8c1 (nv12)
+        smart_cuda_stream *stream = nullptr;
+    };
+
+    explicit decoder_nvdec(create_config conf);
+
+    ~decoder_nvdec();
+
+    void decode(const frame_info &frame);
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //TINYPLAYER3_DECODER_NVDEC_H

+ 78 - 0
src/codec/decoder_nvjpeg.cpp

@@ -0,0 +1,78 @@
+#include "decoder_nvjpeg.h"
+#include "cuda_helper.hpp"
+#include "simple_mq.h"
+#include "variable_defs.h"
+
+#include <nvjpeg.h>
+
+bool check_nvjpeg_api_call(nvjpegStatus_t api_ret, unsigned int line_number,
+                           const char *file_name, const char *api_call_str) {
+    if (api_ret == NVJPEG_STATUS_SUCCESS) [[likely]] return true;
+    SPDLOG_ERROR("nvJPEG api call {} failed at {}:{} with error 0x{:x}.",
+                 api_call_str, file_name, line_number, (int) api_ret);
+    RET_ERROR_B;
+}
+
+#define API_CHECK(api_call) \
+    check_nvjpeg_api_call( \
+        api_call, __LINE__, __FILE__, #api_call)
+
+namespace decoder_nvjpeg_impl {
+    nvjpegHandle_t handle = nullptr;
+}
+
+using namespace decoder_nvjpeg_impl;
+using namespace simple_mq_singleton;
+
+struct decoder_nvjpeg::impl {
+
+    nvjpegJpegState_t dec_state = nullptr;
+    cv::cuda::GpuMat img_rgb;
+
+    impl() {
+        if (handle == nullptr) [[unlikely]] {
+            API_CHECK(nvjpegCreateSimple(&handle));
+        }
+        API_CHECK(nvjpegJpegStateCreate(handle, &dec_state));
+    }
+
+    ~impl() {
+        API_CHECK(nvjpegJpegStateDestroy(dec_state));
+    }
+
+    void decode(frame_ptr_type &&frame) {
+        // decode image info
+        int channels;
+        nvjpegChromaSubsampling_t subsampling;
+        int width[NVJPEG_MAX_COMPONENT];
+        int height[NVJPEG_MAX_COMPONENT];
+        API_CHECK(nvjpegGetImageInfo(handle, frame->ptr, frame->length,
+                                     &channels, &subsampling, width, height));
+
+        // avoid frame overwritten
+        wait_render_idle();
+
+        // prepare buffer
+        assert(channels == 3);
+        img_rgb.create(height[0], width[0], CV_8UC3);
+        nvjpegImage_t nv_image{};
+        nv_image.channel[0] = (uint8_t *) img_rgb.cudaPtr();
+        nv_image.pitch[0] = img_rgb.step;
+
+        // decode image
+        API_CHECK(nvjpegDecode(handle, dec_state, frame->ptr, frame->length,
+                               NVJPEG_OUTPUT_RGBI, &nv_image, nullptr));
+
+        // commit frame
+        commit_frame(img_rgb);
+    }
+};
+
+decoder_nvjpeg::decoder_nvjpeg()
+        : pimpl(std::make_unique<impl>()) {}
+
+decoder_nvjpeg::~decoder_nvjpeg() = default;
+
+void decoder_nvjpeg::decode(decoder_base::frame_ptr_type &&frame) {
+    pimpl->decode(std::move(frame));
+}

+ 20 - 0
src/codec/decoder_nvjpeg.h

@@ -0,0 +1,20 @@
+#ifndef TINYPLAYER3_DECODER_NVJPEG_H
+#define TINYPLAYER3_DECODER_NVJPEG_H
+
+#include "decoder_base.h"
+
+class decoder_nvjpeg : public decoder_base {
+public:
+    decoder_nvjpeg();
+
+    ~decoder_nvjpeg() override;
+
+    void decode(decoder_base::frame_ptr_type &&frame) override;
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //TINYPLAYER3_DECODER_NVJPEG_H

+ 27 - 1
src/core/image_utility.hpp

@@ -55,6 +55,29 @@ inline cudaMemcpyKind get_copy_kind(memory_location src,
     }
 }
 
+// calculate height of a nv12 image
+inline auto img_height_to_nv12(auto h) {
+    assert(h % 2 == 0);
+    return h / 2 * 3;
+}
+
+inline auto img_size_to_nv12(cv::Size size) {
+    return cv::Size(size.width,
+                    img_height_to_nv12(size.height));
+}
+
+// calculate image height from a nv12 image
+// inverse of img_height_to_nv12()
+inline auto nv12_height_to_img(auto h) {
+    assert(h % 3 == 0);
+    return h / 3 * 2;
+}
+
+inline auto nv12_size_to_img(cv::Size size) {
+    return cv::Size(size.width,
+                    nv12_height_to_img(size.height));
+}
+
 #define ALLOC_IMG(type, size, loc, pitch) \
     ALLOC_PITCH_SHARED(type, size.width, size.height, loc, pitch)
 
@@ -77,7 +100,10 @@ struct image_info_type {
     cv::Size size = {};
     size_t pitch = 0;
 
-    void *start_ptr() const { return ptr.get(); }
+    // start pointer of specific row
+    void *start_ptr(int row = 0) const {
+        return ptr.get() + row * pitch;
+    }
 
     size_t size_in_bytes() const { return sizeof(T) * size.area(); }
 

+ 7 - 5
src/core/impl/object_manager.cpp

@@ -6,11 +6,9 @@
 using boost::asio::io_context;
 using boost::asio::post;
 
-object_manager::impl::~impl() {
-    for (auto &item: obj_pool) {
-        auto &obj_st = item.second;
-        obj_st.del_func(obj_st.ptr);
-    }
+object_manager::impl::obj_st_type::~obj_st_type() {
+    assert(del_func != nullptr);
+    del_func(ptr);
 }
 
 object_manager::impl::obj_st_type *
@@ -45,6 +43,10 @@ object_manager::impl::query_obj_stats(name_type obj_name) {
 void object_manager::impl::create_placeholder(name_type obj_name, std::type_index obj_type,
                                               void *ptr, del_func_type del_func) {
     assert(switch_ctx() == nullptr);
+    if (auto iter = obj_pool.find(obj_name);
+            iter != obj_pool.end()) {
+        obj_pool.erase(iter);
+    }
     assert(!obj_pool.contains(obj_name));
     obj_pool.emplace(std::piecewise_construct,
                      std::forward_as_tuple(obj_name),

+ 2 - 2
src/core/impl/object_manager_impl.h

@@ -19,6 +19,8 @@ struct object_manager::impl {
 
         // statistical information
         event_timer stats_timer;
+
+        ~obj_st_type();
     };
 
     using obj_pool_type = std::unordered_map<name_type, obj_st_type>;
@@ -27,8 +29,6 @@ struct object_manager::impl {
     io_context *ctx = nullptr;
     std::thread::id tid = std::this_thread::get_id();
 
-    ~impl();
-
     obj_st_type *query_st(name_type obj_name);
 
     obj_stats query_obj_stats(name_type obj_name);

+ 1 - 1
src/core/object_manager.h

@@ -106,7 +106,7 @@ private:
     void *query_placeholder(name_type obj_name, std::type_index obj_type) {
         auto info_o = query_info(obj_name);
         if (!info_o.has_value()) [[unlikely]] return nullptr;
-        assert(info_o->type == obj_type);
+        if (info_o->type != obj_type) return nullptr;
         return info_o->pl_ptr;
     }
 

+ 23 - 0
src/core/utility.hpp

@@ -1,6 +1,8 @@
 #ifndef DEPTHGUIDE_UTILITY_HPP
 #define DEPTHGUIDE_UTILITY_HPP
 
+#include <spdlog/spdlog.h>
+
 #include <cassert>
 #include <chrono>
 
@@ -50,4 +52,25 @@ inline auto now_since_epoch_in_seconds() {
     assert(false); \
     return nullptr
 
+inline bool check_function_call(bool function_ret, unsigned int line_number,
+                                const char *file_name, const char *function_call_str) {
+    if (function_ret) [[likely]] return true;
+    SPDLOG_ERROR("Function call {} failed at {}:{}.",
+                 function_call_str, file_name, line_number);
+    RET_ERROR_B;
+}
+
+#define CALL_CHECK(function_call) \
+    check_function_call( \
+        function_call, __LINE__, __FILE__, #function_call)
+
+#define EXCEPTION_CHECK_P(function_call) \
+    try { \
+        function_call; \
+    } catch (std::exception &e) { \
+        SPDLOG_ERROR("Function call {} failed at {}:{}, {}.", \
+                     #function_call, __FILE__, __LINE__, e.what()); \
+        RET_ERROR_P; \
+    } (void) 0
+
 #endif //DEPTHGUIDE_UTILITY_HPP

+ 56 - 0
src/network_v3/receiver_base.hpp

@@ -0,0 +1,56 @@
+#ifndef TINYPLAYER3_RECEIVER_BASE_H
+#define TINYPLAYER3_RECEIVER_BASE_H
+
+#include "codec/codec_base.hpp"
+#include "core/utility.hpp"
+
+#include <fmt/chrono.h>
+#include <fmt/format.h>
+
+#include <functional>
+
+enum receiver_type {
+    RECEIVER_TCP,
+    RECEIVER_UDP,
+    RECEIVER_UDP_FEC
+};
+
+class receiver_base {
+public:
+
+    using cb_func_type = std::function<void(frame_info)>;
+
+    struct create_config {
+        cb_func_type cb_func = nullptr;
+        bool enable_log = false;
+    };
+
+    explicit receiver_base(const create_config &conf) {
+        cb_func = conf.cb_func;
+
+        // create log file if requested
+        if (conf.enable_log) {
+            auto file_name = fmt::format("log_{:%Y_%m_%d_%H_%M_%S}.csv",
+                                         std::chrono::system_clock::now());
+            log_file.open(file_name);
+        }
+    }
+
+protected:
+
+    void commit_frame(const frame_info &frame) {
+        log_frame_received(frame.frame_id);
+        cb_func(frame);
+    }
+
+private:
+    cb_func_type cb_func;
+    std::ofstream log_file;
+
+    void log_frame_received(uint64_t frame_id) {
+        if (!log_file.is_open()) return;
+        log_file << fmt::format("{},{}\n", frame_id, current_timestamp());
+    }
+};
+
+#endif //TINYPLAYER3_RECEIVER_BASE_H

+ 96 - 0
src/network_v3/receiver_tcp.cpp

@@ -0,0 +1,96 @@
+#include "receiver_tcp.h"
+#include "simple_mq.h"
+#include "receiver_utility.hpp"
+#include "utility.hpp"
+#include "variable_defs.h"
+
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/read.hpp>
+#include <boost/asio/post.hpp>
+
+#include <spdlog/spdlog.h>
+
+using namespace boost::asio::ip;
+using boost::asio::buffer;
+using boost::asio::io_context;
+using boost::asio::post;
+using boost::asio::read;
+using namespace simple_mq_singleton;
+
+struct receiver_tcp::impl {
+
+    receiver_tcp *q_this = nullptr;
+    std::unique_ptr<tcp::socket> socket;
+    decoder_base *decoder;
+
+    smart_buffer<uint8_t> in_buf;
+
+    void receive_one_frame() {
+        // read packet length
+        uint64_t packet_length;
+        in_buf.create(sizeof(packet_length));
+        read(*socket, buffer(in_buf.ptr, in_buf.length));
+        read_binary_number(in_buf.ptr, &packet_length);
+
+        // read full packet
+        in_buf.create(packet_length);
+        read(*socket, buffer(in_buf.ptr, in_buf.length));
+
+        // read frame id
+        uint64_t frame_id;
+        auto ptr = read_binary_number(in_buf.ptr, &frame_id);
+        auto frame_length = packet_length
+                            - sizeof(frame_id);
+
+        // read frame content
+        auto frame = std::make_unique<video_nal>();
+        frame->create(ptr, frame_length);
+        q_this->log_frame_received(frame_id);
+        decoder->decode(std::move(frame));
+        SPDLOG_TRACE("Frame {} decoded.", frame_id);
+    }
+
+    void receive_frames() {
+        try {
+            for (;;) {
+                // check if stop requested
+                if (mq().query_variable<bool>(RECEIVER_SHOULD_STOP)) {
+                    return;
+                }
+
+                receive_one_frame();
+            }
+        } catch (std::exception &e) {
+            SPDLOG_INFO("Server closed.");
+        }
+    }
+
+    void run() {
+        post(*q_this->get_ctx(), [this] {
+            receive_frames();
+            q_this->get_ctx()->stop();
+        });
+    }
+
+    static impl *create(const receiver_config &conf, receiver_tcp *q_this) {
+        auto ret = std::make_unique<impl>();
+        assert(conf.decoder != nullptr);
+        ret->q_this = q_this;
+        ret->decoder = conf.decoder;
+        auto server_ep = tcp::endpoint{address::from_string(conf.server_addr), conf.server_port};
+        ret->socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
+        EXCEPTION_CHECK_P(ret->socket->connect(server_ep));
+        ret->run();
+        return ret.release();
+    }
+};
+
+receiver_tcp::~receiver_tcp() = default;
+
+receiver_tcp *receiver_tcp::create(const receiver_config &conf) {
+    auto ret = std::make_unique<receiver_tcp>();
+    auto pimpl = impl::create(conf, ret.get());
+    if (pimpl == nullptr) return nullptr;
+    ret->pimpl.reset(pimpl);
+    return ret.release();
+}

+ 19 - 0
src/network_v3/receiver_tcp.h

@@ -0,0 +1,19 @@
+#ifndef TINYPLAYER3_RECEIVER_TCP_H
+#define TINYPLAYER3_RECEIVER_TCP_H
+
+#include "receiver_base.hpp"
+
+class receiver_tcp : public receiver_base {
+public:
+
+    ~receiver_tcp() override;
+
+    static receiver_tcp *create(const receiver_config &conf);
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //TINYPLAYER3_RECEIVER_TCP_H

+ 356 - 0
src/network_v3/receiver_udp_fec.cpp

@@ -0,0 +1,356 @@
+#include "receiver_udp_fec.h"
+#include "third_party/scope_guard.hpp"
+
+extern "C" {
+#include "network/impl/fragment/third_party/rs.h"
+}
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/udp.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/container/static_vector.hpp>
+#include <boost/crc.hpp>
+#include <boost/endian.hpp>
+
+#include <spdlog/spdlog.h>
+
+#include <algorithm>
+
+using namespace boost::asio::ip;
+using boost::asio::buffer;
+using boost::asio::io_context;
+using boost::asio::post;
+using boost::system::error_code;
+
+namespace receiver_udp_fec_impl {
+
+    enum status_type {
+        NOT_INIT,
+        WAITING,
+        READY
+    };
+
+    struct smart_chunk {
+        static constexpr auto max_blocks = DATA_SHARDS_MAX;
+
+        using block_ptrs_type = boost::container::static_vector<uint8_t *, max_blocks>;
+        using block_miss_type = boost::container::static_vector<bool, max_blocks>;
+
+        data_type block_data;
+        block_ptrs_type block_ptrs;
+        block_miss_type block_miss;
+        uint8_t ready_blocks = 0;
+        status_type status = NOT_INIT;
+
+        ~smart_chunk() { deallocate(); }
+
+        void reset() {
+            ready_blocks = 0;
+            status = NOT_INIT;
+        }
+
+        void create(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
+            if (total_blocks != block_ptrs.size() ||
+                parity_blocks != last_parity_blocks ||
+                block_size != last_block_size) [[unlikely]] {
+                deallocate();
+                allocate(total_blocks, parity_blocks, block_size);
+            }
+            std::ranges::fill(block_miss, true);
+            assert(status == NOT_INIT);
+            status = WAITING;
+        }
+
+        bool reconstruct() {
+            if (ready_blocks + last_parity_blocks < block_ptrs.size()) return false;
+            auto ret = reed_solomon_reconstruct(rs, block_ptrs.data(), (uint8_t *) block_miss.data(),
+                                                block_ptrs.size(), last_block_size);
+            if (ret != 0) return false;
+            assert(status == WAITING);
+            status = READY;
+            return true;
+        }
+
+    private:
+        reed_solomon *rs = nullptr;
+        uint8_t last_parity_blocks = 0;
+        uint16_t last_block_size = 0;
+
+        void deallocate() {
+            if (rs == nullptr) return;
+            reed_solomon_release(rs);
+            rs = nullptr;
+        }
+
+        void allocate(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
+            assert(rs == nullptr);
+            auto data_blocks = total_blocks - parity_blocks;
+            rs = reed_solomon_new(data_blocks, parity_blocks);
+            block_data.reserve(total_blocks * block_size);
+            block_ptrs.resize(total_blocks);
+            block_miss.resize(total_blocks);
+            for (int i = 0; i < total_blocks; ++i) {
+                block_ptrs[i] = block_data.ptr + block_size * i;
+            }
+            last_parity_blocks = parity_blocks;
+            last_block_size = block_size;
+        }
+    };
+
+}
+
+using namespace receiver_udp_fec_impl;
+
+struct receiver_udp_fec::impl {
+
+    struct frag_header {
+        uint32_t frag_checksum;
+        uint8_t frame_type; // 'I' or 'P'
+        uint32_t frame_id;
+        uint32_t frame_length;
+        uint8_t chunk_count;
+        uint8_t chunk_id;
+        uint32_t chunk_offset;
+        uint32_t chunk_length;
+        uint16_t block_size;
+        uint8_t block_count;
+        uint8_t chunk_decode_block_count;
+        uint8_t block_id;
+    };
+
+    struct request_type {
+        uint32_t request_checksum;
+        uint8_t request_type;
+        uint32_t frame_id;
+    };
+
+    struct frame_info {
+        using chunks_type = std::vector<smart_chunk>;
+        chunks_type chunks;
+
+        data_type data;
+        uint32_t id = 0;
+        uint8_t ready_chunks = 0;
+        status_type status = NOT_INIT;
+
+        void create(uint32_t frame_id, uint8_t chunk_count, size_t length) {
+            chunks.resize(chunk_count);
+            data.reserve(length);
+            for (auto k = 0; k < chunk_count; ++k) {
+                chunks[k].reset();
+            }
+            id = frame_id;
+            ready_chunks = 0;
+            status = WAITING;
+        }
+    };
+
+    static constexpr auto frag_header_size = 28;
+    static constexpr auto request_size = 9;
+    static constexpr auto max_package_size = 64 * 1024; // 64KiB
+    static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
+
+    receiver_udp_fec *q_this = nullptr;
+
+    // parent config
+    receiver_base::create_config par_conf;
+
+    std::unique_ptr<udp::socket> socket;
+
+    frame_info frame_cache;
+    uint32_t last_frame_id = 0;
+
+    udp::endpoint server_ep;
+    data_type in_buf, out_buf;
+
+    static uint8_t *read_frag_header(uint8_t *ptr, frag_header *header) {
+#define READ(member) ptr = read_binary_number(ptr, &header->member)
+        READ(frag_checksum);
+        READ(frame_type);
+        READ(frame_id);
+        READ(frame_length);
+        READ(chunk_count);
+        READ(chunk_id);
+        READ(chunk_offset);
+        READ(chunk_length);
+        READ(block_size);
+        READ(block_count);
+        READ(chunk_decode_block_count);
+        READ(block_id);
+#undef WRITE
+        return ptr;
+    }
+
+    static uint8_t *write_request(uint8_t *ptr, const request_type &req) {
+#define WRITE(member) ptr = write_binary_number(ptr, req.member)
+        WRITE(request_checksum);
+        WRITE(request_type);
+        WRITE(frame_id);
+#undef WRITE
+        return ptr;
+    }
+
+    ~impl() {
+        request_exit();
+    }
+
+    void refresh_frame(const frag_header &header) {
+        frame_cache.create(header.frame_id, header.chunk_count, header.frame_length);
+    }
+
+    void send_request(const request_type &req) {
+        out_buf.reserve(request_size);
+        write_request(out_buf.ptr, req);
+
+        // calculate crc32
+        auto crc = boost::crc_32_type{};
+        crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
+                          request_size - sizeof(uint32_t));
+        write_binary_number(out_buf.ptr, crc.checksum());
+
+        // send packet
+        assert(socket != nullptr);
+        auto buf = buffer(out_buf.ptr, request_size);
+        socket->send_to(buf, server_ep);
+    }
+
+    void request_idr_frame(uint32_t frame_id) {
+        request_type req;
+        req.request_type = 'I';
+        req.frame_id = frame_id;
+        send_request(req);
+        SPDLOG_WARN("Receive frame {} error, request new IDR frame.", frame_id);
+    }
+
+    void request_frame_confirm(uint32_t frame_id) {
+        request_type req;
+        req.request_type = 'C';
+        req.frame_id = frame_id;
+        send_request(req);
+    }
+
+    void request_exit() {
+        request_type req;
+        req.request_type = 'X';
+        send_request(req);
+    }
+
+    void async_handle_package() {
+        in_buf.reserve(max_package_size);
+        auto buf = buffer(in_buf.ptr, max_package_size);
+        using namespace std::placeholders;
+        socket->async_receive(buf, std::bind(&impl::handle_package, this, _1, _2));
+    }
+
+    void handle_package(const error_code &ec, size_t length) {
+        // prepare for next request when this function exited.
+        auto closer = sg::make_scope_guard([this] {
+            async_handle_package();
+        });
+
+        // handle errors
+        if (ec) {
+            SPDLOG_ERROR("Error while receiving request: {}", ec.what());
+            return;
+        }
+
+        // parse package
+        frag_header header;
+        read_frag_header(in_buf.ptr, &header);
+        auto crc = boost::crc_32_type{};
+        crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
+                          length - sizeof(uint32_t));
+        if (crc.checksum() != header.frag_checksum) { // checksum failed
+            // TODO show log
+            return;
+        }
+
+        assert(length == frag_header_size + header.block_size);
+        if (header.frame_id < frame_cache.id) return; // old package
+        bool is_idr_frame = header.frame_type == 'I';
+        if (frame_cache.status == READY) { // last frame has already been decoded
+            if (header.frame_id == frame_cache.id) return; // redundant package
+            if (is_idr_frame || // new IDR frame or correct next P frame
+                header.frame_id == last_frame_id + 1) {
+                refresh_frame(header);
+            } else {
+                request_idr_frame(header.frame_id);
+                return;
+            }
+        } else {
+            if (header.frame_id > frame_cache.id) {
+                if (is_idr_frame) {
+                    refresh_frame(header);
+                } else {
+                    request_idr_frame(header.frame_id);
+                    return;
+                }
+            }
+        }
+
+        assert(frame_cache.id == header.frame_id);
+        assert(frame_cache.status == WAITING);
+        auto &chunk = frame_cache.chunks[header.chunk_id];
+        if (chunk.status == NOT_INIT) {
+            auto parity_blocks = header.block_count - header.chunk_decode_block_count;
+            chunk.create(header.block_count, parity_blocks, header.block_size);
+        } else if (chunk.status == READY) {
+            return;
+        }
+
+        assert(chunk.status == WAITING);
+        auto data_ptr = in_buf.ptr + frag_header_size;
+        memcpy(chunk.block_ptrs[header.block_id], data_ptr, header.block_size);
+        chunk.block_miss[header.block_id] = false;
+        ++chunk.ready_blocks;
+        if (!chunk.reconstruct()) [[likely]] return; // need more blocks
+
+        assert(chunk.status == READY);
+        assert(chunk.block_data.size >= header.chunk_length);
+        memcpy(frame_cache.data.ptr + header.chunk_offset, chunk.block_data.ptr, header.chunk_length);
+        ++frame_cache.ready_chunks;
+        if (frame_cache.ready_chunks < frame_cache.chunks.size()) return; // need more chunks
+
+        // decode frame
+        frame_cache.status = READY;
+        auto frame = ::frame_info(); // not frame_info in this impl
+        frame.data = frame_cache.data;
+        frame.idr = header.frame_type == 'I';
+        frame.frame_id = header.frame_id;
+        q_this->commit_frame(frame);
+        SPDLOG_TRACE("Frame {} decoded.", frame_cache.id);
+        last_frame_id = frame_cache.id;
+        request_frame_confirm(frame_cache.id);
+    }
+
+    static impl *create(const create_config &conf) {
+        auto ret = new impl();
+        ret->par_conf = {.cb_func = conf.cb_func, .enable_log = conf.enable_log};
+        ret->server_ep = udp::endpoint{address::from_string(conf.server_addr), conf.server_port};
+        ret->socket = std::make_unique<udp::socket>(*conf.ctx);
+        ret->socket->connect(ret->server_ep);
+        ret->socket->set_option(udp::socket::receive_buffer_size{udp_buffer_size});
+        ret->async_handle_package();
+        ret->request_idr_frame(0);
+
+        // initialize reed solomon
+        fec_init();
+
+        return ret;
+    }
+};
+
+receiver_udp_fec::~receiver_udp_fec() = default;
+
+receiver_udp_fec::pointer
+receiver_udp_fec::create(const create_config &conf) {
+    auto impl = impl::create(conf);
+    return std::unique_ptr<receiver_udp_fec>(
+            new receiver_udp_fec(impl));
+}
+
+receiver_udp_fec::receiver_udp_fec(impl *_pimpl)
+        : receiver_base(_pimpl->par_conf),
+          pimpl(std::unique_ptr<impl>(_pimpl)) {
+    pimpl->q_this = this;
+}

+ 40 - 0
src/network_v3/receiver_udp_fec.h

@@ -0,0 +1,40 @@
+#ifndef TINYPLAYER2_FRAME_RECEIVER2_H
+#define TINYPLAYER2_FRAME_RECEIVER2_H
+
+#include "receiver_base.hpp"
+
+#include <boost/asio/io_context.hpp>
+
+#include <memory>
+
+class receiver_udp_fec : public receiver_base {
+public:
+
+    using io_context = boost::asio::io_context;
+
+    struct create_config {
+        std::string server_addr;
+        uint16_t server_port;
+        io_context *ctx = nullptr;
+
+        // for parent
+        cb_func_type cb_func = nullptr;
+        bool enable_log = false;
+    };
+
+    using this_type = receiver_udp_fec;
+    using pointer = std::unique_ptr<this_type>;
+
+    static pointer create(const create_config &conf);
+
+    ~receiver_udp_fec();
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+
+    explicit receiver_udp_fec(impl *pimpl);
+};
+
+
+#endif //TINYPLAYER2_FRAME_RECEIVER2_H