|
|
@@ -0,0 +1,387 @@
|
|
|
+#include "frame_receiver2.h"
|
|
|
+#include "third_party/scope_guard.hpp"
|
|
|
+#include "video_decoder.h"
|
|
|
+
|
|
|
+extern "C" {
|
|
|
+#include "third_party/rs.h"
|
|
|
+}
|
|
|
+
|
|
|
+#include <boost/asio/io_context.hpp>
|
|
|
+#include <boost/asio/ip/udp.hpp>
|
|
|
+#include <boost/crc.hpp>
|
|
|
+#include <boost/endian.hpp>
|
|
|
+
|
|
|
+#include <spdlog/spdlog.h>
|
|
|
+
|
|
|
+#include <list>
|
|
|
+
|
|
|
+using namespace boost::asio::ip;
|
|
|
+using boost::asio::buffer;
|
|
|
+using boost::asio::io_context;
|
|
|
+using boost::system::error_code;
|
|
|
+
|
|
|
+namespace frame_receiver2_impl {
|
|
|
+
|
|
|
+ template<typename T>
|
|
|
+ struct smart_buffer {
|
|
|
+ T *ptr = nullptr;
|
|
|
+ size_t length = 0;
|
|
|
+
|
|
|
+ ~smart_buffer() {
|
|
|
+ free(ptr);
|
|
|
+ }
|
|
|
+
|
|
|
+ void create(size_t req_length) {
|
|
|
+ if (req_length > capacity) [[unlikely]] {
|
|
|
+ auto ptr_next = new T[req_length];
|
|
|
+ if (ptr != nullptr) {
|
|
|
+ delete ptr;
|
|
|
+ }
|
|
|
+ ptr = ptr_next;
|
|
|
+ capacity = req_length;
|
|
|
+ }
|
|
|
+ length = req_length;
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+ size_t capacity = 0;
|
|
|
+ };
|
|
|
+
|
|
|
+ enum status_type {
|
|
|
+ NOT_INIT,
|
|
|
+ WAITING,
|
|
|
+ READY
|
|
|
+ };
|
|
|
+
|
|
|
+ struct smart_chunk {
|
|
|
+ smart_buffer<uint8_t> block_data;
|
|
|
+ smart_buffer<uint8_t *> block_ptrs;
|
|
|
+ smart_buffer<bool> block_miss;
|
|
|
+ uint8_t ready_blocks = 0;
|
|
|
+ status_type status = NOT_INIT;
|
|
|
+
|
|
|
+ void reset() {
|
|
|
+ ready_blocks = 0;
|
|
|
+ status = NOT_INIT;
|
|
|
+ }
|
|
|
+
|
|
|
+ void create(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
|
|
|
+ if (total_blocks != block_ptrs.length ||
|
|
|
+ parity_blocks != last_parity_blocks ||
|
|
|
+ block_size != last_block_size) [[unlikely]] {
|
|
|
+ deallocate();
|
|
|
+ allocate(total_blocks, parity_blocks, block_size);
|
|
|
+ }
|
|
|
+ memset(block_miss.ptr, true, block_miss.length * sizeof(bool));
|
|
|
+ assert(status == NOT_INIT);
|
|
|
+ status = WAITING;
|
|
|
+ }
|
|
|
+
|
|
|
+ bool reconstruct() {
|
|
|
+ if (ready_blocks + last_parity_blocks < block_ptrs.length) return false;
|
|
|
+ auto ret = reed_solomon_reconstruct(rs, block_ptrs.ptr, (uint8_t *) block_miss.ptr,
|
|
|
+ block_ptrs.length, last_block_size);
|
|
|
+ if (ret != 0) return false;
|
|
|
+ assert(status == WAITING);
|
|
|
+ status = READY;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private:
|
|
|
+ reed_solomon *rs = nullptr;
|
|
|
+ uint8_t last_parity_blocks = 0;
|
|
|
+ uint16_t last_block_size = 0;
|
|
|
+
|
|
|
+ void deallocate() {
|
|
|
+ if (rs == nullptr) return;
|
|
|
+ reed_solomon_release(rs);
|
|
|
+ rs = nullptr;
|
|
|
+ }
|
|
|
+
|
|
|
+ void allocate(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
|
|
|
+ assert(rs == nullptr);
|
|
|
+ auto data_blocks = total_blocks - parity_blocks;
|
|
|
+ rs = reed_solomon_new(data_blocks, parity_blocks);
|
|
|
+ block_data.create(total_blocks * block_size);
|
|
|
+ block_ptrs.create(total_blocks);
|
|
|
+ block_miss.create(total_blocks);
|
|
|
+ for (int i = 0; i < total_blocks; ++i) {
|
|
|
+ block_ptrs.ptr[i] = block_data.ptr + block_size * i;
|
|
|
+ }
|
|
|
+ last_parity_blocks = parity_blocks;
|
|
|
+ last_block_size = block_size;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+using namespace frame_receiver2_impl;
|
|
|
+
|
|
|
+struct frame_receiver2::impl {
|
|
|
+
|
|
|
+ struct frag_header {
|
|
|
+ uint32_t frag_checksum;
|
|
|
+ uint8_t frame_type; // 'I' or 'P'
|
|
|
+ uint32_t frame_id;
|
|
|
+ uint32_t frame_length;
|
|
|
+ uint8_t chunk_count;
|
|
|
+ uint8_t chunk_id;
|
|
|
+ uint32_t chunk_offset;
|
|
|
+ uint32_t chunk_length;
|
|
|
+ uint16_t block_size;
|
|
|
+ uint8_t block_count;
|
|
|
+ uint8_t chunk_decode_block_count;
|
|
|
+ uint8_t block_id;
|
|
|
+ };
|
|
|
+
|
|
|
+ struct request_type {
|
|
|
+ uint32_t request_checksum;
|
|
|
+ uint8_t request_type;
|
|
|
+ uint32_t frame_id;
|
|
|
+ };
|
|
|
+
|
|
|
+ struct frame_info {
|
|
|
+ smart_buffer<smart_chunk> chunks;
|
|
|
+ smart_buffer<uint8_t> data;
|
|
|
+ uint32_t id = 0;
|
|
|
+ uint8_t ready_chunks = 0;
|
|
|
+ status_type status = NOT_INIT;
|
|
|
+
|
|
|
+ void create(uint32_t frame_id, uint8_t chunk_count, size_t length) {
|
|
|
+ chunks.create(chunk_count);
|
|
|
+ data.create(length);
|
|
|
+ for (auto k = 0; k < chunk_count; ++k) {
|
|
|
+ chunks.ptr[k].reset();
|
|
|
+ }
|
|
|
+ id = frame_id;
|
|
|
+ ready_chunks = 0;
|
|
|
+ status = WAITING;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ static constexpr auto frag_header_size = 28;
|
|
|
+ static constexpr auto request_size = 9;
|
|
|
+ static constexpr auto max_package_size = 64 * 1024; // 64KiB
|
|
|
+ static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
|
|
|
+
|
|
|
+ std::unique_ptr<io_context> ctx;
|
|
|
+ std::unique_ptr<udp::socket> socket;
|
|
|
+ video_decoder *decoder;
|
|
|
+
|
|
|
+ frame_info frame_cache;
|
|
|
+ uint32_t last_frame_id = 0;
|
|
|
+
|
|
|
+ udp::endpoint server_ep;
|
|
|
+ smart_buffer<uint8_t> in_buf, out_buf;
|
|
|
+
|
|
|
+ template<typename T>
|
|
|
+ static uint8_t *write_binary_number(uint8_t *ptr, T val) {
|
|
|
+ static constexpr auto need_swap =
|
|
|
+ (boost::endian::order::native != boost::endian::order::big);
|
|
|
+ auto real_ptr = (T *) ptr;
|
|
|
+ if constexpr (need_swap) {
|
|
|
+ *real_ptr = boost::endian::endian_reverse(val);
|
|
|
+ } else {
|
|
|
+ *real_ptr = val;
|
|
|
+ }
|
|
|
+ return ptr + sizeof(T);
|
|
|
+ }
|
|
|
+
|
|
|
+ template<typename T>
|
|
|
+ static uint8_t *read_binary_number(uint8_t *ptr, T *val) {
|
|
|
+ static constexpr auto need_swap =
|
|
|
+ (boost::endian::order::native != boost::endian::order::big);
|
|
|
+ *val = *(T *) ptr;
|
|
|
+ if constexpr (need_swap) {
|
|
|
+ boost::endian::endian_reverse_inplace(*val);
|
|
|
+ }
|
|
|
+ return ptr + sizeof(T);
|
|
|
+ }
|
|
|
+
|
|
|
+ static uint8_t *read_frag_header(uint8_t *ptr, frag_header *header) {
|
|
|
+#define READ(member) ptr = read_binary_number(ptr, &header->member)
|
|
|
+ READ(frag_checksum);
|
|
|
+ READ(frame_type);
|
|
|
+ READ(frame_id);
|
|
|
+ READ(frame_length);
|
|
|
+ READ(chunk_count);
|
|
|
+ READ(chunk_id);
|
|
|
+ READ(chunk_offset);
|
|
|
+ READ(chunk_length);
|
|
|
+ READ(block_size);
|
|
|
+ READ(block_count);
|
|
|
+ READ(chunk_decode_block_count);
|
|
|
+ READ(block_id);
|
|
|
+#undef WRITE
|
|
|
+ return ptr;
|
|
|
+ }
|
|
|
+
|
|
|
+ static uint8_t *write_request(uint8_t *ptr, const request_type &req) {
|
|
|
+#define WRITE(member) ptr = write_binary_number(ptr, req.member)
|
|
|
+ WRITE(request_checksum);
|
|
|
+ WRITE(request_type);
|
|
|
+ WRITE(frame_id);
|
|
|
+#undef WRITE
|
|
|
+ return ptr;
|
|
|
+ }
|
|
|
+
|
|
|
+ ~impl() {
|
|
|
+ request_exit();
|
|
|
+ }
|
|
|
+
|
|
|
+ void refresh_frame(const frag_header &header) {
|
|
|
+ frame_cache.create(header.frame_id, header.chunk_count, header.frame_length);
|
|
|
+ }
|
|
|
+
|
|
|
+ void send_request(const request_type &req) {
|
|
|
+ out_buf.create(request_size);
|
|
|
+ write_request(out_buf.ptr, req);
|
|
|
+
|
|
|
+ // calculate crc32
|
|
|
+ auto crc = boost::crc_32_type{};
|
|
|
+ crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
|
|
|
+ request_size - sizeof(uint32_t));
|
|
|
+ write_binary_number(out_buf.ptr, crc.checksum());
|
|
|
+
|
|
|
+ // send packet
|
|
|
+ assert(socket != nullptr);
|
|
|
+ auto buf = buffer(out_buf.ptr, request_size);
|
|
|
+ socket->send_to(buf, server_ep);
|
|
|
+ }
|
|
|
+
|
|
|
+ void request_idr_frame(uint32_t frame_id) {
|
|
|
+ request_type req;
|
|
|
+ req.request_type = 'I';
|
|
|
+ req.frame_id = frame_id;
|
|
|
+ send_request(req);
|
|
|
+ SPDLOG_WARN("Receive frame {} error, request new IDR frame.", frame_id);
|
|
|
+ }
|
|
|
+
|
|
|
+ void request_frame_confirm(uint32_t frame_id) {
|
|
|
+ request_type req;
|
|
|
+ req.request_type = 'C';
|
|
|
+ req.frame_id = frame_id;
|
|
|
+ send_request(req);
|
|
|
+ }
|
|
|
+
|
|
|
+ void request_exit() {
|
|
|
+ request_type req;
|
|
|
+ req.request_type = 'X';
|
|
|
+ send_request(req);
|
|
|
+ }
|
|
|
+
|
|
|
+ void async_handle_package() {
|
|
|
+ in_buf.create(max_package_size);
|
|
|
+ auto buf = buffer(in_buf.ptr, max_package_size);
|
|
|
+ using namespace std::placeholders;
|
|
|
+ socket->async_receive(buf, std::bind(&impl::handle_package, this, _1, _2));
|
|
|
+ }
|
|
|
+
|
|
|
+ void handle_package(const error_code &ec, size_t length) {
|
|
|
+ // prepare for next request when this function exited.
|
|
|
+ auto closer = sg::make_scope_guard([this] {
|
|
|
+ async_handle_package();
|
|
|
+ });
|
|
|
+
|
|
|
+ // handle errors
|
|
|
+ if (ec) {
|
|
|
+ SPDLOG_ERROR("Error while receiving request: {}", ec.what());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // parse package
|
|
|
+ frag_header header;
|
|
|
+ read_frag_header(in_buf.ptr, &header);
|
|
|
+ auto crc = boost::crc_32_type{};
|
|
|
+ crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
|
|
|
+ length - sizeof(uint32_t));
|
|
|
+ if (crc.checksum() != header.frag_checksum) { // checksum failed
|
|
|
+ // TODO show log
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ assert(length == frag_header_size + header.block_size);
|
|
|
+ if (header.frame_id < frame_cache.id) return; // old package
|
|
|
+ if (frame_cache.status == READY) { // last frame has already been decoded
|
|
|
+ if (header.frame_id == frame_cache.id) return; // redundant package
|
|
|
+ if (header.frame_type == 'I' || // new IDR frame or correct next P frame
|
|
|
+ header.frame_id == last_frame_id + 1) {
|
|
|
+ refresh_frame(header);
|
|
|
+ } else {
|
|
|
+ request_idr_frame(header.frame_id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (header.frame_id > frame_cache.id) {
|
|
|
+ if (header.frame_type == 'I') {
|
|
|
+ refresh_frame(header);
|
|
|
+ } else {
|
|
|
+ request_idr_frame(header.frame_id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ assert(frame_cache.id == header.frame_id);
|
|
|
+ assert(frame_cache.status == WAITING);
|
|
|
+ auto &chunk = frame_cache.chunks.ptr[header.chunk_id];
|
|
|
+ if (chunk.status == NOT_INIT) {
|
|
|
+ auto parity_blocks = header.block_count - header.chunk_decode_block_count;
|
|
|
+ chunk.create(header.block_count, parity_blocks, header.block_size);
|
|
|
+ } else if (chunk.status == READY) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ assert(chunk.status == WAITING);
|
|
|
+ auto data_ptr = in_buf.ptr + frag_header_size;
|
|
|
+ memcpy(chunk.block_ptrs.ptr[header.block_id], data_ptr, header.block_size);
|
|
|
+ chunk.block_miss.ptr[header.block_id] = false;
|
|
|
+ ++chunk.ready_blocks;
|
|
|
+ if (!chunk.reconstruct()) [[likely]] return; // need more blocks
|
|
|
+
|
|
|
+ assert(chunk.status == READY);
|
|
|
+ assert(chunk.block_data.length >= header.chunk_length);
|
|
|
+ memcpy(frame_cache.data.ptr + header.chunk_offset, chunk.block_data.ptr, header.chunk_length);
|
|
|
+ ++frame_cache.ready_chunks;
|
|
|
+ if (frame_cache.ready_chunks < frame_cache.chunks.length) return; // need more chunks
|
|
|
+
|
|
|
+ // decode frame
|
|
|
+ frame_cache.status = READY;
|
|
|
+ decoder->decode_frame(frame_cache.data.ptr, frame_cache.data.length);
|
|
|
+ SPDLOG_TRACE("Frame {} decoded.", frame_cache.id);
|
|
|
+ last_frame_id = frame_cache.id;
|
|
|
+ request_frame_confirm(frame_cache.id);
|
|
|
+ }
|
|
|
+
|
|
|
+ static impl *create(const receiver_config &conf) {
|
|
|
+ auto ret = new impl;
|
|
|
+ assert(conf.decoder != nullptr);
|
|
|
+ ret->decoder = conf.decoder;
|
|
|
+ ret->ctx = std::make_unique<io_context>();
|
|
|
+ ret->server_ep = udp::endpoint{address::from_string(conf.server_addr), conf.server_port};
|
|
|
+ ret->socket = std::make_unique<udp::socket>(*(ret->ctx));
|
|
|
+ ret->socket->connect(ret->server_ep);
|
|
|
+ ret->socket->set_option(udp::socket::receive_buffer_size{udp_buffer_size});
|
|
|
+ ret->async_handle_package();
|
|
|
+
|
|
|
+ // initialize reed solomon
|
|
|
+ fec_init();
|
|
|
+
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+frame_receiver2 *frame_receiver2::create(const receiver_config &conf) {
|
|
|
+ auto pimpl = impl::create(conf);
|
|
|
+ if (pimpl == nullptr) return nullptr;
|
|
|
+ auto ret = new frame_receiver2;
|
|
|
+ ret->pimpl.reset(pimpl);
|
|
|
+ return ret;
|
|
|
+}
|
|
|
+
|
|
|
+void frame_receiver2::run() {
|
|
|
+ pimpl->request_idr_frame(0);
|
|
|
+ pimpl->ctx->run();
|
|
|
+}
|
|
|
+
|
|
|
+frame_receiver2::~frame_receiver2() = default;
|