|
@@ -1,4 +1,5 @@
|
|
|
-#include "frame_sender.h"
|
|
|
|
|
|
|
+#include "sender_udp_fec.h"
|
|
|
|
|
+#include "sender_utility.hpp"
|
|
|
#include "core/timestamp_helper.hpp"
|
|
#include "core/timestamp_helper.hpp"
|
|
|
#include "simple_mq.h"
|
|
#include "simple_mq.h"
|
|
|
#include "third_party/scope_guard.hpp"
|
|
#include "third_party/scope_guard.hpp"
|
|
@@ -13,47 +14,19 @@ extern "C" {
|
|
|
#include <boost/asio/ip/udp.hpp>
|
|
#include <boost/asio/ip/udp.hpp>
|
|
|
#include <boost/asio/post.hpp>
|
|
#include <boost/asio/post.hpp>
|
|
|
#include <boost/crc.hpp>
|
|
#include <boost/crc.hpp>
|
|
|
-#include <boost/endian.hpp>
|
|
|
|
|
|
|
|
|
|
#include <spdlog/spdlog.h>
|
|
#include <spdlog/spdlog.h>
|
|
|
|
|
|
|
|
-#include <deque>
|
|
|
|
|
-#include <mutex>
|
|
|
|
|
-
|
|
|
|
|
using namespace boost::asio::ip;
|
|
using namespace boost::asio::ip;
|
|
|
using boost::asio::buffer;
|
|
using boost::asio::buffer;
|
|
|
using boost::asio::io_context;
|
|
using boost::asio::io_context;
|
|
|
using boost::asio::post;
|
|
using boost::asio::post;
|
|
|
using boost::system::error_code;
|
|
using boost::system::error_code;
|
|
|
|
|
+using namespace sender_impl;
|
|
|
using namespace sophiar;
|
|
using namespace sophiar;
|
|
|
using namespace simple_mq_singleton;
|
|
using namespace simple_mq_singleton;
|
|
|
|
|
|
|
|
-namespace frame_sender_impl {
|
|
|
|
|
-
|
|
|
|
|
- template<typename T>
|
|
|
|
|
- struct smart_buffer {
|
|
|
|
|
- T *ptr = nullptr;
|
|
|
|
|
- size_t length = 0;
|
|
|
|
|
-
|
|
|
|
|
- ~smart_buffer() {
|
|
|
|
|
- free(ptr);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- void create(size_t req_length) {
|
|
|
|
|
- if (req_length > capacity) [[unlikely]] {
|
|
|
|
|
- auto ptr_next = new T[req_length];
|
|
|
|
|
- if (ptr != nullptr) {
|
|
|
|
|
- delete ptr;
|
|
|
|
|
- }
|
|
|
|
|
- ptr = ptr_next;
|
|
|
|
|
- capacity = req_length;
|
|
|
|
|
- }
|
|
|
|
|
- length = req_length;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- private:
|
|
|
|
|
- size_t capacity = 0;
|
|
|
|
|
- };
|
|
|
|
|
|
|
+namespace sender_udp_fec_impl {
|
|
|
|
|
|
|
|
struct smart_reed_solomon {
|
|
struct smart_reed_solomon {
|
|
|
reed_solomon *rs = nullptr;
|
|
reed_solomon *rs = nullptr;
|
|
@@ -104,9 +77,9 @@ namespace frame_sender_impl {
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-using namespace frame_sender_impl;
|
|
|
|
|
|
|
+using namespace sender_udp_fec_impl;
|
|
|
|
|
|
|
|
-struct frame_sender::impl {
|
|
|
|
|
|
|
+struct sender_udp_fec::impl {
|
|
|
|
|
|
|
|
struct frag_header {
|
|
struct frag_header {
|
|
|
uint32_t frag_checksum;
|
|
uint32_t frag_checksum;
|
|
@@ -137,14 +110,10 @@ struct frame_sender::impl {
|
|
|
static constexpr int confirm_timeout = 10 * 1e6; // 10s
|
|
static constexpr int confirm_timeout = 10 * 1e6; // 10s
|
|
|
|
|
|
|
|
using frame_ptr_type = std::unique_ptr<video_nal>;
|
|
using frame_ptr_type = std::unique_ptr<video_nal>;
|
|
|
- using frame_list_type = std::deque<frame_ptr_type>;
|
|
|
|
|
|
|
|
|
|
- std::unique_ptr<io_context> ctx;
|
|
|
|
|
|
|
+ sender_udp_fec *q_this = nullptr;
|
|
|
std::unique_ptr<udp::socket> socket;
|
|
std::unique_ptr<udp::socket> socket;
|
|
|
|
|
|
|
|
- frame_list_type frame_list;
|
|
|
|
|
- std::mutex frame_list_mu;
|
|
|
|
|
-
|
|
|
|
|
udp::endpoint request_ep;
|
|
udp::endpoint request_ep;
|
|
|
std::unique_ptr<udp::endpoint> remote_ep;
|
|
std::unique_ptr<udp::endpoint> remote_ep;
|
|
|
float parity_rate;
|
|
float parity_rate;
|
|
@@ -156,30 +125,6 @@ struct frame_sender::impl {
|
|
|
smart_buffer<uint8_t> in_buf, out_buf;
|
|
smart_buffer<uint8_t> in_buf, out_buf;
|
|
|
timestamp_type last_confirm_ts = 0;
|
|
timestamp_type last_confirm_ts = 0;
|
|
|
|
|
|
|
|
- template<typename T>
|
|
|
|
|
- static uint8_t *write_binary_number(uint8_t *ptr, T val) {
|
|
|
|
|
- static constexpr auto need_swap =
|
|
|
|
|
- (boost::endian::order::native != boost::endian::order::big);
|
|
|
|
|
- auto real_ptr = (T *) ptr;
|
|
|
|
|
- if constexpr (need_swap) {
|
|
|
|
|
- *real_ptr = boost::endian::endian_reverse(val);
|
|
|
|
|
- } else {
|
|
|
|
|
- *real_ptr = val;
|
|
|
|
|
- }
|
|
|
|
|
- return ptr + sizeof(T);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- template<typename T>
|
|
|
|
|
- static uint8_t *read_binary_number(uint8_t *ptr, T *val) {
|
|
|
|
|
- static constexpr auto need_swap =
|
|
|
|
|
- (boost::endian::order::native != boost::endian::order::big);
|
|
|
|
|
- *val = *(T *) ptr;
|
|
|
|
|
- if constexpr (need_swap) {
|
|
|
|
|
- boost::endian::endian_reverse_inplace(*val);
|
|
|
|
|
- }
|
|
|
|
|
- return ptr + sizeof(T);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) {
|
|
static uint8_t *write_frag_header(uint8_t *ptr, const frag_header &header) {
|
|
|
#define WRITE(member) ptr = write_binary_number(ptr, header.member)
|
|
#define WRITE(member) ptr = write_binary_number(ptr, header.member)
|
|
|
WRITE(frag_checksum);
|
|
WRITE(frag_checksum);
|
|
@@ -240,7 +185,9 @@ struct frame_sender::impl {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- void send_frame(const frame_ptr_type &frame) {
|
|
|
|
|
|
|
+ void send_frame(frame_ptr_type &&frame) {
|
|
|
|
|
+ if (remote_ep == nullptr) return;
|
|
|
|
|
+
|
|
|
frag_header header;
|
|
frag_header header;
|
|
|
header.frame_type = frame->idr ? 'I' : 'P';
|
|
header.frame_type = frame->idr ? 'I' : 'P';
|
|
|
header.frame_id = frame_id;
|
|
header.frame_id = frame_id;
|
|
@@ -267,11 +214,6 @@ struct frame_sender::impl {
|
|
|
socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
|
|
socket->async_receive_from(buf, request_ep, std::bind(&impl::handle_request, this, _1, _2));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- void clear_frame_list() {
|
|
|
|
|
- auto lock = std::lock_guard{frame_list_mu};
|
|
|
|
|
- frame_list.clear();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
void handle_request(const error_code &ec, size_t length) {
|
|
void handle_request(const error_code &ec, size_t length) {
|
|
|
// prepare for next request when this function exited.
|
|
// prepare for next request when this function exited.
|
|
|
auto closer = sg::make_scope_guard([this] {
|
|
auto closer = sg::make_scope_guard([this] {
|
|
@@ -312,8 +254,7 @@ struct frame_sender::impl {
|
|
|
case 'I': {
|
|
case 'I': {
|
|
|
remote_ep = std::make_unique<udp::endpoint>(request_ep);
|
|
remote_ep = std::make_unique<udp::endpoint>(request_ep);
|
|
|
last_confirm_ts = current_timestamp();
|
|
last_confirm_ts = current_timestamp();
|
|
|
- clear_frame_list();
|
|
|
|
|
- mq().update_variable(REQUEST_IDR, true);
|
|
|
|
|
|
|
+ q_this->request_idr_frame();
|
|
|
mq().update_variable(SENDER_CONNECTED, true);
|
|
mq().update_variable(SENDER_CONNECTED, true);
|
|
|
|
|
|
|
|
static uint32_t last_frame_id = 0;
|
|
static uint32_t last_frame_id = 0;
|
|
@@ -332,7 +273,6 @@ struct frame_sender::impl {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void close_connection() {
|
|
void close_connection() {
|
|
|
- clear_frame_list();
|
|
|
|
|
remote_ep.reset();
|
|
remote_ep.reset();
|
|
|
mq().update_variable(SENDER_CONNECTED, false);
|
|
mq().update_variable(SENDER_CONNECTED, false);
|
|
|
}
|
|
}
|
|
@@ -347,49 +287,11 @@ struct frame_sender::impl {
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // return true to continue
|
|
|
|
|
- bool handle_one_frame() {
|
|
|
|
|
- frame_ptr_type frame;
|
|
|
|
|
- {
|
|
|
|
|
- auto lock = std::lock_guard{frame_list_mu};
|
|
|
|
|
- if (frame_list.empty()) return false;
|
|
|
|
|
- frame = std::move(*frame_list.begin());
|
|
|
|
|
- frame_list.pop_front();
|
|
|
|
|
- }
|
|
|
|
|
- if (frame == nullptr) return false;
|
|
|
|
|
- send_frame(frame);
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- void handle_frames() {
|
|
|
|
|
- // test stop flag
|
|
|
|
|
- if (mq().query_variable<bool>(SENDER_SHOULD_STOP)) {
|
|
|
|
|
- close_connection();
|
|
|
|
|
- ctx->stop();
|
|
|
|
|
- SPDLOG_INFO("Frame sender stopped.");
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if (!is_connected()) return;
|
|
|
|
|
- while (handle_one_frame());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- void push_frame(frame_ptr_type &&frame) {
|
|
|
|
|
- {
|
|
|
|
|
- auto lock = std::lock_guard{frame_list_mu};
|
|
|
|
|
- if (frame != nullptr && frame->idr) { // maybe nullptr
|
|
|
|
|
- frame_list.clear();
|
|
|
|
|
- }
|
|
|
|
|
- frame_list.emplace_back(std::move(frame));
|
|
|
|
|
- }
|
|
|
|
|
- post(*ctx, std::bind(&impl::handle_frames, this));
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- static impl *create(const sender_config &conf) {
|
|
|
|
|
|
|
+ static impl *create(const config &conf, sender_udp_fec *q_this) {
|
|
|
auto ret = new impl;
|
|
auto ret = new impl;
|
|
|
- ret->ctx = std::make_unique<io_context>();
|
|
|
|
|
auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
|
|
auto local_ep = udp::endpoint{udp::v4(), conf.listen_port};
|
|
|
- ret->socket = std::make_unique<udp::socket>(*(ret->ctx), local_ep);
|
|
|
|
|
|
|
+ ret->q_this = q_this;
|
|
|
|
|
+ ret->socket = std::make_unique<udp::socket>(*q_this->get_ctx(), local_ep);
|
|
|
ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
|
|
ret->socket->set_option(udp::socket::send_buffer_size{udp_buffer_size});
|
|
|
ret->async_handle_request();
|
|
ret->async_handle_request();
|
|
|
|
|
|
|
@@ -406,21 +308,20 @@ struct frame_sender::impl {
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-frame_sender::~frame_sender() = default;
|
|
|
|
|
|
|
+sender_udp_fec::~sender_udp_fec() = default;
|
|
|
|
|
|
|
|
-void frame_sender::send_frame(std::unique_ptr<video_nal> &&frame) {
|
|
|
|
|
- pimpl->push_frame(std::move(frame));
|
|
|
|
|
|
|
+sender_udp_fec *sender_udp_fec::create(const config &conf) {
|
|
|
|
|
+ auto ret = std::make_unique<sender_udp_fec>();
|
|
|
|
|
+ auto pimpl = impl::create(conf, ret.get());
|
|
|
|
|
+ if (pimpl == nullptr) return nullptr;
|
|
|
|
|
+ ret->pimpl.reset(pimpl);
|
|
|
|
|
+ return ret.release();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-void frame_sender::run() {
|
|
|
|
|
- SPDLOG_INFO("Frame sender started.");
|
|
|
|
|
- pimpl->ctx->run();
|
|
|
|
|
|
|
+void sender_udp_fec::close_connection() {
|
|
|
|
|
+ pimpl->close_connection();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-frame_sender *frame_sender::create(const sender_config &conf) {
|
|
|
|
|
- auto pimpl = impl::create(conf);
|
|
|
|
|
- if (pimpl == nullptr) return nullptr;
|
|
|
|
|
- auto ret = new frame_sender;
|
|
|
|
|
- ret->pimpl.reset(pimpl);
|
|
|
|
|
- return ret;
|
|
|
|
|
|
|
+void sender_udp_fec::handle_frame(frame_ptr_type &&frame) {
|
|
|
|
|
+ pimpl->send_frame(std::move(frame));
|
|
|
}
|
|
}
|