mal-packet-weaver
C++20 packet serialization/deserialization library.
Loading...
Searching...
No Matches
packet-dispatcher.cpp
Go to the documentation of this file.
2
3#include <SDKDDKVer.h>
4
5#include "common.hpp"
6
7namespace mal_packet_weaver
8{
9 PacketDispatcher::PacketDispatcher(boost::asio::io_context &io_context)
10 : io_context_{ io_context }, signal_handler_{ io_context },
11 unprocessed_packets_input_{ io_context,
12 [this](std::vector<BasePacketPtr> &packets) -> void
13 {
14 for (auto &packet_ptr : packets)
15 {
16 if (packet_ptr == nullptr)
17 {
18 spdlog::warn("packet_ptr in unprocessed_input is nullptr");
19 continue;
20 }
21 auto packet_id = packet_ptr->type;
22 auto &unprocessed_packets_queue_ = unprocessed_packets_[packet_id];
23 unprocessed_packets_queue_.emplace_back(std::move(packet_ptr));
24 }
25 } },
26 promise_map_input_{
27 io_context,
28 [this](std::vector<std::pair<UniquePacketID, shared_packet_promise>> &promise_input) -> void
29 {
30 for (auto &[packet_id, shared_promise] : promise_input)
31 {
32 auto &promise_queue = promise_map_[packet_id];
33 promise_queue.emplace_back(std::move(shared_promise));
34 }
35 }
36
37 },
38 promise_filter_map_input_{ io_context,
39 [this](
40 std::vector<std::pair<UniquePacketID, promise_filter>> &promise_input) -> void
41 {
42 for (auto &[packet_id, filter] : promise_input)
43 {
44 auto &filter_queue = promise_filter_map_[packet_id];
45 filter_queue.emplace_back(std::move(filter));
46 }
47 } },
48 default_handlers_input_{ io_context,
49 [this](std::vector<std::pair<UniquePacketID, handler_tuple>> &handlers) -> void
50 {
51 for (auto &[packet_id, handler] : handlers)
52 {
53 auto &handler_list = default_handlers_[packet_id];
54 // Insert the handler such that filtered ones are first.
56 handler_list, std::move(handler),
57 [](handler_tuple const &left,
58 handler_tuple const &right) -> bool __mal_toolkit_lambda_force_inline
59 {
60 if (bool(std::get<1>(left)))
61 {
62 return true;
63 }
64 else if (bool(std::get<1>(right)))
65 {
66 return false;
67 }
68 return true;
69 });
70 }
71 } },
72 subsystem_handlers_input_{ io_context,
73 [this](std::vector<std::pair<PacketSubsystemID, handler_tuple>> &handlers) -> void
74 {
75 for (auto &[packet_id, handler] : handlers)
76 {
77 auto &handler_list = subsystem_handlers_[packet_id];
78 // Insert the handler such that filtered ones are first.
80 handler_list, std::move(handler),
81 [](handler_tuple const &left,
82 handler_tuple const &right) -> bool __mal_toolkit_lambda_force_inline
83 {
84 if (bool(std::get<1>(left)))
85 {
86 return true;
87 }
88 else if (bool(std::get<1>(right)))
89 {
90 return false;
91 }
92 return true;
93 });
94 }
95 } }
96 {
97 spdlog::debug("PacketDispatcher constructor called.");
98 co_spawn(io_context, std::bind(&PacketDispatcher::Run, this), boost::asio::detached);
99 }
100 void PacketDispatcher::register_subsystem_handler(PacketSubsystemID subsystem_id, PacketHandlerFunc<Packet> handler,
101 PacketFilterFunc<Packet> filter, float delay)
102 {
103 spdlog::trace("Posting task to register subsystem handler for {} subsystem", subsystem_id);
104 handler_tuple tuple{ delay, std::move(filter), std::move(handler) };
105 subsystem_handlers_input_.push(std::pair{ subsystem_id, std::move(tuple) });
106 signal_handler_.notify();
107 }
108
109 boost::asio::awaitable<std::shared_ptr<PacketDispatcher>> PacketDispatcher::get_shared_ptr()
110 {
111 ExponentialBackoff backoff(std::chrono::microseconds(1), std::chrono::microseconds(100), 2, 32, 0.1);
112
113 int it = 0;
114 do
115 {
116 try
117 {
118 co_return shared_from_this();
119 }
120 catch (std::bad_weak_ptr &)
121 {
122 it++;
123 }
124 boost::asio::steady_timer timer(io_context_, backoff.get_current_delay());
125 co_await timer.async_wait(boost::asio::use_awaitable);
126 backoff.increase_delay();
127 if (it >= 50 && it % 20 == 0)
128 {
129 spdlog::error("Failed to retrieve shared pointer, iteration: {}", it);
130 }
131 } while (it <= 200);
132 spdlog::error("Exceeded maximum attempts to retrieve shared pointer");
133 co_return nullptr;
134 }
135 boost::asio::awaitable<void> PacketDispatcher::Run()
136 {
137 SteadyTimer timer;
138 float min_handler_timestamp = std::numeric_limits<float>::max();
139
140 std::shared_ptr<PacketDispatcher> dispatcher_lock = co_await get_shared_ptr();
141 if (dispatcher_lock == nullptr)
142 {
143 spdlog::error(
144 "Couldn't retrieve shared pointer for session. Did you create the "
145 "session using std::make_shared?");
146 co_return;
147 }
148 co_await boost::asio::this_coro::executor;
149 try
150 {
151 while (alive_.load())
152 {
153 bool updated = co_await pop_inputs();
154
155 if (!updated)
156 {
157 if (min_handler_timestamp < timer.elapsed())
158 {
159 spdlog::trace("Updating handlers...");
160 min_handler_timestamp = std::numeric_limits<float>::max();
161
162 for (auto &[packet_id, packet_vector] : unprocessed_packets_)
163 {
164 // Remove packets that fulfill handlers and update min_handler_timestamp
165 std::erase_if(
166 packet_vector, [this, &packet_id, &min_handler_timestamp, &timer](BasePacketPtr &packet)
167 __mal_toolkit_lambda_force_inline
168 { return fulfill_handlers(packet_id, packet, min_handler_timestamp, timer); });
169 }
170 }
171 if (min_handler_timestamp == std::numeric_limits<float>::max())
172 {
173 co_await signal_handler_.wait();
174 }
175 else
176 {
177 co_await signal_handler_.wait(
178 std::chrono::microseconds(static_cast<size_t>(min_handler_timestamp * 1.0e6)));
179 }
180 continue;
181 }
182
183 spdlog::trace("Input arrays were updated! Fetching...");
184 if (!alive_.load())
185 {
186 break;
187 }
188
189 min_handler_timestamp = std::numeric_limits<float>::max();
190 for (auto &[packet_id, packet_vector] : unprocessed_packets_)
191 {
192 // Process packets: fulfill promises, fulfill handlers, and check for expiration
193 std::erase_if(packet_vector,
194 [this, &packet_id, &min_handler_timestamp, &timer](BasePacketPtr &packet)
195 __mal_toolkit_lambda_force_inline
196 {
197 return fulfill_promises(packet_id, packet) ||
198 fulfill_handlers(packet_id, packet, min_handler_timestamp, timer) ||
199 packet->expired();
200 });
201 }
202
203 // Remove empty entries from the unprocessed_packets_ map
204 std::erase_if(unprocessed_packets_,
205 [](auto const &pair) __mal_toolkit_lambda_force_inline { return pair.second.empty(); });
206 }
207 }
208 catch (std::exception &e)
209 {
210 spdlog::error("PacketDispatcher::Run cancelled with an error: {}", e.what());
211 }
212 spdlog::info("Exiting PacketDispatcher::Run");
213 }
214
215 boost::asio::awaitable<bool> PacketDispatcher::pop_inputs()
216 {
217 std::vector<std::future<bool>> futures{};
218 if (unprocessed_packets_input_.has_data())
219 {
220 futures.emplace_back(unprocessed_packets_input_.create_pop_task());
221 }
222 if (promise_map_input_.has_data())
223 {
224 futures.emplace_back(promise_map_input_.create_pop_task());
225 }
226 if (promise_filter_map_input_.has_data())
227 {
228 futures.emplace_back(promise_filter_map_input_.create_pop_task());
229 }
230 if (default_handlers_input_.has_data())
231 {
232 futures.emplace_back(default_handlers_input_.create_pop_task());
233 }
234 if (subsystem_handlers_input_.has_data())
235 {
236 futures.emplace_back(subsystem_handlers_input_.create_pop_task());
237 }
238 bool rv = false;
239 co_await boost::asio::this_coro::executor;
240 for (auto &future : futures)
241 {
242 spdlog::trace("Waiting for futures to complete...");
243 rv |= co_await await_future(io_context_, future);
244 spdlog::trace("Futures complete. Result for popping is: {}", rv);
245 }
246 co_return rv;
247 }
248} // namespace mal_packet_weaver
std::unordered_map< UniquePacketID, std::vector< BasePacketPtr > > unprocessed_packets_
std::tuple< float, PacketFilterFunc< Packet >, PacketHandlerFunc< Packet > > handler_tuple
Alias for a tuple containing information for packet handling.
std::unique_ptr< Packet > BasePacketPtr
Alias for a unique pointer to a base packet type.
PacketDispatcher(boost::asio::io_context &io_context)
Constructs a PacketDispatcher instance associated with the given io_context.
A utility class for implementing exponential backoff strategies.
Definition backoffs.hpp:17
constexpr ChronoType get_current_delay() noexcept
Get the current backoff delay.
Definition backoffs.hpp:47
constexpr void increase_delay() noexcept
Increase the backoff delay using exponential factor.
Definition backoffs.hpp:69
constexpr float elapsed() noexcept
Calculates and returns the elapsed time since the timer was last reset.
Definition timer.hpp:43
This is the main namespace for the Mal Packet Weaver library.
Definition common.hpp:42
std::function< bool(Args..., DerivedPacket const &)> PacketFilterFunc
Predicate function type to filter packets.
uint16_t PacketSubsystemID
Type alias for packet subsystem IDs.
Definition packet.hpp:12
std::function< void(Args..., std::unique_ptr< DerivedPacket >)> PacketHandlerFunc
Callback function type to handle packets asynchronously.
void SortedInsert(std::vector< T > &range, T &&value)
Inserts a value into a sorted vector while maintaining the sorted order.
STL namespace.