|
|
@@ -1,16 +1,10 @@
|
|
|
#include "receiver_udp_fec.h"
|
|
|
+#include "fec_receive_helper.h"
|
|
|
#include "third_party/scope_guard.hpp"
|
|
|
|
|
|
-extern "C" {
|
|
|
-#include "network/impl/fragment/third_party/rs.h"
|
|
|
-}
|
|
|
-
|
|
|
#include <boost/asio/io_context.hpp>
|
|
|
#include <boost/asio/ip/udp.hpp>
|
|
|
#include <boost/asio/post.hpp>
|
|
|
-#include <boost/container/static_vector.hpp>
|
|
|
-#include <boost/crc.hpp>
|
|
|
-#include <boost/endian.hpp>
|
|
|
|
|
|
#include <spdlog/spdlog.h>
|
|
|
|
|
|
@@ -22,131 +16,7 @@ using boost::asio::io_context;
|
|
|
using boost::asio::post;
|
|
|
using boost::system::error_code;
|
|
|
|
|
|
-namespace receiver_udp_fec_impl {
|
|
|
-
|
|
|
- enum status_type {
|
|
|
- NOT_INIT,
|
|
|
- WAITING,
|
|
|
- READY
|
|
|
- };
|
|
|
-
|
|
|
- struct smart_chunk {
|
|
|
- static constexpr auto max_blocks = DATA_SHARDS_MAX;
|
|
|
-
|
|
|
- using block_ptrs_type = boost::container::static_vector<uint8_t *, max_blocks>;
|
|
|
- using block_miss_type = boost::container::static_vector<bool, max_blocks>;
|
|
|
-
|
|
|
- data_type block_data;
|
|
|
- block_ptrs_type block_ptrs;
|
|
|
- block_miss_type block_miss;
|
|
|
- uint8_t ready_blocks = 0;
|
|
|
- status_type status = NOT_INIT;
|
|
|
-
|
|
|
- ~smart_chunk() { deallocate(); }
|
|
|
-
|
|
|
- void reset() {
|
|
|
- ready_blocks = 0;
|
|
|
- status = NOT_INIT;
|
|
|
- }
|
|
|
-
|
|
|
- void create(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
|
|
|
- if (total_blocks != block_ptrs.size() ||
|
|
|
- parity_blocks != last_parity_blocks ||
|
|
|
- block_size != last_block_size) [[unlikely]] {
|
|
|
- deallocate();
|
|
|
- allocate(total_blocks, parity_blocks, block_size);
|
|
|
- }
|
|
|
- std::ranges::fill(block_miss, true);
|
|
|
- assert(status == NOT_INIT);
|
|
|
- status = WAITING;
|
|
|
- }
|
|
|
-
|
|
|
- bool reconstruct() {
|
|
|
- if (ready_blocks + last_parity_blocks < block_ptrs.size()) return false;
|
|
|
- auto ret = reed_solomon_reconstruct(rs, block_ptrs.data(), (uint8_t *) block_miss.data(),
|
|
|
- block_ptrs.size(), last_block_size);
|
|
|
- if (ret != 0) return false;
|
|
|
- assert(status == WAITING);
|
|
|
- status = READY;
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- private:
|
|
|
- reed_solomon *rs = nullptr;
|
|
|
- uint8_t last_parity_blocks = 0;
|
|
|
- uint16_t last_block_size = 0;
|
|
|
-
|
|
|
- void deallocate() {
|
|
|
- if (rs == nullptr) return;
|
|
|
- reed_solomon_release(rs);
|
|
|
- rs = nullptr;
|
|
|
- }
|
|
|
-
|
|
|
- void allocate(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
|
|
|
- assert(rs == nullptr);
|
|
|
- auto data_blocks = total_blocks - parity_blocks;
|
|
|
- rs = reed_solomon_new(data_blocks, parity_blocks);
|
|
|
- block_data.reserve(total_blocks * block_size);
|
|
|
- block_ptrs.resize(total_blocks);
|
|
|
- block_miss.resize(total_blocks);
|
|
|
- for (int i = 0; i < total_blocks; ++i) {
|
|
|
- block_ptrs[i] = block_data.ptr + block_size * i;
|
|
|
- }
|
|
|
- last_parity_blocks = parity_blocks;
|
|
|
- last_block_size = block_size;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-using namespace receiver_udp_fec_impl;
|
|
|
-
|
|
|
struct receiver_udp_fec::impl {
|
|
|
-
|
|
|
- struct frag_header {
|
|
|
- uint32_t frag_checksum;
|
|
|
- uint8_t frame_type; // 'I' or 'P'
|
|
|
- uint32_t frame_id;
|
|
|
- uint32_t frame_length;
|
|
|
- uint8_t chunk_count;
|
|
|
- uint8_t chunk_id;
|
|
|
- uint32_t chunk_offset;
|
|
|
- uint32_t chunk_length;
|
|
|
- uint16_t block_size;
|
|
|
- uint8_t block_count;
|
|
|
- uint8_t chunk_decode_block_count;
|
|
|
- uint8_t block_id;
|
|
|
- };
|
|
|
-
|
|
|
- struct request_type {
|
|
|
- uint32_t request_checksum;
|
|
|
- uint8_t request_type;
|
|
|
- uint32_t frame_id;
|
|
|
- };
|
|
|
-
|
|
|
- struct frame_info {
|
|
|
- using chunks_type = std::vector<smart_chunk>;
|
|
|
- chunks_type chunks;
|
|
|
-
|
|
|
- data_type data;
|
|
|
- uint32_t id = 0;
|
|
|
- uint8_t ready_chunks = 0;
|
|
|
- status_type status = NOT_INIT;
|
|
|
-
|
|
|
- void create(uint32_t frame_id, uint8_t chunk_count, size_t length) {
|
|
|
- chunks.resize(chunk_count);
|
|
|
- data.reserve(length);
|
|
|
- for (auto k = 0; k < chunk_count; ++k) {
|
|
|
- chunks[k].reset();
|
|
|
- }
|
|
|
- id = frame_id;
|
|
|
- ready_chunks = 0;
|
|
|
- status = WAITING;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- static constexpr auto frag_header_size = 28;
|
|
|
- static constexpr auto request_size = 9;
|
|
|
static constexpr auto max_package_size = 64 * 1024; // 64KiB
|
|
|
static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
|
|
|
|
|
|
@@ -155,89 +25,24 @@ struct receiver_udp_fec::impl {
|
|
|
// parent config
|
|
|
receiver_base::create_config par_conf;
|
|
|
|
|
|
- std::unique_ptr<udp::socket> socket;
|
|
|
-
|
|
|
- frame_info frame_cache;
|
|
|
- uint32_t last_frame_id = 0;
|
|
|
-
|
|
|
udp::endpoint server_ep;
|
|
|
- data_type in_buf, out_buf;
|
|
|
-
|
|
|
- static uint8_t *read_frag_header(uint8_t *ptr, frag_header *header) {
|
|
|
-#define READ(member) ptr = read_binary_number(ptr, &header->member)
|
|
|
- READ(frag_checksum);
|
|
|
- READ(frame_type);
|
|
|
- READ(frame_id);
|
|
|
- READ(frame_length);
|
|
|
- READ(chunk_count);
|
|
|
- READ(chunk_id);
|
|
|
- READ(chunk_offset);
|
|
|
- READ(chunk_length);
|
|
|
- READ(block_size);
|
|
|
- READ(block_count);
|
|
|
- READ(chunk_decode_block_count);
|
|
|
- READ(block_id);
|
|
|
-#undef WRITE
|
|
|
- return ptr;
|
|
|
- }
|
|
|
-
|
|
|
- static uint8_t *write_request(uint8_t *ptr, const request_type &req) {
|
|
|
-#define WRITE(member) ptr = write_binary_number(ptr, req.member)
|
|
|
- WRITE(request_checksum);
|
|
|
- WRITE(request_type);
|
|
|
- WRITE(frame_id);
|
|
|
-#undef WRITE
|
|
|
- return ptr;
|
|
|
- }
|
|
|
-
|
|
|
- ~impl() {
|
|
|
- request_exit();
|
|
|
- }
|
|
|
-
|
|
|
- void refresh_frame(const frag_header &header) {
|
|
|
- frame_cache.create(header.frame_id, header.chunk_count, header.frame_length);
|
|
|
- }
|
|
|
-
|
|
|
- void send_request(const request_type &req) {
|
|
|
- out_buf.reserve(request_size);
|
|
|
- write_request(out_buf.ptr, req);
|
|
|
+ std::unique_ptr<udp::socket> socket;
|
|
|
|
|
|
- // calculate crc32
|
|
|
- auto crc = boost::crc_32_type{};
|
|
|
- crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
|
|
|
- request_size - sizeof(uint32_t));
|
|
|
- write_binary_number(out_buf.ptr, crc.checksum());
|
|
|
+ fec_receive_helper helper;
|
|
|
+ simple_data::pointer in_buf;
|
|
|
|
|
|
+ void send_request() const {
|
|
|
// send packet
|
|
|
assert(socket != nullptr);
|
|
|
- auto buf = buffer(out_buf.ptr, request_size);
|
|
|
+ const auto out_buf = helper.get_output_packet();
|
|
|
+ if (out_buf == nullptr) [[likely]] return;
|
|
|
+ const auto buf = buffer(out_buf->data, out_buf->length);
|
|
|
socket->send_to(buf, server_ep);
|
|
|
}
|
|
|
|
|
|
- void request_idr_frame(uint32_t frame_id) {
|
|
|
- request_type req;
|
|
|
- req.request_type = 'I';
|
|
|
- req.frame_id = frame_id;
|
|
|
- send_request(req);
|
|
|
- SPDLOG_WARN("Receive frame {} error, request new IDR frame.", frame_id);
|
|
|
- }
|
|
|
-
|
|
|
- void request_frame_confirm(uint32_t frame_id) {
|
|
|
- request_type req;
|
|
|
- req.request_type = 'C';
|
|
|
- req.frame_id = frame_id;
|
|
|
- send_request(req);
|
|
|
- }
|
|
|
-
|
|
|
- void request_exit() {
|
|
|
- request_type req;
|
|
|
- req.request_type = 'X';
|
|
|
- send_request(req);
|
|
|
- }
|
|
|
-
|
|
|
void async_handle_package() {
|
|
|
- in_buf.reserve(max_package_size);
|
|
|
- auto buf = buffer(in_buf.ptr, max_package_size);
|
|
|
+ in_buf = simple_data::create(max_package_size);
|
|
|
+ const auto buf = buffer(in_buf->data, max_package_size);
|
|
|
using namespace std::placeholders;
|
|
|
socket->async_receive(buf, std::bind(&impl::handle_package, this, _1, _2));
|
|
|
}
|
|
|
@@ -254,88 +59,29 @@ struct receiver_udp_fec::impl {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // parse package
|
|
|
- frag_header header;
|
|
|
- read_frag_header(in_buf.ptr, &header);
|
|
|
- auto crc = boost::crc_32_type{};
|
|
|
- crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
|
|
|
- length - sizeof(uint32_t));
|
|
|
- if (crc.checksum() != header.frag_checksum) { // checksum failed
|
|
|
- // TODO show log
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- assert(length == frag_header_size + header.block_size);
|
|
|
- if (header.frame_id < frame_cache.id) return; // old package
|
|
|
- bool is_idr_frame = header.frame_type == 'I';
|
|
|
- if (frame_cache.status == READY) { // last frame has already been decoded
|
|
|
- if (header.frame_id == frame_cache.id) return; // redundant package
|
|
|
- if (is_idr_frame || // new IDR frame or correct next P frame
|
|
|
- header.frame_id == last_frame_id + 1) {
|
|
|
- refresh_frame(header);
|
|
|
- } else {
|
|
|
- request_idr_frame(header.frame_id);
|
|
|
- return;
|
|
|
- }
|
|
|
- } else {
|
|
|
- if (header.frame_id > frame_cache.id) {
|
|
|
- if (is_idr_frame) {
|
|
|
- refresh_frame(header);
|
|
|
- } else {
|
|
|
- request_idr_frame(header.frame_id);
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- assert(frame_cache.id == header.frame_id);
|
|
|
- assert(frame_cache.status == WAITING);
|
|
|
- auto &chunk = frame_cache.chunks[header.chunk_id];
|
|
|
- if (chunk.status == NOT_INIT) {
|
|
|
- auto parity_blocks = header.block_count - header.chunk_decode_block_count;
|
|
|
- chunk.create(header.block_count, parity_blocks, header.block_size);
|
|
|
- } else if (chunk.status == READY) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- assert(chunk.status == WAITING);
|
|
|
- auto data_ptr = in_buf.ptr + frag_header_size;
|
|
|
- memcpy(chunk.block_ptrs[header.block_id], data_ptr, header.block_size);
|
|
|
- chunk.block_miss[header.block_id] = false;
|
|
|
- ++chunk.ready_blocks;
|
|
|
- if (!chunk.reconstruct()) [[likely]] return; // need more blocks
|
|
|
-
|
|
|
- assert(chunk.status == READY);
|
|
|
- assert(chunk.block_data.size >= header.chunk_length);
|
|
|
- memcpy(frame_cache.data.ptr + header.chunk_offset, chunk.block_data.ptr, header.chunk_length);
|
|
|
- ++frame_cache.ready_chunks;
|
|
|
- if (frame_cache.ready_chunks < frame_cache.chunks.size()) return; // need more chunks
|
|
|
-
|
|
|
- // decode frame
|
|
|
- frame_cache.status = READY;
|
|
|
- auto frame = ::frame_info(); // not frame_info in this impl
|
|
|
- frame.data = frame_cache.data.clone(); // TODO: find out if deepcopy is really needed.
|
|
|
- frame.idr = header.frame_type == 'I';
|
|
|
- frame.frame_id = header.frame_id;
|
|
|
- q_this->commit_frame(frame);
|
|
|
- SPDLOG_TRACE("Frame {} decoded.", frame_cache.id);
|
|
|
- last_frame_id = frame_cache.id;
|
|
|
- request_frame_confirm(frame_cache.id);
|
|
|
+ auto valid_buf = simple_data::create(length, in_buf->data);
|
|
|
+ auto frame = helper.handle_input_packet(valid_buf);
|
|
|
+ if (frame.frame_id == 0) [[likely]] return;
|
|
|
+ auto ret_frame = frame_info();
|
|
|
+ assert(frame.data != nullptr);
|
|
|
+ ret_frame.data = data_type(frame.data->length, frame.data->data);
|
|
|
+ ret_frame.frame_id = frame.frame_id;
|
|
|
+ ret_frame.idr = frame.idr;
|
|
|
+ q_this->commit_frame(ret_frame);
|
|
|
}
|
|
|
|
|
|
static impl *create(const create_config &conf) {
|
|
|
- auto ret = new impl();
|
|
|
+ const auto ret = new impl();
|
|
|
ret->par_conf = {.cb_func = conf.cb_func, .enable_log = conf.enable_log};
|
|
|
ret->server_ep = udp::endpoint{address::from_string(conf.server_addr), conf.server_port};
|
|
|
ret->socket = std::make_unique<udp::socket>(*conf.ctx);
|
|
|
ret->socket->connect(ret->server_ep);
|
|
|
ret->socket->set_option(udp::socket::receive_buffer_size{udp_buffer_size});
|
|
|
ret->async_handle_package();
|
|
|
- ret->request_idr_frame(0); // TODO: post to event queue
|
|
|
-
|
|
|
- // initialize reed solomon
|
|
|
- fec_init();
|
|
|
|
|
|
+ // initialize
|
|
|
+ ret->helper.initialize({});
|
|
|
+ ret->send_request();
|
|
|
return ret;
|
|
|
}
|
|
|
};
|