Quellcode durchsuchen

Merged old network modules.

jcsyshc vor 1 Jahr
Ursprung
Commit
98b2475ec1

+ 25 - 7
CMakeLists.txt

@@ -8,15 +8,15 @@ add_executable(${PROJECT_NAME} src/main.cpp
         src/core/impl/memory_pool.cpp
         src/core/impl/object_manager.cpp
         src/module/impl/image_viewer.cpp
-        src/network_v3/sender_base.cpp
+        src/module/impl/image_streamer.cpp
         src/network_v3/sender_tcp.cpp
         src/network_v3/sender_udp_fec.cpp
-#        src/network/impl/crc_checker.cpp
+        #        src/network/impl/crc_checker.cpp
         src/network/impl/fragment/third_party/rs.c
-#        src/network/impl/fragment/frag_basic.cpp
-#        src/network/impl/fragment/frag_rs.cpp
-#        src/network/impl/multiplexer.cpp
-#        src/network/impl/network.cpp
+        #        src/network/impl/fragment/frag_basic.cpp
+        #        src/network/impl/fragment/frag_rs.cpp
+        #        src/network/impl/multiplexer.cpp
+        #        src/network/impl/network.cpp
         src/render/impl/render_texture.cpp
         src/render/impl/render_tools.cpp
         src/render/impl/render_utility.cpp)
@@ -100,4 +100,22 @@ set(CRYPTOPP_LIB_DIR ${CRYPTOPP_DIR}/lib)
 find_library(CRYPTOPP_LIB cryptopp HINTS ${CRYPTOPP_LIB_DIR})
 set(CRYPTOPP_INCLUDE_DIR ${CRYPTOPP_DIR}/include)
 target_include_directories(${PROJECT_NAME} PRIVATE ${CRYPTOPP_INCLUDE_DIR})
-target_link_libraries(${PROJECT_NAME} ${CRYPTOPP_LIB})
+target_link_libraries(${PROJECT_NAME} ${CRYPTOPP_LIB})
+
+# NvEnc config
+if (WIN32)
+    set(NVCODEC_DIR C:/BuildEssentials/CUDA/Video_Codec_SDK_12.0.16)
+    set(NVENC_LIB_DIR ${NVCODEC_DIR}/Lib/x64)
+    find_library(NVENC_LIB nvencodeapi HINTS ${NVENC_LIB_DIR})
+else ()
+    set(NVCODEC_DIR /home/tpx/src/Video_Codec_SDK_12.0.16)
+    find_library(NVENC_LIB nvidia-encode)
+endif ()
+set(NVCODEC_INCLUDE_DIR ${NVCODEC_DIR}/Interface)
+target_include_directories(${PROJECT_NAME} PRIVATE ${NVCODEC_INCLUDE_DIR})
+target_link_libraries(${PROJECT_NAME} ${NVENC_LIB})
+target_sources(${PROJECT_NAME} PRIVATE src/codec/encoder_nvenc.cpp)
+
+# nvJPEG config
+target_link_libraries(${PROJECT_NAME} CUDA::nvjpeg)
+#target_sources(${PROJECT_NAME} PRIVATE src/codec/encoder_jpeg.cpp)

+ 21 - 0
src/codec/codec_base.hpp

@@ -0,0 +1,21 @@
+#ifndef REMOTEAR3_ENCODER_BASE_HPP
+#define REMOTEAR3_ENCODER_BASE_HPP
+
+#include "network/binary_utility.hpp"
+
+enum encoder_type {
+    ENCODER_NVENC,
+    ENCODER_JPEG
+};
+
+struct frame_info {
+    data_type data;
+    bool idr = false;
+    uint64_t frame_id = 0;
+
+    uint8_t *start_ptr() const { return data.start_ptr(); }
+
+    size_t size() const { return data.size; }
+};
+
+#endif //REMOTEAR3_ENCODER_BASE_HPP

+ 169 - 0
src/codec/encoder_jpeg.cpp

