| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- #include "sender_tcp.h"
- #include "sender_utility.hpp"
- #include "simple_mq.h"
- #include "variable_defs.h"
- #include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/post.hpp>
- #include <boost/asio/write.hpp>
- #include <spdlog/spdlog.h>
- #include <deque>
- using namespace boost::asio::ip;
- using boost::asio::buffer;
- using boost::asio::io_context;
- using boost::asio::post;
- using boost::asio::write;
- using boost::system::error_code;
- using namespace sender_impl;
- using namespace simple_mq_singleton;
- struct sender_tcp::impl {
- sender_tcp *q_this = nullptr;
- std::unique_ptr<tcp::acceptor> acceptor;
- std::unique_ptr<tcp::socket> socket;
- smart_buffer<uint8_t> out_buf;
- void close_connection() {
- if (socket == nullptr) return;
- // TODO: log client exit
- // if (socket->is_open()) {
- // auto remote_ep = socket->remote_endpoint();
- // SPDLOG_INFO("Client {}:{} left.",
- // remote_ep.address().to_string(), remote_ep.port());
- // }
- socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
- mq().update_variable(SENDER_CONNECTED, false);
- }
- void send_frame(frame_ptr_type &&frame) {
- if (!socket->is_open()) return;
- uint64_t packet_length = frame->length
- + sizeof(frame->frame_id);
- auto out_length = packet_length
- + sizeof(packet_length);
- out_buf.create(out_length);
- // fill out buffer
- auto ptr = out_buf.ptr;
- ptr = write_binary_number(ptr, packet_length);
- ptr = write_binary_number(ptr, frame->frame_id);
- memcpy(ptr, frame->ptr, frame->length);
- error_code err;
- write(*socket, buffer(out_buf.ptr, out_buf.length), err); // TODO: change to async operation
- if (err) {
- SPDLOG_WARN("Error while sending frame: {}", err.to_string());
- close_connection();
- async_waiting_client();
- }
- q_this->log_frame_sent(frame->frame_id);
- SPDLOG_TRACE("Frame {} sent, length = {}",
- frame->frame_id, frame->length);
- }
- void async_waiting_client() {
- acceptor->async_accept(*socket, [this](error_code err) {
- if (err) {
- SPDLOG_ERROR("Error while accepting client: {}", err.to_string());
- async_waiting_client();
- }
- assert(socket->is_open());
- q_this->request_idr_frame();
- mq().update_variable(SENDER_CONNECTED, true);
- auto remote_ep = socket->remote_endpoint();
- SPDLOG_INFO("New client from {}:{}.",
- remote_ep.address().to_string(), remote_ep.port());
- });
- }
- static impl *create(uint16_t listen_port, sender_tcp *q_this) {
- auto ret = new impl;
- ret->q_this = q_this;
- auto listen_ep = tcp::endpoint{tcp::v4(), listen_port};
- ret->acceptor = std::make_unique<tcp::acceptor>(*q_this->get_ctx(), listen_ep);
- ret->socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
- ret->async_waiting_client();
- return ret;
- }
- };
- sender_tcp::~sender_tcp() = default;
- sender_tcp *sender_tcp::create(uint16_t listen_port) {
- auto ret = std::make_unique<sender_tcp>();
- auto pimpl = impl::create(listen_port, ret.get());
- if (pimpl == nullptr) return nullptr;
- ret->pimpl.reset(pimpl);
- return ret.release();
- }
- void sender_tcp::close_connection() {
- pimpl->close_connection();
- }
- void sender_tcp::handle_frame(frame_ptr_type &&frame) {
- pimpl->send_frame(std::move(frame));
- }
|