#include "frag_basic.h" #include "core/utility.hpp" bool frag_basic::send(const data_type &data) { auto header_extra_size = frag_header_type::extra_size; auto frag_header_size = low->header_size() + header_extra_size; auto frag_max_size = low->max_length() - header_extra_size; frag_header_type header; header.msg_id = ++last_send_id; header.msg_size = data.size; header.bl_num = data.size / frag_max_size + ((data.size % frag_max_size) != 0); header.bl_id = 0; for (uint32_t offset = 0; offset < data.size; offset += frag_max_size) { auto frag_size = std::min(frag_max_size, data.size - offset); auto frag_data = data.sub_data(offset, frag_size).clone(frag_header_size); header.bl_offset = offset; bool ok = low->send(encode_header(data, header)); if (!ok) return false; ++header.bl_id; } return true; } void frag_basic::append_data(const data_type &data) { frag_header_type header; auto dec_data = decode_header(data, &header); if (dec_data.empty()) return; // ignore if message is too old auto msg_id = header.msg_id; if (msg_id <= last_recv_id) return; if (!msg_q.empty()) { auto last_id = msg_q.back().msg_id; if ((int32_t) (last_id - msg_id) >= q_size) return; } // remove old incomplete messages while (!msg_q.empty()) { auto fnt = msg_q.begin(); // front if ((int32_t) (msg_id - fnt->msg_id) >= q_size) { if (fnt->is_complete()) { recv_cb_func(fnt->data); assert(fnt->msg_id > last_recv_id); last_recv_id = fnt->msg_id; } else { sig_msg_loss(); // emit message loss signal } msg_q.pop_front(); } } // determine message location auto iter = queue_type::iterator(); if (msg_q.empty() || msg_id < msg_q.front().msg_id) [[unlikely]] { msg_q.emplace_front(header); iter = msg_q.begin(); } else { iter = msg_q.begin(); for (;;) { if (iter->msg_id == msg_id) break; assert(msg_id > iter->msg_id); auto iter_next = iter++; if (iter_next == msg_q.end() || msg_id < iter_next->msg_id) { iter = msg_q.emplace(iter, header); break; } iter = iter_next; } } // check consistency if (!iter->verify(header)) { RET_ERROR_N; } // ignore duplicated block auto bl_id = header.bl_id; if (iter->bl_ok.test(bl_id)) return; // copy block data iter->merge(header, dec_data); // callback if message completes while (!msg_q.empty()) { if (auto fnt = msg_q.begin(); fnt->is_complete()) { recv_cb_func(fnt->data); assert(fnt->msg_id > last_recv_id); last_recv_id = fnt->msg_id; msg_q.pop_front(); } else break; } } bool frag_basic::msg_info::verify(frag_header_type header) const { assert(msg_id == header.msg_id); if (bl_ok.size() != header.bl_num) { RET_ERROR_B; } if (data.size != header.msg_size) { RET_ERROR_B; } return true; } bool frag_basic::msg_info::is_complete() const { auto ok = bl_ok.all(); if (ok) { assert(bytes_ok == data.size); } return ok; } void frag_basic::msg_info::merge(frag_header_type header, const data_type &sub_data) { auto offset = header.bl_offset; assert(offset + sub_data.size <= data.size); std::copy_n(sub_data.start_ptr(), sub_data.size, data.start_ptr() + offset); bl_ok.set(header.bl_id); bytes_ok += sub_data.size; }