@@ -0,0 +1,169 @@
+#include "encoder_jpeg.h"
+#include "cuda_helper.hpp"
+#include "simple_mq.h"
+#include "third_party/scope_guard.hpp"
+#include "variable_defs.h"
+#include "utility.hpp"
+
+#include <nvjpeg.h>
+
+#include <fmt/chrono.h>
+
+#include <opencv2/cudaimgproc.hpp>
+
+bool check_nvjpeg_api_call(nvjpegStatus_t api_ret, unsigned int line_number,
+                           const char *file_name, const char *api_call_str) {
+    if (api_ret == NVJPEG_STATUS_SUCCESS) [[likely]] return true;
+    SPDLOG_ERROR("nvJPEG api call {} failed at {}:{} with error 0x{:x}.",
+                 api_call_str, file_name, line_number, (int) api_ret);
+    RET_ERROR_B;
+}
+
+#define API_CHECK(api_call) \
+    check_nvjpeg_api_call( \
+        api_call, __LINE__, __FILE__, #api_call)
+
+#define API_CHECK_P(api_call) \
+    if (!check_nvjpeg_api_call( \
+        api_call, __LINE__, __FILE__, #api_call)) [[unlikely]] \
+        return nullptr
+
+namespace encoder_jpeg_impl {
+    nvjpegHandle_t handle = nullptr;
+}
+
+using namespace encoder_jpeg_impl;
+using namespace simple_mq_singleton;
+
+struct encoder_jpeg::impl {
+
+    nvjpegEncoderState_t enc_state = nullptr;
+    nvjpegEncoderParams_t enc_params = nullptr;
+
+    std::unique_ptr<cv::cuda::Stream> cv_stream;
+    cudaStream_t stream; // content of cv_stream
+    cv::cuda::GpuMat img_bgr;
+
+    FILE *save_file = nullptr;
+    bool save_length;
+
+    cudaStream_t output_stream = nullptr;
+    cudaEvent_t output_event = nullptr;
+
+    ~impl() {
+        API_CHECK(nvjpegEncoderParamsDestroy(enc_params));
+        API_CHECK(nvjpegEncoderStateDestroy(enc_state));
+        CUDA_API_CHECK(cudaEventDestroy(output_event));
+
+        // close save file
+        if (save_file != nullptr) {
+            fclose(save_file);
+        }
+
+        SPDLOG_INFO("Video encoder stopped.");
+    }
+
+    static impl *create(nvjpeg_config conf) {
+        // binding cuda context
+        auto cuda_ctx = mq().query_variable<CUcontext>(CUDA_CONTEXT);
+        CUDA_API_CHECK(cuCtxSetCurrent(cuda_ctx));
+
+        // create encoder
+        auto ret = new impl;
+        auto closer = sg::make_scope_guard([&] {
+            if (ret->save_file != nullptr) {
+                fclose(ret->save_file);
+            }
+            delete ret;
+        });
+        if (handle == nullptr) [[unlikely]] {
+            API_CHECK_P(nvjpegCreateSimple(&handle));
+        }
+        ret->cv_stream = std::make_unique<cv::cuda::Stream>();
+        ret->stream = (cudaStream_t) ret->cv_stream->cudaPtr();
+        CUDA_API_CHECK(cudaEventCreateWithFlags(&ret->output_event, cudaEventDisableTiming));
+        API_CHECK_P(nvjpegEncoderStateCreate(handle, &ret->enc_state, ret->stream));
+        API_CHECK_P(nvjpegEncoderParamsCreate(handle, &ret->enc_params, ret->stream));
+        ret->output_stream = mq().query_variable<cudaStream_t>(CUDA_STREAM_OUTPUT);
+
+        // config parameters
+        API_CHECK_P(nvjpegEncoderParamsSetOptimizedHuffman(ret->enc_params, true, ret->stream));
+        API_CHECK_P(nvjpegEncoderParamsSetSamplingFactors(ret->enc_params, NVJPEG_CSS_420, ret->stream));
+        assert(conf.quality >= 1 && conf.quality <= 100);
+        API_CHECK_P(nvjpegEncoderParamsSetQuality(ret->enc_params, conf.quality, ret->stream));
+
+        // create save file
+        if (conf.save_file) {
+            auto file_name = fmt::format("record_{:%Y_%m_%d_%H_%M_%S}.{}",
+                                         now_since_epoch_in_seconds(),
+                                         conf.save_length ? "dat" : "mjpeg");
+            ret->save_file = fopen(file_name.c_str(), "wb");
+            ret->save_length = conf.save_length;
+        }
+
+        SPDLOG_INFO("Video encoder started.");
+        closer.dismiss();
+        return ret;
+    }
+
+    void encode(const cv::cuda::GpuMat &img_bgra, video_nal *out) {
+        // wait for output stream
+        CUDA_API_CHECK(cudaEventRecord(output_event, output_stream));
+        CUDA_API_CHECK(cudaStreamWaitEvent(stream, output_event));
+
+        // remove alpha channel
+        cv::cuda::cvtColor(img_bgra, img_bgr, cv::COLOR_BGRA2BGR, 3, *cv_stream);
+
+        // create image
+        nvjpegImage_t img_in{};
+        img_in.channel[0] = (uint8_t *) img_bgr.cudaPtr();
+        img_in.pitch[0] = img_bgr.step;
+
+        // encode frame
+        API_CHECK(nvjpegEncodeImage(handle, enc_state, enc_params, &img_in,
+                                    NVJPEG_INPUT_BGRI, img_bgr.cols, img_bgr.rows, stream));
+
+        // retrieve encoded image
+        size_t length;
+        API_CHECK(nvjpegEncodeRetrieveBitstream(handle, enc_state, nullptr, &length, stream));
+        out->create(nullptr, length, true);
+        API_CHECK(nvjpegEncodeRetrieveBitstream(handle, enc_state, out->ptr, &length, stream));
+
+        // sync stream
+        CUDA_API_CHECK(cudaStreamSynchronize(stream));
+
+        // save bitstream
+        if (save_file != nullptr) {
+            if (save_length) {
+                fwrite(&out->length, sizeof(size_t), 1, save_file);
+            }
+            fwrite(out->ptr, out->length, 1, save_file);
+        }
+    }
+
+    void change_config(const nvjpeg_config &conf) {
+        assert(conf.save_file == (save_file != nullptr));
+        assert(conf.quality >= 1 && conf.quality <= 100);
+        API_CHECK(nvjpegEncoderParamsSetQuality(enc_params, conf.quality, stream));
+    }
+};
+
+encoder_jpeg::~encoder_jpeg() = default;
+
+encoder_jpeg *encoder_jpeg::create(nvjpeg_config conf) {
+    auto pimpl = impl::create(conf);
+    if (pimpl == nullptr) return nullptr;
+    auto ret = new encoder_jpeg;
+    ret->pimpl.reset(pimpl);
+    return ret;
+}
+
+void encoder_jpeg::encode(const cv::cuda::GpuMat &img, video_nal *out, bool) {
+    mq().update_variable(ENCODER_BUSY, true);
+    pimpl->encode(img, out);
+    mq().update_variable(ENCODER_BUSY, false);
+}
+
+void encoder_jpeg::change_config(const nvjpeg_config &conf) {
+    pimpl->change_config(conf);
+}

+ 32 - 0
src/codec/encoder_jpeg.h

@@ -0,0 +1,32 @@
+#ifndef REMOTEAR3_ENCODER_JPEG_H
+#define REMOTEAR3_ENCODER_JPEG_H
+
+#include "codec_base.hpp"
+
+#include <memory>
+
+struct nvjpeg_config {
+    int quality;
+    bool save_file;
+    bool save_length;
+};
+
+class encoder_jpeg : public codec_base {
+public:
+
+    ~encoder_jpeg() override;
+
+    void encode(const cv::cuda::GpuMat &img, video_nal *out, bool) override;
+
+    // only quality can be changed
+    void change_config(const nvjpeg_config &conf);
+
+    static encoder_jpeg *create(nvjpeg_config conf);
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //REMOTEAR3_ENCODER_JPEG_H

+ 266 - 0
src/codec/encoder_nvenc.cpp

@@ -0,0 +1,266 @@
+#include "third_party/scope_guard.hpp"
+#include "encoder_nvenc.h"
+
+#include <nvEncodeAPI.h>
+
+#include <fmt/chrono.h>
+
+bool check_nvenc_api_call(NVENCSTATUS api_ret, unsigned int line_number,
+                          const char *file_name, const char *api_call_str) {
+    if (api_ret == NV_ENC_SUCCESS) [[likely]] return true;
+    SPDLOG_ERROR("NvEnc api call {} failed at {}:{} with error 0x{:x}.",
+                 api_call_str, file_name, line_number, (int) api_ret);
+    RET_ERROR_B;
+}
+
+#define API_CHECK(api_call) \
+    check_nvenc_api_call( \
+        api_call, __LINE__, __FILE__, #api_call)
+
+#define API_CHECK_P(api_call) \
+    if (!check_nvenc_api_call( \
+        api_call, __LINE__, __FILE__, #api_call)) [[unlikely]] \
+        return nullptr
+
+namespace video_encoder_impl {
+    constexpr auto frame_buffer_type = NV_ENC_BUFFER_FORMAT_ARGB;
+    static auto codec_guid = NV_ENC_CODEC_HEVC_GUID;
+    static auto preset_guid = NV_ENC_PRESET_P3_GUID;
+    constexpr auto tuning_info = NV_ENC_TUNING_INFO_ULTRA_LOW_LATENCY;
+    std::unique_ptr<NV_ENCODE_API_FUNCTION_LIST> api;
+}
+
+using namespace video_encoder_impl;
+
+struct encoder_nvenc::impl {
+
+    std::unique_ptr<NV_ENC_PRESET_CONFIG> preset_config;
+    std::unique_ptr<NV_ENC_INITIALIZE_PARAMS> init_params;
+    void *encoder;
+    NV_ENC_OUTPUT_PTR output_buf;
+
+    cv::Size frame_size;
+    FILE *save_file = nullptr;
+    bool save_length;
+
+    void *last_frame_ptr = nullptr;
+    NV_ENC_REGISTERED_PTR last_reg_ptr = nullptr;
+    uint64_t last_frame_id = 0;
+
+    smart_cuda_stream *stream = nullptr;
+
+    ~impl() {
+        // notify the end of stream
+        NV_ENC_PIC_PARAMS pic_params = {NV_ENC_PIC_PARAMS_VER};
+        pic_params.encodePicFlags = NV_ENC_PIC_FLAG_EOS;
+        API_CHECK(api->nvEncEncodePicture(encoder, &pic_params));
+
+        // releasing resources
+        unregister_frame_ptr();
+        API_CHECK(api->nvEncDestroyBitstreamBuffer(encoder, output_buf));
+
+        // close encoder
+        API_CHECK(api->nvEncDestroyEncoder(encoder));
+
+        // close save file
+        if (save_file != nullptr) {
+            fclose(save_file);
+        }
+
+        SPDLOG_INFO("Video encoder stopped.");
+    }
+
+    static impl *create(create_config conf) {
+        // initialize api
+        if (api == nullptr) [[unlikely]] {
+            api = std::make_unique<NV_ENCODE_API_FUNCTION_LIST>(
+                    NV_ENCODE_API_FUNCTION_LIST_VER);
+            API_CHECK_P(NvEncodeAPICreateInstance(api.get()));
+        }
+
+        // get cuda context
+        auto cuda_ctx = conf.ctx;
+
+        // create encoder
+        auto ret = new impl;
+        ret->stream = conf.stream;
+        ret->frame_size = conf.frame_size;
+        auto closer = sg::make_scope_guard([&] {
+            if (ret->save_file != nullptr) {
+                fclose(ret->save_file);
+            }
+            delete ret;
+        });
+        NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS session_params = {
+                NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER};
+        session_params.deviceType = NV_ENC_DEVICE_TYPE_CUDA;
+        session_params.device = *cuda_ctx;
+        session_params.apiVersion = NVENCAPI_VERSION;
+        API_CHECK_P(api->nvEncOpenEncodeSessionEx(&session_params, &ret->encoder));
+
+        // get preset config
+        ret->preset_config = std::make_unique<NV_ENC_PRESET_CONFIG>();
+        auto &preset_config = *ret->preset_config;
+        preset_config.version = NV_ENC_PRESET_CONFIG_VER;
+        preset_config.presetCfg.version = NV_ENC_CONFIG_VER;
+        API_CHECK_P(api->nvEncGetEncodePresetConfigEx(
+                ret->encoder, codec_guid, preset_guid, tuning_info, &preset_config));
+        auto &encode_config = preset_config.presetCfg;
+        encode_config.gopLength = NVENC_INFINITE_GOPLENGTH;
+        encode_config.frameIntervalP = 1;
+        auto &rc_params = encode_config.rcParams;
+        rc_params.rateControlMode = NV_ENC_PARAMS_RC_CBR;
+        rc_params.averageBitRate = conf.bitrate_mbps * 1e6;
+        rc_params.enableAQ = true;
+        rc_params.multiPass = NV_ENC_TWO_PASS_QUARTER_RESOLUTION;
+        // TODO; fine tune encoder config
+
+        // start_encode encoder
+        ret->init_params =
+                std::make_unique<NV_ENC_INITIALIZE_PARAMS>(NV_ENC_INITIALIZE_PARAMS_VER);
+        auto &init_params = *ret->init_params;
+        init_params.encodeGUID = codec_guid;
+        init_params.presetGUID = preset_guid;
+        init_params.encodeWidth = conf.frame_size.width;
+        init_params.encodeHeight = conf.frame_size.height;
+        init_params.darWidth = conf.frame_size.width; // TODO; learn more about this
+        init_params.darHeight = conf.frame_size.height; // TODO; learn more about this
+        init_params.frameRateNum = conf.frame_rate;
+        init_params.frameRateDen = 1;
+        init_params.enablePTD = 1;
+        init_params.encodeConfig = &preset_config.presetCfg;
+        init_params.maxEncodeWidth = conf.frame_size.width;
+        init_params.maxEncodeHeight = conf.frame_size.height;
+        init_params.tuningInfo = tuning_info;
+        init_params.bufferFormat = frame_buffer_type;
+        API_CHECK_P(api->nvEncInitializeEncoder(ret->encoder, &init_params));
+
+        // create output buffer
+        NV_ENC_CREATE_BITSTREAM_BUFFER buffer_config = {
+                NV_ENC_CREATE_BITSTREAM_BUFFER_VER};
+        API_CHECK_P(api->nvEncCreateBitstreamBuffer(ret->encoder, &buffer_config));
+        ret->output_buf = buffer_config.bitstreamBuffer;
+
+        // create save file
+        if (conf.save_file) {
+            auto file_name = fmt::format("record_{:%Y_%m_%d_%H_%M_%S}.{}",
+                                         now_since_epoch_in_seconds(),
+                                         conf.save_length ? "dat" : "hevc");
+            ret->save_file = fopen(file_name.c_str(), "wb");
+            ret->save_length = conf.save_length;
+        }
+
+        SPDLOG_INFO("Video encoder started.");
+        closer.dismiss();
+        return ret;
+    }
+
+    void unregister_frame_ptr() {
+        if (last_reg_ptr == nullptr) return;
+        API_CHECK(api->nvEncUnregisterResource(encoder, last_reg_ptr));
+        last_reg_ptr = nullptr;
+    }
+
+    void register_frame_ptr(const image_info_type<uchar4> &info) {
+        assert(info.loc == MEM_CUDA);
+        NV_ENC_REGISTER_RESOURCE reg_params = {NV_ENC_REGISTER_RESOURCE_VER};
+        reg_params.resourceType = NV_ENC_INPUT_RESOURCE_TYPE_CUDADEVICEPTR;
+        reg_params.width = info.size.width;
+        reg_params.height = info.size.height;
+        reg_params.pitch = info.pitch;
+        reg_params.resourceToRegister = info.start_ptr();
+        reg_params.bufferFormat = frame_buffer_type;
+        reg_params.bufferUsage = NV_ENC_INPUT_IMAGE;
+        API_CHECK(api->nvEncRegisterResource(encoder, &reg_params));
+        last_reg_ptr = reg_params.registeredResource;
+    }
+
+    frame_info encode(const image_u8c4 &img, bool force_idr = false) {
+        // register pointer if needed
+        auto img_info = img->as_cuda_info(stream);
+        // TODO: image pointer may change frequently
+        if (img_info.start_ptr() != last_frame_ptr) [[unlikely]] {
+            assert(img->size() == frame_size);
+            unregister_frame_ptr();
+            register_frame_ptr(img_info);
+        }
+
+        // map input resource
+        NV_ENC_MAP_INPUT_RESOURCE map_params = {
+                NV_ENC_MAP_INPUT_RESOURCE_VER};
+        map_params.registeredResource = last_reg_ptr;
+        API_CHECK(api->nvEncMapInputResource(encoder, &map_params));
+        assert(map_params.mappedBufferFmt == frame_buffer_type);
+
+        // encode frame
+        NV_ENC_PIC_PARAMS pic_params = {NV_ENC_PIC_PARAMS_VER};
+        pic_params.inputWidth = img_info.size.width;
+        pic_params.inputHeight = img_info.size.height;
+        pic_params.inputPitch = img_info.pitch;
+        if (force_idr) { // request for IDR frame
+            pic_params.encodePicFlags = NV_ENC_PIC_FLAG_FORCEIDR | NV_ENC_PIC_FLAG_OUTPUT_SPSPPS;
+        } else {
+            pic_params.encodePicFlags = 0;
+        }
+        pic_params.inputBuffer = map_params.mappedResource;
+        pic_params.outputBitstream = output_buf;
+        pic_params.bufferFmt = frame_buffer_type;
+        pic_params.pictureStruct = NV_ENC_PIC_STRUCT_FRAME; // TODO; learn more about this
+        API_CHECK(api->nvEncEncodePicture(encoder, &pic_params));
+
+        // get encoded bitstream
+        NV_ENC_LOCK_BITSTREAM lock_config = {NV_ENC_LOCK_BITSTREAM_VER};
+        lock_config.doNotWait = false; // block until encode completed.
+        lock_config.outputBitstream = output_buf;
+        API_CHECK(api->nvEncLockBitstream(encoder, &lock_config));
+
+        // copy bitstream
+        auto ret = frame_info();
+        ret.data = data_type(lock_config.bitstreamSizeInBytes, lock_config.bitstreamBufferPtr);
+        ret.idr = lock_config.pictureType == NV_ENC_PIC_TYPE_IDR;
+        ret.frame_id = ++last_frame_id;
+
+        // save bitstream
+        if (save_file != nullptr) {
+            auto ret_size = ret.size();
+            if (save_length) {
+                fwrite(&ret_size, sizeof(size_t), 1, save_file);
+            }
+            fwrite(ret.start_ptr(), ret_size, 1, save_file);
+        }
+
+        // cleanup
+        API_CHECK(api->nvEncUnlockBitstream(encoder, output_buf));
+        API_CHECK(api->nvEncUnmapInputResource(encoder, map_params.mappedResource));
+
+        return ret;
+    }
+
+    void change_config(modifiable_config conf) {
+        NV_ENC_RECONFIGURE_PARAMS params = {NV_ENC_RECONFIGURE_PARAMS_VER};
+        init_params->frameRateNum = conf.frame_rate;
+        init_params->encodeConfig->rcParams.averageBitRate = conf.bitrate_mbps * 1e6;
+        params.reInitEncodeParams = *init_params;
+        params.resetEncoder = true;
+        params.forceIDR = true;
+        API_CHECK(api->nvEncReconfigureEncoder(encoder, &params));
+    }
+};
+
+encoder_nvenc::~encoder_nvenc() = default;
+
+encoder_nvenc::pointer encoder_nvenc::create(create_config conf) {
+    auto pimpl = impl::create(conf);
+    if (pimpl == nullptr) return nullptr;
+    auto ret = std::make_unique<encoder_nvenc>();
+    ret->pimpl.reset(pimpl);
+    return ret;
+}
+
+frame_info encoder_nvenc::encode(const image_u8c4 &img, bool force_idr) {
+    return pimpl->encode(img, force_idr);
+}
+
+void encoder_nvenc::change_config(modifiable_config conf) {
+    pimpl->change_config(conf);
+}

+ 48 - 0
src/codec/encoder_nvenc.h

@@ -0,0 +1,48 @@
+#ifndef REMOTEAR3_ENCODER_NVENC_H
+#define REMOTEAR3_ENCODER_NVENC_H
+
+#include "codec_base.hpp"
+#include "core/cuda_helper.hpp"
+#include "core/image_utility.hpp"
+
+#include <opencv2/core/types.hpp>
+
+#include <memory>
+
+class encoder_nvenc {
+public:
+
+    ~encoder_nvenc();
+
+    struct create_config {
+        cv::Size frame_size;
+        int frame_rate;
+        float bitrate_mbps;
+        bool save_file;
+        bool save_length;
+
+        CUcontext *ctx = nullptr;
+        smart_cuda_stream *stream = nullptr;
+    };
+
+    using this_type = encoder_nvenc;
+    using pointer = std::unique_ptr<this_type>;
+
+    static pointer create(create_config conf);
+
+    struct modifiable_config {
+        int frame_rate;
+        float bitrate_mbps;
+    };
+
+    void change_config(modifiable_config conf);
+
+    frame_info encode(const image_u8c4 &img, bool force_idr = false);
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //REMOTEAR3_ENCODER_NVENC_H

+ 93 - 0
src/core/async_queue.hpp

@@ -0,0 +1,93 @@
+#ifndef DEPTHGUIDE_ASYNC_QUEUE_HPP
+#define DEPTHGUIDE_ASYNC_QUEUE_HPP
+
+#include "utility.hpp"
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/post.hpp>
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <thread>
+
+template<typename T>
+class async_queue : public std::enable_shared_from_this<async_queue<T>> {
+    struct _private {
+    };
+public:
+
+    using io_context = boost::asio::io_context;
+
+    struct create_config {
+        io_context *ctx = nullptr; // callback functions will be called here
+        std::thread::id tid; // thread id of the ctx
+        std::function<void(T)> cb_func; // callback function
+    };
+
+    async_queue(create_config conf, _private) {
+        ctx = conf.ctx;
+        tid = conf.tid;
+        cb_func = conf.cb_func;
+        assert(ctx != nullptr);
+    }
+
+    using this_type = async_queue<T>;
+    using pointer = std::shared_ptr<this_type>;
+
+    static pointer create(create_config conf) {
+        return std::make_shared<this_type>(conf, _private{});
+    }
+
+    void clear() {
+        assert(is_ctx_thread());
+        q = {};
+    }
+
+    void push(const T &item, bool refresh_start = false) {
+        using boost::asio::post;
+        if (!is_ctx_thread()) {
+            post(*ctx, [=, p_this = this->shared_from_this()] {
+                p_this->push(item, refresh_start);
+            });
+        } else {
+            if (refresh_start) [[unlikely]] {
+                clear();
+            }
+            q.push(item);
+            if (!is_cb_pending) {
+                post(*ctx, [p_this = this->shared_from_this()] {
+                    p_this->cb_wrapper();
+                    p_this->is_cb_pending = false;
+                });
+                is_cb_pending = true;
+            }
+        }
+    }
+
+private:
+    using q_type = std::queue<T>;
+    q_type q;
+
+    io_context *ctx = nullptr;
+    std::thread::id tid;
+    std::function<void(T)> cb_func;
+
+    bool is_cb_pending = false; // is callback function in the ctx queue
+
+    bool is_ctx_thread() {
+        return std::this_thread::get_id() == tid;
+    }
+
+    // callback wrapper
+    void cb_wrapper() {
+        assert(is_ctx_thread());
+        while (!q.empty()) {
+            auto &item = q.front();
+            cb_func(item);
+            q.pop();
+        }
+    }
+};
+
+#endif //DEPTHGUIDE_ASYNC_QUEUE_HPP

+ 7 - 0
src/core/image_utility.hpp

@@ -6,6 +6,11 @@
 
 #include <opencv2/core/types.hpp>
 
+enum image_pixel_type {
+    PIX_RGBA,
+    PIX_RGB
+};
+
 template<typename T>
 constexpr inline int get_cv_type() {
     // @formatter:off
@@ -38,6 +43,8 @@ struct image_info_type {
     cv::Size size = {};
     size_t pitch = 0;
 
+    void *start_ptr() const { return ptr.get(); }
+
     cv::Mat as_mat() const {
         assert(loc == MEM_HOST);
         return {size, get_cv_type<T>(), ptr.get(), pitch};

+ 5 - 5
src/core/object_manager.h

@@ -110,18 +110,18 @@ using obj_name_type = object_manager::name_type;
 
 static constexpr obj_name_type invalid_obj_name = -1;
 
-extern object_manager main_ob;
+extern object_manager *main_ob;
 
 #define OBJ_QUERY(type, name) \
-    main_ob.query<type>(name)
+    main_ob->query<type>(name)
 
 #define OBJ_TYPE(name) \
-    main_ob.query_type(name)
+    main_ob->query_type(name)
 
 #define OBJ_SAVE(name, val) \
-    main_ob.save(name, val)
+    main_ob->save(name, val)
 
 #define OBJ_SIG(name) \
-    main_ob.query_signal(name)
+    main_ob->query_signal(name)
 
 #endif //DEPTHGUIDE_OBJECT_MANAGER_H

+ 7 - 7
src/device/impl/orb_camera_ui.cpp

@@ -8,11 +8,11 @@
 using boost::asio::io_context;
 using boost::asio::post;
 
-extern io_context main_ctx;
+extern io_context *main_ctx;
 
 orb_camera_ui::impl::impl(create_config conf) {
     cam_c_conf.stream = conf.stream;
-    cam_c_conf.ctx = &main_ctx;
+    cam_c_conf.ctx = main_ctx;
     cam_s_conf.color.name = conf.cf_name;
     cam_s_conf.depth.name = conf.df_name;
 
@@ -78,7 +78,7 @@ void orb_camera_ui::impl::show_config() {
     }
     ImGui::SameLine();
     if (ImGui::Button("R")) {
-        post(main_ctx, [this] { refresh_dev_info_list(); });
+        post(*main_ctx, [this] { refresh_dev_info_list(); });
     }
 
     // select video config
@@ -122,21 +122,21 @@ void orb_camera_ui::impl::show() {
     if (cam == nullptr) {
         auto guard = imgui_disable_guard(dev_info_list.empty());
         if (ImGui::Button("Open")) {
-            post(main_ctx, [this] { open_camera(); });
+            post(*main_ctx, [this] { open_camera(); });
         }
     } else {
         assert(cam != nullptr);
         if (ImGui::Button("Close")) {
-            post(main_ctx, [this] { cam = nullptr; });
+            post(*main_ctx, [this] { cam = nullptr; });
         }
         ImGui::SameLine();
         if (!cam->is_capturing()) {
             if (ImGui::Button("Start")) {
-                post(main_ctx, [this] { start_camera(); });
+                post(*main_ctx, [this] { start_camera(); });
             }
         } else {
             if (ImGui::Button("Stop")) {
-                post(main_ctx, [this] { cam->stop(); });
+                post(*main_ctx, [this] { cam->stop(); });
             }
         }
     }

+ 36 - 5
src/impl/main_impl.cpp

@@ -1,7 +1,9 @@
 #include "main_impl.h"
 #include "device/orb_camera_ui.h"
 #include "core/image_utility.hpp"
+#include "module/image_streamer.h"
 #include "module/image_viewer.h"
+#include "module/viewport_downloader.hpp"
 #include "object_names.h"
 
 #include <boost/asio/io_context.hpp>
@@ -26,8 +28,8 @@ using boost::system::error_code;
 CUcontext cuda_ctx = nullptr;
 GLFWwindow *window = nullptr;
 smart_cuda_stream *default_cuda_stream = nullptr;
-io_context main_ctx;
-object_manager main_ob({&main_ctx});
+io_context *main_ctx;
+object_manager *main_ob;
 
 std::unique_ptr<steady_timer> ui_timer;
 std::chrono::milliseconds ui_interval;
@@ -35,6 +37,8 @@ std::chrono::milliseconds ui_interval;
 // modules
 std::unique_ptr<orb_camera_ui> orb_cam;
 std::unique_ptr<image_viewer> bg_viewer; // background viewer
+std::unique_ptr<viewport_downloader> out_downloader;
+std::unique_ptr<image_streamer> out_streamer; // output streamer
 
 void init_cuda() {
     cuInit(0);
@@ -109,9 +113,12 @@ void init_window() {
 }
 
 void init_om() {
+    main_ctx = new io_context();
+    main_ob = new object_manager({.ctx = main_ctx});
     OBJ_SAVE(img_color, image_u8c3());
     OBJ_SAVE(img_depth, image_f32c1());
     OBJ_SAVE(img_bg, image_u8c3());
+    OBJ_SAVE(img_out, image_u8c4());
 
     OBJ_SIG(img_color)->connect(INT_MIN, [=](obj_name_type _) {
         OBJ_SAVE(img_bg, OBJ_QUERY(image_u8c3, img_color));
@@ -133,6 +140,16 @@ void init_modules() {
     bg_extra_conf.c_name = img_color;
     bg_extra_conf.d_name = img_depth;
     bg_viewer = std::make_unique<image_viewer>(bg_viewer_conf);
+
+    auto out_down_conf = viewport_downloader::create_config{
+            .type = PIX_RGBA, .stream = default_cuda_stream
+    };
+    out_downloader = std::make_unique<viewport_downloader>(out_down_conf);
+
+    auto out_streamer_conf = image_streamer::create_config{
+            .img_name = img_out, .cuda_ctx = &cuda_ctx, .stream = default_cuda_stream
+    };
+    out_streamer = std::make_unique<image_streamer>(out_streamer_conf);
 }
 
 void ui_timer_func(error_code ec) {
@@ -150,7 +167,7 @@ void init_all() {
     init_modules();
 
     ui_interval = std::chrono::milliseconds(33); // TODO: select refresh rate
-    ui_timer = std::make_unique<steady_timer>(main_ctx, ui_interval);
+    ui_timer = std::make_unique<steady_timer>(*main_ctx, ui_interval);
     ui_timer->async_wait(ui_timer_func);
 }
 
@@ -162,7 +179,7 @@ void show_ui() {
 
     if (glfwWindowShouldClose(window)) {
         ui_timer->cancel();
-        main_ctx.stop();
+        main_ctx->stop();
         return;
     }
 
@@ -174,6 +191,11 @@ void show_ui() {
             orb_cam->show();
         }
 
+        if (ImGui::CollapsingHeader("Streamer")) {
+            auto id_guard = imgui_id_guard("streamer");
+            out_streamer->show();
+        }
+
         if (ImGui::CollapsingHeader("Debug")) {
             if (ImGui::TreeNode("Background")) {
                 bg_viewer->show();
@@ -181,7 +203,7 @@ void show_ui() {
             }
             if (ImGui::TreeNode("Memory Pool")) {
                 if (ImGui::Button("Purge")) {
-                    post(main_ctx, [] { global_mp.purge(); });
+                    post(*main_ctx, [] { global_mp.purge(); });
                 }
                 ImGui::TreePop();
             }
@@ -200,6 +222,10 @@ void show_ui() {
 
     bg_viewer->render();
 
+    // TODO: for test
+    auto bg_img = out_downloader->download_rgba();
+    OBJ_SAVE(img_out, bg_img);
+
     ImGui_ImplOpenGL3_RenderDrawData(ImGui::GetDrawData());
     glfwSwapBuffers(window);
 }
@@ -208,4 +234,9 @@ void cleanup() {
     ui_timer = nullptr;
     orb_cam = nullptr;
     bg_viewer = nullptr;
+    out_downloader = nullptr;
+    out_streamer = nullptr;
+
+    delete main_ob;
+    delete main_ctx;
 }

+ 1 - 1
src/impl/main_impl.h

@@ -5,7 +5,7 @@
 
 #include <boost/asio/io_context.hpp>
 
-extern boost::asio::io_context main_ctx;
+extern boost::asio::io_context *main_ctx;
 
 void init_cuda();
 

+ 3 - 0
src/impl/object_names.h

@@ -10,6 +10,9 @@ enum obj_names : object_manager::name_type {
 
     // background image
     img_bg,
+
+    // output image
+    img_out,
 };
 
 #endif //DEPTHGUIDE_OBJECT_NAMES_H

+ 1 - 1
src/main.cpp

@@ -9,7 +9,7 @@ using boost::asio::io_context;
 int main() {
 //    spdlog::set_level(spdlog::level::trace);
     init_all();
-    main_ctx.run();
+    main_ctx->run();
     cleanup();
     return 0;
 }

+ 35 - 0
src/module/image_streamer.h

@@ -0,0 +1,35 @@
+#ifndef DEPTHGUIDE_IMAGE_STREAMER_H
+#define DEPTHGUIDE_IMAGE_STREAMER_H
+
+#include "core/object_manager.h"
+#include "core/cuda_helper.hpp"
+
+#include <memory>
+
+class image_streamer {
+public:
+
+    struct create_config {
+        // image must be valid before start
+        obj_name_type img_name = invalid_obj_name;
+
+        // for encoder
+        CUcontext *cuda_ctx = nullptr;
+        smart_cuda_stream *stream = nullptr;
+    };
+
+    explicit image_streamer(create_config conf);
+
+    ~image_streamer();
+
+    void show();
+
+    using size_change_sig_type = boost::signals2::signal<void(cv::Size)>;
+    size_change_sig_type sig_size_changed;
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+#endif //DEPTHGUIDE_IMAGE_STREAMER_H

+ 257 - 0
src/module/impl/image_streamer.cpp

@@ -0,0 +1,257 @@
+#include "image_streamer_impl.h"
+#include "core/imgui_utility.hpp"
+
+void image_streamer::impl::create_encoder() {
+    switch (chose_encoder_type) {
+        case ENCODER_NVENC: {
+            auto img_info = OBJ_QUERY(image_u8c4, conf.img_name)->as_info();
+
+            auto enc_conf = encoder_nvenc::create_config();
+            enc_conf.frame_size = img_info.size;
+            enc_conf.frame_rate = 30; // TODO: determine in runtime
+            enc_conf.bitrate_mbps = enc_bitrate_mbps;
+            enc_conf.save_file = enc_save_file;
+            enc_conf.save_length = enc_save_length;
+            enc_conf.ctx = conf.cuda_ctx;
+            enc_conf.stream = conf.stream;
+
+            assert(enc_nvenc == nullptr);
+            enc_nvenc = encoder_nvenc::create(enc_conf);
+            assert(enc_nvenc != nullptr);
+            return;
+        }
+        default: {
+            RET_ERROR;
+        }
+    }
+}
+
+void image_streamer::impl::create_sender() {
+    if (enable_aux_thread) {
+        assert(aux_ctx != nullptr);
+        sender_ctx = aux_ctx.get();
+    } else {
+        sender_ctx = main_ctx;
+    }
+
+    assert(sender == nullptr);
+    switch (chose_sender_type) {
+        case SENDER_UDP_FEC: {
+            auto sender_conf = sender_udp_fec::create_config();
+            sender_conf.conn_mtu = sender_mtu;
+            sender_conf.parity_rate = sender_parity_rate;
+            sender_conf.listen_port = sender_listen_port;
+            assert(sender_ctx != nullptr);
+            sender_conf.ctx = sender_ctx;
+            sender_conf.enable_log = sender_enable_log;
+            auto sender_fec = sender_udp_fec::create(sender_conf);
+            sender_fec->sig_size_changed.connect([this](auto size) { // forward signal
+                q_this->sig_size_changed(size);
+            });
+            sender_fec->start();
+            sender = sender_fec;
+            break;
+        }
+        case SENDER_TCP: {
+            RET_ERROR;
+        }
+        default: {
+            RET_ERROR;
+        }
+    }
+    assert(sender != nullptr);
+
+    if (enable_aux_thread) {
+        sender->sig_req_idr.connect([this] {
+            post(*main_ctx, [this] {
+                enc_idr_requested = true;
+            });
+        });
+    } else {
+        sender->sig_req_idr.connect([this] {
+            enc_idr_requested = true;
+        });
+    }
+}
+
+void image_streamer::impl::start() {
+    if (enable_aux_thread) {
+        assert(aux_ctx == nullptr);
+        aux_ctx = std::make_unique<io_context>();
+    }
+
+    create_encoder();
+    create_sender();
+
+    if (enable_aux_thread) {
+        aux_thread = std::make_unique<std::thread>([this] { aux_thread_work(); });
+    }
+
+    img_cb_conn = OBJ_SIG(conf.img_name)->connect(
+            [this](auto name) { image_callback(name); });
+
+    assert(!is_running);
+    is_running = true;
+}
+
+void image_streamer::impl::stop() {
+    img_cb_conn.disconnect();
+
+    if (enable_aux_thread) {
+        aux_ctx->stop();
+        aux_thread->join();
+        aux_thread = nullptr;
+    }
+
+    enc_nvenc = nullptr;
+    sender = nullptr;
+    aux_ctx = nullptr;
+
+    assert(is_running);
+    is_running = false;
+}
+
+void image_streamer::impl::image_callback(obj_name_type name) {
+    assert(name == conf.img_name);
+
+    // early exit
+    if (!enc_save_file &&
+        !sender->is_connected()) // nobody cares the encoded frame
+        return;
+
+    auto frame = encode_image();
+
+    // send frame
+    if (enable_aux_thread) {
+        assert(frame_queue != nullptr);
+        frame_queue->push(frame, frame.idr);
+    } else {
+        sender->send_frame(frame);
+    }
+}
+
+frame_info image_streamer::impl::encode_image() {
+    switch (chose_encoder_type) {
+        case ENCODER_NVENC: {
+            auto img = OBJ_QUERY(image_u8c4, conf.img_name);
+            assert(enc_nvenc != nullptr);
+            auto frame = enc_nvenc->encode(img, enc_idr_requested);
+            enc_idr_requested = false;
+            return frame;
+        }
+        default: {
+            RET_ERROR_E;
+        }
+    }
+}
+
+void image_streamer::impl::aux_thread_work() {
+    auto queue_conf = frame_queue_type::create_config();
+    queue_conf.ctx = aux_ctx.get();
+    queue_conf.tid = std::this_thread::get_id();
+    queue_conf.cb_func = [this](auto frame) { sender->send_frame(frame); };
+    assert(frame_queue == nullptr);
+    frame_queue = frame_queue_type::create(queue_conf);
+
+    auto blocker = boost::asio::make_work_guard(*aux_ctx);
+    aux_ctx->run();
+
+    // clean up
+    frame_queue = nullptr;
+}
+
+void image_streamer::impl::show_config() {
+    auto guard = imgui_disable_guard(is_running);
+
+    ImGui::SeparatorText("Encoder Configs");
+    // chose encoding method
+    if (ImGui::RadioButton("NvEnc", chose_encoder_type == ENCODER_NVENC)) {
+        chose_encoder_type = ENCODER_NVENC;
+        if (chose_sender_type == SENDER_UDP) { // NvEnc cannot be used with UDP
+            chose_sender_type = SENDER_TCP;
+        }
+    }
+    ImGui::SameLine();
+    if (ImGui::RadioButton("nvJPEG", chose_encoder_type == ENCODER_JPEG)) {
+        chose_encoder_type = ENCODER_JPEG;
+    }
+    // encoder-specific configs
+    switch (chose_encoder_type) {
+        case ENCODER_NVENC: {
+            ImGui::DragFloat("Bitrate (Mbps)", &enc_bitrate_mbps, 0.1, 1, 20, "%.01f");
+            break;
+        }
+        case ENCODER_JPEG: {
+            RET_ERROR; // TODO
+//            if (ImGui::DragInt("Quality (%)", &main_nvjpeg_conf.quality, 1, 1, 100)) {
+//                simple_eq.emplace(upload_encoder_config);
+//            }
+//            break;
+        }
+        default: {
+            RET_ERROR;
+        }
+    }
+    // common configs
+    ImGui::Checkbox("Save Video", &enc_save_file);
+    if (enc_save_file) {
+        ImGui::SameLine();
+        ImGui::Checkbox("Save Length", &enc_save_length);
+    }
+
+    ImGui::SeparatorText("Sender Configs");
+    // chose sender method
+    if (ImGui::RadioButton("TCP", chose_sender_type == SENDER_TCP)) {
+        chose_sender_type = SENDER_TCP;
+    }
+    if (chose_encoder_type != ENCODER_NVENC) {
+        ImGui::SameLine();
+        if (ImGui::RadioButton("UDP", chose_sender_type == SENDER_UDP)) {
+            chose_sender_type = SENDER_UDP;
+        }
+    }
+    ImGui::SameLine();
+    if (ImGui::RadioButton("UDP (FEC)", chose_sender_type == SENDER_UDP_FEC)) {
+        chose_sender_type = SENDER_UDP_FEC;
+    }
+    // configs
+    ImGui::InputInt("Listen Port", &sender_listen_port);
+    if (chose_sender_type == SENDER_UDP_FEC) {
+        ImGui::DragFloat("Parity Rate", &sender_parity_rate, 0.01, 0, 2, "%.02f");
+    }
+    if (chose_sender_type) {
+        ImGui::Checkbox("Enable Log", &sender_enable_log);
+    }
+    ImGui::SameLine();
+    ImGui::Checkbox("Auxiliary Thread", &enable_aux_thread);
+}
+
+void image_streamer::impl::show() {
+    ImGui::SeparatorText("Actions");
+    if (!is_running) {
+        if (ImGui::Button("Start")) {
+            post(*main_ctx, [this] { start(); });
+        }
+    } else {
+        if (ImGui::Button("Close")) {
+            post(*main_ctx, [this] { stop(); });
+        }
+        ImGui::SameLine();
+        if (ImGui::Button("Request IDR")) {
+            enc_idr_requested = true;
+        }
+    }
+    show_config();
+}
+
+image_streamer::image_streamer(create_config conf)
+        : pimpl(std::make_unique<impl>()) {
+    pimpl->conf = conf;
+    pimpl->q_this = this;
+}
+
+image_streamer::~image_streamer() = default;
+
+void image_streamer::show() {
+    pimpl->show();
+}

+ 91 - 0
src/module/impl/image_streamer_impl.h

@@ -0,0 +1,91 @@
+#ifndef DEPTHGUIDE_IMAGE_STREAMER_IMPL_H
+#define DEPTHGUIDE_IMAGE_STREAMER_IMPL_H
+
+#include "module/image_streamer.h"
+#include "codec/encoder_nvenc.h"
+#include "core/async_queue.hpp"
+#include "network_v3/sender_tcp.h"
+#include "network_v3/sender_udp_fec.h"
+
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/post.hpp>
+
+using boost::asio::io_context;
+using boost::asio::post;
+
+extern io_context *main_ctx;
+
+struct image_streamer::impl {
+
+    create_config conf;
+    image_streamer *q_this = nullptr;
+    bool is_running = false;
+
+    encoder_type chose_encoder_type = ENCODER_NVENC;
+    std::unique_ptr<encoder_nvenc> enc_nvenc;
+
+    // for all encoder
+    bool enc_save_file = false;
+    bool enc_save_length = false;
+    bool enc_idr_requested = false;
+
+    // for NvEnc
+    float enc_bitrate_mbps = 5.0f; // 5Mbps
+
+    // for NvJpeg
+    // TODO
+
+    sender_type chose_sender_type = SENDER_UDP_FEC;
+    std::shared_ptr<sender_base> sender;
+
+    // for all sender
+    int sender_listen_port = 5279; // make ImGui happy
+    io_context *sender_ctx = nullptr;
+    bool sender_enable_log = false;
+
+    bool enable_aux_thread = true; // run sender in another thread
+    std::unique_ptr<io_context> aux_ctx;
+
+    // for UDP and UDP (FEC)
+    uint16_t sender_mtu = 1500;
+
+    // for UDP (FEC)
+    float sender_parity_rate = 0.2f;
+
+    // for auxiliary thread
+    using frame_queue_type = async_queue<frame_info>;
+    std::shared_ptr<frame_queue_type> frame_queue;
+    std::unique_ptr<std::thread> aux_thread;
+
+    using conn_type = boost::signals2::connection;
+    conn_type img_cb_conn;
+
+    ~impl() {
+        if (is_running) {
+            stop();
+        }
+    }
+
+    void show_config();
+
+    void show();
+
+    void start();
+
+    void stop();
+
+    void create_encoder();
+
+    void create_sender();
+
+    void image_callback(obj_name_type name);
+
+    frame_info encode_image();
+
+    /* sender may run in the auxiliary thread,
+     * because it may do complex processing in CPU */
+    void aux_thread_work();
+
+};
+
+#endif //DEPTHGUIDE_IMAGE_STREAMER_IMPL_H

+ 33 - 0
src/module/viewport_downloader.hpp

@@ -0,0 +1,33 @@
+#ifndef DEPTHGUIDE_VIEWPORT_DOWNLOADER_HPP
+#define DEPTHGUIDE_VIEWPORT_DOWNLOADER_HPP
+
+#include "render/render_utility.h"
+
+class viewport_downloader {
+public:
+
+    struct create_config {
+        image_pixel_type type = PIX_RGBA;
+        smart_cuda_stream *stream = nullptr;
+    };
+
+    explicit viewport_downloader(create_config conf) {
+        type = conf.type;
+        stream = conf.stream;
+    }
+
+    image_u8c4 download_rgba() {
+        assert(type == PIX_RGBA);
+        auto info = image_info_type<uchar4>();
+        info.loc = MEM_CUDA;
+        pbo.download_viewport(&info, GL_BGRA, GL_UNSIGNED_INT_8_8_8_8_REV, stream);
+        return create_image(info);
+    }
+
+private:
+    image_pixel_type type;
+    smart_cuda_stream *stream;
+    smart_pixel_buffer pbo;
+};
+
+#endif //DEPTHGUIDE_VIEWPORT_DOWNLOADER_HPP

+ 0 - 127
src/network_v3/sender_base.cpp

@@ -1,127 +0,0 @@
-#include "sender_base.h"
-#include "core/utility.hpp"
-
-#include <boost/asio/post.hpp>
-
-#include <fmt/chrono.h>
-#include <spdlog/spdlog.h>
-
-#include <deque>
-#include <fstream>
-#include <optional>
-
-using boost::asio::io_context;
-using boost::asio::post;
-
-struct sender_base::impl {
-
-    using frame_list_type = std::deque<frame_info>;
-    frame_list_type frame_list;
-
-    sender_base *q_this = nullptr;
-    io_context *ctx = nullptr;
-    bool waiting_idr = false;
-    bool stopped = false;
-
-    std::ofstream log_file;
-    obj_name_type connect_obj = invalid_obj_name;
-
-    explicit impl(create_config conf) {
-        ctx = conf.ctx;
-        connect_obj = conf.connect_obj;
-
-        // create log file if requested
-        if (conf.enable_log) {
-            auto file_name = fmt::format("log_{:%Y_%m_%d_%H_%M_%S}.csv",
-                                         std::chrono::system_clock::now());
-            log_file.open(file_name);
-        }
-    }
-
-    void clear_frame_list() {
-        frame_list.clear();
-    }
-
-    void request_idr_frame() {
-        clear_frame_list();
-        waiting_idr = true;
-        q_this->sig_req_idr();
-    }
-
-    std::optional<frame_info> retrieve_one_frame() {
-        if (frame_list.empty()) return {};
-        auto frame = frame_list.front();
-        frame_list.pop_front();
-        if (waiting_idr) { // if idr frame is requested, only idr frame will be returned.
-            if (!frame.idr) return {};
-            waiting_idr = false;
-        }
-        return frame;
-    }
-
-    void handle_frames() {
-        for (;;) {
-            // test stop flag
-            if (stopped) {
-                q_this->close_connection();
-                return;
-            }
-
-            auto frame = retrieve_one_frame();
-            if (!frame.has_value()) return;
-            q_this->handle_frame(frame.value());
-        }
-    }
-
-    void push_frame_impl(const frame_info &frame) {
-        assert(!frame.data.empty());
-        if (frame.idr) {
-            frame_list.clear();
-        }
-        frame_list.push_back(frame);
-    }
-
-    void push_frame(const frame_info &frame) {
-        push_frame_impl(frame);
-        post(*ctx, [ptr = q_this->shared_from_this()] {
-            ptr->pimpl->handle_frames();
-        });
-    }
-
-    void log_frame_sent(uint64_t frame_id) {
-        if (!log_file.is_open()) return;
-        log_file << fmt::format("{},{}\n", frame_id, current_timestamp());
-    }
-
-};
-
-sender_base::sender_base(create_config conf)
-        : pimpl(std::make_unique<impl>(conf)) {
-    pimpl->q_this = this;
-}
-
-sender_base::~sender_base() = default;
-
-void sender_base::stop() {
-    pimpl->stopped = true;
-}
-
-void sender_base::send_frame(const frame_info &frame) {
-    pimpl->push_frame(frame);
-}
-
-void sender_base::request_idr_frame() {
-    pimpl->request_idr_frame();
-}
-
-void sender_base::log_frame_sent(uint64_t frame_id) {
-    pimpl->log_frame_sent(frame_id);
-}
-
-void sender_base::notify_connected() {
-    OBJ_SAVE(pimpl->connect_obj, true);
-}
-
-void sender_base::notify_disconnected() {
-    OBJ_SAVE(pimpl->connect_obj, false);
-}

+ 0 - 67
src/network_v3/sender_base.h

@@ -1,67 +0,0 @@
-#ifndef REMOTEAR3_SENDER_BASE_HPP
-#define REMOTEAR3_SENDER_BASE_HPP
-
-#include "core/object_manager.h"
-#include "network/binary_utility.hpp"
-
-#include <boost/asio/io_context.hpp>
-#include <boost/signals2.hpp>
-
-#include <memory>
-
-enum sender_type {
-    SENDER_TCP,
-    SENDER_UDP,
-    SENDER_UDP_FEC
-};
-
-struct frame_info {
-    data_type data;
-    bool idr = false;
-    uint64_t frame_id = 0;
-
-    uint8_t *start_ptr() const { return data.start_ptr(); }
-
-    size_t size() const { return data.size; }
-};
-
-class sender_base : public std::enable_shared_from_this<sender_base> {
-public:
-
-    virtual ~sender_base();
-
-    void send_frame(const frame_info &frame);
-
-    void stop();
-
-    using req_idr_sig_type = boost::signals2::signal<void()>;
-    req_idr_sig_type sig_req_idr;
-
-protected:
-
-    struct create_config {
-        boost::asio::io_context *ctx = nullptr;
-        bool enable_log = false;
-        obj_name_type connect_obj = invalid_obj_name; // bool
-    };
-
-    explicit sender_base(create_config conf);
-
-    void request_idr_frame();
-
-    void log_frame_sent(uint64_t frame_id);
-
-    virtual void handle_frame(const frame_info &frame) = 0;
-
-    virtual void close_connection() = 0;
-
-    void notify_connected();
-
-    void notify_disconnected();
-
-private:
-    struct impl;
-    std::unique_ptr<impl> pimpl;
-};
-
-#endif //REMOTEAR3_SENDER_BASE_HPP

+ 84 - 0
src/network_v3/sender_base.hpp

@@ -0,0 +1,84 @@
+#ifndef REMOTEAR3_SENDER_BASE_HPP
+#define REMOTEAR3_SENDER_BASE_HPP
+
+#include "codec/codec_base.hpp"
+#include "core/utility.hpp"
+
+#include <boost/signals2.hpp>
+
+#include <fmt/chrono.h>
+#include <fmt/format.h>
+
+#include <atomic>
+#include <fstream>
+
+enum sender_type {
+    SENDER_TCP,
+    SENDER_UDP,
+    SENDER_UDP_FEC
+};
+
+class sender_base {
+public:
+
+    void send_frame(const frame_info &frame) {
+        if (waiting_idr) {
+            if (!frame.idr) return;
+            waiting_idr = false;
+        }
+        handle_frame(frame);
+    }
+
+    struct create_config {
+        bool enable_log = false;
+    };
+
+    explicit sender_base(create_config conf) {
+        waiting_idr = true;
+
+        // create log file if requested
+        if (conf.enable_log) {
+            auto file_name = fmt::format("log_{:%Y_%m_%d_%H_%M_%S}.csv",
+                                         std::chrono::system_clock::now());
+            log_file.open(file_name);
+        }
+    }
+
+    virtual ~sender_base() = default;
+
+    using req_idr_sig_type = boost::signals2::signal<void()>;
+    req_idr_sig_type sig_req_idr;
+
+    bool is_connected() const {
+        return connect_flag.test(std::memory_order_relaxed);
+    }
+
+protected:
+
+    void request_idr_frame() {
+        waiting_idr = true;
+        sig_req_idr();
+    }
+
+    void log_frame_sent(uint64_t frame_id) {
+        if (!log_file.is_open()) return;
+        log_file << fmt::format("{},{}\n", frame_id, current_timestamp());
+    }
+
+    virtual void handle_frame(const frame_info &frame) = 0;
+
+    void notify_connected() {
+        connect_flag.test_and_set(std::memory_order_relaxed);
+    }
+
+    void notify_disconnected() {
+        connect_flag.clear(std::memory_order_relaxed);
+    }
+
+private:
+    std::ofstream log_file;
+    bool waiting_idr = false;
+    std::atomic_flag connect_flag = ATOMIC_FLAG_INIT;
+};
+
+#endif //REMOTEAR3_SENDER_BASE_HPP

+ 3 - 8
src/network_v3/sender_tcp.cpp

@@ -126,8 +126,7 @@ struct sender_tcp::impl {
 
     static impl *create(create_config conf) {
         auto ret = new impl;
-        ret->par_conf = sender_base::create_config{
-                .ctx = conf.ctx, .enable_log = conf.enable_log, .connect_obj = conf.connect_obj};
+        ret->par_conf.enable_log = conf.enable_log;
         auto listen_ep = tcp::endpoint{tcp::v4(), conf.listen_port};
 //        ret->hole_punch(listen_ep);
         ret->acceptor = std::make_unique<tcp::acceptor>(*conf.ctx, listen_ep);
@@ -150,12 +149,8 @@ sender_tcp::sender_tcp(sender_tcp::impl *_pimpl)
 
 sender_tcp::~sender_tcp() = default;
 
-std::shared_ptr<sender_tcp> sender_tcp::create(create_config conf) {
-    return std::make_shared<sender_tcp>(impl::create(conf));
-}
-
-void sender_tcp::close_connection() {
-    pimpl->close_connection();
+sender_tcp::this_type *sender_tcp::create(create_config conf, _private) {
+    return new sender_tcp(impl::create(conf));
 }
 
 void sender_tcp::handle_frame(const frame_info &frame) {

+ 24 - 13
src/network_v3/sender_tcp.h

@@ -1,38 +1,49 @@
 #ifndef REMOTEAR3_SENDER_TCP_H
 #define REMOTEAR3_SENDER_TCP_H
 
-#include "sender_base.h"
+#include "sender_base.hpp"
 
-#include <memory>
+#include <boost/asio/io_context.hpp>
 
-class sender_tcp : public sender_base {
-private:
-    struct impl;
-    std::unique_ptr<impl> pimpl;
+#include <memory>
 
+class sender_tcp : public sender_base,
+                   public std::enable_shared_from_this<sender_tcp> {
+    struct _private {
+    };
 public:
 
-    explicit sender_tcp(impl *pimpl);
-
-    ~sender_tcp() override;
+    using io_contxt = boost::asio::io_context;
 
     struct create_config {
         uint16_t listen_port;
+        boost::asio::io_context *ctx;
 
         // for parent
-        boost::asio::io_context *ctx;
         bool enable_log;
-        obj_name_type connect_obj;
     };
 
-    static std::shared_ptr<sender_tcp> create(create_config conf);
+    using this_type = sender_tcp;
+
+    static this_type *create(create_config conf, _private);
+
+    using pointer = std::shared_ptr<sender_tcp>;
+
+    static pointer create(create_config conf) {
+        return std::shared_ptr<this_type>(create(conf, _private{}));
+    }
+
+    ~sender_tcp() override;
 
 protected:
 
     void handle_frame(const frame_info &frame) override;
 
-    void close_connection() override;
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
 
+    explicit sender_tcp(impl *pimpl);
 };
 
 

+ 17 - 13
src/network_v3/sender_udp_fec.cpp

@@ -236,7 +236,9 @@ struct sender_udp_fec::impl {
         in_buf.reserve(max_package_size);
         auto buf = buffer(in_buf.ptr, max_package_size);
         using namespace std::placeholders;
-        socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
+        socket->async_receive_from(buf, request_ep, [ptr = q_this->shared_from_this()](auto ec, auto length) {
+            ptr->pimpl->handle_request(ec, length);
+        });
     }
 
     void handle_request(const error_code &ec, size_t length) {
@@ -322,13 +324,11 @@ struct sender_udp_fec::impl {
 
     static impl *create(create_config conf) {
         auto ret = new impl;
+        ret->par_conf.enable_log = conf.enable_log;
+
         auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
-        ret->par_conf = sender_base::create_config{
-                .ctx = conf.ctx, .enable_log = conf.enable_log, .connect_obj = conf.connect_obj};
         ret->socket = std::make_unique<udp::socket>(*conf.ctx, local_ep);
         ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
-//        ret->hole_punch();
-        ret->async_handle_request();
 
         // constant configs
         ret->parity_rate = conf.parity_rate;
@@ -338,9 +338,13 @@ struct sender_udp_fec::impl {
 
         // initialize reed solomon
         fec_init();
-
         return ret;
     }
+
+    void start() {
+//        hole_punch();
+        async_handle_request();
+    }
 };
 
 sender_udp_fec::sender_udp_fec(impl *_pimpl)
@@ -349,16 +353,16 @@ sender_udp_fec::sender_udp_fec(impl *_pimpl)
     pimpl->q_this = this;
 }
 
-sender_udp_fec::~sender_udp_fec() = default;
-
-std::shared_ptr<sender_udp_fec> sender_udp_fec::create(create_config conf) {
-    return std::make_shared<sender_udp_fec>(impl::create(conf));
+sender_udp_fec *sender_udp_fec::create(create_config conf, _private) {
+    return new sender_udp_fec(impl::create(conf));
 }
 
-void sender_udp_fec::close_connection() {
-    pimpl->close_connection();
-}
+sender_udp_fec::~sender_udp_fec() = default;
 
 void sender_udp_fec::handle_frame(const frame_info &frame) {
     pimpl->send_frame(frame);
+}
+
+void sender_udp_fec::start() {
+    pimpl->start();
 }

+ 26 - 13
src/network_v3/sender_udp_fec.h

@@ -1,45 +1,58 @@
 #ifndef REMOTEAR3_SENDER_UDP_FEC_H
 #define REMOTEAR3_SENDER_UDP_FEC_H
 
-#include "sender_base.h"
+#include "sender_base.hpp"
 
 #include <opencv2/core/types.hpp>
 
-#include <memory>
+#include <boost/asio/io_context.hpp>
 
-class sender_udp_fec : public sender_base {
-private:
-    struct impl;
-    std::unique_ptr<impl> pimpl;
+#include <memory>
 
+class sender_udp_fec : public sender_base,
+                       public std::enable_shared_from_this<sender_udp_fec> {
+    struct _private {
+    };
 public:
 
-    explicit sender_udp_fec(impl *pimpl);
-
-    ~sender_udp_fec() override;
+    using io_contxt = boost::asio::io_context;
 
     struct create_config {
         uint16_t conn_mtu;
         float parity_rate;
         uint16_t listen_port;
+        io_contxt *ctx;
 
         // for parent
-        boost::asio::io_context *ctx;
         bool enable_log;
-        obj_name_type connect_obj;
     };
 
-    static std::shared_ptr<sender_udp_fec> create(create_config conf);
+    using this_type = sender_udp_fec;
+
+    static this_type *create(create_config conf, _private);
+
+    using pointer = std::shared_ptr<sender_udp_fec>;
+
+    static pointer create(create_config conf) {
+        return std::shared_ptr<this_type>(create(conf, _private{}));
+    }
+
+    ~sender_udp_fec() override;
 
     using size_change_sig_type = boost::signals2::signal<void(cv::Size)>;
     size_change_sig_type sig_size_changed;
 
+    void start();
+
 protected:
 
     void handle_frame(const frame_info &frame) override;
 
-    void close_connection() override;
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
 
+    explicit sender_udp_fec(impl *pimpl);
 };
 
 

+ 0 - 0
src/network_v3/third_party/scope_guard.hpp → src/third_party/scope_guard.hpp