sender_tcp.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #include "sender_tcp.h"
  2. #include "sender_utility.hpp"
  3. #include "simple_mq.h"
  4. #include "variable_defs.h"
  5. #include <boost/asio/io_context.hpp>
  6. #include <boost/asio/ip/tcp.hpp>
  7. #include <boost/asio/post.hpp>
  8. #include <boost/asio/write.hpp>
  9. #include <spdlog/spdlog.h>
  10. #include <deque>
  11. using namespace boost::asio::ip;
  12. using boost::asio::buffer;
  13. using boost::asio::io_context;
  14. using boost::asio::post;
  15. using boost::asio::write;
  16. using boost::system::error_code;
  17. using namespace sender_impl;
  18. using namespace simple_mq_singleton;
  19. struct sender_tcp::impl {
  20. sender_tcp *q_this = nullptr;
  21. std::unique_ptr<tcp::acceptor> acceptor;
  22. std::unique_ptr<tcp::socket> socket;
  23. smart_buffer<uint8_t> out_buf;
  24. void close_connection() {
  25. if (socket == nullptr) return;
  26. // TODO: log client exit
  27. // if (socket->is_open()) {
  28. // auto remote_ep = socket->remote_endpoint();
  29. // SPDLOG_INFO("Client {}:{} left.",
  30. // remote_ep.address().to_string(), remote_ep.port());
  31. // }
  32. socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
  33. mq().update_variable(SENDER_CONNECTED, false);
  34. }
  35. void send_frame(frame_ptr_type &&frame) {
  36. if (!socket->is_open()) return;
  37. uint64_t packet_length = frame->length
  38. + sizeof(frame->frame_id);
  39. auto out_length = packet_length
  40. + sizeof(packet_length);
  41. out_buf.create(out_length);
  42. // fill out buffer
  43. auto ptr = out_buf.ptr;
  44. ptr = write_binary_number(ptr, packet_length);
  45. ptr = write_binary_number(ptr, frame->frame_id);
  46. memcpy(ptr, frame->ptr, frame->length);
  47. error_code err;
  48. write(*socket, buffer(out_buf.ptr, out_buf.length), err); // TODO: change to async operation
  49. if (err) {
  50. SPDLOG_WARN("Error while sending frame: {}", err.to_string());
  51. close_connection();
  52. async_waiting_client();
  53. }
  54. q_this->log_frame_sent(frame->frame_id);
  55. SPDLOG_TRACE("Frame {} sent, length = {}",
  56. frame->frame_id, frame->length);
  57. }
  58. void async_waiting_client() {
  59. acceptor->async_accept(*socket, [this](error_code err) {
  60. if (err) {
  61. SPDLOG_ERROR("Error while accepting client: {}", err.to_string());
  62. async_waiting_client();
  63. }
  64. assert(socket->is_open());
  65. q_this->request_idr_frame();
  66. mq().update_variable(SENDER_CONNECTED, true);
  67. auto remote_ep = socket->remote_endpoint();
  68. SPDLOG_INFO("New client from {}:{}.",
  69. remote_ep.address().to_string(), remote_ep.port());
  70. });
  71. }
  72. static impl *create(uint16_t listen_port, sender_tcp *q_this) {
  73. auto ret = new impl;
  74. ret->q_this = q_this;
  75. auto listen_ep = tcp::endpoint{tcp::v4(), listen_port};
  76. ret->acceptor = std::make_unique<tcp::acceptor>(*q_this->get_ctx(), listen_ep);
  77. ret->socket = std::make_unique<tcp::socket>(*q_this->get_ctx());
  78. ret->async_waiting_client();
  79. return ret;
  80. }
  81. };
  82. sender_tcp::~sender_tcp() = default;
  83. sender_tcp *sender_tcp::create(uint16_t listen_port) {
  84. auto ret = std::make_unique<sender_tcp>();
  85. auto pimpl = impl::create(listen_port, ret.get());
  86. if (pimpl == nullptr) return nullptr;
  87. ret->pimpl.reset(pimpl);
  88. return ret.release();
  89. }
  90. void sender_tcp::close_connection() {
  91. pimpl->close_connection();
  92. }
  93. void sender_tcp::handle_frame(frame_ptr_type &&frame) {
  94. pimpl->send_frame(std::move(frame));
  95. }