| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- #include "sender_udp_fec.h"
- #include "sender_utility.hpp"
- #include "core/timestamp_helper.hpp"
- #include "simple_mq.h"
- #include "third_party/scope_guard.hpp"
- #include "utility.hpp"
- #include "variable_defs.h"
- extern "C" {
- #include "third_party/rs.h"
- }
- #include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/udp.hpp>
- #include <boost/asio/post.hpp>
- #include <boost/crc.hpp>
- #include <spdlog/spdlog.h>
- using namespace boost::asio::ip;
- using boost::asio::buffer;
- using boost::asio::io_context;
- using boost::asio::post;
- using boost::system::error_code;
- using namespace sender_impl;
- using namespace sophiar;
- using namespace simple_mq_singleton;
- namespace sender_udp_fec_impl {
- struct smart_reed_solomon {
- reed_solomon *rs = nullptr;
- smart_buffer<uint8_t *> block_ptrs;
- ~smart_reed_solomon() {
- deallocate();
- }
- // assume parity_rate and block_size will not change in the lifetime
- void process(uint8_t data_blocks, uint8_t parity_blocks,
- uint16_t block_size, uint8_t *data, uint32_t length) {
- if (data_blocks != last_data_blocks) {
- deallocate();
- allocate(data_blocks, parity_blocks, block_size);
- }
- assert(data_blocks * block_size >= length);
- memcpy(block_data.ptr, data, length);
- reed_solomon_encode2(rs, block_ptrs.ptr, block_count(), block_size);
- }
- uint8_t block_count() const {
- return block_ptrs.length;
- }
- private:
- uint8_t last_data_blocks = 0;
- smart_buffer<uint8_t> block_data;
- void deallocate() {
- if (rs == nullptr) return;
- reed_solomon_release(rs);
- rs = nullptr;
- }
- void allocate(uint8_t data_blocks, uint8_t parity_blocks, uint16_t block_size) {
- assert(rs == nullptr);
- rs = reed_solomon_new(data_blocks, parity_blocks);
- auto total_blocks = data_blocks + parity_blocks;
- block_data.create(total_blocks * block_size);
- block_ptrs.create(total_blocks);
- for (int i = 0; i < total_blocks; ++i) {
- block_ptrs.ptr[i] = block_data.ptr + block_size * i;
- }
- last_data_blocks = data_blocks;
- }
- };
- }
- using namespace sender_udp_fec_impl;
- struct sender_udp_fec::impl {
- struct frag_header {
- uint32_t frag_checksum;
- uint8_t frame_type; // 'I' or 'P'
- uint32_t frame_id;
- uint32_t frame_length;
- uint8_t chunk_count;
- uint8_t chunk_id;
- uint32_t chunk_offset;
- uint32_t chunk_length;
- uint16_t block_size;
- uint8_t block_count;
- uint8_t chunk_decode_block_count;
- uint8_t block_id;
- };
- struct request_type {
- uint32_t request_checksum;
- uint8_t request_type;
- uint32_t frame_id;
- };
- static constexpr auto frag_header_size = 28;
- static constexpr auto request_size = 9;
- static constexpr auto max_block_count = DATA_SHARDS_MAX;
- static constexpr auto max_package_size = 64 * 1024; // 64KiB
- static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
- static constexpr int confirm_timeout = 10 * 1e6; // 10s
- using frame_ptr_type = std::unique_ptr<video_nal>;
- sender_udp_fec *q_this = nullptr;
- std::unique_ptr<udp::socket> socket;
- udp::endpoint request_ep;
- std::unique_ptr<udp::endpoint> remote_ep;
- float parity_rate;
- uint8_t max_data_block_count; // max_block_count / (1 + parity_rate)
- uint16_t block_size; // conn_mtu - header_size
- uint32_t max_chunk_size; // max_data_block_count * block_size
- smart_reed_solomon rs;
- smart_buffer<uint8_t> in_buf, out_buf;
- timestamp_type last_confirm_ts = 0;
- static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) {
- #define WRITE(member) ptr = write_binary_number(ptr, header.member)
- WRITE(frag_checksum);
- WRITE(frame_type);
- WRITE(frame_id);
- WRITE(frame_length);
- WRITE(chunk_count);
- WRITE(chunk_id);
- WRITE(chunk_offset);
- WRITE(chunk_length);
- WRITE(block_size);
- WRITE(block_count);
- WRITE(chunk_decode_block_count);
- WRITE(block_id);
- #undef WRITE
- return ptr;
- }
- static uint8_t *read_request(uint8_t *ptr, request_type *req) {
- #define READ(member) ptr = read_binary_number(ptr, &req->member)
- READ(request_checksum);
- READ(request_type);
- READ(frame_id);
- #undef READ
- return ptr;
- }
- void send_block(uint8_t *block_data, const frag_header &header) {
- out_buf.create(max_package_size);
- auto ptr = write_frag_header(out_buf.ptr, header);
- assert(ptr - out_buf.ptr == frag_header_size);
- memcpy(ptr, block_data, block_size);
- // calculate crc32
- auto crc = boost::crc_32_type{};
- crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
- frag_header_size + block_size - sizeof(uint32_t));
- write_binary_number(out_buf.ptr, crc.checksum());
- // send packet
- assert(socket != nullptr);
- auto buf = buffer(out_buf.ptr, frag_header_size + block_size);
- assert(remote_ep != nullptr);
- socket->send_to(buf, *remote_ep);
- }
- void send_chunk(uint8_t *chunk_data, uint32_t chunk_length, frag_header *header) {
- auto data_blocks = (chunk_length + block_size - 1) / block_size;
- assert(data_blocks <= max_data_block_count);
- auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate));
- rs.process(data_blocks, parity_blocks, block_size, chunk_data, chunk_length);
- header->block_size = block_size;
- header->block_count = rs.block_count();
- header->chunk_decode_block_count = data_blocks;
- for (auto k = 0; k < rs.block_count(); ++k) {
- header->block_id = k;
- send_block(rs.block_ptrs.ptr[k], *header);
- }
- }
- void send_frame(frame_ptr_type &&frame) {
- if (remote_ep == nullptr) return;
- frag_header header;
- header.frame_type = frame->idr ? 'I' : 'P';
- header.frame_id = frame->frame_id;
- header.frame_length = frame->length;
- auto chunk_count = (frame->length + max_chunk_size - 1) / max_chunk_size;
- header.chunk_count = chunk_count;
- for (auto k = 0; k < chunk_count; ++k) {
- header.chunk_offset = k * max_chunk_size;
- header.chunk_id = k;
- header.chunk_length = std::min((size_t) max_chunk_size,
- frame->length - k * max_chunk_size);
- auto chunk_data = frame->ptr + header.chunk_offset;
- send_chunk(chunk_data, header.chunk_length, &header);
- }
- q_this->log_frame_sent(frame->frame_id);
- SPDLOG_TRACE("Frame {} sent.", frame->frame_id);
- }
- void async_handle_request() {
- in_buf.create(max_package_size);
- auto buf = buffer(in_buf.ptr, max_package_size);
- using namespace std::placeholders;
- socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
- }
- void handle_request(const error_code &ec, size_t length) {
- // prepare for next request when this function exited.
- auto closer = sg::make_scope_guard([this] {
- async_handle_request();
- });
- // handle errors
- if (ec) {
- SPDLOG_ERROR("Error while receiving request: {}", ec.what());
- return;
- }
- // parse request
- if (length != request_size) return;
- request_type req;
- read_request(in_buf.ptr, &req);
- auto crc = boost::crc_32_type{};
- crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
- request_size - sizeof(uint32_t));
- if (crc.checksum() != req.request_checksum) { // checksum failed
- // TODO show log
- return;
- }
- // handle request
- switch (req.request_type) {
- case 'X': {
- SPDLOG_INFO("Client {}:{} left.",
- remote_ep->address().to_string(), remote_ep->port());
- close_connection();
- return;
- }
- case 'C': {
- last_confirm_ts = current_timestamp();
- SPDLOG_TRACE("Frame {} confirmed.", req.frame_id);
- return;
- }
- case 'I': {
- remote_ep = std::make_unique<udp::endpoint>(request_ep);
- last_confirm_ts = current_timestamp();
- q_this->request_idr_frame();
- mq().update_variable(SENDER_CONNECTED, true);
- static uint32_t last_frame_id = 0;
- if (req.frame_id != last_frame_id) {
- SPDLOG_INFO("New client from {}:{}.",
- remote_ep->address().to_string(), remote_ep->port());
- last_frame_id = req.frame_id;
- }
- return;
- }
- default: {
- // TODO show log
- return;
- }
- }
- }
- void close_connection() {
- remote_ep.reset();
- mq().update_variable(SENDER_CONNECTED, false);
- }
- bool is_connected() {
- if (remote_ep == nullptr) return false;
- if (current_timestamp() - last_confirm_ts > confirm_timeout) [[unlikely]] {
- SPDLOG_WARN("Client timeout.");
- close_connection();
- return false;
- }
- return true;
- }
- static impl *create(const config &conf, sender_udp_fec *q_this) {
- auto ret = new impl;
- auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
- ret->q_this = q_this;
- ret->socket = std::make_unique<udp::socket>(*q_this->get_ctx(), local_ep);
- ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
- ret->async_handle_request();
- // constant configs
- ret->parity_rate = conf.parity_rate;
- ret->max_data_block_count = max_block_count / (1 + conf.parity_rate);
- ret->block_size = conf.conn_mtu - frag_header_size;
- ret->max_chunk_size = ret->max_data_block_count * ret->block_size;
- // initialize reed solomon
- fec_init();
- return ret;
- }
- };
- sender_udp_fec::~sender_udp_fec() = default;
- sender_udp_fec *sender_udp_fec::create(const config &conf) {
- auto ret = std::make_unique<sender_udp_fec>();
- auto pimpl = impl::create(conf, ret.get());
- if (pimpl == nullptr) return nullptr;
- ret->pimpl.reset(pimpl);
- return ret.release();
- }
- void sender_udp_fec::close_connection() {
- pimpl->close_connection();
- }
- void sender_udp_fec::handle_frame(frame_ptr_type &&frame) {
- pimpl->send_frame(std::move(frame));
- }
|