#include "sender_udp_fec.h" #include "sender_utility.hpp" #include "core/timestamp_helper.hpp" #include "simple_mq.h" #include "third_party/scope_guard.hpp" #include "utility.hpp" #include "variable_defs.h" extern "C" { #include "third_party/rs.h" } #include #include #include #include #include using namespace boost::asio::ip; using boost::asio::buffer; using boost::asio::io_context; using boost::asio::post; using boost::system::error_code; using namespace sender_impl; using namespace sophiar; using namespace simple_mq_singleton; namespace sender_udp_fec_impl { struct smart_reed_solomon { reed_solomon *rs = nullptr; smart_buffer block_ptrs; ~smart_reed_solomon() { deallocate(); } // assume parity_rate and block_size will not change in the lifetime void process(uint8_t data_blocks, uint8_t parity_blocks, uint16_t block_size, uint8_t *data, uint32_t length) { if (data_blocks != last_data_blocks) { deallocate(); allocate(data_blocks, parity_blocks, block_size); } assert(data_blocks * block_size >= length); memcpy(block_data.ptr, data, length); reed_solomon_encode2(rs, block_ptrs.ptr, block_count(), block_size); } uint8_t block_count() const { return block_ptrs.length; } private: uint8_t last_data_blocks = 0; smart_buffer block_data; void deallocate() { if (rs == nullptr) return; reed_solomon_release(rs); rs = nullptr; } void allocate(uint8_t data_blocks, uint8_t parity_blocks, uint16_t block_size) { assert(rs == nullptr); rs = reed_solomon_new(data_blocks, parity_blocks); auto total_blocks = data_blocks + parity_blocks; block_data.create(total_blocks * block_size); block_ptrs.create(total_blocks); for (int i = 0; i < total_blocks; ++i) { block_ptrs.ptr[i] = block_data.ptr + block_size * i; } last_data_blocks = data_blocks; } }; } using namespace sender_udp_fec_impl; struct sender_udp_fec::impl { struct frag_header { uint32_t frag_checksum; uint8_t frame_type; // 'I' or 'P' uint32_t frame_id; uint32_t frame_length; uint8_t chunk_count; uint8_t chunk_id; uint32_t chunk_offset; uint32_t chunk_length; uint16_t block_size; uint8_t block_count; uint8_t chunk_decode_block_count; uint8_t block_id; }; struct request_type { uint32_t request_checksum; uint8_t request_type; uint32_t frame_id; }; static constexpr auto frag_header_size = 28; static constexpr auto request_size = 9; static constexpr auto max_block_count = DATA_SHARDS_MAX; static constexpr auto max_package_size = 64 * 1024; // 64KiB static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB static constexpr int confirm_timeout = 10 * 1e6; // 10s using frame_ptr_type = std::unique_ptr; sender_udp_fec *q_this = nullptr; std::unique_ptr socket; udp::endpoint request_ep; std::unique_ptr remote_ep; float parity_rate; uint8_t max_data_block_count; // max_block_count / (1 + parity_rate) uint16_t block_size; // conn_mtu - header_size uint32_t max_chunk_size; // max_data_block_count * block_size smart_reed_solomon rs; smart_buffer in_buf, out_buf; timestamp_type last_confirm_ts = 0; static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) { #define WRITE(member) ptr = write_binary_number(ptr, header.member) WRITE(frag_checksum); WRITE(frame_type); WRITE(frame_id); WRITE(frame_length); WRITE(chunk_count); WRITE(chunk_id); WRITE(chunk_offset); WRITE(chunk_length); WRITE(block_size); WRITE(block_count); WRITE(chunk_decode_block_count); WRITE(block_id); #undef WRITE return ptr; } static uint8_t *read_request(uint8_t *ptr, request_type *req) { #define READ(member) ptr = read_binary_number(ptr, &req->member) READ(request_checksum); READ(request_type); READ(frame_id); #undef READ return ptr; } void send_block(uint8_t *block_data, const frag_header &header) { out_buf.create(max_package_size); auto ptr = write_frag_header(out_buf.ptr, header); assert(ptr - out_buf.ptr == frag_header_size); memcpy(ptr, block_data, block_size); // calculate crc32 auto crc = boost::crc_32_type{}; crc.process_bytes(out_buf.ptr + sizeof(uint32_t), frag_header_size + block_size - sizeof(uint32_t)); write_binary_number(out_buf.ptr, crc.checksum()); // send packet assert(socket != nullptr); auto buf = buffer(out_buf.ptr, frag_header_size + block_size); assert(remote_ep != nullptr); socket->send_to(buf, *remote_ep); } void send_chunk(uint8_t *chunk_data, uint32_t chunk_length, frag_header *header) { auto data_blocks = (chunk_length + block_size - 1) / block_size; assert(data_blocks <= max_data_block_count); auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate)); rs.process(data_blocks, parity_blocks, block_size, chunk_data, chunk_length); header->block_size = block_size; header->block_count = rs.block_count(); header->chunk_decode_block_count = data_blocks; for (auto k = 0; k < rs.block_count(); ++k) { header->block_id = k; send_block(rs.block_ptrs.ptr[k], *header); } } void send_frame(frame_ptr_type &&frame) { if (remote_ep == nullptr) return; frag_header header; header.frame_type = frame->idr ? 'I' : 'P'; header.frame_id = frame->frame_id; header.frame_length = frame->length; auto chunk_count = (frame->length + max_chunk_size - 1) / max_chunk_size; header.chunk_count = chunk_count; for (auto k = 0; k < chunk_count; ++k) { header.chunk_offset = k * max_chunk_size; header.chunk_id = k; header.chunk_length = std::min((size_t) max_chunk_size, frame->length - k * max_chunk_size); auto chunk_data = frame->ptr + header.chunk_offset; send_chunk(chunk_data, header.chunk_length, &header); } q_this->log_frame_sent(frame->frame_id); SPDLOG_TRACE("Frame {} sent.", frame->frame_id); } void async_handle_request() { in_buf.create(max_package_size); auto buf = buffer(in_buf.ptr, max_package_size); using namespace std::placeholders; socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2)); } void handle_request(const error_code &ec, size_t length) { // prepare for next request when this function exited. auto closer = sg::make_scope_guard([this] { async_handle_request(); }); // handle errors if (ec) { SPDLOG_ERROR("Error while receiving request: {}", ec.what()); return; } // parse request if (length != request_size) return; request_type req; read_request(in_buf.ptr, &req); auto crc = boost::crc_32_type{}; crc.process_bytes(in_buf.ptr + sizeof(uint32_t), request_size - sizeof(uint32_t)); if (crc.checksum() != req.request_checksum) { // checksum failed // TODO show log return; } // handle request switch (req.request_type) { case 'X': { SPDLOG_INFO("Client {}:{} left.", remote_ep->address().to_string(), remote_ep->port()); close_connection(); return; } case 'C': { last_confirm_ts = current_timestamp(); SPDLOG_TRACE("Frame {} confirmed.", req.frame_id); return; } case 'I': { remote_ep = std::make_unique(request_ep); last_confirm_ts = current_timestamp(); q_this->request_idr_frame(); mq().update_variable(SENDER_CONNECTED, true); static uint32_t last_frame_id = 0; if (req.frame_id != last_frame_id) { SPDLOG_INFO("New client from {}:{}.", remote_ep->address().to_string(), remote_ep->port()); last_frame_id = req.frame_id; } return; } default: { // TODO show log return; } } } void close_connection() { remote_ep.reset(); mq().update_variable(SENDER_CONNECTED, false); } bool is_connected() { if (remote_ep == nullptr) return false; if (current_timestamp() - last_confirm_ts > confirm_timeout) [[unlikely]] { SPDLOG_WARN("Client timeout."); close_connection(); return false; } return true; } static impl *create(const config &conf, sender_udp_fec *q_this) { auto ret = new impl; auto local_ep = udp::endpoint{udp::v4(), conf.listen_port}; ret->q_this = q_this; ret->socket = std::make_unique(*q_this->get_ctx(), local_ep); ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size}); ret->async_handle_request(); // constant configs ret->parity_rate = conf.parity_rate; ret->max_data_block_count = max_block_count / (1 + conf.parity_rate); ret->block_size = conf.conn_mtu - frag_header_size; ret->max_chunk_size = ret->max_data_block_count * ret->block_size; // initialize reed solomon fec_init(); return ret; } }; sender_udp_fec::~sender_udp_fec() = default; sender_udp_fec *sender_udp_fec::create(const config &conf) { auto ret = std::make_unique(); auto pimpl = impl::create(conf, ret.get()); if (pimpl == nullptr) return nullptr; ret->pimpl.reset(pimpl); return ret.release(); } void sender_udp_fec::close_connection() { pimpl->close_connection(); } void sender_udp_fec::handle_frame(frame_ptr_type &&frame) { pimpl->send_frame(std::move(frame)); }