frag_basic.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #include "frag_basic.h"
  2. #include "core/utility.hpp"
  3. bool frag_basic::send(const data_type &data) {
  4. auto header_extra_size = frag_header_type::extra_size;
  5. auto frag_header_size = low->header_size() + header_extra_size;
  6. auto frag_max_size = low->max_length() - header_extra_size;
  7. frag_header_type header;
  8. header.msg_id = ++last_send_id;
  9. header.msg_size = data.size;
  10. header.bl_num = data.size / frag_max_size + ((data.size % frag_max_size) != 0);
  11. header.bl_id = 0;
  12. for (uint32_t offset = 0; offset < data.size; offset += frag_max_size) {
  13. auto frag_size = std::min(frag_max_size, data.size - offset);
  14. auto frag_data = data.sub_data(offset, frag_size).clone(frag_header_size);
  15. header.bl_offset = offset;
  16. bool ok = low->send(encode_header(data, header));
  17. if (!ok) return false;
  18. ++header.bl_id;
  19. }
  20. return true;
  21. }
  22. void frag_basic::append_data(const data_type &data) {
  23. frag_header_type header;
  24. auto dec_data = decode_header(data, &header);
  25. if (dec_data.empty()) return;
  26. // ignore if message is too old
  27. auto msg_id = header.msg_id;
  28. if (msg_id <= last_recv_id) return;
  29. if (!msg_q.empty()) {
  30. auto last_id = msg_q.back().msg_id;
  31. if ((int32_t) (last_id - msg_id) >= q_size) return;
  32. }
  33. // remove old incomplete messages
  34. while (!msg_q.empty()) {
  35. auto fnt = msg_q.begin(); // front
  36. if ((int32_t) (msg_id - fnt->msg_id) >= q_size) {
  37. if (fnt->is_complete()) {
  38. recv_cb_func(fnt->data);
  39. assert(fnt->msg_id > last_recv_id);
  40. last_recv_id = fnt->msg_id;
  41. } else {
  42. sig_msg_loss(); // emit message loss signal
  43. }
  44. msg_q.pop_front();
  45. }
  46. }
  47. // determine message location
  48. auto iter = queue_type::iterator();
  49. if (msg_q.empty() || msg_id < msg_q.front().msg_id) [[unlikely]] {
  50. msg_q.emplace_front(header);
  51. iter = msg_q.begin();
  52. } else {
  53. iter = msg_q.begin();
  54. for (;;) {
  55. if (iter->msg_id == msg_id) break;
  56. assert(msg_id > iter->msg_id);
  57. auto iter_next = iter++;
  58. if (iter_next == msg_q.end() || msg_id < iter_next->msg_id) {
  59. iter = msg_q.emplace(iter, header);
  60. break;
  61. }
  62. iter = iter_next;
  63. }
  64. }
  65. // check consistency
  66. if (!iter->verify(header)) { RET_ERROR_N; }
  67. // ignore duplicated block
  68. auto bl_id = header.bl_id;
  69. if (iter->bl_ok.test(bl_id)) return;
  70. // copy block data
  71. iter->merge(header, dec_data);
  72. // callback if message completes
  73. while (!msg_q.empty()) {
  74. if (auto fnt = msg_q.begin(); fnt->is_complete()) {
  75. recv_cb_func(fnt->data);
  76. assert(fnt->msg_id > last_recv_id);
  77. last_recv_id = fnt->msg_id;
  78. msg_q.pop_front();
  79. } else break;
  80. }
  81. }
  82. bool frag_basic::msg_info::verify(frag_header_type header) const {
  83. assert(msg_id == header.msg_id);
  84. if (bl_ok.size() != header.bl_num) { RET_ERROR_B; }
  85. if (data.size != header.msg_size) { RET_ERROR_B; }
  86. return true;
  87. }
  88. bool frag_basic::msg_info::is_complete() const {
  89. auto ok = bl_ok.all();
  90. if (ok) { assert(bytes_ok == data.size); }
  91. return ok;
  92. }
  93. void frag_basic::msg_info::merge(frag_header_type header, const data_type &sub_data) {
  94. auto offset = header.bl_offset;
  95. assert(offset + sub_data.size <= data.size);
  96. std::copy_n(sub_data.start_ptr(), sub_data.size,
  97. data.start_ptr() + offset);
  98. bl_ok.set(header.bl_id);
  99. bytes_ok += sub_data.size;
  100. }