receiver_udp_fec.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. #include "receiver_udp_fec.h"
  2. #include "third_party/scope_guard.hpp"
  3. extern "C" {
  4. #include "network/impl/fragment/third_party/rs.h"
  5. }
  6. #include <boost/asio/io_context.hpp>
  7. #include <boost/asio/ip/udp.hpp>
  8. #include <boost/asio/post.hpp>
  9. #include <boost/container/static_vector.hpp>
  10. #include <boost/crc.hpp>
  11. #include <boost/endian.hpp>
  12. #include <spdlog/spdlog.h>
  13. #include <algorithm>
  14. using namespace boost::asio::ip;
  15. using boost::asio::buffer;
  16. using boost::asio::io_context;
  17. using boost::asio::post;
  18. using boost::system::error_code;
  19. namespace receiver_udp_fec_impl {
  20. enum status_type {
  21. NOT_INIT,
  22. WAITING,
  23. READY
  24. };
  25. struct smart_chunk {
  26. static constexpr auto max_blocks = DATA_SHARDS_MAX;
  27. using block_ptrs_type = boost::container::static_vector<uint8_t *, max_blocks>;
  28. using block_miss_type = boost::container::static_vector<bool, max_blocks>;
  29. data_type block_data;
  30. block_ptrs_type block_ptrs;
  31. block_miss_type block_miss;
  32. uint8_t ready_blocks = 0;
  33. status_type status = NOT_INIT;
  34. ~smart_chunk() { deallocate(); }
  35. void reset() {
  36. ready_blocks = 0;
  37. status = NOT_INIT;
  38. }
  39. void create(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
  40. if (total_blocks != block_ptrs.size() ||
  41. parity_blocks != last_parity_blocks ||
  42. block_size != last_block_size) [[unlikely]] {
  43. deallocate();
  44. allocate(total_blocks, parity_blocks, block_size);
  45. }
  46. std::ranges::fill(block_miss, true);
  47. assert(status == NOT_INIT);
  48. status = WAITING;
  49. }
  50. bool reconstruct() {
  51. if (ready_blocks + last_parity_blocks < block_ptrs.size()) return false;
  52. auto ret = reed_solomon_reconstruct(rs, block_ptrs.data(), (uint8_t *) block_miss.data(),
  53. block_ptrs.size(), last_block_size);
  54. if (ret != 0) return false;
  55. assert(status == WAITING);
  56. status = READY;
  57. return true;
  58. }
  59. private:
  60. reed_solomon *rs = nullptr;
  61. uint8_t last_parity_blocks = 0;
  62. uint16_t last_block_size = 0;
  63. void deallocate() {
  64. if (rs == nullptr) return;
  65. reed_solomon_release(rs);
  66. rs = nullptr;
  67. }
  68. void allocate(uint8_t total_blocks, uint8_t parity_blocks, uint16_t block_size) {
  69. assert(rs == nullptr);
  70. auto data_blocks = total_blocks - parity_blocks;
  71. rs = reed_solomon_new(data_blocks, parity_blocks);
  72. block_data.reserve(total_blocks * block_size);
  73. block_ptrs.resize(total_blocks);
  74. block_miss.resize(total_blocks);
  75. for (int i = 0; i < total_blocks; ++i) {
  76. block_ptrs[i] = block_data.ptr + block_size * i;
  77. }
  78. last_parity_blocks = parity_blocks;
  79. last_block_size = block_size;
  80. }
  81. };
  82. }
  83. using namespace receiver_udp_fec_impl;
  84. struct receiver_udp_fec::impl {
  85. struct frag_header {
  86. uint32_t frag_checksum;
  87. uint8_t frame_type; // 'I' or 'P'
  88. uint32_t frame_id;
  89. uint32_t frame_length;
  90. uint8_t chunk_count;
  91. uint8_t chunk_id;
  92. uint32_t chunk_offset;
  93. uint32_t chunk_length;
  94. uint16_t block_size;
  95. uint8_t block_count;
  96. uint8_t chunk_decode_block_count;
  97. uint8_t block_id;
  98. };
  99. struct request_type {
  100. uint32_t request_checksum;
  101. uint8_t request_type;
  102. uint32_t frame_id;
  103. };
  104. struct frame_info {
  105. using chunks_type = std::vector<smart_chunk>;
  106. chunks_type chunks;
  107. data_type data;
  108. uint32_t id = 0;
  109. uint8_t ready_chunks = 0;
  110. status_type status = NOT_INIT;
  111. void create(uint32_t frame_id, uint8_t chunk_count, size_t length) {
  112. chunks.resize(chunk_count);
  113. data.reserve(length);
  114. for (auto k = 0; k < chunk_count; ++k) {
  115. chunks[k].reset();
  116. }
  117. id = frame_id;
  118. ready_chunks = 0;
  119. status = WAITING;
  120. }
  121. };
  122. static constexpr auto frag_header_size = 28;
  123. static constexpr auto request_size = 9;
  124. static constexpr auto max_package_size = 64 * 1024; // 64KiB
  125. static constexpr auto udp_buffer_size = 10 * 1024 * 1024; // 10MiB
  126. receiver_udp_fec *q_this = nullptr;
  127. // parent config
  128. receiver_base::create_config par_conf;
  129. std::unique_ptr<udp::socket> socket;
  130. frame_info frame_cache;
  131. uint32_t last_frame_id = 0;
  132. udp::endpoint server_ep;
  133. data_type in_buf, out_buf;
  134. static uint8_t *read_frag_header(uint8_t *ptr, frag_header *header) {
  135. #define READ(member) ptr = read_binary_number(ptr, &header->member)
  136. READ(frag_checksum);
  137. READ(frame_type);
  138. READ(frame_id);
  139. READ(frame_length);
  140. READ(chunk_count);
  141. READ(chunk_id);
  142. READ(chunk_offset);
  143. READ(chunk_length);
  144. READ(block_size);
  145. READ(block_count);
  146. READ(chunk_decode_block_count);
  147. READ(block_id);
  148. #undef WRITE
  149. return ptr;
  150. }
  151. static uint8_t *write_request(uint8_t *ptr, const request_type &req) {
  152. #define WRITE(member) ptr = write_binary_number(ptr, req.member)
  153. WRITE(request_checksum);
  154. WRITE(request_type);
  155. WRITE(frame_id);
  156. #undef WRITE
  157. return ptr;
  158. }
  159. ~impl() {
  160. request_exit();
  161. }
  162. void refresh_frame(const frag_header &header) {
  163. frame_cache.create(header.frame_id, header.chunk_count, header.frame_length);
  164. }
  165. void send_request(const request_type &req) {
  166. out_buf.reserve(request_size);
  167. write_request(out_buf.ptr, req);
  168. // calculate crc32
  169. auto crc = boost::crc_32_type{};
  170. crc.process_bytes(out_buf.ptr + sizeof(uint32_t),
  171. request_size - sizeof(uint32_t));
  172. write_binary_number(out_buf.ptr, crc.checksum());
  173. // send packet
  174. assert(socket != nullptr);
  175. auto buf = buffer(out_buf.ptr, request_size);
  176. socket->send_to(buf, server_ep);
  177. }
  178. void request_idr_frame(uint32_t frame_id) {
  179. request_type req;
  180. req.request_type = 'I';
  181. req.frame_id = frame_id;
  182. send_request(req);
  183. SPDLOG_WARN("Receive frame {} error, request new IDR frame.", frame_id);
  184. }
  185. void request_frame_confirm(uint32_t frame_id) {
  186. request_type req;
  187. req.request_type = 'C';
  188. req.frame_id = frame_id;
  189. send_request(req);
  190. }
  191. void request_exit() {
  192. request_type req;
  193. req.request_type = 'X';
  194. send_request(req);
  195. }
  196. void async_handle_package() {
  197. in_buf.reserve(max_package_size);
  198. auto buf = buffer(in_buf.ptr, max_package_size);
  199. using namespace std::placeholders;
  200. socket->async_receive(buf, std::bind(&impl::handle_package, this, _1, _2));
  201. }
  202. void handle_package(const error_code &ec, size_t length) {
  203. // prepare for next request when this function exited.
  204. auto closer = sg::make_scope_guard([this] {
  205. async_handle_package();
  206. });
  207. // handle errors
  208. if (ec) {
  209. SPDLOG_ERROR("Error while receiving request: {}", ec.what());
  210. return;
  211. }
  212. // parse package
  213. frag_header header;
  214. read_frag_header(in_buf.ptr, &header);
  215. auto crc = boost::crc_32_type{};
  216. crc.process_bytes(in_buf.ptr + sizeof(uint32_t),
  217. length - sizeof(uint32_t));
  218. if (crc.checksum() != header.frag_checksum) { // checksum failed
  219. // TODO show log
  220. return;
  221. }
  222. assert(length == frag_header_size + header.block_size);
  223. if (header.frame_id < frame_cache.id) return; // old package
  224. bool is_idr_frame = header.frame_type == 'I';
  225. if (frame_cache.status == READY) { // last frame has already been decoded
  226. if (header.frame_id == frame_cache.id) return; // redundant package
  227. if (is_idr_frame || // new IDR frame or correct next P frame
  228. header.frame_id == last_frame_id + 1) {
  229. refresh_frame(header);
  230. } else {
  231. request_idr_frame(header.frame_id);
  232. return;
  233. }
  234. } else {
  235. if (header.frame_id > frame_cache.id) {
  236. if (is_idr_frame) {
  237. refresh_frame(header);
  238. } else {
  239. request_idr_frame(header.frame_id);
  240. return;
  241. }
  242. }
  243. }
  244. assert(frame_cache.id == header.frame_id);
  245. assert(frame_cache.status == WAITING);
  246. auto &chunk = frame_cache.chunks[header.chunk_id];
  247. if (chunk.status == NOT_INIT) {
  248. auto parity_blocks = header.block_count - header.chunk_decode_block_count;
  249. chunk.create(header.block_count, parity_blocks, header.block_size);
  250. } else if (chunk.status == READY) {
  251. return;
  252. }
  253. assert(chunk.status == WAITING);
  254. auto data_ptr = in_buf.ptr + frag_header_size;
  255. memcpy(chunk.block_ptrs[header.block_id], data_ptr, header.block_size);
  256. chunk.block_miss[header.block_id] = false;
  257. ++chunk.ready_blocks;
  258. if (!chunk.reconstruct()) [[likely]] return; // need more blocks
  259. assert(chunk.status == READY);
  260. assert(chunk.block_data.size >= header.chunk_length);
  261. memcpy(frame_cache.data.ptr + header.chunk_offset, chunk.block_data.ptr, header.chunk_length);
  262. ++frame_cache.ready_chunks;
  263. if (frame_cache.ready_chunks < frame_cache.chunks.size()) return; // need more chunks
  264. // decode frame
  265. frame_cache.status = READY;
  266. auto frame = ::frame_info(); // not frame_info in this impl
  267. frame.data = frame_cache.data;
  268. frame.idr = header.frame_type == 'I';
  269. frame.frame_id = header.frame_id;
  270. q_this->commit_frame(frame);
  271. SPDLOG_TRACE("Frame {} decoded.", frame_cache.id);
  272. last_frame_id = frame_cache.id;
  273. request_frame_confirm(frame_cache.id);
  274. }
  275. static impl *create(const create_config &conf) {
  276. auto ret = new impl();
  277. ret->par_conf = {.cb_func = conf.cb_func, .enable_log = conf.enable_log};
  278. ret->server_ep = udp::endpoint{address::from_string(conf.server_addr), conf.server_port};
  279. ret->socket = std::make_unique<udp::socket>(*conf.ctx);
  280. ret->socket->connect(ret->server_ep);
  281. ret->socket->set_option(udp::socket::receive_buffer_size{udp_buffer_size});
  282. ret->async_handle_package();
  283. ret->request_idr_frame(0); // TODO: post to event queue
  284. // initialize reed solomon
  285. fec_init();
  286. return ret;
  287. }
  288. };
  289. receiver_udp_fec::~receiver_udp_fec() = default;
  290. receiver_udp_fec::pointer
  291. receiver_udp_fec::create(const create_config &conf) {
  292. auto impl = impl::create(conf);
  293. return std::unique_ptr<receiver_udp_fec>(
  294. new receiver_udp_fec(impl));
  295. }
  296. receiver_udp_fec::receiver_udp_fec(impl *_pimpl)
  297. : receiver_base(_pimpl->par_conf),
  298. pimpl(std::unique_ptr<impl>(_pimpl)) {
  299. pimpl->q_this = this;
  300. }