2#include <boost/algorithm/hex.hpp>
5 Session::Session(boost::asio::io_context &io, boost::asio::ip::tcp::socket &&socket)
6 : socket_(
std::move(socket)),
7 received_packets_{ 8192 },
8 packets_to_send_{ 8192 }
10 spdlog::debug(
"Session: Creating a new session");
16 spdlog::info(
"Session: Socket is open. Session created");
20 spdlog::warn(
"Something went wrong. Socket is closed.");
21 throw std::invalid_argument(
"Socket is closed");
26 for (
size_t i = 0; i < 1; i++)
29 boost::asio::detached);
51 if (
const std::unique_ptr<ByteArray> packet_data =
pop_packet_data(); packet_data)
53 spdlog::trace(
"Successfully retrieved packet data.");
55 if (!
encryption_ && packet_data->at(0) != std::byte{ 0 })
59 spdlog::error(
"Cannot decrypt packet without an instance of encryption_. Skipping.");
62 if (
encryption_ && packet_data->at(0) != std::byte{ 0 })
64 spdlog::trace(
"Decrypting packet data...");
67 spdlog::trace(
"Decrypted packet type: {}", packet_type);
72 spdlog::trace(
"Packet type: {}", packet_type);
80 spdlog::trace(
"Async packet popping initiated.");
88 spdlog::trace(
"Successfully popped a packet asynchronously.");
91 co_await boost::asio::this_coro::executor;
94 spdlog::error(
"Async packet popping stopped, session is not alive.");
105 spdlog::trace(
"Successfully popped packet data.");
106 return std::unique_ptr<ByteArray>(packet);
114 ExponentialBackoff backoff(std::chrono::microseconds(25), std::chrono::microseconds(100), 2, 32, 0.1);
121 co_return shared_from_this();
123 catch (std::bad_weak_ptr &)
128 co_await timer.async_wait(boost::asio::use_awaitable);
130 if (it >= 50 && it % 20 == 0)
132 spdlog::error(
"Failed to retrieve shared pointer, iteration: {}", it);
135 spdlog::error(
"Exceeded maximum attempts to retrieve shared pointer");
141 bool writing =
false;
146 const uint32_t kDefaultDataToSendSize = 1024 * 64;
152 const uint32_t kMaximumDataToSendSize = 1024 * 1024 * 1;
153 data_to_send.reserve(kDefaultDataToSendSize);
155 std::shared_ptr<Session> session_lock =
co_await get_shared_ptr(io);
156 if (session_lock ==
nullptr)
159 "Couldn't retrieve shared pointer for session. Did you create the "
160 "session using std::make_shared?");
166 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
172 spdlog::trace(
"Starting data preparation and writing process...");
174 data_to_send.clear();
175 if (data_to_send.capacity() >= kMaximumDataToSendSize)
177 data_to_send.shrink_to_fit();
182 (i < 1000 && data_to_send.size() < kDefaultDataToSendSize) &&
packets_to_send_.pop(packet);
186 data_to_send.
append(*packet);
188 if (packet !=
nullptr)
193 spdlog::trace(
"Sending data...");
196 if (spdlog::get_level() == spdlog::level::trace)
198 std::string sending_hex;
199 boost::algorithm::hex(data_to_send.
as<uint8_t>(),
200 data_to_send.
as<uint8_t>() + data_to_send.size(),
201 std::back_inserter(sending_hex));
202 spdlog::trace(
"Sending hex: {}", sending_hex);
205 async_write(
socket_, boost::asio::buffer(data_to_send.
as<
char>(), data_to_send.size()),
206 [&](
const boost::system::error_code ec, [[maybe_unused]] std::size_t length)
209 data_to_send.clear();
212 spdlog::warn(
"Error sending message: {}", ec.message());
216 spdlog::trace(
"Data sent successfully");
222 co_await timer.async_wait(boost::asio::use_awaitable);
225 catch (std::exception &e)
227 spdlog::error(
"Send loop terminated: {}", e.what());
231 spdlog::error(
"Send loop terminated: reason unknown.");
234 spdlog::debug(
"Send loop terminated");
236 boost::asio::awaitable<void> Session::async_packet_forger(boost::asio::io_context &io)
238 boost::asio::streambuf receiver_buffer;
239 spdlog::debug(
"Starting async_packet_forger...");
241 std::shared_ptr<Session> session_lock =
co_await get_shared_ptr(io);
242 if (session_lock ==
nullptr)
245 "Couldn't retrieve shared pointer for session. Did you create the "
246 "session using std::make_shared?");
249 const auto socket_receive = [&]() -> boost::asio::awaitable<size_t> {
252 boost::asio::streambuf::mutable_buffers_type bufs = receiver_buffer.prepare(512);
253 size_t n =
co_await socket_.async_receive(bufs, boost::asio::use_awaitable);
254 receiver_buffer.commit(n);
257 catch (std::exception &e)
261 spdlog::error(
"Error reading message, socket dead: {}", e.what());
263 packets_to_send_.consume_all(
266 if (value !=
nullptr)
273 const auto read_bytes_to = [&](
ByteArray &byte_array,
const size_t amount)
275 const size_t current_size = byte_array.size();
276 byte_array.resize(current_size + amount);
277 receiver_buffer.sgetn(byte_array.
as<
char>() + current_size *
sizeof(char), amount);
281 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
284 if (receiver_buffer.size() < 4)
286 co_await socket_receive();
289 spdlog::trace(
"Buffer size is sufficient for a packet...");
292 read_bytes_to(packet_header, 4);
295 spdlog::trace(
"Read packet size: {}", packet_size);
300 AlwaysAssert(packet_size != 0 && packet_size < 1024ULL * 1024 * 1024 * 4,
301 "The amount of bytes to read is too big. 4GB? What are you "
302 "transfering? Anyways, it seems to be a bug.");
304 while (
static_cast<int64_t
>(receiver_buffer.size()) < packet_size && alive_)
306 co_await socket_receive();
307 spdlog::trace(
"Waiting for buffer to reach packet size...");
310 if (
static_cast<int64_t
>(receiver_buffer.size()) < packet_size)
314 spdlog::error(
"Buffer still not sufficient, breaking out of loop...");
319 read_bytes_to(*packet_data, packet_size);
320 spdlog::trace(
"Read packet data with size: {}", packet_size);
322 while (!received_packets_.push(packet_data))
324 spdlog::trace(
"Waiting to push packet data to received_packets_...");
325 co_await timer.async_wait(boost::asio::use_awaitable);
329 spdlog::debug(
"Exiting async_packet_forger.");
332 boost::asio::awaitable<void> Session::async_packet_sender(boost::asio::io_context &io)
334 spdlog::debug(
"Starting async_packet_sender...");
336 std::shared_ptr<Session> session_lock =
co_await get_shared_ptr(io);
337 if (session_lock ==
nullptr)
340 "Couldn't retrieve shared pointer for session. Did you create the "
341 "session using std::make_shared?");
344 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
349 spdlog::trace(
"Waiting for packet data...");
350 while ((!
bool(packet_receiver_) || !received_packets_.pop(packet_data)))
354 spdlog::warn(
"Session is no longer alive, exiting loop...");
357 co_await timer.async_wait(boost::asio::use_awaitable);
359 spdlog::trace(
"Received packet data!");
363 if (!encryption_ && packet_data->at(0) != std::byte{ 0 })
367 spdlog::error(
"Cannot decrypt packet without an instance of encryption_. Skipping.");
372 if (encryption_ && packet_data->at(0) != std::byte{ 0 })
374 const ByteArray plain = encryption_->decrypt(packet_data->
view(1));
379 spdlog::trace(
"Decrypting and deserializing packet data...");
380 packet_receiver_(PacketFactory::Deserialize(plain.
view(4), packet_type));
382 catch (
const std::exception &e)
384 spdlog::warn(
"Packet receiver has thrown an exception: {}", e.what());
393 spdlog::trace(
"Deserializing packet data...");
394 packet_receiver_(PacketFactory::Deserialize(packet_data->
view(5), packet_type));
396 catch (
const std::exception &e)
398 spdlog::warn(
"Packet receiver has thrown an exception: {}", e.what());
406 spdlog::debug(
"Exiting async_packet_sender.");
static std::unique_ptr< Packet > Deserialize(const ByteView &bytearray, UniquePacketID packet_type)
Deserialize a byte view into a unique pointer of the specified packet type.
boost::lockfree::queue< ByteArray *, boost::lockfree::fixed_sized< true > > packets_to_send_
Lock-free queue to store packets that are waiting to be sent.
std::shared_ptr< mal_packet_weaver::crypto::EncryptionInterface > encryption_
Holder for encryption using EncryptionInterface.
std::unique_ptr< Packet > pop_packet_now()
Returns the earliest acquired packet. If packet queue is empty, returns nullptr.
boost::asio::awaitable< std::shared_ptr< Session > > get_shared_ptr(boost::asio::io_context &io)
Retrieves a shared pointer to the current session.
boost::lockfree::queue< ByteArray *, boost::lockfree::fixed_sized< true > > received_packets_
Lock-free queue to store received packets that are waiting to be processed.
Session(boost::asio::io_context &io, boost::asio::ip::tcp::socket &&socket)
Constructor for the Session class.
boost::asio::ip::tcp::tcp::socket socket_
The TCP socket for network communication.
boost::asio::awaitable< void > send_all(boost::asio::io_context &io)
Asynchronously sends all packets in the queue through the network.
boost::asio::awaitable< void > async_packet_sender(boost::asio::io_context &io)
Asynchronously receives and processes incoming packets from the network.
boost::asio::awaitable< std::unique_ptr< Packet > > pop_packet_async(boost::asio::io_context &io)
std::unique_ptr< ByteArray > pop_packet_data() noexcept
Pops the packet data from the received packets queue.
boost::asio::awaitable< void > async_packet_forger(boost::asio::io_context &io)
Asynchronously forges new packets from the buffer.
virtual ~Session()
Destructor for the Session class.
bool alive_
Indicates whether the session is alive and operational.
virtual ByteArray decrypt(const ByteView ciphertext) const =0
This is the main namespace for the Mal Packet Weaver library.