#include "receiver_udp_fec.h" #include "third_party/scope_guard.hpp" extern "C" { #include "network/impl/fragment/third_party/rs.h" } #include #include #include #include #include #include #include #include using namespace boost::asio::ip; using boost::asio::buffer; using boost::asio::io_context; using boost::asio::post; using boost::system::error_code; namespace receiver_udp_fec_impl { enum status_type { NOT_INIT, WAITING, READY }; struct smart_chunk { static constexpr auto max_blocks = DATA_SHARDS_MAX; using block_ptrs_type = boost::container::static_vector; using block_miss_type = boost::container::static_vector; data_type block_data; block_ptrs_type block_ptrs; block_miss_type block_miss; uint8_t ready_blocks = 0; status_type status = NOT_INIT; ~smart_chunk() { deallocate(); } void reset() { ready_blocks = 0; status = NOT_INIT; } void create(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) { if (total_blocks != block_ptrs.size() || parity_blocks != last_parity_blocks || block_size != last_block_size) [[unlikely]] { deallocate(); allocate(total_blocks, parity_blocks, block_size); } std::ranges::fill(block_miss, true); assert(status == NOT_INIT); status = WAITING; } bool reconstruct() { if (ready_blocks + last_parity_blocks < block_ptrs.size()) return false; auto ret = reed_solomon_reconstruct(rs, block_ptrs.data(), (uint8_t *) block_miss.data(), block_ptrs.size(), last_block_size); if (ret != 0) return false; assert(status == WAITING); status = READY; return true; } private: reed_solomon *rs = nullptr; uint8_t last_parity_blocks = 0; uint16_t last_block_size = 0; void deallocate() { if (rs == nullptr) return; reed_solomon_release(rs); rs = nullptr; } void allocate(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) { assert(rs == nullptr); auto data_blocks = total_blocks - parity_blocks; rs = reed_solomon_new(data_blocks, parity_blocks); block_data.reserve(total_blocks * block_size); block_ptrs.resize(total_blocks); block_miss.resize(total_blocks); for (int i = 0; i < total_blocks; ++i) { block_ptrs[i] = block_data.ptr + block_size * i; } last_parity_blocks = parity_blocks; last_block_size = block_size; } }; } using namespace receiver_udp_fec_impl; struct receiver_udp_fec::impl { struct frag_header { uint32_t frag_checksum; uint8_t frame_type; // 'I' or 'P' uint32_t frame_id; uint32_t frame_length; uint8_t chunk_count; uint8_t chunk_id; uint32_t chunk_offset; uint32_t chunk_length; uint16_t block_size; uint8_t block_count; uint8_t chunk_decode_block_count; uint8_t block_id; }; struct request_type { uint32_t request_checksum; uint8_t request_type; uint32_t frame_id; }; struct frame_info { using chunks_type = std::vector; chunks_type chunks; data_type data; uint32_t id = 0; uint8_t ready_chunks = 0; status_type status = NOT_INIT; void create(uint32_t frame_id, uint8_t chunk_count, size_t length) { chunks.resize(chunk_count); data.reserve(length); for (auto k = 0; k < chunk_count; ++k) { chunks[k].reset(); } id = frame_id; ready_chunks = 0; status = WAITING; } }; static constexpr auto frag_header_size = 28; static constexpr auto request_size = 9; static constexpr auto max_package_size = 64 * 1024; // 64KiB static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB receiver_udp_fec *q_this = nullptr; // parent config receiver_base::create_config par_conf; std::unique_ptr socket; frame_info frame_cache; uint32_t last_frame_id = 0; udp::endpoint server_ep; data_type in_buf, out_buf; static uint8_t *read_frag_header(uint8_t *ptr, frag_header *header) { #define READ(member) ptr = read_binary_number(ptr, &header->member) READ(frag_checksum); READ(frame_type); READ(frame_id); READ(frame_length); READ(chunk_count); READ(chunk_id); READ(chunk_offset); READ(chunk_length); READ(block_size); READ(block_count); READ(chunk_decode_block_count); READ(block_id); #undef WRITE return ptr; } static uint8_t *write_request(uint8_t *ptr, const request_type &req) { #define WRITE(member) ptr = write_binary_number(ptr, req.member) WRITE(request_checksum); WRITE(request_type); WRITE(frame_id); #undef WRITE return ptr; } ~impl() { request_exit(); } void refresh_frame(const frag_header &header) { frame_cache.create(header.frame_id, header.chunk_count, header.frame_length); } void send_request(const request_type &req) { out_buf.reserve(request_size); write_request(out_buf.ptr, req); // calculate crc32 auto crc = boost::crc_32_type{}; crc.process_bytes(out_buf.ptr + sizeof(uint32_t), request_size - sizeof(uint32_t)); write_binary_number(out_buf.ptr, crc.checksum()); // send packet assert(socket != nullptr); auto buf = buffer(out_buf.ptr, request_size); socket->send_to(buf, server_ep); } void request_idr_frame(uint32_t frame_id) { request_type req; req.request_type = 'I'; req.frame_id = frame_id; send_request(req); SPDLOG_WARN("Receive frame {} error, request new IDR frame.", frame_id); } void request_frame_confirm(uint32_t frame_id) { request_type req; req.request_type = 'C'; req.frame_id = frame_id; send_request(req); } void request_exit() { request_type req; req.request_type = 'X'; send_request(req); } void async_handle_package() { in_buf.reserve(max_package_size); auto buf = buffer(in_buf.ptr, max_package_size); using namespace std::placeholders; socket->async_receive(buf, std::bind(&impl::handle_package, this, _1, _2)); } void handle_package(const error_code &ec, size_t length) { // prepare for next request when this function exited. auto closer = sg::make_scope_guard([this] { async_handle_package(); }); // handle errors if (ec) { SPDLOG_ERROR("Error while receiving request: {}", ec.what()); return; } // parse package frag_header header; read_frag_header(in_buf.ptr, &header); auto crc = boost::crc_32_type{}; crc.process_bytes(in_buf.ptr + sizeof(uint32_t), length - sizeof(uint32_t)); if (crc.checksum() != header.frag_checksum) { // checksum failed // TODO show log return; } assert(length == frag_header_size + header.block_size); if (header.frame_id < frame_cache.id) return; // old package bool is_idr_frame = header.frame_type == 'I'; if (frame_cache.status == READY) { // last frame has already been decoded if (header.frame_id == frame_cache.id) return; // redundant package if (is_idr_frame || // new IDR frame or correct next P frame header.frame_id == last_frame_id + 1) { refresh_frame(header); } else { request_idr_frame(header.frame_id); return; } } else { if (header.frame_id > frame_cache.id) { if (is_idr_frame) { refresh_frame(header); } else { request_idr_frame(header.frame_id); return; } } } assert(frame_cache.id == header.frame_id); assert(frame_cache.status == WAITING); auto &chunk = frame_cache.chunks[header.chunk_id]; if (chunk.status == NOT_INIT) { auto parity_blocks = header.block_count - header.chunk_decode_block_count; chunk.create(header.block_count, parity_blocks, header.block_size); } else if (chunk.status == READY) { return; } assert(chunk.status == WAITING); auto data_ptr = in_buf.ptr + frag_header_size; memcpy(chunk.block_ptrs[header.block_id], data_ptr, header.block_size); chunk.block_miss[header.block_id] = false; ++chunk.ready_blocks; if (!chunk.reconstruct()) [[likely]] return; // need more blocks assert(chunk.status == READY); assert(chunk.block_data.size >= header.chunk_length); memcpy(frame_cache.data.ptr + header.chunk_offset, chunk.block_data.ptr, header.chunk_length); ++frame_cache.ready_chunks; if (frame_cache.ready_chunks < frame_cache.chunks.size()) return; // need more chunks // decode frame frame_cache.status = READY; auto frame = ::frame_info(); // not frame_info in this impl frame.data = frame_cache.data; frame.idr = header.frame_type == 'I'; frame.frame_id = header.frame_id; q_this->commit_frame(frame); SPDLOG_TRACE("Frame {} decoded.", frame_cache.id); last_frame_id = frame_cache.id; request_frame_confirm(frame_cache.id); } static impl *create(const create_config &conf) { auto ret = new impl(); ret->par_conf = {.cb_func = conf.cb_func, .enable_log = conf.enable_log}; ret->server_ep = udp::endpoint{address::from_string(conf.server_addr), conf.server_port}; ret->socket = std::make_unique(*conf.ctx); ret->socket->connect(ret->server_ep); ret->socket->set_option(udp::socket::receive_buffer_size{udp_buffer_size}); ret->async_handle_package(); ret->request_idr_frame(0); // TODO: post to event queue // initialize reed solomon fec_init(); return ret; } }; receiver_udp_fec::~receiver_udp_fec() = default; receiver_udp_fec::pointer receiver_udp_fec::create(const create_config &conf) { auto impl = impl::create(conf); return std::unique_ptr( new receiver_udp_fec(impl)); } receiver_udp_fec::receiver_udp_fec(impl *_pimpl) : receiver_base(_pimpl->par_conf), pimpl(std::unique_ptr(_pimpl)) { pimpl->q_this = this; }