| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- #include "frag_basic.h"
- #include "core/utility.hpp"
- bool frag_basic::send(const data_type &data) {
- auto header_extra_size = frag_header_type::extra_size;
- auto frag_header_size = low->header_size() + header_extra_size;
- auto frag_max_size = low->max_length() - header_extra_size;
- frag_header_type header;
- header.msg_id = ++last_send_id;
- header.msg_size = data.size;
- header.bl_num = data.size / frag_max_size + ((data.size % frag_max_size) != 0);
- header.bl_id = 0;
- for (uint32_t offset = 0; offset < data.size; offset += frag_max_size) {
- auto frag_size = std::min(frag_max_size, data.size - offset);
- auto frag_data = data.sub_data(offset, frag_size).clone(frag_header_size);
- header.bl_offset = offset;
- bool ok = low->send(encode_header(data, header));
- if (!ok) return false;
- ++header.bl_id;
- }
- return true;
- }
- void frag_basic::append_data(const data_type &data) {
- frag_header_type header;
- auto dec_data = decode_header(data, &header);
- if (dec_data.empty()) return;
- // ignore if message is too old
- auto msg_id = header.msg_id;
- if (msg_id <= last_recv_id) return;
- if (!msg_q.empty()) {
- auto last_id = msg_q.back().msg_id;
- if ((int32_t) (last_id - msg_id) >= q_size) return;
- }
- // remove old incomplete messages
- while (!msg_q.empty()) {
- auto fnt = msg_q.begin(); // front
- if ((int32_t) (msg_id - fnt->msg_id) >= q_size) {
- if (fnt->is_complete()) {
- recv_cb_func(fnt->data);
- assert(fnt->msg_id > last_recv_id);
- last_recv_id = fnt->msg_id;
- } else {
- sig_msg_loss(); // emit message loss signal
- }
- msg_q.pop_front();
- }
- }
- // determine message location
- auto iter = queue_type::iterator();
- if (msg_q.empty() || msg_id < msg_q.front().msg_id) [[unlikely]] {
- msg_q.emplace_front(header);
- iter = msg_q.begin();
- } else {
- iter = msg_q.begin();
- for (;;) {
- if (iter->msg_id == msg_id) break;
- assert(msg_id > iter->msg_id);
- auto iter_next = iter++;
- if (iter_next == msg_q.end() || msg_id < iter_next->msg_id) {
- iter = msg_q.emplace(iter, header);
- break;
- }
- iter = iter_next;
- }
- }
- // check consistency
- if (!iter->verify(header)) { RET_ERROR_N; }
- // ignore duplicated block
- auto bl_id = header.bl_id;
- if (iter->bl_ok.test(bl_id)) return;
- // copy block data
- iter->merge(header, dec_data);
- // callback if message completes
- while (!msg_q.empty()) {
- if (auto fnt = msg_q.begin(); fnt->is_complete()) {
- recv_cb_func(fnt->data);
- assert(fnt->msg_id > last_recv_id);
- last_recv_id = fnt->msg_id;
- msg_q.pop_front();
- } else break;
- }
- }
- bool frag_basic::msg_info::verify(frag_header_type header) const {
- assert(msg_id == header.msg_id);
- if (bl_ok.size() != header.bl_num) { RET_ERROR_B; }
- if (data.size != header.msg_size) { RET_ERROR_B; }
- return true;
- }
- bool frag_basic::msg_info::is_complete() const {
- auto ok = bl_ok.all();
- if (ok) { assert(bytes_ok == data.size); }
- return ok;
- }
- void frag_basic::msg_info::merge(frag_header_type header, const data_type &sub_data) {
- auto offset = header.bl_offset;
- assert(offset + sub_data.size <= data.size);
- std::copy_n(sub_data.start_ptr(), sub_data.size,
- data.start_ptr() + offset);
- bl_ok.set(header.bl_id);
- bytes_ok += sub_data.size;
- }
|