#include "sender_tcp.h" #include "sender_utility.hpp" #include "simple_mq.h" #include "variable_defs.h" #include #include #include #include #include #include using namespace boost::asio::ip; using boost::asio::buffer; using boost::asio::io_context; using boost::asio::post; using boost::asio::write; using boost::system::error_code; using namespace sender_impl; using namespace simple_mq_singleton; struct sender_tcp::impl { sender_tcp *q_this = nullptr; std::unique_ptr acceptor; std::unique_ptr socket; smart_buffer out_buf; void close_connection() { if (socket == nullptr) return; // TODO: log client exit // if (socket->is_open()) { // auto remote_ep = socket->remote_endpoint(); // SPDLOG_INFO("Client {}:{} left.", // remote_ep.address().to_string(), remote_ep.port()); // } socket = std::make_unique(*q_this->get_ctx()); mq().update_variable(SENDER_CONNECTED, false); } void send_frame(frame_ptr_type &&frame) { if (!socket->is_open()) return; uint64_t packet_length = frame->length + sizeof(frame->frame_id); auto out_length = packet_length + sizeof(packet_length); out_buf.create(out_length); // fill out buffer auto ptr = out_buf.ptr; ptr = write_binary_number(ptr, packet_length); ptr = write_binary_number(ptr, frame->frame_id); memcpy(ptr, frame->ptr, frame->length); error_code err; write(*socket, buffer(out_buf.ptr, out_buf.length), err); // TODO: change to async operation if (err) { SPDLOG_WARN("Error while sending frame: {}", err.to_string()); close_connection(); async_waiting_client(); } q_this->log_frame_sent(frame->frame_id); SPDLOG_TRACE("Frame {} sent, length = {}", frame->frame_id, frame->length); } void async_waiting_client() { acceptor->async_accept(*socket, [this](error_code err) { if (err) { SPDLOG_ERROR("Error while accepting client: {}", err.to_string()); async_waiting_client(); } assert(socket->is_open()); q_this->request_idr_frame(); mq().update_variable(SENDER_CONNECTED, true); auto remote_ep = socket->remote_endpoint(); SPDLOG_INFO("New client from {}:{}.", remote_ep.address().to_string(), remote_ep.port()); }); } static impl *create(uint16_t listen_port, sender_tcp *q_this) { auto ret = new impl; ret->q_this = q_this; auto listen_ep = tcp::endpoint{tcp::v4(), listen_port}; ret->acceptor = std::make_unique(*q_this->get_ctx(), listen_ep); ret->socket = std::make_unique(*q_this->get_ctx()); ret->async_waiting_client(); return ret; } }; sender_tcp::~sender_tcp() = default; sender_tcp *sender_tcp::create(uint16_t listen_port) { auto ret = std::make_unique(); auto pimpl = impl::create(listen_port, ret.get()); if (pimpl == nullptr) return nullptr; ret->pimpl.reset(pimpl); return ret.release(); } void sender_tcp::close_connection() { pimpl->close_connection(); } void sender_tcp::handle_frame(frame_ptr_type &&frame) { pimpl->send_frame(std::move(frame)); }