mal-packet-weaver
C++20 packet serialization/deserialization library.
Loading...
Searching...
No Matches
packet-dispatcher.inl
Go to the documentation of this file.
1#pragma once
3namespace mal_packet_weaver
4{
6 {
7 spdlog::trace("Enqueuing packet for processing.");
8 push_packet(std::move(packet));
9 }
10 template <IsPacket DerivedPacket>
11 boost::asio::awaitable<std::unique_ptr<DerivedPacket>> PacketDispatcher::await_packet(float timeout)
12 {
13 auto packet_type = DerivedPacket::static_unique_id;
14 auto promise = std::make_shared<std::promise<BasePacketPtr>>();
15 enqueue_promise(packet_type, promise);
16 auto future = promise->get_future();
17 co_await boost::asio::this_coro::executor;
18
19 spdlog::debug("Waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
20
21 if (timeout <= 0)
22 {
23 auto base = co_await await_future(io_context_, future);
24 Assert(base->type == DerivedPacket::static_unique_id); // Sanity check
25 spdlog::trace("Received packet: {}, ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
26 co_return std::unique_ptr<DerivedPacket>(reinterpret_cast<DerivedPacket *>(base.release()));
27 }
28
29 try
30 {
31 auto base = co_await await_future(io_context_, future,
32 std::chrono::microseconds(static_cast<size_t>(timeout * 1e6f)));
33 Assert(base->type == DerivedPacket::static_unique_id); // Sanity check
34 spdlog::trace("Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
35 co_return std::unique_ptr<DerivedPacket>(reinterpret_cast<DerivedPacket *>(base.release()));
36 }
37 catch (std::runtime_error &err)
38 {
39 spdlog::warn("Timed out waiting for packet {} ({}): {}", DerivedPacket::static_unique_id,
40 DerivedPacket::static_packet_name, err.what());
41 co_return nullptr;
42 }
43 catch (std::exception &exception)
44 {
45 spdlog::error("An error occurred while waiting for packet {} ({}): {}", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name, exception.what());
46 co_return nullptr;
47 }
48 }
49
50 template <IsPacket DerivedPacket>
51 boost::asio::awaitable<std::unique_ptr<DerivedPacket>> PacketDispatcher::await_packet(
52 PacketFilterFunc<DerivedPacket> filter, float timeout)
53 {
54 auto packet_type = DerivedPacket::static_unique_id;
55 auto promise = std::make_shared<std::promise<BasePacketPtr>>();
56 enqueue_filter_promise(packet_type, { [passedFilter = filter](BasePacketPtr const &packet) {
57 return passedFilter(*reinterpret_cast<DerivedPacket *>(packet.get()));
58 },
59 promise });
60 auto future = promise->get_future();
61 co_await boost::asio::this_coro::executor;
62
63 spdlog::trace("Waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
64
65 if (timeout <= 0)
66 {
67 auto base = co_await await_future(io_context_, future);
68 Assert(base->type == DerivedPacket::static_unique_id); // Sanity check
69 spdlog::trace("Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
70 co_return std::unique_ptr<DerivedPacket>(reinterpret_cast<DerivedPacket *>(base.release()));
71 }
72
73 try
74 {
75 auto base = co_await await_future(io_context_, future, std::chrono::microseconds(static_cast<size_t>(timeout * 1e6f)));
76 Assert(base->type == DerivedPacket::static_unique_id); // Sanity check
77 spdlog::trace("Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
78 co_return std::unique_ptr<DerivedPacket>(reinterpret_cast<DerivedPacket *>(base.release()));
79 }
80 catch (std::runtime_error &err)
81 {
82 spdlog::warn("Timed out waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
83 co_return nullptr;
84 }
85 catch (std::exception &exception)
86 {
87 spdlog::error("An error occurred while waiting for packet {} ({}): {}", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name,
88 exception.what());
89 co_return nullptr;
90 }
91 }
92
93 template <IsPacket DerivedPacket>
95 PacketFilterFunc<DerivedPacket> filter, float delay)
96 {
97 spdlog::trace("Posting task to register default handler for packet {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
98 constexpr auto packet_id = DerivedPacket::static_unique_id;
99
100 handler_tuple tuple =
101 handler_tuple{ delay,
102 !bool(filter)
104 : ([moved_filter = std::move(filter)](Packet const &packet) -> bool
105 { return moved_filter(reinterpret_cast<DerivedPacket const &>(packet)); }),
106 [moved_handler = std::move(handler)](std::unique_ptr<Packet> &&packet)
107 {
108 auto ptr = reinterpret_cast<DerivedPacket *>(packet.release());
109 auto uptr = std::unique_ptr<DerivedPacket>(ptr);
110 moved_handler(std::move(uptr));
111 } };
112 default_handlers_input_.push(std::pair{packet_id, std::move(tuple)});
114 }
115
117 {
118 spdlog::trace("Posting task to enqueue promise for packet {}", packet_id);
119 promise_map_input_.push(std::pair{ packet_id, std::move(promise) });
121 }
122
124 {
125 spdlog::trace("Posting task to enqueue promise with filter for packet {}", packet_id);
126 promise_filter_map_input_.push(std::pair{packet_id, std::move(filtered_promise)});
128 }
129
131 {
132 // Fulfill first filtered promise in filter_promise_map
133 {
134 auto it = promise_filter_map_.find(packet_id);
135 if (it != promise_filter_map_.end())
136 {
137 for (auto &promise_filter : it->second)
138 {
139 if (!promise_filter.first || promise_filter.first(packet))
140 {
141 spdlog::trace("Fulfilled filtered promise for packet_id: {} ({})", packet_id, packet->packet_name());
142 promise_filter.second->set_value(std::move(packet));
143 return true;
144 }
145 }
146 }
147 }
148
149 // Fulfill the first promise in promise_map
150 {
151 auto it = promise_map_.find(packet_id);
152 if (it != promise_map_.end() && !it->second.empty())
153 {
154 spdlog::trace("Fulfilled promise for packet_id: {} ({})", packet_id, packet->packet_name());
155 it->second.front()->set_value(std::move(packet));
156 it->second.pop_front();
157 return true;
158 }
159 }
160
161 spdlog::trace("No promises to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
162 return false;
163 }
164
166 float &min_handler_timestamp, SteadyTimer &timer)
167 {
168 {
169 auto it = default_handlers_.find(packet_id);
170 if (it == default_handlers_.end())
171 {
172 goto fulfill_handlers_exit;
173 }
174
175 for (auto &[delay, filter, handler] : it->second)
176 {
177 if (delay > packet->get_packet_time_alive())
178 {
179 min_handler_timestamp = std::min<float>(min_handler_timestamp, timer.elapsed() + delay - packet->get_packet_time_alive());
180 spdlog::trace("Handler delay for packet_id {} ({}) is greater than packet time alive.", packet_id, packet->packet_name());
181 continue;
182 }
183
184 if (bool(filter) && !filter(*packet))
185 {
186 spdlog::trace("Filter condition not satisfied for packet_id: {} ({})", packet_id, packet->packet_name());
187 continue;
188 }
189
190 spdlog::trace("Fulfilled handler for packet_id: {} ({})", packet_id, packet->packet_name());
191 handler(std::move(packet));
192 return true;
193 }
194
195 spdlog::trace("No suitable default handler to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
196 }
197 fulfill_handlers_exit:
198 {
200 if (it == subsystem_handlers_.end())
201 {
202 spdlog::trace("No handlers to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
203 return false;
204 }
205 for (auto &[delay, filter, handler] : it->second)
206 {
207 if (delay > packet->get_packet_time_alive())
208 {
209 min_handler_timestamp =
210 std::min<float>(min_handler_timestamp, timer.elapsed() + delay - packet->get_packet_time_alive());
211 spdlog::trace("Handler delay for packet_id {} ({}) is greater than packet time alive.", packet_id, packet->packet_name());
212 continue;
213 }
214
215 if (bool(filter) && !filter(*packet))
216 {
217 spdlog::trace("Filter condition not satisfied for packet_id: {} ({})", packet_id, packet->packet_name());
218 continue;
219 }
220
221 spdlog::trace("Fulfilled handler for packet_id: {} ({})", packet_id, packet->packet_name());
222 handler(std::move(packet));
223 return true;
224 }
225 }
226 return false;
227 }
228
230 {
231 spdlog::trace("Pushed packet {} to unprocessed_packets_input_ queue.", packet->packet_name());
232 unprocessed_packets_input_.push(std::move(packet));
234 }
235
236} // namespace mal_packet_weaver
A templated class representing a derived packet from the base Packet class.
Definition packet.hpp:114
void push(Value &&value)
Pushes a value into the queue for asynchronous processing.
SynchronizationWrapper< BasePacketPtr > unprocessed_packets_input_
std::unordered_map< UniquePacketID, std::vector< handler_tuple > > default_handlers_
boost::asio::awaitable< std::unique_ptr< DerivedPacket > > await_packet(float timeout=-1.0f)
Wait until the packet is registered in the dispatch system and return as soon as possible.
bool fulfill_promises(UniquePacketID packet_id, BasePacketPtr &packet)
Fulfills promises associated with a packet ID.
void push_packet(BasePacketPtr &&packet)
Pushes an input packet to the unprocessed_packets_input_ queue.
shared_promise< BasePacketPtr > shared_packet_promise
Alias for a shared promise of a base packet pointer.
std::tuple< float, PacketFilterFunc< Packet >, PacketHandlerFunc< Packet > > handler_tuple
Alias for a tuple containing information for packet handling.
void enqueue_filter_promise(UniquePacketID packet_id, promise_filter filtered_promise)
Enqueues a promise with a filter associated with a packet.
std::unordered_map< PacketSubsystemID, std::vector< handler_tuple > > subsystem_handlers_
bool fulfill_handlers(UniquePacketID packet_id, BasePacketPtr &packet, float &min_handler_timestamp, SteadyTimer &timer)
Fulfills handlers associated with a packet ID and packet data.
std::unique_ptr< Packet > BasePacketPtr
Alias for a unique pointer to a base packet type.
std::unordered_map< UniquePacketID, std::vector< promise_filter > > promise_filter_map_
void register_default_handler(PacketHandlerFunc< DerivedPacket > handler, PacketFilterFunc< DerivedPacket > filter={}, float delay=0.0f)
Registers a default handler for the provided packet type.
void enqueue_packet(BasePacketPtr &&packet)
Enqueues a packet for processing.
SynchronizationWrapper< std::pair< UniquePacketID, promise_filter > > promise_filter_map_input_
std::pair< std::function< bool(BasePacketPtr const &)>, shared_packet_promise > promise_filter
Alias for a filter function paired with a shared packet promise.
SynchronizationWrapper< std::pair< UniquePacketID, handler_tuple > > default_handlers_input_
SynchronizationWrapper< std::pair< UniquePacketID, shared_packet_promise > > promise_map_input_
void enqueue_promise(UniquePacketID packet_id, shared_packet_promise promise)
Enqueues a promise associated with a packet.
std::unordered_map< UniquePacketID, std::deque< shared_packet_promise > > promise_map_
Base class for all packets.
Definition packet.hpp:47
constexpr float elapsed() noexcept
Calculates and returns the elapsed time since the timer was last reset.
Definition timer.hpp:43
This is the main namespace for the Mal Packet Weaver library.
Definition common.hpp:42
boost::asio::awaitable< T > await_future(Executor &executor, std::future< T > &fut)
Definition common.hpp:118
constexpr PacketSubsystemID UniquePacketIDToPacketSubsystemID(UniquePacketID subsystem_type) noexcept
Extract PacketSubsystemID from a UniquePacketID.
Definition packet.hpp:28
std::function< bool(Args..., DerivedPacket const &)> PacketFilterFunc
Predicate function type to filter packets.
std::function< void(Args..., std::unique_ptr< DerivedPacket >)> PacketHandlerFunc
Callback function type to handle packets asynchronously.
uint32_t UniquePacketID
Unique identifier for a packet, combining subsystem and packet IDs.
Definition packet.hpp:16
void Assert(bool value, std::string_view message="Assert failed", std::source_location location=std::source_location::current())
Asserts a condition with customizable behavior based on debug mode.