#include "config.h" #include "frame_sender.h" #include "third_party/scope_guard.hpp" extern "C" { #include "third_party/rs.h" } #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace boost::asio::experimental::awaitable_operators; using namespace boost::asio::ip; using namespace boost::posix_time; using boost::asio::awaitable; using boost::asio::buffer; using boost::asio::deadline_timer; using boost::asio::detached; using boost::asio::experimental::concurrent_channel; using boost::asio::io_context; using boost::asio::redirect_error; using boost::asio::use_awaitable; using boost::system::error_code; #define EXCEPTION_CHECK(api_call) \ try { \ api_call; \ } catch (std::exception &e) { \ SPDLOG_ERROR("Procedure {} failed at {}:{} with exception {}.", \ #api_call, __FILE__, __LINE__, e.what()); \ return false; \ } void(0) #define CORO_CHECK(api_call) { \ bool ok = co_await (api_call); \ if (!ok) { \ SPDLOG_ERROR("Coroutine {} failed at {}:{}.", \ #api_call, __FILE__, __LINE__); \ co_return false; \ } \ } void(0) struct frame_sender::impl { static constexpr auto buffer_size = 64 * 1024; // 64KB static constexpr auto rtt_probe_count = 30; static constexpr auto max_loss_rate = 0.2; // 20% packet loss rate static constexpr auto frag_header_size = 35; static constexpr auto channel_buffer_size = 16; struct frag_header { uint64_t frag_checksum; uint8_t frame_type; uint64_t frame_salt; uint32_t frame_id; uint32_t frame_length; uint32_t block_size; uint16_t block_count; uint16_t frame_decode_count; uint16_t block_id; }; struct sent_frame_info { uint64_t salt; uint32_t id; ptime time; }; uint16_t local_port = 5277; udp::endpoint remote_endpoint; uint64_t conn_rtt_us = 50; // connection round trip time (RTT) uint16_t conn_mtu = 1200; double parity_rate = 0.2; boost::scoped_ptr context; boost::scoped_ptr socket; using chan_type = concurrent_channel; boost::scoped_ptr chan; char *in_data = nullptr, *out_data = nullptr; enum status_type { IDLE, CONNECTING, CONNECTED } status = IDLE; uint32_t frame_count = 0; std::atomic_flag *idr_flag = nullptr; int frame_rate = default_camera_fps; time_duration frame_timeout, conn_timeout; ptime last_confirm_time; boost::scoped_ptr keepalive_timer; std::deque sent_list; // pending confirm list std::unique_ptr work_thread; impl() { in_data = (char *) malloc(buffer_size); out_data = (char *) malloc(buffer_size); context.reset(new io_context{}); chan.reset(new chan_type{*context, channel_buffer_size}); keepalive_timer.reset(new deadline_timer{*context}); auto error_handler = [](std::exception_ptr ep) { if (!ep) { SPDLOG_ERROR("Infinite loop exited with no error."); return; } try { std::rethrow_exception(ep); } catch (std::exception &e) { SPDLOG_ERROR("Infinite loop exited with error: {}", e.what()); } }; co_spawn(*context, main_loop(), error_handler); co_spawn(*context, keepalive_loop(), error_handler); } ~impl() { stop(); free(in_data); free(out_data); } static uint64_t generate_salt() { static std::random_device device; static std::default_random_engine engine{device()}; static std::uniform_int_distribution dist; return dist(engine); } template static char *write_binary_number(char *ptr, T val) { static constexpr auto need_swap = (boost::endian::order::native != boost::endian::order::big); auto real_ptr = (T *) ptr; if constexpr (need_swap) { *real_ptr = boost::endian::endian_reverse(val); } else { *real_ptr = val; } return ptr + sizeof(T); } template static char *read_binary_number(char *ptr, T *val) { static constexpr auto need_swap = (boost::endian::order::native != boost::endian::order::big); *val = *(T *) ptr; if constexpr (need_swap) { boost::endian::endian_reverse_inplace(*val); } return ptr + sizeof(T); } static char *write_frag_header(char *ptr, const frag_header *header) { #define WRITE(member) ptr = write_binary_number(ptr, header->member) WRITE(frag_checksum); WRITE(frame_type); WRITE(frame_salt); WRITE(frame_id); WRITE(frame_length); WRITE(block_size); WRITE(block_count); WRITE(frame_decode_count); WRITE(block_id); #undef WRITE return ptr; } // calculate and fill hash value for out buffer bool calc_out_hash(char *end_ptr) { assert(end_ptr - out_data > sizeof(uint64_t)); static auto hash_state = XXH64_createState(); auto out_ptr = out_data + sizeof(uint64_t); CALL_CHECK(XXH64_reset(hash_state, 0) != XXH_ERROR); CALL_CHECK(XXH64_update(hash_state, out_ptr, end_ptr - out_ptr) != XXH_ERROR); write_binary_number(out_data, XXH64_digest(hash_state)); return true; } bool check_rtt_reply(uint64_t salt, uint16_t out_len, uint16_t in_len) { static constexpr auto desired_length = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint16_t); if (in_len != desired_length) return false; // check frag type if (in_data[0] != 'R') return false; // check frame salt uint64_t in_salt; auto in_ptr = read_binary_number(in_data + sizeof(uint8_t), &in_salt); if (in_salt != salt) return false; // check returned length uint16_t in_frag_len; read_binary_number(in_ptr, &in_frag_len); if (in_frag_len != out_len) return false; return true; } template static T power2(T x) { return x * x; } static uint64_t calc_upper_rtt(const std::vector &v) { auto sum = std::accumulate(v.begin(), v.end(), 0.0); auto mean = sum / (double) v.size(); auto sum2 = std::accumulate(v.begin(), v.end(), 0.0, [=](double a, uint64_t b) { return a + power2((double) b - mean); }); auto std_var = std::sqrt(sum2 / (double) v.size()); return (uint64_t) (mean + 5 * std_var); } awaitable probe_rtt() { static const auto max_rtt = seconds(1); auto timer = deadline_timer{*context}; std::vector rtt_result; auto in_buf = buffer(in_data, buffer_size); for (int k = 0; k < rtt_probe_count; ++k) { auto salt = generate_salt(); // write probe frag data auto out_ptr = out_data; out_ptr = write_binary_number(out_ptr, (uint64_t) 0); // checksum placeholder out_ptr = write_binary_number(out_ptr, 'T'); out_ptr = write_binary_number(out_ptr, salt); // fill frag with random data auto limit_ptr = out_data + conn_mtu; auto content_len = 0; while (out_ptr + sizeof(uint64_t) < limit_ptr) { out_ptr = write_binary_number(out_ptr, generate_salt()); content_len += sizeof(uint64_t); } calc_out_hash(out_ptr); auto out_buf = buffer(out_data, out_ptr - out_data); socket->send_to(out_buf, remote_endpoint); // wait for reply or timeout auto start_time = microsec_clock::local_time(); timer.expires_from_now(max_rtt); for (;;) { udp::endpoint sender_endpoint; auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) || timer.async_wait(use_awaitable)); if (ret.index() == 0) { // received reply if (sender_endpoint != remote_endpoint) continue; if (check_rtt_reply(salt, content_len, std::get<0>(ret))) { auto end_time = microsec_clock::local_time(); auto rtt_us = (end_time - start_time).total_microseconds(); rtt_result.push_back(rtt_us); SPDLOG_TRACE("RTT probe {}: {}us.", k, rtt_us); break; } } else { // timeout assert(ret.index() == 1); SPDLOG_TRACE("RTT probe {}: failed.", k); break; } } } if (rtt_result.size() <= (int) (rtt_probe_count * max_loss_rate)) { SPDLOG_WARN("Packet loss rate too high, cannot probe RTT."); co_return false; } conn_rtt_us = calc_upper_rtt(rtt_result); SPDLOG_INFO("Connection MaxRTT: {}us.", conn_rtt_us); co_return true; } awaitable setup_connection() { // socket->connect(remote_endpoint); CORO_CHECK(probe_rtt()); // TODO: detect mtu // TODO: detect packet loss rate // reset timer frame_timeout = milliseconds(conn_rtt_us / 1000 + 3 * 1000 / frame_rate); conn_timeout = seconds(1); // TODO last_confirm_time = microsec_clock::local_time(); // keepalive_timer->expires_at(boost::posix_time::pos_infin); sent_list.clear(); idr_flag->test_and_set(); co_return true; } void handle_frame_confirm(size_t msg_len) { static constexpr auto desired_length = sizeof(uint8_t) + sizeof(uint64_t); if (msg_len != desired_length) return; // read salt uint64_t frame_salt; read_binary_number(in_data + 1, &frame_salt); static uint64_t last_frame_salt; if (frame_salt == last_frame_salt) return; // already confirmed // erase confirmed frame auto iter = sent_list.begin(); while (iter != sent_list.end() && iter->salt != frame_salt) ++iter; if (iter == sent_list.end()) return; SPDLOG_TRACE("Frame {} confirmed.", iter->id); sent_list.erase(sent_list.begin(), ++iter); // reset timer if (sent_list.empty()) { keepalive_timer->expires_at(pos_infin); } else { keepalive_timer->expires_at(sent_list.begin()->time + frame_timeout); } last_confirm_time = microsec_clock::local_time(); } void handle_upd_message(size_t msg_len, const udp::endpoint &sender) { assert(status != CONNECTING); if (status == IDLE) { if (msg_len == 1 && in_data[0] == 'R') { // reset connection if (status == CONNECTING) { SPDLOG_WARN("Only one connection is supported."); return; } status = CONNECTING; remote_endpoint = sender; SPDLOG_INFO("Reset connection with {}:{}.", sender.address().to_string(), sender.port()); co_spawn(*context, setup_connection(), [this](std::exception_ptr e, bool ok) { assert(!e); SPDLOG_INFO("Reset connection {}.", ok ? "succeeded" : "failed"); if (ok) { status = CONNECTED; } else { status = IDLE; remote_endpoint = {}; } }); } } else if (status == CONNECTED) { if (sender != remote_endpoint) return; if (msg_len == 1 && in_data[0] == 'E') { // client exit keepalive_timer->expires_at(pos_infin); status = IDLE; SPDLOG_INFO("Client left."); } else if (in_data[0] == 'C') { // confirmation handle_frame_confirm(msg_len); } } } void handle_frame(const frame_info &info) { ++frame_count; auto frame_deleter = sg::make_scope_guard([&]() { free(info.data); }); if (status != CONNECTED) { SPDLOG_TRACE("Frame {} received, but connection is not ready."); return; } // prepare buffer for frame auto block_size = (conn_mtu - frag_header_size) & 0xffffff00; // TODO: support for larger frame auto data_blocks = (info.length + block_size - 1) / block_size; auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate)); auto total_blocks = data_blocks + parity_blocks; auto block_data = (uint8_t *) malloc(total_blocks * block_size); auto block_ptr = (uint8_t **) malloc(total_blocks * sizeof(void *)); for (int i = 0; i < total_blocks; ++i) { block_ptr[i] = block_data + block_size * i; } auto rs = reed_solomon_new(data_blocks, parity_blocks); assert(rs != nullptr); auto closer = sg::make_scope_guard([&]() { free(block_data); free(block_ptr); reed_solomon_release(rs); }); // calc reed-solomon memcpy(block_data, info.data, info.length); auto ret = reed_solomon_encode2(rs, block_ptr, total_blocks, block_size); assert(ret == 0); // send encoded frames frag_header header; header.frame_type = info.is_idr_frame ? 'I' : 'P'; header.frame_salt = generate_salt(); header.frame_id = frame_count; header.frame_length = info.length; header.block_size = block_size; header.block_count = total_blocks; header.frame_decode_count = data_blocks; for (int i = 0; i < total_blocks; ++i) { header.block_id = i; auto out_ptr = write_frag_header(out_data, &header); assert(out_ptr - out_data == frag_header_size); memcpy(out_ptr, block_ptr[i], block_size); out_ptr += block_size; calc_out_hash(out_ptr); auto out_buf = buffer(out_data, out_ptr - out_data); socket->send_to(out_buf, remote_endpoint); } SPDLOG_TRACE("Frame {} is sent with {}+{} blocks.", header.frame_id, header.block_count, header.block_count - header.frame_decode_count); // config frame queue and timeout if (keepalive_timer->expires_at() == pos_infin) { keepalive_timer->expires_from_now(frame_timeout); SPDLOG_TRACE("Timer reset to {}.", to_simple_string(keepalive_timer->expires_at())); } sent_list.push_back({header.frame_salt, header.frame_id, microsec_clock::local_time()}); } awaitable main_loop() { auto in_buf = buffer(in_data, buffer_size); for (;;) { if (status == CONNECTING) { auto ret = co_await chan->async_receive(use_awaitable); handle_frame(ret); } else { // IDLE or CONNECTED udp::endpoint sender_endpoint; auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) || chan->async_receive(use_awaitable)); if (ret.index() == 0) { // udp message handle_upd_message(std::get<0>(ret), sender_endpoint); } else { // new frame assert(ret.index() == 1); handle_frame(std::get<1>(ret)); } } } } awaitable keepalive_loop() { for (;;) { error_code ec; co_await keepalive_timer->async_wait(redirect_error(use_awaitable, ec)); if (ec == boost::asio::error::operation_aborted) { SPDLOG_TRACE("Timer aborted."); continue; } SPDLOG_WARN("Connection timeout."); keepalive_timer->expires_at(pos_infin); auto now = microsec_clock::local_time(); if (now - last_confirm_time > conn_timeout) { status = IDLE; SPDLOG_WARN("Connection closed."); } else { idr_flag->test_and_set(); } sent_list.clear(); } } void start() { // clean channel if (chan != nullptr) { while (chan->try_receive([](error_code e, frame_info &&info) { free(info.data); })); } keepalive_timer->expires_at(pos_infin); auto local_endpoint = udp::endpoint{udp::v4(), local_port}; socket.reset(new udp::socket{*context, local_endpoint}); socket->set_option(udp::socket::send_buffer_size{10 * 1024 * 1024}); // 10MB send buffer assert(socket->is_open()); if (context->stopped()) { context->restart(); } // request idr frame idr_flag->test_and_set(); assert(work_thread == nullptr); work_thread = std::make_unique([this]() { try { context->run(); } catch (std::exception &e) { SPDLOG_ERROR("Frame sender error: {}", e.what()); } }); } void stop() { if (work_thread == nullptr) return; context->stop(); work_thread->join(); work_thread = nullptr; socket->close(); } }; frame_sender::frame_sender() : pimpl(std::make_unique()) { fec_init(); } frame_sender::~frame_sender() = default; bool frame_sender::start(uint16_t local_port, std::atomic_flag *idr_flag, int fps) { pimpl->local_port = local_port; pimpl->idr_flag = idr_flag; pimpl->frame_rate = fps; EXCEPTION_CHECK(pimpl->start()); return true; } void frame_sender::stop() { pimpl->stop(); } bool frame_sender::send_frame(const frame_sender::frame_info &info) { CALL_CHECK(pimpl->chan->try_send(error_code{}, info)); return true; } bool frame_sender::is_running() { return pimpl->work_thread != nullptr; }