| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553 |
- #include "config.h"
- #include "frame_sender.h"
- #include "third_party/scope_guard.hpp"
- extern "C" {
- #include "third_party/rs.h"
- }
- #include <boost/asio/awaitable.hpp>
- #include <boost/asio/buffer.hpp>
- #include <boost/asio/deadline_timer.hpp>
- #include <boost/asio/detached.hpp>
- #include <boost/asio/experimental/awaitable_operators.hpp>
- #include <boost/asio/experimental/concurrent_channel.hpp>
- #include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/udp.hpp>
- #include <boost/asio/redirect_error.hpp>
- #include <boost/asio/use_awaitable.hpp>
- #include <boost/endian.hpp>
- #include <boost/date_time/posix_time/posix_time.hpp>
- #include <boost/smart_ptr.hpp>
- #include <xxhash.h>
- #include <spdlog/spdlog.h>
- #include <cstdint>
- #include <deque>
- #include <numeric>
- #include <random>
- #include <thread>
- #include <tuple>
- #include <vector>
- using namespace boost::asio::experimental::awaitable_operators;
- using namespace boost::asio::ip;
- using namespace boost::posix_time;
- using boost::asio::awaitable;
- using boost::asio::buffer;
- using boost::asio::deadline_timer;
- using boost::asio::detached;
- using boost::asio::experimental::concurrent_channel;
- using boost::asio::io_context;
- using boost::asio::redirect_error;
- using boost::asio::use_awaitable;
- using boost::system::error_code;
- #define EXCEPTION_CHECK(api_call) \
- try { \
- api_call; \
- } catch (std::exception &e) { \
- SPDLOG_ERROR("Procedure {} failed at {}:{} with exception {}.", \
- #api_call, __FILE__, __LINE__, e.what()); \
- return false; \
- } void(0)
- #define CORO_CHECK(api_call) { \
- bool ok = co_await (api_call); \
- if (!ok) { \
- SPDLOG_ERROR("Coroutine {} failed at {}:{}.", \
- #api_call, __FILE__, __LINE__); \
- co_return false; \
- } \
- } void(0)
- struct frame_sender::impl {
- static constexpr auto buffer_size = 64 * 1024; // 64KB
- static constexpr auto rtt_probe_count = 30;
- static constexpr auto max_loss_rate = 0.2; // 20% packet loss rate
- static constexpr auto frag_header_size = 35;
- static constexpr auto channel_buffer_size = 16;
- struct frag_header {
- uint64_t frag_checksum;
- uint8_t frame_type;
- uint64_t frame_salt;
- uint32_t frame_id;
- uint32_t frame_length;
- uint32_t block_size;
- uint16_t block_count;
- uint16_t frame_decode_count;
- uint16_t block_id;
- };
- struct sent_frame_info {
- uint64_t salt;
- uint32_t id;
- ptime time;
- };
- uint16_t local_port = 5277;
- udp::endpoint remote_endpoint;
- uint64_t conn_rtt_us = 50; // connection round trip time (RTT)
- uint16_t conn_mtu = 1200;
- double parity_rate = 0.2;
- boost::scoped_ptr<io_context> context;
- boost::scoped_ptr<udp::socket> socket;
- using chan_type = concurrent_channel<void(error_code, frame_info)>;
- boost::scoped_ptr<chan_type> chan;
- char *in_data = nullptr, *out_data = nullptr;
- enum status_type {
- IDLE,
- CONNECTING,
- CONNECTED
- } status = IDLE;
- uint32_t frame_count = 0;
- std::atomic_flag *idr_flag = nullptr;
- int frame_rate = default_camera_fps;
- time_duration frame_timeout, conn_timeout;
- ptime last_confirm_time;
- boost::scoped_ptr<deadline_timer> keepalive_timer;
- std::deque<sent_frame_info> sent_list; // pending confirm list
- std::unique_ptr<std::thread> work_thread;
- impl() {
- in_data = (char *) malloc(buffer_size);
- out_data = (char *) malloc(buffer_size);
- context.reset(new io_context{});
- chan.reset(new chan_type{*context, channel_buffer_size});
- keepalive_timer.reset(new deadline_timer{*context});
- auto error_handler = [](std::exception_ptr ep) {
- if (!ep) {
- SPDLOG_ERROR("Infinite loop exited with no error.");
- return;
- }
- try {
- std::rethrow_exception(ep);
- } catch (std::exception &e) {
- SPDLOG_ERROR("Infinite loop exited with error: {}", e.what());
- }
- };
- co_spawn(*context, main_loop(), error_handler);
- co_spawn(*context, keepalive_loop(), error_handler);
- }
- ~impl() {
- stop();
- free(in_data);
- free(out_data);
- }
- static uint64_t generate_salt() {
- static std::random_device device;
- static std::default_random_engine engine{device()};
- static std::uniform_int_distribution<uint64_t> dist;
- return dist(engine);
- }
- template<typename T>
- static char *write_binary_number(char *ptr, T val) {
- static constexpr auto need_swap =
- (boost::endian::order::native != boost::endian::order::big);
- auto real_ptr = (T *) ptr;
- if constexpr (need_swap) {
- *real_ptr = boost::endian::endian_reverse(val);
- } else {
- *real_ptr = val;
- }
- return ptr + sizeof(T);
- }
- template<typename T>
- static char *read_binary_number(char *ptr, T *val) {
- static constexpr auto need_swap =
- (boost::endian::order::native != boost::endian::order::big);
- *val = *(T *) ptr;
- if constexpr (need_swap) {
- boost::endian::endian_reverse_inplace(*val);
- }
- return ptr + sizeof(T);
- }
- static char *write_frag_header(char *ptr, const frag_header *header) {
- #define WRITE(member) ptr = write_binary_number(ptr, header->member)
- WRITE(frag_checksum);
- WRITE(frame_type);
- WRITE(frame_salt);
- WRITE(frame_id);
- WRITE(frame_length);
- WRITE(block_size);
- WRITE(block_count);
- WRITE(frame_decode_count);
- WRITE(block_id);
- #undef WRITE
- return ptr;
- }
- // calculate and fill hash value for out buffer
- bool calc_out_hash(char *end_ptr) {
- assert(end_ptr - out_data > sizeof(uint64_t));
- static auto hash_state = XXH64_createState();
- auto out_ptr = out_data + sizeof(uint64_t);
- CALL_CHECK(XXH64_reset(hash_state, 0) != XXH_ERROR);
- CALL_CHECK(XXH64_update(hash_state, out_ptr, end_ptr - out_ptr) != XXH_ERROR);
- write_binary_number(out_data, XXH64_digest(hash_state));
- return true;
- }
- bool check_rtt_reply(uint64_t salt, uint16_t out_len, uint16_t in_len) {
- static constexpr auto desired_length =
- sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint16_t);
- if (in_len != desired_length) return false;
- // check frag type
- if (in_data[0] != 'R') return false;
- // check frame salt
- uint64_t in_salt;
- auto in_ptr = read_binary_number(in_data + sizeof(uint8_t), &in_salt);
- if (in_salt != salt) return false;
- // check returned length
- uint16_t in_frag_len;
- read_binary_number(in_ptr, &in_frag_len);
- if (in_frag_len != out_len) return false;
- return true;
- }
- template<typename T>
- static T power2(T x) { return x * x; }
- static uint64_t calc_upper_rtt(const std::vector<uint64_t> &v) {
- auto sum = std::accumulate(v.begin(), v.end(), 0.0);
- auto mean = sum / (double) v.size();
- auto sum2 = std::accumulate(v.begin(), v.end(), 0.0,
- [=](double a, uint64_t b) { return a + power2((double) b - mean); });
- auto std_var = std::sqrt(sum2 / (double) v.size());
- return (uint64_t) (mean + 5 * std_var);
- }
- awaitable<bool> probe_rtt() {
- static const auto max_rtt = seconds(1);
- auto timer = deadline_timer{*context};
- std::vector<uint64_t> rtt_result;
- auto in_buf = buffer(in_data, buffer_size);
- for (int k = 0; k < rtt_probe_count; ++k) {
- auto salt = generate_salt();
- // write probe frag data
- auto out_ptr = out_data;
- out_ptr = write_binary_number(out_ptr, (uint64_t) 0); // checksum placeholder
- out_ptr = write_binary_number(out_ptr, 'T');
- out_ptr = write_binary_number(out_ptr, salt);
- // fill frag with random data
- auto limit_ptr = out_data + conn_mtu;
- auto content_len = 0;
- while (out_ptr + sizeof(uint64_t) < limit_ptr) {
- out_ptr = write_binary_number(out_ptr, generate_salt());
- content_len += sizeof(uint64_t);
- }
- calc_out_hash(out_ptr);
- auto out_buf = buffer(out_data, out_ptr - out_data);
- socket->send_to(out_buf, remote_endpoint);
- // wait for reply or timeout
- auto start_time = microsec_clock::local_time();
- timer.expires_from_now(max_rtt);
- for (;;) {
- udp::endpoint sender_endpoint;
- auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) ||
- timer.async_wait(use_awaitable));
- if (ret.index() == 0) { // received reply
- if (sender_endpoint != remote_endpoint) continue;
- if (check_rtt_reply(salt, content_len, std::get<0>(ret))) {
- auto end_time = microsec_clock::local_time();
- auto rtt_us = (end_time - start_time).total_microseconds();
- rtt_result.push_back(rtt_us);
- SPDLOG_TRACE("RTT probe {}: {}us.", k, rtt_us);
- break;
- }
- } else { // timeout
- assert(ret.index() == 1);
- SPDLOG_TRACE("RTT probe {}: failed.", k);
- break;
- }
- }
- }
- if (rtt_result.size() <= (int) (rtt_probe_count * max_loss_rate)) {
- SPDLOG_WARN("Packet loss rate too high, cannot probe RTT.");
- co_return false;
- }
- conn_rtt_us = calc_upper_rtt(rtt_result);
- SPDLOG_INFO("Connection MaxRTT: {}us.", conn_rtt_us);
- co_return true;
- }
- awaitable<bool> setup_connection() {
- // socket->connect(remote_endpoint);
- CORO_CHECK(probe_rtt());
- // TODO: detect mtu
- // TODO: detect packet loss rate
- // reset timer
- frame_timeout = milliseconds(conn_rtt_us / 1000 + 3 * 1000 / frame_rate);
- conn_timeout = seconds(1); // TODO
- last_confirm_time = microsec_clock::local_time();
- // keepalive_timer->expires_at(boost::posix_time::pos_infin);
- sent_list.clear();
- idr_flag->test_and_set();
- co_return true;
- }
- void handle_frame_confirm(size_t msg_len) {
- static constexpr auto desired_length =
- sizeof(uint8_t) + sizeof(uint64_t);
- if (msg_len != desired_length) return;
- // read salt
- uint64_t frame_salt;
- read_binary_number(in_data + 1, &frame_salt);
- static uint64_t last_frame_salt;
- if (frame_salt == last_frame_salt) return; // already confirmed
- // erase confirmed frame
- auto iter = sent_list.begin();
- while (iter != sent_list.end() && iter->salt != frame_salt) ++iter;
- if (iter == sent_list.end()) return;
- SPDLOG_TRACE("Frame {} confirmed.", iter->id);
- sent_list.erase(sent_list.begin(), ++iter);
- // reset timer
- if (sent_list.empty()) {
- keepalive_timer->expires_at(pos_infin);
- } else {
- keepalive_timer->expires_at(sent_list.begin()->time + frame_timeout);
- }
- last_confirm_time = microsec_clock::local_time();
- }
- void handle_upd_message(size_t msg_len, const udp::endpoint &sender) {
- assert(status != CONNECTING);
- if (status == IDLE) {
- if (msg_len == 1 && in_data[0] == 'R') { // reset connection
- if (status == CONNECTING) {
- SPDLOG_WARN("Only one connection is supported.");
- return;
- }
- status = CONNECTING;
- remote_endpoint = sender;
- SPDLOG_INFO("Reset connection with {}:{}.", sender.address().to_string(), sender.port());
- co_spawn(*context, setup_connection(), [this](std::exception_ptr e, bool ok) {
- assert(!e);
- SPDLOG_INFO("Reset connection {}.", ok ? "succeeded" : "failed");
- if (ok) {
- status = CONNECTED;
- } else {
- status = IDLE;
- remote_endpoint = {};
- }
- });
- }
- } else if (status == CONNECTED) {
- if (sender != remote_endpoint) return;
- if (msg_len == 1 && in_data[0] == 'E') { // client exit
- keepalive_timer->expires_at(pos_infin);
- status = IDLE;
- SPDLOG_INFO("Client left.");
- } else if (in_data[0] == 'C') { // confirmation
- handle_frame_confirm(msg_len);
- }
- }
- }
- void handle_frame(const frame_info &info) {
- ++frame_count;
- auto frame_deleter = sg::make_scope_guard([&]() {
- free(info.data);
- });
- if (status != CONNECTED) {
- SPDLOG_TRACE("Frame {} received, but connection is not ready.");
- return;
- }
- // prepare buffer for frame
- auto block_size = (conn_mtu - frag_header_size) & 0xffffff00; // TODO: support for larger frame
- auto data_blocks = (info.length + block_size - 1) / block_size;
- auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate));
- auto total_blocks = data_blocks + parity_blocks;
- auto block_data = (uint8_t *) malloc(total_blocks * block_size);
- auto block_ptr = (uint8_t **) malloc(total_blocks * sizeof(void *));
- for (int i = 0; i < total_blocks; ++i) {
- block_ptr[i] = block_data + block_size * i;
- }
- auto rs = reed_solomon_new(data_blocks, parity_blocks);
- assert(rs != nullptr);
- auto closer = sg::make_scope_guard([&]() {
- free(block_data);
- free(block_ptr);
- reed_solomon_release(rs);
- });
- // calc reed-solomon
- memcpy(block_data, info.data, info.length);
- auto ret = reed_solomon_encode2(rs, block_ptr, total_blocks, block_size);
- assert(ret == 0);
- // send encoded frames
- frag_header header;
- header.frame_type = info.is_idr_frame ? 'I' : 'P';
- header.frame_salt = generate_salt();
- header.frame_id = frame_count;
- header.frame_length = info.length;
- header.block_size = block_size;
- header.block_count = total_blocks;
- header.frame_decode_count = data_blocks;
- for (int i = 0; i < total_blocks; ++i) {
- header.block_id = i;
- auto out_ptr = write_frag_header(out_data, &header);
- assert(out_ptr - out_data == frag_header_size);
- memcpy(out_ptr, block_ptr[i], block_size);
- out_ptr += block_size;
- calc_out_hash(out_ptr);
- auto out_buf = buffer(out_data, out_ptr - out_data);
- socket->send_to(out_buf, remote_endpoint);
- }
- SPDLOG_TRACE("Frame {} is sent with {}+{} blocks.",
- header.frame_id, header.block_count, header.block_count - header.frame_decode_count);
- // config frame queue and timeout
- if (keepalive_timer->expires_at() == pos_infin) {
- keepalive_timer->expires_from_now(frame_timeout);
- SPDLOG_TRACE("Timer reset to {}.", to_simple_string(keepalive_timer->expires_at()));
- }
- sent_list.push_back({header.frame_salt, header.frame_id,
- microsec_clock::local_time()});
- }
- awaitable<void> main_loop() {
- auto in_buf = buffer(in_data, buffer_size);
- for (;;) {
- if (status == CONNECTING) {
- auto ret = co_await chan->async_receive(use_awaitable);
- handle_frame(ret);
- } else { // IDLE or CONNECTED
- udp::endpoint sender_endpoint;
- auto ret = co_await (socket->async_receive_from(in_buf, sender_endpoint, use_awaitable) ||
- chan->async_receive(use_awaitable));
- if (ret.index() == 0) { // udp message
- handle_upd_message(std::get<0>(ret), sender_endpoint);
- } else { // new frame
- assert(ret.index() == 1);
- handle_frame(std::get<1>(ret));
- }
- }
- }
- }
- awaitable<void> keepalive_loop() {
- for (;;) {
- error_code ec;
- co_await keepalive_timer->async_wait(redirect_error(use_awaitable, ec));
- if (ec == boost::asio::error::operation_aborted) {
- SPDLOG_TRACE("Timer aborted.");
- continue;
- }
- SPDLOG_WARN("Connection timeout.");
- keepalive_timer->expires_at(pos_infin);
- auto now = microsec_clock::local_time();
- if (now - last_confirm_time > conn_timeout) {
- status = IDLE;
- SPDLOG_WARN("Connection closed.");
- } else {
- idr_flag->test_and_set();
- }
- sent_list.clear();
- }
- }
- void start() {
- // clean channel
- if (chan != nullptr) {
- while (chan->try_receive([](error_code e, frame_info &&info) {
- free(info.data);
- }));
- }
- keepalive_timer->expires_at(pos_infin);
- auto local_endpoint = udp::endpoint{udp::v4(), local_port};
- socket.reset(new udp::socket{*context, local_endpoint});
- socket->set_option(udp::socket::send_buffer_size{10 * 1024 * 1024}); // 10MB send buffer
- assert(socket->is_open());
- if (context->stopped()) {
- context->restart();
- }
- // request idr frame
- idr_flag->test_and_set();
- assert(work_thread == nullptr);
- work_thread = std::make_unique<std::thread>([this]() {
- try {
- context->run();
- } catch (std::exception &e) {
- SPDLOG_ERROR("Frame sender error: {}", e.what());
- }
- });
- }
- void stop() {
- if (work_thread == nullptr) return;
- context->stop();
- work_thread->join();
- work_thread = nullptr;
- socket->close();
- }
- };
- frame_sender::frame_sender()
- : pimpl(std::make_unique<impl>()) {
- fec_init();
- }
- frame_sender::~frame_sender() = default;
- bool frame_sender::start(uint16_t local_port, std::atomic_flag *idr_flag, int fps) {
- pimpl->local_port = local_port;
- pimpl->idr_flag = idr_flag;
- pimpl->frame_rate = fps;
- EXCEPTION_CHECK(pimpl->start());
- return true;
- }
- void frame_sender::stop() {
- pimpl->stop();
- }
- bool frame_sender::send_frame(const frame_sender::frame_info &info) {
- CALL_CHECK(pimpl->chan->try_send(error_code{}, info));
- return true;
- }
- bool frame_sender::is_running() {
- return pimpl->work_thread != nullptr;
- }
|