mal-packet-weaver
C++20 packet serialization/deserialization library.
Loading...
Searching...
No Matches
session.cpp
Go to the documentation of this file.
1#include "session.hpp"
2#include <boost/algorithm/hex.hpp>
3namespace mal_packet_weaver
4{
5 Session::Session(boost::asio::io_context &io, boost::asio::ip::tcp::socket &&socket)
6 : socket_(std::move(socket)),
7 received_packets_{ 8192 }, // Initialize received_packets_ with a buffer size of 8192
8 packets_to_send_{ 8192 } // Initialize packets_to_send_ with a buffer size of 8192
9 {
10 spdlog::debug("Session: Creating a new session");
11
12 // Check if the socket is open and mark the session as alive if so
13 alive_ = socket_.is_open();
14 if (alive_)
15 {
16 spdlog::info("Session: Socket is open. Session created");
17 }
18 else
19 {
20 spdlog::warn("Something went wrong. Socket is closed.");
21 throw std::invalid_argument("Socket is closed");
22 }
23 // Start asynchronous tasks for receiving data, packet forging, and sending packets concurrently
24 co_spawn(socket_.get_executor(), std::bind(&Session::async_packet_forger, this, std::ref(io)), boost::asio::detached);
25 co_spawn(socket_.get_executor(), std::bind(&Session::send_all, this, std::ref(io)), boost::asio::detached);
26 for (size_t i = 0; i < 1; i++)
27 {
28 co_spawn(socket_.get_executor(), std::bind(&Session::async_packet_sender, this, std::ref(io)),
29 boost::asio::detached);
30 }
31 }
32
34 {
35 packets_to_send_.consume_all(
36 [](ByteArray *value)
37 {
38 if (value != nullptr)
39 delete value;
40 });
41 received_packets_.consume_all(
42 [](ByteArray *value)
43 {
44 if (value != nullptr)
45 delete value;
46 });
47 }
48
49 std::unique_ptr<Packet> Session::pop_packet_now()
50 {
51 if (const std::unique_ptr<ByteArray> packet_data = pop_packet_data(); packet_data)
52 {
53 spdlog::trace("Successfully retrieved packet data.");
54
55 if (!encryption_ && packet_data->at(0) != std::byte{ 0 })
56 {
57 // TODO: cache packets that are encrypted till encryption_ is initialized. Add timeouts for
58 // that packets. Right now we just skip them, and it might not be okay.
59 spdlog::error("Cannot decrypt packet without an instance of encryption_. Skipping.");
60 return nullptr;
61 }
62 if (encryption_ && packet_data->at(0) != std::byte{ 0 })
63 {
64 spdlog::trace("Decrypting packet data...");
65 const ByteArray plain = encryption_->decrypt(packet_data->view(1));
66 const uint32_t packet_type = bytes_to_uint32(plain.view(0, 4));
67 spdlog::trace("Decrypted packet type: {}", packet_type);
68 return PacketFactory::Deserialize(plain.view(4), packet_type);
69 }
70
71 const uint32_t packet_type = bytes_to_uint32(packet_data->view(1, 4));
72 spdlog::trace("Packet type: {}", packet_type);
73 return PacketFactory::Deserialize(packet_data->view(5), packet_type);
74 }
75 return nullptr;
76 }
77
78 boost::asio::awaitable<std::unique_ptr<Packet>> Session::pop_packet_async(boost::asio::io_context &io)
79 {
80 spdlog::trace("Async packet popping initiated.");
81
82 while (this->alive_)
83 {
84 std::unique_ptr<Packet> packet = pop_packet_now();
85
86 if (packet)
87 {
88 spdlog::trace("Successfully popped a packet asynchronously.");
89 co_return packet;
90 }
91 co_await boost::asio::this_coro::executor;
92 }
93
94 spdlog::error("Async packet popping stopped, session is not alive.");
95 co_return nullptr;
96 }
97
98 std::unique_ptr<ByteArray> Session::pop_packet_data() noexcept
99 {
100 ByteArray *packet = nullptr;
101 received_packets_.pop(packet);
102
103 if (packet)
104 {
105 spdlog::trace("Successfully popped packet data.");
106 return std::unique_ptr<ByteArray>(packet);
107 }
108
109 return nullptr;
110 }
111
112 boost::asio::awaitable<std::shared_ptr<Session>> Session::get_shared_ptr(boost::asio::io_context &io)
113 {
114 ExponentialBackoff backoff(std::chrono::microseconds(25), std::chrono::microseconds(100), 2, 32, 0.1);
115
116 int it = 0;
117 do
118 {
119 try
120 {
121 co_return shared_from_this();
122 }
123 catch (std::bad_weak_ptr &)
124 {
125 it++;
126 }
127 boost::asio::steady_timer timer(io, backoff.get_current_delay());
128 co_await timer.async_wait(boost::asio::use_awaitable);
129 backoff.increase_delay();
130 if (it >= 50 && it % 20 == 0)
131 {
132 spdlog::error("Failed to retrieve shared pointer, iteration: {}", it);
133 }
134 } while (it <= 200);
135 spdlog::error("Exceeded maximum attempts to retrieve shared pointer");
136 co_return nullptr;
137 }
138
139 boost::asio::awaitable<void> Session::send_all(boost::asio::io_context &io)
140 {
141 bool writing = false;
142 ByteArray data_to_send;
143
144 // TODO: make these configurable per session
145 // 64 Kb, no reason to allocate more per session
146 const uint32_t kDefaultDataToSendSize = 1024 * 64;
147 // If user somehow managed to send the packet of this size or bigger
148 // we shrink the size back to kDefaultDataToSendSize.
149 // If capacity of the vector is lower than this we will keep it's size.
150 // This is done solely so we don't consume a lot of memory per session if we send heavy
151 // packets from time to time.
152 const uint32_t kMaximumDataToSendSize = 1024 * 1024 * 1;
153 data_to_send.reserve(kDefaultDataToSendSize);
154
155 std::shared_ptr<Session> session_lock = co_await get_shared_ptr(io);
156 if (session_lock == nullptr)
157 {
158 spdlog::error(
159 "Couldn't retrieve shared pointer for session. Did you create the "
160 "session using std::make_shared?");
161 co_return;
162 }
163
164 try
165 {
166 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
167 while (alive_)
168 {
169 if (!packets_to_send_.empty() && !writing)
170 {
171 writing = true;
172 spdlog::trace("Starting data preparation and writing process...");
173
174 data_to_send.clear();
175 if (data_to_send.capacity() >= kMaximumDataToSendSize)
176 {
177 data_to_send.shrink_to_fit();
178 }
179
180 ByteArray *packet = nullptr;
181 for (int i = 0;
182 (i < 1000 && data_to_send.size() < kDefaultDataToSendSize) && packets_to_send_.pop(packet);
183 i++)
184 {
185 data_to_send.append(uint32_to_bytes(static_cast<uint32_t>(packet->size())));
186 data_to_send.append(*packet);
187 }
188 if (packet != nullptr)
189 {
190 delete packet;
191 }
192
193 spdlog::trace("Sending data...");
194 // show data as hex
195 {
196 if (spdlog::get_level() == spdlog::level::trace)
197 {
198 std::string sending_hex;
199 boost::algorithm::hex(data_to_send.as<uint8_t>(),
200 data_to_send.as<uint8_t>() + data_to_send.size(),
201 std::back_inserter(sending_hex));
202 spdlog::trace("Sending hex: {}", sending_hex);
203 }
204 }
205 async_write(socket_, boost::asio::buffer(data_to_send.as<char>(), data_to_send.size()),
206 [&](const boost::system::error_code ec, [[maybe_unused]] std::size_t length)
207 {
208 writing = false;
209 data_to_send.clear();
210 if (ec)
211 {
212 spdlog::warn("Error sending message: {}", ec.message());
213 }
214 else
215 {
216 spdlog::trace("Data sent successfully");
217 }
218 });
219
220 continue;
221 }
222 co_await timer.async_wait(boost::asio::use_awaitable);
223 }
224 }
225 catch (std::exception &e)
226 {
227 spdlog::error("Send loop terminated: {}", e.what());
228 }
229 catch (...)
230 {
231 spdlog::error("Send loop terminated: reason unknown.");
232 }
233
234 spdlog::debug("Send loop terminated");
235 }
236 boost::asio::awaitable<void> Session::async_packet_forger(boost::asio::io_context &io)
237 {
238 boost::asio::streambuf receiver_buffer;
239 spdlog::debug("Starting async_packet_forger...");
240
241 std::shared_ptr<Session> session_lock = co_await get_shared_ptr(io);
242 if (session_lock == nullptr)
243 {
244 spdlog::error(
245 "Couldn't retrieve shared pointer for session. Did you create the "
246 "session using std::make_shared?");
247 co_return;
248 }
249 const auto socket_receive = [&]() -> boost::asio::awaitable<size_t> {
250 try
251 {
252 boost::asio::streambuf::mutable_buffers_type bufs = receiver_buffer.prepare(512);
253 size_t n = co_await socket_.async_receive(bufs, boost::asio::use_awaitable);
254 receiver_buffer.commit(n);
255 co_return n;
256 }
257 catch (std::exception &e)
258 {
259 auto &v = socket_;
260 socket_.close();
261 spdlog::error("Error reading message, socket dead: {}", e.what());
262 alive_ = false;
263 packets_to_send_.consume_all(
264 [](ByteArray *value)
265 {
266 if (value != nullptr)
267 delete value;
268 });
269 co_return 0;
270 }
271 };
272
273 const auto read_bytes_to = [&](ByteArray &byte_array, const size_t amount)
274 {
275 const size_t current_size = byte_array.size();
276 byte_array.resize(current_size + amount);
277 receiver_buffer.sgetn(byte_array.as<char>() + current_size * sizeof(char), amount);
278 };
279
280
281 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
282 while (alive_)
283 {
284 if (receiver_buffer.size() < 4)
285 {
286 co_await socket_receive();
287 continue;
288 }
289 spdlog::trace("Buffer size is sufficient for a packet...");
290
291 ByteArray packet_header;
292 read_bytes_to(packet_header, 4);
293 const int64_t packet_size = bytes_to_uint32(packet_header);
294
295 spdlog::trace("Read packet size: {}", packet_size);
296
297 // TODO: add a system that ensures that packet data size is correct.
298 // TODO: handle exception, and if packet size is too big we need to do something
299 // about it.
300 AlwaysAssert(packet_size != 0 && packet_size < 1024ULL * 1024 * 1024 * 4,
301 "The amount of bytes to read is too big. 4GB? What are you "
302 "transfering? Anyways, it seems to be a bug.");
303
304 while (static_cast<int64_t>(receiver_buffer.size()) < packet_size && alive_)
305 {
306 co_await socket_receive();
307 spdlog::trace("Waiting for buffer to reach packet size...");
308 }
309
310 if (static_cast<int64_t>(receiver_buffer.size()) < packet_size)
311 // While loop waits until requirement is satisfied, so if it's false then alive_ is
312 // false and session is dead, so we won't get any data anymore
313 {
314 spdlog::error("Buffer still not sufficient, breaking out of loop...");
315 break;
316 }
317
318 ByteArray *packet_data = new ByteArray;
319 read_bytes_to(*packet_data, packet_size);
320 spdlog::trace("Read packet data with size: {}", packet_size);
321
322 while (!received_packets_.push(packet_data))
323 {
324 spdlog::trace("Waiting to push packet data to received_packets_...");
325 co_await timer.async_wait(boost::asio::use_awaitable);
326 }
327 }
328
329 spdlog::debug("Exiting async_packet_forger.");
330 }
331
332 boost::asio::awaitable<void> Session::async_packet_sender(boost::asio::io_context &io)
333 {
334 spdlog::debug("Starting async_packet_sender...");
335
336 std::shared_ptr<Session> session_lock = co_await get_shared_ptr(io);
337 if (session_lock == nullptr)
338 {
339 spdlog::error(
340 "Couldn't retrieve shared pointer for session. Did you create the "
341 "session using std::make_shared?");
342 co_return;
343 }
344 boost::asio::steady_timer timer(io, std::chrono::microseconds(5));
345
346 while (alive_)
347 {
348 ByteArray *packet_data = nullptr;
349 spdlog::trace("Waiting for packet data...");
350 while ((!bool(packet_receiver_) || !received_packets_.pop(packet_data)))
351 {
352 if (!alive_)
353 {
354 spdlog::warn("Session is no longer alive, exiting loop...");
355 break;
356 }
357 co_await timer.async_wait(boost::asio::use_awaitable);
358 }
359 spdlog::trace("Received packet data!");
360
361 if (packet_data)
362 {
363 if (!encryption_ && packet_data->at(0) != std::byte{ 0 })
364 {
365 // TODO: cache packets that are encrypted till encryption_ is initialized. Add timeouts
366 // for that packets. Right now we just skip them, and it might not be okay.
367 spdlog::error("Cannot decrypt packet without an instance of encryption_. Skipping.");
368 delete packet_data;
369 continue;
370 }
371
372 if (encryption_ && packet_data->at(0) != std::byte{ 0 })
373 {
374 const ByteArray plain = encryption_->decrypt(packet_data->view(1));
375 const uint32_t packet_type = bytes_to_uint32(plain.view(0, 4));
376
377 try
378 {
379 spdlog::trace("Decrypting and deserializing packet data...");
380 packet_receiver_(PacketFactory::Deserialize(plain.view(4), packet_type));
381 }
382 catch (const std::exception &e)
383 {
384 spdlog::warn("Packet receiver has thrown an exception: {}", e.what());
385 }
386 }
387 else
388 {
389 const uint32_t packet_type = bytes_to_uint32(packet_data->view(1, 4));
390
391 try
392 {
393 spdlog::trace("Deserializing packet data...");
394 packet_receiver_(PacketFactory::Deserialize(packet_data->view(5), packet_type));
395 }
396 catch (const std::exception &e)
397 {
398 spdlog::warn("Packet receiver has thrown an exception: {}", e.what());
399 }
400 }
401
402 delete packet_data;
403 }
404 }
405
406 spdlog::debug("Exiting async_packet_sender.");
407 }
408
409} // namespace mal_packet_weaver
static std::unique_ptr< Packet > Deserialize(const ByteView &bytearray, UniquePacketID packet_type)
Deserialize a byte view into a unique pointer of the specified packet type.
boost::lockfree::queue< ByteArray *, boost::lockfree::fixed_sized< true > > packets_to_send_
Lock-free queue to store packets that are waiting to be sent.
Definition session.hpp:185
std::shared_ptr< mal_packet_weaver::crypto::EncryptionInterface > encryption_
Holder for encryption using EncryptionInterface.
Definition session.hpp:200
std::unique_ptr< Packet > pop_packet_now()
Returns the earliest acquired packet. If packet queue is empty, returns nullptr.
Definition session.cpp:49
boost::asio::awaitable< std::shared_ptr< Session > > get_shared_ptr(boost::asio::io_context &io)
Retrieves a shared pointer to the current session.
Definition session.cpp:112
boost::lockfree::queue< ByteArray *, boost::lockfree::fixed_sized< true > > received_packets_
Lock-free queue to store received packets that are waiting to be processed.
Definition session.hpp:181
Session(boost::asio::io_context &io, boost::asio::ip::tcp::socket &&socket)
Constructor for the Session class.
Definition session.cpp:5
boost::asio::ip::tcp::tcp::socket socket_
The TCP socket for network communication.
Definition session.hpp:195
boost::asio::awaitable< void > send_all(boost::asio::io_context &io)
Asynchronously sends all packets in the queue through the network.
Definition session.cpp:139
boost::asio::awaitable< void > async_packet_sender(boost::asio::io_context &io)
Asynchronously receives and processes incoming packets from the network.
Definition session.cpp:332
boost::asio::awaitable< std::unique_ptr< Packet > > pop_packet_async(boost::asio::io_context &io)
Definition session.cpp:78
std::unique_ptr< ByteArray > pop_packet_data() noexcept
Pops the packet data from the received packets queue.
Definition session.cpp:98
boost::asio::awaitable< void > async_packet_forger(boost::asio::io_context &io)
Asynchronously forges new packets from the buffer.
Definition session.cpp:236
virtual ~Session()
Destructor for the Session class.
Definition session.cpp:33
bool alive_
Indicates whether the session is alive and operational.
Definition session.hpp:190
virtual ByteArray decrypt(const ByteView ciphertext) const =0
A utility class for implementing exponential backoff strategies.
Definition backoffs.hpp:17
constexpr ChronoType get_current_delay() noexcept
Get the current backoff delay.
Definition backoffs.hpp:47
constexpr void increase_delay() noexcept
Increase the backoff delay using exponential factor.
Definition backoffs.hpp:69
This is the main namespace for the Mal Packet Weaver library.
Definition common.hpp:42
ByteArray uint32_to_bytes(const uint32_t value)
Convert a uint32_t value to a ByteArray using little-endian byte order.
uint32_t bytes_to_uint32(const ByteView byte_view)
Convert a byte array to a uint32_t value using little-endian byte order.
void AlwaysAssert(bool value, std::string_view message="Assert failed", std::source_location location=std::source_location::current())
Always asserts a condition with customizable behavior.
STL namespace.
A dynamically resizable array of bytes.
constexpr ByteView view(size_t from=0) const
Create a ByteView of a subsequence of the byte array.
constexpr T * as()
Convert the ByteArray to a typed pointer.
void append(First &&first, Second &&second, Args &&...args)
Append multiple data sources to the byte array.