Преглед на файлове

Implemented frame sender.

jcsyshc преди 2 години
родител
ревизия
fc535938bc
променени са 9 файла, в които са добавени 1754 реда и са изтрити 30 реда
  1. 12 3
      CMakeLists.txt
  2. 548 0
      src/frame_sender.cpp
  3. 34 0
      src/frame_sender.h
  4. 53 14
      src/main.cpp
  5. 1 0
      src/stereo_camera.hpp
  6. 991 0
      src/third_party/rs.c
  7. 88 0
      src/third_party/rs.h
  8. 22 10
      src/video_encoder.cpp
  9. 5 3
      src/video_encoder.h

+ 12 - 3
CMakeLists.txt

@@ -3,7 +3,10 @@ project(RemoteAR2 LANGUAGES C CXX)
 
 set(CMAKE_CXX_STANDARD 20)
 
-add_executable(RemoteAR2 src/main.cpp src/augment_renderer.cpp)
+add_executable(RemoteAR2 src/main.cpp
+        src/augment_renderer.cpp
+        src/frame_sender.cpp
+        src/third_party/rs.c)
 
 # OpenGL config
 find_package(OpenGL REQUIRED)
@@ -52,7 +55,7 @@ target_sources(${PROJECT_NAME} PRIVATE
 # spdlog config
 find_package(spdlog REQUIRED)
 target_link_libraries(${PROJECT_NAME} spdlog::spdlog)
-target_compile_definitions(${PROJECT_NAME} PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_TRACE)
+#target_compile_definitions(${PROJECT_NAME} PRIVATE SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_TRACE)
 
 # OpenCV config
 find_package(OpenCV REQUIRED COMPONENTS cudaimgproc imgcodecs)
@@ -117,4 +120,10 @@ endif ()
 set(NVCODEC_INCLUDE_DIR ${NVCODEC_DIR}/Interface)
 target_include_directories(${PROJECT_NAME} PRIVATE ${NVCODEC_INCLUDE_DIR})
 target_link_libraries(${PROJECT_NAME} ${NVENC_LIB})
-target_sources(${PROJECT_NAME} PRIVATE src/video_encoder.cpp)
+target_sources(${PROJECT_NAME} PRIVATE src/video_encoder.cpp)
+
+# xxHash config
+set(XXHASH_INCLUDE_DIR /usr/include)
+find_library(XXHASH_LIB xxhash)
+target_include_directories(${PROJECT_NAME} PRIVATE ${XXHASH_INCLUDE_DIR})
+target_link_libraries(${PROJECT_NAME} ${XXHASH_LIB})

+ 548 - 0
src/frame_sender.cpp

@@ -0,0 +1,548 @@
+#include "config.h"
+#include "frame_sender.h"
+#include "third_party/scope_guard.hpp"
+
+extern "C" {
+#include "third_party/rs.h"
+}
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/detached.hpp>
+#include <boost/asio/experimental/awaitable_operators.hpp>
+#include <boost/asio/experimental/concurrent_channel.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/ip/udp.hpp>
+#include <boost/asio/redirect_error.hpp>
+#include <boost/asio/use_awaitable.hpp>
+#include <boost/endian.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/smart_ptr.hpp>
+
+#include <xxhash.h>
+
+#include <spdlog/spdlog.h>
+
+#include <cstdint>
+#include <deque>
+#include <random>
+#include <thread>
+#include <tuple>
+#include <vector>
+
+using namespace boost::asio::experimental::awaitable_operators;
+using namespace boost::asio::ip;
+using namespace boost::posix_time;
+using boost::asio::awaitable;
+using boost::asio::buffer;
+using boost::asio::deadline_timer;
+using boost::asio::detached;
+using boost::asio::experimental::concurrent_channel;
+using boost::asio::io_context;
+using boost::asio::redirect_error;
+using boost::asio::use_awaitable;
+using boost::system::error_code;
+
+#define EXCEPTION_CHECK(api_call) \
+    try { \
+        api_call; \
+    } catch (std::exception &e) { \
+        SPDLOG_ERROR("Procedure {} failed at {}:{} with exception {}.", \
+                     #api_call, __FILE__, __LINE__, e.what()); \
+        return false; \
+    } void(0)
+
+#define CORO_CHECK(api_call) { \
+        bool ok = co_await (api_call); \
+        if (!ok) { \
+            SPDLOG_ERROR("Coroutine {} failed at {}:{}.", \
+                         #api_call, __FILE__, __LINE__); \
+            co_return false; \
+        } \
+    } void(0)
+
+struct frame_sender::impl {
+
+    static constexpr auto buffer_size = 64 * 1024; // 64KB
+    static constexpr auto rtt_probe_count = 30;
+    static constexpr auto max_loss_rate = 0.2; // 20% packet loss rate
+    static constexpr auto frag_header_size = 35;
+    static constexpr auto channel_buffer_size = 16;
+
+    struct frag_header {
+        uint64_t frag_checksum;
+        uint8_t frame_type;
+        uint64_t frame_salt;
+        uint32_t frame_id;
+        uint32_t frame_length;
+        uint32_t block_size;
+        uint16_t block_count;
+        uint16_t frame_decode_count;
+        uint16_t block_id;
+    };
+
+    struct sent_frame_info {
+        uint64_t salt;
+        uint32_t id;
+        ptime time;
+    };
+
+    uint16_t local_port = 5277;
+    udp::endpoint remote_endpoint;
+    uint64_t conn_rtt_us = 50; // connection round trip time (RTT)
+    uint16_t conn_mtu = 1200;
+    double parity_rate = 0.2;
+
+    boost::scoped_ptr<io_context> context;
+    boost::scoped_ptr<udp::socket> socket;
+
+    using chan_type = concurrent_channel<void(error_code, frame_info)>;
+    boost::scoped_ptr<chan_type> chan;
+
+    char *in_data = nullptr, *out_data = nullptr;
+
+    enum status_type {
+        IDLE,
+        CONNECTING,
+        CONNECTED
+    } status = IDLE;
+
+    uint32_t frame_count = 0;
+    std::atomic_flag *idr_flag = nullptr;
+    int frame_rate = default_camera_fps;
+    time_duration frame_timeout, conn_timeout;
+    ptime last_confirm_time;
+    boost::scoped_ptr<deadline_timer> keepalive_timer;
+    std::deque<sent_frame_info> sent_list; // pending confirm list
+
+    std::thread *work_thread = nullptr;
+
+    impl() {
+        in_data = (char *) malloc(buffer_size);
+        out_data = (char *) malloc(buffer_size);
+    }
+
+    ~impl() {
+        stop();
+        free(in_data);
+        free(out_data);
+    }
+
+    static uint64_t generate_salt() {
+        static std::random_device device;
+        static std::default_random_engine engine{device()};
+        static std::uniform_int_distribution<uint64_t> dist;
+        return dist(engine);
+    }
+
+    template<typename T>
+    static char *write_binary_number(char *ptr, T val) {
+        static constexpr auto need_swap =
+                (boost::endian::order::native != boost::endian::order::big);
+        auto real_ptr = (T *) ptr;
+        if constexpr (need_swap) {
+            *real_ptr = boost::endian::endian_reverse(val);
+        } else {
+            *real_ptr = val;
+        }
+        return ptr + sizeof(T);
+    }
+
+    template<typename T>
+    static char *read_binary_number(char *ptr, T *val) {
+        static constexpr auto need_swap =
+                (boost::endian::order::native != boost::endian::order::big);
+        *val = *(T *) ptr;
+        if constexpr (need_swap) {
+            boost::endian::endian_reverse_inplace(*val);
+        }
+        return ptr + sizeof(T);
+    }
+
+    static char *write_frag_header(char *ptr, const frag_header *header) {
+#define WRITE(member) ptr = write_binary_number(ptr, header->member)
+        WRITE(frag_checksum);
+        WRITE(frame_type);
+        WRITE(frame_salt);
+        WRITE(frame_id);
+        WRITE(frame_length);
+        WRITE(block_size);
+        WRITE(block_count);
+        WRITE(frame_decode_count);
+        WRITE(block_id);
+#undef WRITE
+        return ptr;
+    }
+
+    // calculate and fill hash value for out buffer
+    bool calc_out_hash(char *end_ptr) {
+        assert(end_ptr - out_data > sizeof(uint64_t));
+        static auto hash_state = XXH64_createState();
+        auto out_ptr = out_data + sizeof(uint64_t);
+        CALL_CHECK(XXH64_reset(hash_state, 0) != XXH_ERROR);
+        CALL_CHECK(XXH64_update(hash_state, out_ptr, end_ptr - out_ptr) != XXH_ERROR);
+        write_binary_number(out_data, XXH64_digest(hash_state));
+        return true;
+    }
+
+    bool check_rtt_reply(uint64_t salt, uint16_t out_len, uint16_t in_len) {
+        static constexpr auto desired_length =
+                sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint16_t);
+        if (in_len != desired_length) return false;
+
+        // check frag type
+        if (in_data[0] != 'R') return false;
+
+        // check frame salt
+        uint64_t in_salt;
+        auto in_ptr = read_binary_number(in_data + sizeof(uint8_t), &in_salt);
+        if (in_salt != salt) return false;
+
+        // check returned length
+        uint16_t in_frag_len;
+        read_binary_number(in_ptr, &in_frag_len);
+        if (in_frag_len != out_len) return false;
+
+        return true;
+    }
+
+    template<typename T>
+    static T power2(T x) { return x * x; }
+
+    static uint64_t calc_upper_rtt(const std::vector<uint64_t> &v) {
+        auto sum = std::accumulate(v.begin(), v.end(), 0.0);
+        auto mean = sum / (double) v.size();
+        auto sum2 = std::accumulate(v.begin(), v.end(), 0.0,
+                                    [=](double a, uint64_t b) { return a + power2((double) b - mean); });
+        auto std_var = std::sqrt(sum2 / (double) v.size());
+        return (uint64_t) (mean + 5 * std_var);
+    }
+
+    awaitable<bool> probe_rtt() {
+        static const auto max_rtt = seconds(1);
+
+        auto timer = deadline_timer{*context};
+        std::vector<uint64_t> rtt_result;
+        auto in_buf = buffer(in_data, buffer_size);
+
+        for (int k = 0; k < rtt_probe_count; ++k) {
+            auto salt = generate_salt();
+
+            // write probe frag data
+            auto out_ptr = out_data;
+            out_ptr = write_binary_number(out_ptr, (uint64_t) 0); // checksum placeholder
+            out_ptr = write_binary_number(out_ptr, 'T');
+            out_ptr = write_binary_number(out_ptr, salt);
+
+            // fill frag with random data
+            auto limit_ptr = out_data + conn_mtu;
+            auto content_len = 0;
+            while (out_ptr + sizeof(uint64_t) < limit_ptr) {
+                out_ptr = write_binary_number(out_ptr, generate_salt());
+                content_len += sizeof(uint64_t);
+            }
+
+            calc_out_hash(out_ptr);
+            auto out_buf = buffer(out_data, out_ptr - out_data);
+            socket->send_to(out_buf, remote_endpoint);
+
+            // wait for reply or timeout
+            auto start_time = microsec_clock::local_time();
+            timer.expires_from_now(max_rtt);
+            for (;;) {
+                udp::endpoint sender_endpoint;
+                auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) ||
+                                     timer.async_wait(use_awaitable));
+                if (ret.index() == 0) { // received reply
+                    if (sender_endpoint != remote_endpoint) continue;
+                    if (check_rtt_reply(salt, content_len, std::get<0>(ret))) {
+                        auto end_time = microsec_clock::local_time();
+                        auto rtt_us = (end_time - start_time).total_microseconds();
+                        rtt_result.push_back(rtt_us);
+                        SPDLOG_TRACE("RTT probe {}: {}us.", k, rtt_us);
+                        break;
+                    }
+                } else { // timeout
+                    assert(ret.index() == 1);
+                    SPDLOG_TRACE("RTT probe {}: failed.", k);
+                    break;
+                }
+            }
+        }
+
+        if (rtt_result.size() <= (int) (rtt_probe_count * max_loss_rate)) {
+            SPDLOG_WARN("Packet loss rate too high, cannot probe RTT.");
+            co_return false;
+        }
+        conn_rtt_us = calc_upper_rtt(rtt_result);
+        SPDLOG_INFO("Connection MaxRTT: {}us.", conn_rtt_us);
+        co_return true;
+    }
+
+    awaitable<bool> setup_connection() {
+//        socket->connect(remote_endpoint);
+        CORO_CHECK(probe_rtt());
+        // TODO: detect mtu
+        // TODO: detect packet loss rate
+
+        // reset timer
+        frame_timeout = milliseconds(conn_rtt_us / 1000 + 3 * 1000 / frame_rate);
+        conn_timeout = seconds(1); // TODO
+        last_confirm_time = microsec_clock::local_time();
+//        keepalive_timer->expires_at(boost::posix_time::pos_infin);
+        sent_list.clear();
+        idr_flag->test_and_set();
+
+        co_return true;
+    }
+
+    void handle_frame_confirm(size_t msg_len) {
+        static constexpr auto desired_length =
+                sizeof(uint8_t) + sizeof(uint64_t);
+        if (msg_len != desired_length) return;
+
+        // read salt
+        uint64_t frame_salt;
+        read_binary_number(in_data + 1, &frame_salt);
+        static uint64_t last_frame_salt;
+        if (frame_salt == last_frame_salt) return; // already confirmed
+
+        // erase confirmed frame
+        auto iter = sent_list.begin();
+        while (iter != sent_list.end() && iter->salt != frame_salt) ++iter;
+        if (iter == sent_list.end()) return;
+        SPDLOG_TRACE("Frame {} confirmed.", iter->id);
+        sent_list.erase(sent_list.begin(), ++iter);
+
+        // reset timer
+        if (sent_list.empty()) {
+            keepalive_timer->expires_at(pos_infin);
+        } else {
+            keepalive_timer->expires_at(sent_list.begin()->time + frame_timeout);
+        }
+        last_confirm_time = microsec_clock::local_time();
+    }
+
+    void handle_upd_message(size_t msg_len, const udp::endpoint &sender) {
+        assert(status != CONNECTING);
+        if (status == IDLE) {
+            if (msg_len == 1 && in_data[0] == 'R') { // reset connection
+                if (status == CONNECTING) {
+                    SPDLOG_WARN("Only one connection is supported.");
+                    return;
+                }
+                status = CONNECTING;
+                remote_endpoint = sender;
+                SPDLOG_INFO("Reset connection with {}:{}.", sender.address().to_string(), sender.port());
+                co_spawn(*context, setup_connection(), [this](std::exception_ptr e, bool ok) {
+                    assert(!e);
+                    SPDLOG_INFO("Reset connection {}.", ok ? "succeeded" : "failed");
+                    if (ok) {
+                        status = CONNECTED;
+                    } else {
+                        status = IDLE;
+                        remote_endpoint = {};
+                    }
+                });
+            }
+        } else if (status == CONNECTED) {
+            if (sender != remote_endpoint) return;
+            if (msg_len == 1 && in_data[0] == 'E') { // client exit
+                keepalive_timer->expires_at(pos_infin);
+                status = IDLE;
+                SPDLOG_INFO("Client left.");
+            } else if (in_data[0] == 'C') { // confirmation
+                handle_frame_confirm(msg_len);
+            }
+        }
+    }
+
+    void handle_frame(const frame_info &info) {
+        ++frame_count;
+        auto frame_deleter = sg::make_scope_guard([&]() {
+            free(info.data);
+        });
+        if (status != CONNECTED) {
+            SPDLOG_TRACE("Frame {} received, but connection is not ready.");
+            return;
+        }
+
+        // prepare buffer for frame
+        auto block_size = (conn_mtu - frag_header_size) & 0xffffff00; // TODO: support for larger frame
+        auto data_blocks = (info.length + block_size - 1) / block_size;
+        auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate));
+        auto total_blocks = data_blocks + parity_blocks;
+        auto block_data = (uint8_t *) malloc(total_blocks * block_size);
+        auto block_ptr = (uint8_t **) malloc(total_blocks * sizeof(void *));
+        for (int i = 0; i < total_blocks; ++i) {
+            block_ptr[i] = block_data + block_size * i;
+        }
+        auto rs = reed_solomon_new(data_blocks, parity_blocks);
+        assert(rs != nullptr);
+
+        auto closer = sg::make_scope_guard([&]() {
+            free(block_data);
+            free(block_ptr);
+            reed_solomon_release(rs);
+        });
+
+        // calc reed-solomon
+        memcpy(block_data, info.data, info.length);
+        auto ret = reed_solomon_encode2(rs, block_ptr, total_blocks, block_size);
+        assert(ret == 0);
+
+        // send encoded frames
+        frag_header header;
+        header.frame_type = info.is_idr_frame ? 'I' : 'P';
+        header.frame_salt = generate_salt();
+        header.frame_id = frame_count;
+        header.frame_length = info.length;
+        header.block_size = block_size;
+        header.block_count = total_blocks;
+        header.frame_decode_count = data_blocks;
+
+        for (int i = 0; i < total_blocks; ++i) {
+            header.block_id = i;
+            auto out_ptr = write_frag_header(out_data, &header);
+            assert(out_ptr - out_data == frag_header_size);
+            memcpy(out_ptr, block_ptr[i], block_size);
+            out_ptr += block_size;
+            calc_out_hash(out_ptr);
+            auto out_buf = buffer(out_data, out_ptr - out_data);
+            socket->send_to(out_buf, remote_endpoint);
+        }
+        SPDLOG_TRACE("Frame {} is sent with {}+{} blocks.",
+                     header.frame_id, header.block_count, header.block_count - header.frame_decode_count);
+
+        // config frame queue and timeout
+        if (keepalive_timer->expires_at() == pos_infin) {
+            keepalive_timer->expires_from_now(frame_timeout);
+            SPDLOG_TRACE("Timer reset to {}.", to_simple_string(keepalive_timer->expires_at()));
+        }
+        sent_list.push_back({header.frame_salt, header.frame_id,
+                             microsec_clock::local_time()});
+    }
+
+    awaitable<void> main_loop() {
+        auto in_buf = buffer(in_data, buffer_size);
+        for (;;) {
+            if (status == CONNECTING) {
+                auto ret = co_await chan->async_receive(use_awaitable);
+                handle_frame(ret);
+            } else { // IDLE or CONNECTED
+                udp::endpoint sender_endpoint;
+                auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) ||
+                                     chan->async_receive(use_awaitable));
+                if (ret.index() == 0) { // udp message
+                    handle_upd_message(std::get<0>(ret), sender_endpoint);
+                } else { // new frame
+                    assert(ret.index() == 1);
+                    handle_frame(std::get<1>(ret));
+                }
+            }
+        }
+    }
+
+    awaitable<void> keepalive_loop() {
+        for (;;) {
+            error_code ec;
+            co_await keepalive_timer->async_wait(redirect_error(use_awaitable, ec));
+            if (ec == boost::asio::error::operation_aborted) {
+                SPDLOG_TRACE("Timer aborted.");
+                continue;
+            }
+            SPDLOG_WARN("Connection timeout.");
+            keepalive_timer->expires_at(pos_infin);
+            auto now = microsec_clock::local_time();
+            if (now - last_confirm_time > conn_timeout) {
+                status = IDLE;
+                SPDLOG_WARN("Connection closed.");
+            } else {
+                idr_flag->test_and_set();
+            }
+            sent_list.clear();
+        }
+    }
+
+    void start() {
+        // clean channel
+        if (chan != nullptr) {
+            while (chan->try_receive([](error_code e, frame_info &&info) {
+                free(info.data);
+            }));
+        }
+
+        auto error_handler = [](std::exception_ptr ep) {
+            if (!ep) {
+                SPDLOG_ERROR("Infinite loop exited with no error.");
+                return;
+            }
+            try {
+                std::rethrow_exception(ep);
+            } catch (std::exception &e) {
+                SPDLOG_ERROR("Infinite loop exited with error: {}", e.what());
+            }
+        };
+
+        context.reset(new io_context{});
+        chan.reset(new chan_type{*context, channel_buffer_size});
+        keepalive_timer.reset(new deadline_timer{*context});
+        keepalive_timer->expires_at(pos_infin);
+        auto local_endpoint = udp::endpoint{udp::v4(), local_port};
+        socket.reset(new udp::socket{*context, local_endpoint});
+        socket->set_option(udp::socket::send_buffer_size{10 * 1024 * 1024}); // 10MB send buffer
+        assert(socket->is_open());
+        co_spawn(*context, main_loop(), error_handler);
+        co_spawn(*context, keepalive_loop(), error_handler);
+
+        // request idr frame
+        idr_flag->test_and_set();
+
+        assert(work_thread == nullptr);
+        work_thread = new std::thread{[this]() {
+            try {
+                context->run();
+            } catch (std::exception &e) {
+                SPDLOG_ERROR("Frame sender error: {}", e.what());
+            }
+        }};
+    }
+
+    void stop() {
+        if (work_thread == nullptr) return;
+        context->stop();
+        work_thread->join();
+        delete work_thread;
+        work_thread = nullptr;
+    }
+
+};
+
+frame_sender::frame_sender()
+        : pimpl(std::make_unique<impl>()) {
+    fec_init();
+}
+
+frame_sender::~frame_sender() = default;
+
+bool frame_sender::start(uint16_t local_port, std::atomic_flag *idr_flag, int fps) {
+    pimpl->local_port = local_port;
+    pimpl->idr_flag = idr_flag;
+    pimpl->frame_rate = fps;
+    EXCEPTION_CHECK(pimpl->start());
+    return true;
+}
+
+void frame_sender::stop() {
+    pimpl->stop();
+}
+
+bool frame_sender::send_frame(const frame_sender::frame_info &info) {
+    CALL_CHECK(pimpl->chan->try_send(error_code{}, info));
+    return true;
+}
+
+bool frame_sender::is_running() {
+    return pimpl->work_thread != nullptr;
+}

