sender_udp_fec.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. #include "sender_udp_fec.h"
  2. #include "sender_utility.hpp"
  3. #include "core/timestamp_helper.hpp"
  4. #include "simple_mq.h"
  5. #include "third_party/scope_guard.hpp"
  6. #include "utility.hpp"
  7. #include "variable_defs.h"
  8. extern "C" {
  9. #include "third_party/rs.h"
  10. }
  11. #include <boost/asio/io_context.hpp>
  12. #include <boost/asio/ip/udp.hpp>
  13. #include <boost/asio/post.hpp>
  14. #include <boost/crc.hpp>
  15. #include <spdlog/spdlog.h>
  16. using namespace boost::asio::ip;
  17. using boost::asio::buffer;
  18. using boost::asio::io_context;
  19. using boost::asio::post;
  20. using boost::system::error_code;
  21. using namespace sender_impl;
  22. using namespace sophiar;
  23. using namespace simple_mq_singleton;
  24. namespace sender_udp_fec_impl {
  25. struct smart_reed_solomon {
  26. reed_solomon *rs = nullptr;
  27. smart_buffer<uint8_t *> block_ptrs;
  28. ~smart_reed_solomon() {
  29. deallocate();
  30. }
  31. // assume parity_rate and block_size will not change in the lifetime
  32. void process(uint8_t data_blocks, uint8_t parity_blocks,
  33. uint16_t block_size, uint8_t *data, uint32_t length) {
  34. if (data_blocks != last_data_blocks) {
  35. deallocate();
  36. allocate(data_blocks, parity_blocks, block_size);
  37. }
  38. assert(data_blocks * block_size >= length);
  39. memcpy(block_data.ptr, data, length);
  40. reed_solomon_encode2(rs, block_ptrs.ptr, block_count(), block_size);
  41. }
  42. uint8_t block_count() const {
  43. return block_ptrs.length;
  44. }
  45. private:
  46. uint8_t last_data_blocks = 0;
  47. smart_buffer<uint8_t> block_data;
  48. void deallocate() {
  49. if (rs == nullptr) return;
  50. reed_solomon_release(rs);
  51. rs = nullptr;
  52. }
  53. void allocate(uint8_t data_blocks, uint8_t parity_blocks, uint16_t block_size) {
  54. assert(rs == nullptr);
  55. rs = reed_solomon_new(data_blocks, parity_blocks);
  56. auto total_blocks = data_blocks + parity_blocks;
  57. block_data.create(total_blocks * block_size);
  58. block_ptrs.create(total_blocks);
  59. for (int i = 0; i < total_blocks; ++i) {
  60. block_ptrs.ptr[i] = block_data.ptr + block_size * i;
  61. }
  62. last_data_blocks = data_blocks;
  63. }
  64. };
  65. }
  66. using namespace sender_udp_fec_impl;
  67. struct sender_udp_fec::impl {
  68. struct frag_header {
  69. uint32_t frag_checksum;
  70. uint8_t frame_type; // 'I' or 'P'
  71. uint32_t frame_id;
  72. uint32_t frame_length;
  73. uint8_t chunk_count;
  74. uint8_t chunk_id;
  75. uint32_t chunk_offset;
  76. uint32_t chunk_length;
  77. uint16_t block_size;
  78. uint8_t block_count;
  79. uint8_t chunk_decode_block_count;
  80. uint8_t block_id;
  81. };
  82. struct request_type {
  83. uint32_t request_checksum;
  84. uint8_t request_type;
  85. uint32_t frame_id;
  86. };
  87. static constexpr auto frag_header_size = 28;
  88. static constexpr auto request_size = 9;
  89. static constexpr auto max_block_count = DATA_SHARDS_MAX;
  90. static constexpr auto max_package_size = 64 * 1024; // 64KiB
  91. static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
  92. static constexpr int confirm_timeout = 10 * 1e6; // 10s
  93. using frame_ptr_type = std::unique_ptr<video_nal>;
  94. sender_udp_fec *q_this = nullptr;
  95. std::unique_ptr<udp::socket> socket;
  96. udp::endpoint request_ep;
  97. std::unique_ptr<udp::endpoint> remote_ep;
  98. float parity_rate;
  99. uint8_t max_data_block_count; // max_block_count / (1 + parity_rate)
  100. uint16_t block_size; // conn_mtu - header_size
  101. uint32_t max_chunk_size; // max_data_block_count * block_size
  102. smart_reed_solomon rs;
  103. smart_buffer<uint8_t> in_buf, out_buf;
  104. timestamp_type last_confirm_ts = 0;
  105. static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) {
  106. #define WRITE(member) ptr = write_binary_number(ptr, header.member)
  107. WRITE(frag_checksum);
  108. WRITE(frame_type);
  109. WRITE(frame_id);
  110. WRITE(frame_length);
  111. WRITE(chunk_count);
  112. WRITE(chunk_id);
  113. WRITE(chunk_offset);
  114. WRITE(chunk_length);
  115. WRITE(block_size);
  116. WRITE(block_count);
  117. WRITE(chunk_decode_block_count);
  118. WRITE(block_id);
  119. #undef WRITE
  120. return ptr;
  121. }
  122. static uint8_t *read_request(uint8_t *ptr, request_type *req) {
  123. #define READ(member) ptr = read_binary_number(ptr, &req->member)
  124. READ(request_checksum);
  125. READ(request_type);
  126. READ(frame_id);
  127. #undef READ
  128. return ptr;
  129. }
  130. void send_block(uint8_t *block_data, const frag_header &header) {
  131. out_buf.create(max_package_size);
  132. auto ptr = write_frag_header(out_buf.ptr, header);
  133. assert(ptr - out_buf.ptr == frag_header_size);
  134. memcpy(ptr, block_data, block_size);
  135. // calculate crc32
  136. auto crc = boost::crc_32_type{};
  137. crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
  138. frag_header_size + block_size - sizeof(uint32_t));
  139. write_binary_number(out_buf.ptr, crc.checksum());
  140. // send packet
  141. assert(socket != nullptr);
  142. auto buf = buffer(out_buf.ptr, frag_header_size + block_size);
  143. assert(remote_ep != nullptr);
  144. socket->send_to(buf, *remote_ep);
  145. }
  146. void send_chunk(uint8_t *chunk_data, uint32_t chunk_length, frag_header *header) {
  147. auto data_blocks = (chunk_length + block_size - 1) / block_size;
  148. assert(data_blocks <= max_data_block_count);
  149. auto parity_blocks = std::max(1, (int) (data_blocks * parity_rate));
  150. rs.process(data_blocks, parity_blocks, block_size, chunk_data, chunk_length);
  151. header->block_size = block_size;
  152. header->block_count = rs.block_count();
  153. header->chunk_decode_block_count = data_blocks;
  154. for (auto k = 0; k < rs.block_count(); ++k) {
  155. header->block_id = k;
  156. send_block(rs.block_ptrs.ptr[k], *header);
  157. }
  158. }
  159. void send_frame(frame_ptr_type &&frame) {
  160. if (remote_ep == nullptr) return;
  161. frag_header header;
  162. header.frame_type = frame->idr ? 'I' : 'P';
  163. header.frame_id = frame->frame_id;
  164. header.frame_length = frame->length;
  165. auto chunk_count = (frame->length + max_chunk_size - 1) / max_chunk_size;
  166. header.chunk_count = chunk_count;
  167. for (auto k = 0; k < chunk_count; ++k) {
  168. header.chunk_offset = k * max_chunk_size;
  169. header.chunk_id = k;
  170. header.chunk_length = std::min((size_t) max_chunk_size,
  171. frame->length - k * max_chunk_size);
  172. auto chunk_data = frame->ptr + header.chunk_offset;
  173. send_chunk(chunk_data, header.chunk_length, &header);
  174. }
  175. q_this->log_frame_sent(frame->frame_id);
  176. SPDLOG_TRACE("Frame {} sent.", frame->frame_id);
  177. }
  178. void async_handle_request() {
  179. in_buf.create(max_package_size);
  180. auto buf = buffer(in_buf.ptr, max_package_size);
  181. using namespace std::placeholders;
  182. socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
  183. }
  184. void handle_request(const error_code &ec, size_t length) {
  185. // prepare for next request when this function exited.
  186. auto closer = sg::make_scope_guard([this] {
  187. async_handle_request();
  188. });
  189. // handle errors
  190. if (ec) {
  191. SPDLOG_ERROR("Error while receiving request: {}", ec.what());
  192. return;
  193. }
  194. // parse request
  195. if (length != request_size) return;
  196. request_type req;
  197. read_request(in_buf.ptr, &req);
  198. auto crc = boost::crc_32_type{};
  199. crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
  200. request_size - sizeof(uint32_t));
  201. if (crc.checksum() != req.request_checksum) { // checksum failed
  202. // TODO show log
  203. return;
  204. }
  205. // handle request
  206. switch (req.request_type) {
  207. case 'X': {
  208. SPDLOG_INFO("Client {}:{} left.",
  209. remote_ep->address().to_string(), remote_ep->port());
  210. close_connection();
  211. return;
  212. }
  213. case 'C': {
  214. last_confirm_ts = current_timestamp();
  215. SPDLOG_TRACE("Frame {} confirmed.", req.frame_id);
  216. return;
  217. }
  218. case 'I': {
  219. remote_ep = std::make_unique<udp::endpoint>(request_ep);
  220. last_confirm_ts = current_timestamp();
  221. q_this->request_idr_frame();
  222. mq().update_variable(SENDER_CONNECTED, true);
  223. static uint32_t last_frame_id = 0;
  224. if (req.frame_id != last_frame_id) {
  225. SPDLOG_INFO("New client from {}:{}.",
  226. remote_ep->address().to_string(), remote_ep->port());
  227. last_frame_id = req.frame_id;
  228. }
  229. return;
  230. }
  231. default: {
  232. // TODO show log
  233. return;
  234. }
  235. }
  236. }
  237. void close_connection() {
  238. remote_ep.reset();
  239. mq().update_variable(SENDER_CONNECTED, false);
  240. }
  241. bool is_connected() {
  242. if (remote_ep == nullptr) return false;
  243. if (current_timestamp() - last_confirm_ts > confirm_timeout) [[unlikely]] {
  244. SPDLOG_WARN("Client timeout.");
  245. close_connection();
  246. return false;
  247. }
  248. return true;
  249. }
  250. static impl *create(const config &conf, sender_udp_fec *q_this) {
  251. auto ret = new impl;
  252. auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
  253. ret->q_this = q_this;
  254. ret->socket = std::make_unique<udp::socket>(*q_this->get_ctx(), local_ep);
  255. ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
  256. ret->async_handle_request();
  257. // constant configs
  258. ret->parity_rate = conf.parity_rate;
  259. ret->max_data_block_count = max_block_count / (1 + conf.parity_rate);
  260. ret->block_size = conf.conn_mtu - frag_header_size;
  261. ret->max_chunk_size = ret->max_data_block_count * ret->block_size;
  262. // initialize reed solomon
  263. fec_init();
  264. return ret;
  265. }
  266. };
  267. sender_udp_fec::~sender_udp_fec() = default;
  268. sender_udp_fec *sender_udp_fec::create(const config &conf) {
  269. auto ret = std::make_unique<sender_udp_fec>();
  270. auto pimpl = impl::create(conf, ret.get());
  271. if (pimpl == nullptr) return nullptr;
  272. ret->pimpl.reset(pimpl);
  273. return ret.release();
  274. }
  275. void sender_udp_fec::close_connection() {
  276. pimpl->close_connection();
  277. }
  278. void sender_udp_fec::handle_frame(frame_ptr_type &&frame) {
  279. pimpl->send_frame(std::move(frame));
  280. }