+ 34 - 0
src/frame_sender.h

@@ -0,0 +1,34 @@
+#ifndef REMOTEAR2_FRAME_SENDER_H
+#define REMOTEAR2_FRAME_SENDER_H
+
+#include <atomic>
+#include <memory>
+
+class frame_sender {
+public:
+
+    frame_sender();
+
+    ~frame_sender();
+
+    bool start(uint16_t local_port, std::atomic_flag *idr_flag, int fps);
+
+    void stop();
+
+    struct frame_info {
+        bool is_idr_frame;
+        char *data;
+        size_t length;
+    };
+
+    bool send_frame(const frame_info &info);
+
+    bool is_running();
+
+private:
+    struct impl;
+    std::unique_ptr<impl> pimpl;
+};
+
+
+#endif //REMOTEAR2_FRAME_SENDER_H

+ 53 - 14
src/main.cpp

@@ -1,6 +1,7 @@
 #include "augment_renderer.h"
 #include "config.h"
 #include "frame_buffer_helper.hpp"
+#include "frame_sender.h"
 #include "stereo_camera.hpp"
 #include "texture_renderer.h"
 #include "video_encoder.h"
@@ -75,6 +76,8 @@ int main() {
     // working staffs
     stereo_camera camera;
     texture_renderer tex_renderer;
+    frame_sender sender;
+    std::atomic_flag idr_flag;
 
     frame_buffer_helper output_fbo;
     output_fbo.initialize(output_frame_width, output_frame_height);
@@ -92,6 +95,7 @@ int main() {
     float exposure_time_ms = default_camera_exposure_time_ms;
     float analog_gain = default_camera_analog_gain;
     float output_bitrate_mbps = default_video_stream_bitrate / 1e6f;
+    int sender_port = 5277;
 
     FILE *video_save_file = nullptr;
     auto video_save_file_closer = sg::make_scope_guard([&]() {
@@ -175,18 +179,18 @@ int main() {
                 if (!encoder.is_encoding()) {
                     if (ImGui::Button("Start")) {
                         // create save file
-                        assert(video_save_file == nullptr);
-                        auto file_name = fmt::format("record_{:%Y_%m_%d_%H_%M_%S}.hevc",
-                                                     std::chrono::system_clock::now());
-                        video_save_file = fopen(file_name.c_str(), "wb");
+//                        assert(video_save_file == nullptr); // TODO: move into encoder
+//                        auto file_name = fmt::format("record_{:%Y_%m_%d_%H_%M_%S}.hevc",
+//                                                     std::chrono::system_clock::now());
+//                        video_save_file = fopen(file_name.c_str(), "wb");
                         encoder.start_encode(output_fbo.tex_width, output_fbo.tex_height,
                                              camera_fps, (int) (output_bitrate_mbps * 1e6));
                     }
                 } else {
                     if (ImGui::Button("Close")) {
                         encoder.stop_encode();
-                        fclose(video_save_file);
-                        video_save_file = nullptr;
+//                        fclose(video_save_file);
+//                        video_save_file = nullptr;
                     }
                 }
 
@@ -205,6 +209,34 @@ int main() {
                 ImGui::PopID();
             }
 
+            if (ImGui::CollapsingHeader("Frame Sender")) {
+                ImGui::PushID("Sender");
+
+                ImGui::SeparatorText("Actions");
+                if (!sender.is_running()) {
+                    if (ImGui::Button("Start")) {
+                        sender.start(sender_port, &idr_flag, camera_fps);
+                    }
+                } else {
+                    if (ImGui::Button("Stop")) {
+                        sender.stop();
+                    }
+                }
+
+                ImGui::SeparatorText("Configs");
+                if (sender.is_running()) {
+                    ImGui::BeginDisabled();
+                }
+
+                ImGui::InputInt("Listen Port", &sender_port);
+
+                if (sender.is_running()) {
+                    ImGui::EndDisabled();
+                }
+
+                ImGui::PopID();
+            }
+
         }
         ImGui::End();
         ImGui::Render();
@@ -225,19 +257,26 @@ int main() {
 
             // encode frame
             output_fbo.download_pixels();
-            void *frame_ptr;
-            encoder.encode_frame(output_fbo.pbo_res, &frame_ptr);
-            auto frame_length = *(size_t *) ((char *) frame_ptr + 0);
-            auto frame_data = (char *) frame_ptr + sizeof(size_t);
-
+            frame_sender::frame_info info;
+            info.is_idr_frame = idr_flag.test();
+            if (info.is_idr_frame) {
+                SPDLOG_INFO("IDR frame requested.");
+                encoder.refresh();
+                idr_flag.clear();
+            }
+            encoder.encode_frame(output_fbo.pbo_res, (void **) &info.data, &info.length);
             SPDLOG_TRACE("Time used: {}ms, length = {}", std::chrono::duration_cast<std::chrono::milliseconds>(
-                    std::chrono::high_resolution_clock::now() - start_time).count(), frame_length);
+                    std::chrono::high_resolution_clock::now() - start_time).count(), info.length);
 
             // save encoded frame
-            fwrite(frame_data, frame_length, 1, video_save_file);
+//            fwrite(info.data, info.length, 1, video_save_file);
 
             // cleanup
-            free(frame_ptr);
+            if (sender.is_running()) {
+                sender.send_frame(info);
+            } else {
+                free(info.data);
+            }
         }
 
         int frame_width, frame_height;

+ 1 - 0
src/stereo_camera.hpp

@@ -54,6 +54,7 @@ struct stereo_camera {
             // let thread exit by itself
             should_stop.test_and_set();
             trigger_thread->join();
+            delete trigger_thread;
 
             // cleanup
             should_stop.clear();

+ 991 - 0
src/third_party/rs.c

@@ -0,0 +1,991 @@
+/*#define PROFILE*/
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980624
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ * (C) 2001 Alain Knaff (alain@knaff.lu)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * Reimplement by Jannson (20161018): compatible for golang version of https://github.com/klauspost/reedsolomon
+ */
+
+/*
+ * The following parameter defines how many bits are used for
+ * field elements. The code supports any value from 2 to 16
+ * but fastest operation is achieved with 8 bit elements
+ * This is the only parameter you may want to change.
+ */
+#define GF_BITS  8  /* code over GF(2**GF_BITS) - change to suit */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <assert.h>
+#include "rs.h"
+
+/*
+ * stuff used for testing purposes only
+ */
+
+#ifdef  TEST
+#define DEB(x)
+#define DDB(x) x
+#define DEBUG   0   /* minimal debugging */
+
+#include <sys/time.h>
+#define DIFF_T(a,b) \
+    (1+ 1000000*(a.tv_sec - b.tv_sec) + (a.tv_usec - b.tv_usec) )
+
+#define TICK(t) \
+    {struct timeval x ; \
+    gettimeofday(&x, NULL) ; \
+    t = x.tv_usec + 1000000* (x.tv_sec & 0xff ) ; \
+    }
+#define TOCK(t) \
+    { u_long t1 ; TICK(t1) ; \
+      if (t1 < t) t = 256000000 + t1 - t ; \
+      else t = t1 - t ; \
+      if (t == 0) t = 1 ;}
+
+u_long ticks[10];   /* vars for timekeeping */
+#else
+#define DEB(x)
+#define DDB(x)
+#define TICK(x)
+#define TOCK(x)
+#endif /* TEST */
+
+/*
+ * You should not need to change anything beyond this point.
+ * The first part of the file implements linear algebra in GF.
+ *
+ * gf is the type used to store an element of the Galois Field.
+ * Must constain at least GF_BITS bits.
+ *
+ * Note: unsigned char will work up to GF(256) but int seems to run
+ * faster on the Pentium. We use int whenever have to deal with an
+ * index, since they are generally faster.
+ */
+/*
+ * AK: Udpcast only uses GF_BITS=8. Remove other possibilities
+ */
+#if (GF_BITS != 8)
+#error "GF_BITS must be 8"
+#endif
+typedef unsigned char gf;
+
+#define GF_SIZE ((1 << GF_BITS) - 1)    /* powers of \alpha */
+
+/*
+ * Primitive polynomials - see Lin & Costello, Appendix A,
+ * and  Lee & Messerschmitt, p. 453.
+ */
+static char *allPp[] = {    /* GF_BITS  polynomial      */
+        NULL,           /*  0   no code         */
+        NULL,           /*  1   no code         */
+        "111",          /*  2   1+x+x^2         */
+        "1101",         /*  3   1+x+x^3         */
+        "11001",            /*  4   1+x+x^4         */
+        "101001",           /*  5   1+x^2+x^5       */
+        "1100001",          /*  6   1+x+x^6         */
+        "10010001",         /*  7   1 + x^3 + x^7       */
+        "101110001",        /*  8   1+x^2+x^3+x^4+x^8   */
+        "1000100001",       /*  9   1+x^4+x^9       */
+        "10010000001",      /* 10   1+x^3+x^10      */
+        "101000000001",     /* 11   1+x^2+x^11      */
+        "1100101000001",        /* 12   1+x+x^4+x^6+x^12    */
+        "11011000000001",       /* 13   1+x+x^3+x^4+x^13    */
+        "110000100010001",      /* 14   1+x+x^6+x^10+x^14   */
+        "1100000000000001",     /* 15   1+x+x^15        */
+        "11010000000010001"     /* 16   1+x+x^3+x^12+x^16   */
+};
+
+
+/*
+ * To speed up computations, we have tables for logarithm, exponent
+ * and inverse of a number. If GF_BITS <= 8, we use a table for
+ * multiplication as well (it takes 64K, no big deal even on a PDA,
+ * especially because it can be pre-initialized an put into a ROM!),
+ * otherwhise we use a table of logarithms.
+ * In any case the macro gf_mul(x,y) takes care of multiplications.
+ */
+
+static gf gf_exp[2*GF_SIZE];    /* index->poly form conversion table    */
+static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table    */
+static gf inverse[GF_SIZE+1];   /* inverse of field elem.       */
+/* inv[\alpha**i]=\alpha**(GF_SIZE-i-1) */
+
+/*
+ * modnn(x) computes x % GF_SIZE, where GF_SIZE is 2**GF_BITS - 1,
+ * without a slow divide.
+ */
+static inline gf
+modnn(int x)
+{
+    while (x >= GF_SIZE) {
+        x -= GF_SIZE;
+        x = (x >> GF_BITS) + (x & GF_SIZE);
+    }
+    return x;
+}
+
+#define SWAP(a,b,t) {t tmp; tmp=a; a=b; b=tmp;}
+
+/*
+ * gf_mul(x,y) multiplies two numbers. If GF_BITS<=8, it is much
+ * faster to use a multiplication table.
+ *
+ * USE_GF_MULC, GF_MULC0(c) and GF_ADDMULC(x) can be used when multiplying
+ * many numbers by the same constant. In this case the first
+ * call sets the constant, and others perform the multiplications.
+ * A value related to the multiplication is held in a local variable
+ * declared with USE_GF_MULC . See usage in addmul1().
+ */
+static gf gf_mul_table[(GF_SIZE + 1)*(GF_SIZE + 1)]
+#ifdef WINDOWS
+        __attribute__((aligned (16)))
+#else
+        __attribute__((aligned (256)))
+#endif
+;
+
+#define gf_mul(x,y) gf_mul_table[(x<<8)+y]
+
+#define USE_GF_MULC register gf * __gf_mulc_
+#define GF_MULC0(c) __gf_mulc_ = &gf_mul_table[(c)<<8]
+#define GF_ADDMULC(dst, x) dst ^= __gf_mulc_[x]
+#define GF_MULC(dst, x) dst = __gf_mulc_[x]
+
+static void
+init_mul_table(void)
+{
+    int i, j;
+    for (i=0; i< GF_SIZE+1; i++)
+        for (j=0; j< GF_SIZE+1; j++)
+            gf_mul_table[(i<<8)+j] = gf_exp[modnn(gf_log[i] + gf_log[j]) ] ;
+
+    for (j=0; j< GF_SIZE+1; j++)
+        gf_mul_table[j] = gf_mul_table[j<<8] = 0;
+}
+
+/*
+ * Generate GF(2**m) from the irreducible polynomial p(X) in p[0]..p[m]
+ * Lookup tables:
+ *     index->polynomial form       gf_exp[] contains j= \alpha^i;
+ *     polynomial form -> index form    gf_log[ j = \alpha^i ] = i
+ * \alpha=x is the primitive element of GF(2^m)
+ *
+ * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple
+ * multiplication of two numbers can be resolved without calling modnn
+ */
+
+
+
+/*
+ * initialize the data structures used for computations in GF.
+ */
+static void
+generate_gf(void)
+{
+    int i;
+    gf mask;
+    char *Pp =  allPp[GF_BITS] ;
+
+    mask = 1;   /* x ** 0 = 1 */
+    gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */
+    /*
+     * first, generate the (polynomial representation of) powers of \alpha,
+     * which are stored in gf_exp[i] = \alpha ** i .
+     * At the same time build gf_log[gf_exp[i]] = i .
+     * The first GF_BITS powers are simply bits shifted to the left.
+     */
+    for (i = 0; i < GF_BITS; i++, mask <<= 1 ) {
+        gf_exp[i] = mask;
+        gf_log[gf_exp[i]] = i;
+        /*
+         * If Pp[i] == 1 then \alpha ** i occurs in poly-repr
+         * gf_exp[GF_BITS] = \alpha ** GF_BITS
+         */
+        if ( Pp[i] == '1' )
+            gf_exp[GF_BITS] ^= mask;
+    }
+    /*
+     * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can als
+     * compute its inverse.
+     */
+    gf_log[gf_exp[GF_BITS]] = GF_BITS;
+    /*
+     * Poly-repr of \alpha ** (i+1) is given by poly-repr of
+     * \alpha ** i shifted left one-bit and accounting for any
+     * \alpha ** GF_BITS term that may occur when poly-repr of
+     * \alpha ** i is shifted.
+     */
+    mask = 1 << (GF_BITS - 1 ) ;
+    for (i = GF_BITS + 1; i < GF_SIZE; i++) {
+        if (gf_exp[i - 1] >= mask)
+            gf_exp[i] = gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1);
+        else
+            gf_exp[i] = gf_exp[i - 1] << 1;
+        gf_log[gf_exp[i]] = i;
+    }
+    /*
+     * log(0) is not defined, so use a special value
+     */
+    gf_log[0] = GF_SIZE ;
+    /* set the extended gf_exp values for fast multiply */
+    for (i = 0 ; i < GF_SIZE ; i++)
+        gf_exp[i + GF_SIZE] = gf_exp[i] ;
+
+    /*
+     * again special cases. 0 has no inverse. This used to
+     * be initialized to GF_SIZE, but it should make no difference
+     * since noone is supposed to read from here.
+     */
+    inverse[0] = 0 ;
+    inverse[1] = 1;
+    for (i=2; i<=GF_SIZE; i++)
+        inverse[i] = gf_exp[GF_SIZE-gf_log[i]];
+}
+
+/*
+ * Various linear algebra operations that i use often.
+ */
+
+/*
+ * addmul() computes dst[] = dst[] + c * src[]
+ * This is used often, so better optimize it! Currently the loop is
+ * unrolled 16 times, a good value for 486 and pentium-class machines.
+ * The case c=0 is also optimized, whereas c=1 is not. These
+ * calls are unfrequent in my typical apps so I did not bother.
+ *
+ * Note that gcc on
+ */
+#if 0
+#define addmul(dst, src, c, sz) \
+    if (c != 0) addmul1(dst, src, c, sz)
+#endif
+
+
+
+#define UNROLL 16 /* 1, 4, 8, 16 */
+static void
+slow_addmul1(gf *dst1, gf *src1, gf c, int sz)
+{
+    USE_GF_MULC ;
+    register gf *dst = dst1, *src = src1 ;
+    gf *lim = &dst[sz - UNROLL + 1] ;
+
+    GF_MULC0(c) ;
+
+#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */
+    for (; dst < lim ; dst += UNROLL, src += UNROLL ) {
+        GF_ADDMULC( dst[0] , src[0] );
+        GF_ADDMULC( dst[1] , src[1] );
+        GF_ADDMULC( dst[2] , src[2] );
+        GF_ADDMULC( dst[3] , src[3] );
+#if (UNROLL > 4)
+        GF_ADDMULC( dst[4] , src[4] );
+        GF_ADDMULC( dst[5] , src[5] );
+        GF_ADDMULC( dst[6] , src[6] );
+        GF_ADDMULC( dst[7] , src[7] );
+#endif
+#if (UNROLL > 8)
+        GF_ADDMULC( dst[8] , src[8] );
+        GF_ADDMULC( dst[9] , src[9] );
+        GF_ADDMULC( dst[10] , src[10] );
+        GF_ADDMULC( dst[11] , src[11] );
+        GF_ADDMULC( dst[12] , src[12] );
+        GF_ADDMULC( dst[13] , src[13] );
+        GF_ADDMULC( dst[14] , src[14] );
+        GF_ADDMULC( dst[15] , src[15] );
+#endif
+    }
+#endif
+    lim += UNROLL - 1 ;
+    for (; dst < lim; dst++, src++ )        /* final components */
+        GF_ADDMULC( *dst , *src );
+}
+
+# define addmul1 slow_addmul1
+
+static void addmul(gf *dst, gf *src, gf c, int sz) {
+    // fprintf(stderr, "Dst=%p Src=%p, gf=%02x sz=%d\n", dst, src, c, sz);
+    if (c != 0) addmul1(dst, src, c, sz);
+}
+
+/*
+ * mul() computes dst[] = c * src[]
+ * This is used often, so better optimize it! Currently the loop is
+ * unrolled 16 times, a good value for 486 and pentium-class machines.
+ * The case c=0 is also optimized, whereas c=1 is not. These
+ * calls are unfrequent in my typical apps so I did not bother.
+ *
+ * Note that gcc on
+ */
+#if 0
+#define mul(dst, src, c, sz) \
+    do { if (c != 0) mul1(dst, src, c, sz); else memset(dst, 0, c); } while(0)
+#endif
+
+#define UNROLL 16 /* 1, 4, 8, 16 */
+static void
+slow_mul1(gf *dst1, gf *src1, gf c, int sz)
+{
+    USE_GF_MULC ;
+    register gf *dst = dst1, *src = src1 ;
+    gf *lim = &dst[sz - UNROLL + 1] ;
+
+    GF_MULC0(c) ;
+
+#if (UNROLL > 1) /* unrolling by 8/16 is quite effective on the pentium */
+    for (; dst < lim ; dst += UNROLL, src += UNROLL ) {
+        GF_MULC( dst[0] , src[0] );
+        GF_MULC( dst[1] , src[1] );
+        GF_MULC( dst[2] , src[2] );
+        GF_MULC( dst[3] , src[3] );
+#if (UNROLL > 4)
+        GF_MULC( dst[4] , src[4] );
+        GF_MULC( dst[5] , src[5] );
+        GF_MULC( dst[6] , src[6] );
+        GF_MULC( dst[7] , src[7] );
+#endif
+#if (UNROLL > 8)
+        GF_MULC( dst[8] , src[8] );
+        GF_MULC( dst[9] , src[9] );
+        GF_MULC( dst[10] , src[10] );
+        GF_MULC( dst[11] , src[11] );
+        GF_MULC( dst[12] , src[12] );
+        GF_MULC( dst[13] , src[13] );
+        GF_MULC( dst[14] , src[14] );
+        GF_MULC( dst[15] , src[15] );
+#endif
+    }
+#endif
+    lim += UNROLL - 1 ;
+    for (; dst < lim; dst++, src++ )        /* final components */
+        GF_MULC( *dst , *src );
+}
+
+# define mul1 slow_mul1
+
+static inline void mul(gf *dst, gf *src, gf c, int sz) {
+    /*fprintf(stderr, "%p = %02x * %p\n", dst, c, src);*/
+    if (c != 0) mul1(dst, src, c, sz); else memset(dst, 0, c);
+}
+
+/*
+ * invert_mat() takes a matrix and produces its inverse
+ * k is the size of the matrix.
+ * (Gauss-Jordan, adapted from Numerical Recipes in C)
+ * Return non-zero if singular.
+ */
+DEB( int pivloops=0; int pivswaps=0 ; /* diagnostic */)
+static int
+invert_mat(gf *src, int k)
+{
+    gf c, *p ;
+    int irow, icol, row, col, i, ix ;
+
+    int error = 1 ;
+    int indxc[k];
+    int indxr[k];
+    int ipiv[k];
+    gf id_row[k];
+
+    memset(id_row, 0, k*sizeof(gf));
+    DEB( pivloops=0; pivswaps=0 ; /* diagnostic */ )
+    /*
+     * ipiv marks elements already used as pivots.
+     */
+    for (i = 0; i < k ; i++)
+        ipiv[i] = 0 ;
+
+    for (col = 0; col < k ; col++) {
+        gf *pivot_row ;
+        /*
+         * Zeroing column 'col', look for a non-zero element.
+         * First try on the diagonal, if it fails, look elsewhere.
+         */
+        irow = icol = -1 ;
+        if (ipiv[col] != 1 && src[col*k + col] != 0) {
+            irow = col ;
+            icol = col ;
+            goto found_piv ;
+        }
+        for (row = 0 ; row < k ; row++) {
+            if (ipiv[row] != 1) {
+                for (ix = 0 ; ix < k ; ix++) {
+                    DEB( pivloops++ ; )
+                    if (ipiv[ix] == 0) {
+                        if (src[row*k + ix] != 0) {
+                            irow = row ;
+                            icol = ix ;
+                            goto found_piv ;
+                        }
+                    } else if (ipiv[ix] > 1) {
+                        fprintf(stderr, "singular matrix\n");
+                        goto fail ;
+                    }
+                }
+            }
+        }
+        if (icol == -1) {
+            fprintf(stderr, "XXX pivot not found!\n");
+            goto fail ;
+        }
+        found_piv:
+        ++(ipiv[icol]) ;
+        /*
+         * swap rows irow and icol, so afterwards the diagonal
+         * element will be correct. Rarely done, not worth
+         * optimizing.
+         */
+        if (irow != icol) {
+            for (ix = 0 ; ix < k ; ix++ ) {
+                SWAP( src[irow*k + ix], src[icol*k + ix], gf) ;
+            }
+        }
+        indxr[col] = irow ;
+        indxc[col] = icol ;
+        pivot_row = &src[icol*k] ;
+        c = pivot_row[icol] ;
+        if (c == 0) {
+            fprintf(stderr, "singular matrix 2\n");
+            goto fail ;
+        }
+        if (c != 1 ) { /* otherwhise this is a NOP */
+            /*
+             * this is done often , but optimizing is not so
+             * fruitful, at least in the obvious ways (unrolling)
+             */
+            DEB( pivswaps++ ; )
+            c = inverse[ c ] ;
+            pivot_row[icol] = 1 ;
+            for (ix = 0 ; ix < k ; ix++ )
+                pivot_row[ix] = gf_mul(c, pivot_row[ix] );
+        }
+        /*
+         * from all rows, remove multiples of the selected row
+         * to zero the relevant entry (in fact, the entry is not zero
+         * because we know it must be zero).
+         * (Here, if we know that the pivot_row is the identity,
+         * we can optimize the addmul).
+         */
+        id_row[icol] = 1;
+        if (memcmp(pivot_row, id_row, k*sizeof(gf)) != 0) {
+            for (p = src, ix = 0 ; ix < k ; ix++, p += k ) {
+                if (ix != icol) {
+                    c = p[icol] ;
+                    p[icol] = 0 ;
+                    addmul(p, pivot_row, c, k );
+                }
+            }
+        }
+        id_row[icol] = 0;
+    } /* done all columns */
+    for (col = k-1 ; col >= 0 ; col-- ) {
+        if (indxr[col] <0 || indxr[col] >= k)
+            fprintf(stderr, "AARGH, indxr[col] %d\n", indxr[col]);
+        else if (indxc[col] <0 || indxc[col] >= k)
+            fprintf(stderr, "AARGH, indxc[col] %d\n", indxc[col]);
+        else
+        if (indxr[col] != indxc[col] ) {
+            for (row = 0 ; row < k ; row++ ) {
+                SWAP( src[row*k + indxr[col]], src[row*k + indxc[col]], gf) ;
+            }
+        }
+    }
+    error = 0 ;
+    fail:
+    return error ;
+}
+
+static int fec_initialized = 0 ;
+
+void fec_init(void)
+{
+    TICK(ticks[0]);
+    generate_gf();
+    TOCK(ticks[0]);
+    DDB(fprintf(stderr, "generate_gf took %ldus\n", ticks[0]);)
+    TICK(ticks[0]);
+    init_mul_table();
+    TOCK(ticks[0]);
+    DDB(fprintf(stderr, "init_mul_table took %ldus\n", ticks[0]);)
+    fec_initialized = 1 ;
+}
+
+
+#ifdef PROFILE
+#ifdef __x86_64__
+static long long rdtsc(void)
+{
+    unsigned long low, hi;
+    asm volatile ("rdtsc" : "=d" (hi), "=a" (low));
+    return ( (((long long)hi) << 32) | ((long long) low));
+}
+#elif __arm__
+static long long rdtsc(void)
+{
+    u64 val;
+    asm volatile("mrs %0, cntvct_el0" : "=r" (val));
+    return val;
+}
+#endif
+
+void print_matrix1(gf* matrix, int nrows, int ncols) {
+    int i, j;
+    printf("matrix (%d,%d):\n", nrows, ncols);
+    for(i = 0; i < nrows; i++) {
+        for(j = 0; j < ncols; j++) {
+            printf("%6d ", matrix[i*ncols + j]);
+        }
+        printf("\n");
+    }
+}
+
+void print_matrix2(gf** matrix, int nrows, int ncols) {
+    int i, j;
+    printf("matrix (%d,%d):\n", nrows, ncols);
+    for(i = 0; i < nrows; i++) {
+        for(j = 0; j < ncols; j++) {
+            printf("%6d ", matrix[i][j]);
+        }
+        printf("\n");
+    }
+}
+
+#endif
+
+/* y = a**n */
+static gf galExp(gf a, gf n) {
+    int logA;
+    int logResult;
+    if(0 == n) {
+        return 1;
+    }
+    if(0 == a) {
+        return 0;
+    }
+    logA = gf_log[a];
+    logResult = logA * n;
+    while(logResult >= 255) {
+        logResult -= 255;
+    }
+
+    return gf_exp[logResult];
+}
+
+static inline gf galMultiply(gf a, gf b) {
+    return gf_mul_table[ ((int)a << 8) + (int)b ];
+}
+
+static gf* vandermonde(int nrows, int ncols) {
+    int row, col, ptr;
+    gf* matrix = (gf*)RS_MALLOC(nrows * ncols);
+    if(NULL != matrix) {
+        ptr = 0;
+        for(row = 0; row < nrows; row++) {
+            for(col = 0; col < ncols; col++) {
+                matrix[ptr++] = galExp((gf)row, (gf)col);
+            }
+        }
+    }
+
+    return matrix;
+}
+
+/*
+ * Not check for input params
+ * */
+static gf* sub_matrix(gf* matrix, int rmin, int cmin, int rmax, int cmax,  int nrows, int ncols) {
+    int i, j, ptr = 0;
+    gf* new_m = (gf*)RS_MALLOC( (rmax-rmin) * (cmax-cmin) );
+    if(NULL != new_m) {
+        for(i = rmin; i < rmax; i++) {
+            for(j = cmin; j < cmax; j++) {
+                new_m[ptr++] = matrix[i*ncols + j];
+            }
+        }
+    }
+
+    return new_m;
+}
+
+/* y = a.dot(b) */
+static gf* multiply1(gf *a, int ar, int ac, gf *b, int br, int bc) {
+    gf *new_m, tg;
+    int r, c, i, ptr = 0;
+
+    assert(ac == br);
+    new_m = (gf*)RS_CALLOC(1, ar*bc);
+    if(NULL != new_m) {
+
+        /* this multiply is slow */
+        for(r = 0; r < ar; r++) {
+            for(c = 0; c < bc; c++) {
+                tg = 0;
+                for(i = 0; i < ac; i++) {
+                    /* tg ^= gf_mul_table[ ((int)a[r*ac+i] << 8) + (int)b[i*bc+c] ]; */
+                    tg ^= galMultiply(a[r*ac+i], b[i*bc+c]);
+                }
+
+                new_m[ptr++] = tg;
+            }
+        }
+
+    }
+
+    return new_m;
+}
+
+/* copy from golang rs version */
+static inline int code_some_shards(gf* matrixRows, gf** inputs, gf** outputs,
+                                   int dataShards, int outputCount, int byteCount) {
+    gf* in;
+    int iRow, c;
+    for(c = 0; c < dataShards; c++) {
+        in = inputs[c];
+        for(iRow = 0; iRow < outputCount; iRow++) {
+            if(0 == c) {
+                mul(outputs[iRow], in, matrixRows[iRow*dataShards+c], byteCount);
+            } else {
+                addmul(outputs[iRow], in, matrixRows[iRow*dataShards+c], byteCount);
+            }
+        }
+    }
+
+    return 0;
+}
+
+reed_solomon* reed_solomon_new(int data_shards, int parity_shards) {
+    gf* vm = NULL;
+    gf* top = NULL;
+    int err = 0;
+    reed_solomon* rs = NULL;
+
+    /* MUST use fec_init once time first */
+    assert(fec_initialized);
+
+    do {
+        rs = (reed_solomon*) RS_MALLOC(sizeof(reed_solomon));
+        if(NULL == rs) {
+            return NULL;
+        }
+        rs->data_shards = data_shards;
+        rs->parity_shards = parity_shards;
+        rs->shards = (data_shards + parity_shards);
+        rs->m = NULL;
+        rs->parity = NULL;
+
+        if(rs->shards > DATA_SHARDS_MAX || data_shards <= 0 || parity_shards <= 0) {
+            err = 1;
+            break;
+        }
+
+        vm = vandermonde(rs->shards, rs->data_shards);
+        if(NULL == vm) {
+            err = 2;
+            break;
+        }
+
+        top = sub_matrix(vm, 0, 0, data_shards, data_shards, rs->shards, data_shards);
+        if(NULL == top) {
+            err = 3;
+            break;
+        }
+
+        err = invert_mat(top, data_shards);
+        assert(0 == err);
+
+        rs->m = multiply1(vm, rs->shards, data_shards, top, data_shards, data_shards);
+        if(NULL == rs->m) {
+            err = 4;
+            break;
+        }
+
+        rs->parity = sub_matrix(rs->m, data_shards, 0, rs->shards, data_shards, rs->shards, data_shards);
+        if(NULL == rs->parity) {
+            err = 5;
+            break;
+        }
+
+        RS_FREE(vm);
+        RS_FREE(top);
+        vm = NULL;
+        top = NULL;
+        return rs;
+
+    } while(0);
+
+    fprintf(stderr, "err=%d\n", err);
+    if(NULL != vm) {
+        RS_FREE(vm);
+    }
+    if(NULL != top) {
+        RS_FREE(top);
+    }
+    if(NULL != rs) {
+        if(NULL != rs->m) {
+            RS_FREE(rs->m);
+        }
+        if(NULL != rs->parity) {
+            RS_FREE(rs->parity);
+        }
+        RS_FREE(rs);
+    }
+
+    return NULL;
+}
+
+void reed_solomon_release(reed_solomon* rs) {
+    if(NULL != rs) {
+        if(NULL != rs->m) {
+            RS_FREE(rs->m);
+        }
+        if(NULL != rs->parity) {
+            RS_FREE(rs->parity);
+        }
+        RS_FREE(rs);
+    }
+}
+
+/**
+ * encode one shard
+ * input:
+ * rs
+ * data_blocks[rs->data_shards][block_size]
+ * fec_blocks[rs->data_shards][block_size]
+ * */
+int reed_solomon_encode(reed_solomon* rs,
+                        unsigned char** data_blocks,
+                        unsigned char** fec_blocks,
+                        int block_size) {
+    assert(NULL != rs && NULL != rs->parity);
+
+    return code_some_shards(rs->parity, data_blocks, fec_blocks
+            , rs->data_shards, rs->parity_shards, block_size);
+}
+
+/**
+ * decode one shard
+ * input:
+ * rs
+ * original data_blocks[rs->data_shards][block_size]
+ * dec_fec_blocks[nr_fec_blocks][block_size]
+ * fec_block_nos: fec pos number in original fec_blocks
+ * erased_blocks: erased blocks in original data_blocks
+ * nr_fec_blocks: the number of erased blocks
+ * */
+int reed_solomon_decode(reed_solomon* rs,
+                        unsigned char **data_blocks,
+                        int block_size,
+                        unsigned char **dec_fec_blocks,
+                        unsigned int *fec_block_nos,
+                        unsigned int *erased_blocks,
+                        int nr_fec_blocks) {
+    /* use stack instead of malloc, define a small number of DATA_SHARDS_MAX to save memory */
+    gf dataDecodeMatrix[DATA_SHARDS_MAX*DATA_SHARDS_MAX];
+    unsigned char* subShards[DATA_SHARDS_MAX];
+    unsigned char* outputs[DATA_SHARDS_MAX];
+    gf* m = rs->m;
+    int i, j, c, swap, subMatrixRow, dataShards, nos, nshards;
+
+    /* the erased_blocks should always sorted
+     * if sorted, nr_fec_blocks times to check it
+     * if not, sort it here
+     * */
+    for(i = 0; i < nr_fec_blocks; i++) {
+        swap = 0;
+        for(j = i+1; j < nr_fec_blocks; j++) {
+            if(erased_blocks[i] > erased_blocks[j]) {
+                /* the prefix is bigger than the following, swap */
+                c = erased_blocks[i];
+                erased_blocks[i] = erased_blocks[j];
+                erased_blocks[j] = c;
+
+                swap = 1;
+            }
+        }
+        //printf("swap:%d\n", swap);
+        if(!swap) {
+            //already sorted or sorted ok
+            break;
+        }
+    }
+
+    j = 0;
+    subMatrixRow = 0;
+    nos = 0;
+    nshards = 0;
+    dataShards = rs->data_shards;
+    for(i = 0; i < dataShards; i++) {
+        if(j < nr_fec_blocks && i == erased_blocks[j]) {
+            //ignore the invalid block
+            j++;
+        } else {
+            /* this row is ok */
+            for(c = 0; c < dataShards; c++) {
+                dataDecodeMatrix[subMatrixRow*dataShards + c] = m[i*dataShards + c];
+            }
+            subShards[subMatrixRow] = data_blocks[i];
+            subMatrixRow++;
+        }
+    }
+
+    for(i = 0; i < nr_fec_blocks && subMatrixRow < dataShards; i++) {
+        subShards[subMatrixRow] = dec_fec_blocks[i];
+        j = dataShards + fec_block_nos[i];
+        for(c = 0; c < dataShards; c++) {
+            dataDecodeMatrix[subMatrixRow*dataShards + c] = m[j*dataShards + c]; //use spefic pos of original fec_blocks
+        }
+        subMatrixRow++;
+    }
+
+    if(subMatrixRow < dataShards) {
+        //cannot correct
+        return -1;
+    }
+
+    invert_mat(dataDecodeMatrix, dataShards);
+    //printf("invert:\n");
+    //print_matrix1(dataDecodeMatrix, dataShards, dataShards);
+    //printf("nShards:\n");
+    //print_matrix2(subShards, dataShards, block_size);
+
+    for(i = 0; i < nr_fec_blocks; i++) {
+        j = erased_blocks[i];
+        outputs[i] = data_blocks[j];
+        //data_blocks[j][0] = 0;
+        memmove(dataDecodeMatrix+i*dataShards, dataDecodeMatrix+j*dataShards, dataShards);
+    }
+    //printf("subMatrixRow:\n");
+    //print_matrix1(dataDecodeMatrix, nr_fec_blocks, dataShards);
+
+    //printf("outputs:\n");
+    //print_matrix2(outputs, nr_fec_blocks, block_size);
+
+    return code_some_shards(dataDecodeMatrix, subShards, outputs,
+                            dataShards, nr_fec_blocks, block_size);
+}
+
+/**
+ * encode a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->shards)
+ * shards[nr_shards][block_size]
+ * */
+int reed_solomon_encode2(reed_solomon* rs, unsigned char** shards, int nr_shards, int block_size) {
+    unsigned char** data_blocks;
+    unsigned char** fec_blocks;
+    int i, ds = rs->data_shards, ps = rs->parity_shards, ss = rs->shards;
+    i = nr_shards / ss;
+    data_blocks = shards;
+    fec_blocks = &shards[(i*ds)];
+
+    for(i = 0; i < nr_shards; i += ss) {
+        reed_solomon_encode(rs, data_blocks, fec_blocks, block_size);
+        data_blocks += ds;
+        fec_blocks += ps;
+    }
+    return 0;
+}
+
+/**
+ * reconstruct a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * marks[nr_shards] marks as errors
+ * */
+int reed_solomon_reconstruct(reed_solomon* rs,
+                             unsigned char** shards,
+                             unsigned char* marks,
+                             int nr_shards,
+                             int block_size) {
+    unsigned char *dec_fec_blocks[DATA_SHARDS_MAX];
+    unsigned int fec_block_nos[DATA_SHARDS_MAX];
+    unsigned int erased_blocks[DATA_SHARDS_MAX];
+    unsigned char* fec_marks;
+    unsigned char **data_blocks, **fec_blocks;
+    int i, j, dn, pn, n;
+    int ds = rs->data_shards;
+    int ps = rs->parity_shards;
+    int err = 0;
+
+    data_blocks = shards;
+    n = nr_shards / rs->shards;
+    fec_marks = marks + n*ds; //after all data, is't fec marks
+    fec_blocks = shards + n*ds;
+
+    for(j = 0; j < n; j++) {
+        dn = 0;
+        for(i = 0; i < ds; i++) {
+            if(marks[i]) {
+                //errors
+                erased_blocks[dn++] = i;
+            }
+        }
+        if(dn > 0) {
+            pn = 0;
+            for(i = 0; i < ps && pn < dn; i++) {
+                if(!fec_marks[i]) {
+                    //got valid fec row
+                    fec_block_nos[pn] = i;
+                    dec_fec_blocks[pn] = fec_blocks[i];
+                    pn++;
+                }
+            }
+
+            if(dn == pn) {
+                reed_solomon_decode(rs
+                        , data_blocks
+                        , block_size
+                        , dec_fec_blocks
+                        , fec_block_nos
+                        , erased_blocks
+                        , dn);
+            } else {
+                //error but we continue
+                err = -1;
+            }
+        }
+        data_blocks += ds;
+        marks += ds;
+        fec_blocks += ps;
+        fec_marks += ps;
+    }
+
+    return err;
+}
+

+ 88 - 0
src/third_party/rs.h

@@ -0,0 +1,88 @@
+#ifndef __RS_H_
+#define __RS_H_
+
+/* use small value to save memory */
+#ifndef DATA_SHARDS_MAX
+#define DATA_SHARDS_MAX (255)
+#endif
+
+/* use other memory allocator */
+#ifndef RS_MALLOC
+#define RS_MALLOC(x)    malloc(x)
+#endif
+
+#ifndef RS_FREE
+#define RS_FREE(x)      free(x)
+#endif
+
+#ifndef RS_CALLOC
+#define RS_CALLOC(n, x) calloc(n, x)
+#endif
+
+typedef struct _reed_solomon {
+    int data_shards;
+    int parity_shards;
+    int shards;
+    unsigned char* m;
+    unsigned char* parity;
+} reed_solomon;
+
+/**
+ * MUST initial one time
+ * */
+void fec_init(void);
+
+reed_solomon* reed_solomon_new(int data_shards, int parity_shards);
+void reed_solomon_release(reed_solomon* rs);
+
+/**
+ * encode one shard
+ * input:
+ * rs
+ * data_blocks[rs->data_shards][block_size]
+ * fec_blocks[rs->data_shards][block_size]
+ * */
+int reed_solomon_encode(reed_solomon* rs,
+                        unsigned char** data_blocks,
+                        unsigned char** fec_blocks,
+                        int block_size);
+
+
+/**
+ * decode one shard
+ * input:
+ * rs
+ * original data_blocks[rs->data_shards][block_size]
+ * dec_fec_blocks[nr_fec_blocks][block_size]
+ * fec_block_nos: fec pos number in original fec_blocks
+ * erased_blocks: erased blocks in original data_blocks
+ * nr_fec_blocks: the number of erased blocks
+ * */
+int reed_solomon_decode(reed_solomon* rs,
+                        unsigned char **data_blocks,
+                        int block_size,
+                        unsigned char **dec_fec_blocks,
+                        unsigned int *fec_block_nos,
+                        unsigned int *erased_blocks,
+                        int nr_fec_blocks);
+
+/**
+ * encode a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * */
+int reed_solomon_encode2(reed_solomon* rs, unsigned char** shards, int nr_shards, int block_size);
+
+/**
+ * reconstruct a big size of buffer
+ * input:
+ * rs
+ * nr_shards: assert(0 == nr_shards % rs->data_shards)
+ * shards[nr_shards][block_size]
+ * marks[nr_shards] marks as errors
+ * */
+int reed_solomon_reconstruct(reed_solomon* rs, unsigned char** shards, unsigned char* marks, int nr_shards, int block_size);
+#endif
+

+ 22 - 10
src/video_encoder.cpp

@@ -34,8 +34,11 @@ struct video_encoder::impl {
 
     // frame related
     void *frame_ptr = nullptr, **output_ptr = nullptr;
+    size_t *output_size = nullptr;
     NV_ENC_REGISTERED_PTR frame_reg_ptr = nullptr;
 
+    bool idr_flag = false;
+
     bool initialize() {
         NVENC_API_CHECK(NvEncodeAPICreateInstance(&api));
         CUDA_API_CHECK(cuCtxGetCurrent(&cuda_ctx));
@@ -46,7 +49,8 @@ struct video_encoder::impl {
         // constant params
         auto codec_guid = NV_ENC_CODEC_HEVC_GUID;
         auto preset_guid = NV_ENC_PRESET_P3_GUID;
-        auto tuning_info = NV_ENC_TUNING_INFO_LOW_LATENCY;
+//        auto tuning_info = NV_ENC_TUNING_INFO_LOW_LATENCY;
+        auto tuning_info = NV_ENC_TUNING_INFO_ULTRA_LOW_LATENCY;
 
         // create encoder
         NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS session_params = {NV_ENC_OPEN_ENCODE_SESSION_EX_PARAMS_VER};
@@ -151,7 +155,12 @@ struct video_encoder::impl {
         pic_params.inputWidth = frame_width;
         pic_params.inputHeight = frame_height;
         pic_params.inputPitch = frame_pitch;
-        pic_params.encodePicFlags = 0; // TODO; modify this value
+        if (idr_flag) { // request for IDR frame
+            pic_params.encodePicFlags = NV_ENC_PIC_FLAG_FORCEIDR | NV_ENC_PIC_FLAG_OUTPUT_SPSPPS;
+            idr_flag = false;
+        } else {
+            pic_params.encodePicFlags = 0;
+        }
         pic_params.inputBuffer = map_params.mappedResource;
         pic_params.outputBitstream = output_buf;
         pic_params.bufferFmt = frame_buffer_type;
@@ -167,11 +176,9 @@ struct video_encoder::impl {
         // copy bitstream
         assert(output_ptr != nullptr);
         assert(output_size != nullptr);
-        *output_ptr = malloc(lock_config.bitstreamSizeInBytes + sizeof(size_t));
-        auto &output_size = *(size_t *) ((char *) *output_ptr + 0);
-        auto output_data = (char *) *output_ptr + sizeof(size_t);
-        output_size = lock_config.bitstreamSizeInBytes;
-        memcpy(output_data, lock_config.bitstreamBufferPtr, lock_config.bitstreamSizeInBytes);
+        *output_ptr = malloc(lock_config.bitstreamSizeInBytes);
+        *output_size = lock_config.bitstreamSizeInBytes;
+        memcpy(*output_ptr, lock_config.bitstreamBufferPtr, lock_config.bitstreamSizeInBytes);
 
         // cleanup
         NVENC_API_CHECK(api.nvEncUnlockBitstream(encoder, output_buf));
@@ -206,7 +213,7 @@ void video_encoder::stop_encode() {
     pimpl->cleanup();
 }
 
-bool video_encoder::encode_frame(void *frame_ptr, void **output_ptr) {
+bool video_encoder::encode_frame(void *frame_ptr, void **output_ptr, size_t *output_size) {
     // register frame ptr
     if (pimpl->frame_ptr != frame_ptr) {
         pimpl->unregister_frame_ptr();
@@ -215,20 +222,25 @@ bool video_encoder::encode_frame(void *frame_ptr, void **output_ptr) {
     }
 
     pimpl->output_ptr = output_ptr;
+    pimpl->output_size = output_size;
     return pimpl->encode_frame();
 }
 
-bool video_encoder::encode_frame(cudaGraphicsResource *res, void **output_ptr) {
+bool video_encoder::encode_frame(cudaGraphicsResource *res, void **output_ptr, size_t *output_size) {
     void *pbo_ptr;
     size_t pbo_size;
     CUDA_API_CHECK(cudaGraphicsMapResources(1, &res));
     CUDA_API_CHECK(cudaGraphicsResourceGetMappedPointer(&pbo_ptr, &pbo_size, res));
     assert(pbo_size == pimpl->frame_pitch * pimpl->frame_height);
-    CALL_CHECK(encode_frame(pbo_ptr, output_ptr));
+    CALL_CHECK(encode_frame(pbo_ptr, output_ptr, output_size));
     CUDA_API_CHECK(cudaGraphicsUnmapResources(1, &res));
     return true;
 }
 
 bool video_encoder::is_encoding() {
     return pimpl->encoder != nullptr;
+}
+
+void video_encoder::refresh() {
+    pimpl->idr_flag = true;
 }

+ 5 - 3
src/video_encoder.h

@@ -18,13 +18,15 @@ public:
 
     void stop_encode();
 
-    // leading 4 bytes indicate length
-    bool encode_frame(void *frame_ptr, void **output_ptr);
+    bool encode_frame(void *frame_ptr, void **output_ptr, size_t *output_size);
 
-    bool encode_frame(cudaGraphicsResource *res, void **output_ptr);
+    bool encode_frame(cudaGraphicsResource *res, void **output_ptr, size_t *output_size);
 
     bool is_encoding();
 
+    // force next frame to be IDR frame
+    void refresh();
+
 private:
     struct impl;
     std::unique_ptr<impl> pimpl;