7 spdlog::trace(
"Enqueuing packet for processing.");
10 template <IsPacket DerivedPacket>
13 auto packet_type = DerivedPacket::static_unique_id;
14 auto promise = std::make_shared<std::promise<BasePacketPtr>>();
16 auto future = promise->get_future();
17 co_await boost::asio::this_coro::executor;
19 spdlog::debug(
"Waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
24 Assert(base->type == DerivedPacket::static_unique_id);
25 spdlog::trace(
"Received packet: {}, ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
26 co_return std::unique_ptr<DerivedPacket>(
reinterpret_cast<DerivedPacket *
>(base.release()));
32 std::chrono::microseconds(
static_cast<size_t>(timeout * 1e6f)));
33 Assert(base->type == DerivedPacket::static_unique_id);
34 spdlog::trace(
"Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
35 co_return std::unique_ptr<DerivedPacket>(
reinterpret_cast<DerivedPacket *
>(base.release()));
37 catch (std::runtime_error &err)
39 spdlog::warn(
"Timed out waiting for packet {} ({}): {}", DerivedPacket::static_unique_id,
40 DerivedPacket::static_packet_name, err.what());
43 catch (std::exception &exception)
45 spdlog::error(
"An error occurred while waiting for packet {} ({}): {}", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name, exception.what());
50 template <IsPacket DerivedPacket>
54 auto packet_type = DerivedPacket::static_unique_id;
55 auto promise = std::make_shared<std::promise<BasePacketPtr>>();
57 return passedFilter(*
reinterpret_cast<DerivedPacket *
>(packet.get()));
60 auto future = promise->get_future();
61 co_await boost::asio::this_coro::executor;
63 spdlog::trace(
"Waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
68 Assert(base->type == DerivedPacket::static_unique_id);
69 spdlog::trace(
"Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
70 co_return std::unique_ptr<DerivedPacket>(
reinterpret_cast<DerivedPacket *
>(base.release()));
75 auto base =
co_await await_future(
io_context_, future, std::chrono::microseconds(
static_cast<size_t>(timeout * 1e6f)));
76 Assert(base->type == DerivedPacket::static_unique_id);
77 spdlog::trace(
"Received packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
78 co_return std::unique_ptr<DerivedPacket>(
reinterpret_cast<DerivedPacket *
>(base.release()));
80 catch (std::runtime_error &err)
82 spdlog::warn(
"Timed out waiting for packet: {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
85 catch (std::exception &exception)
87 spdlog::error(
"An error occurred while waiting for packet {} ({}): {}", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name,
93 template <IsPacket DerivedPacket>
97 spdlog::trace(
"Posting task to register default handler for packet {} ({})", DerivedPacket::static_unique_id, DerivedPacket::static_packet_name);
98 constexpr auto packet_id = DerivedPacket::static_unique_id;
104 : ([moved_filter = std::move(filter)](
Packet const &packet) ->
bool
105 {
return moved_filter(
reinterpret_cast<DerivedPacket const &
>(packet)); }),
106 [moved_handler = std::move(handler)](std::unique_ptr<Packet> &&packet)
108 auto ptr =
reinterpret_cast<DerivedPacket *
>(packet.release());
109 auto uptr = std::unique_ptr<DerivedPacket>(ptr);
110 moved_handler(std::move(uptr));
118 spdlog::trace(
"Posting task to enqueue promise for packet {}", packet_id);
125 spdlog::trace(
"Posting task to enqueue promise with filter for packet {}", packet_id);
141 spdlog::trace(
"Fulfilled filtered promise for packet_id: {} ({})", packet_id, packet->packet_name());
154 spdlog::trace(
"Fulfilled promise for packet_id: {} ({})", packet_id, packet->packet_name());
155 it->second.front()->set_value(std::move(packet));
156 it->second.pop_front();
161 spdlog::trace(
"No promises to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
172 goto fulfill_handlers_exit;
175 for (
auto &[delay, filter, handler] : it->second)
177 if (delay > packet->get_packet_time_alive())
179 min_handler_timestamp = std::min<float>(min_handler_timestamp, timer.
elapsed() + delay - packet->get_packet_time_alive());
180 spdlog::trace(
"Handler delay for packet_id {} ({}) is greater than packet time alive.", packet_id, packet->packet_name());
184 if (
bool(filter) && !filter(*packet))
186 spdlog::trace(
"Filter condition not satisfied for packet_id: {} ({})", packet_id, packet->packet_name());
190 spdlog::trace(
"Fulfilled handler for packet_id: {} ({})", packet_id, packet->packet_name());
191 handler(std::move(packet));
195 spdlog::trace(
"No suitable default handler to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
197 fulfill_handlers_exit:
202 spdlog::trace(
"No handlers to fulfill for packet_id: {} ({})", packet_id, packet->packet_name());
205 for (
auto &[delay, filter, handler] : it->second)
207 if (delay > packet->get_packet_time_alive())
209 min_handler_timestamp =
210 std::min<float>(min_handler_timestamp, timer.
elapsed() + delay - packet->get_packet_time_alive());
211 spdlog::trace(
"Handler delay for packet_id {} ({}) is greater than packet time alive.", packet_id, packet->packet_name());
215 if (
bool(filter) && !filter(*packet))
217 spdlog::trace(
"Filter condition not satisfied for packet_id: {} ({})", packet_id, packet->packet_name());
221 spdlog::trace(
"Fulfilled handler for packet_id: {} ({})", packet_id, packet->packet_name());
222 handler(std::move(packet));
231 spdlog::trace(
"Pushed packet {} to unprocessed_packets_input_ queue.", packet->packet_name());
A templated class representing a derived packet from the base Packet class.
void push(Value &&value)
Pushes a value into the queue for asynchronous processing.
SynchronizationWrapper< BasePacketPtr > unprocessed_packets_input_
std::unordered_map< UniquePacketID, std::vector< handler_tuple > > default_handlers_
boost::asio::awaitable< std::unique_ptr< DerivedPacket > > await_packet(float timeout=-1.0f)
Wait until the packet is registered in the dispatch system and return as soon as possible.
bool fulfill_promises(UniquePacketID packet_id, BasePacketPtr &packet)
Fulfills promises associated with a packet ID.
void push_packet(BasePacketPtr &&packet)
Pushes an input packet to the unprocessed_packets_input_ queue.
shared_promise< BasePacketPtr > shared_packet_promise
Alias for a shared promise of a base packet pointer.
SignalHandler signal_handler_
std::tuple< float, PacketFilterFunc< Packet >, PacketHandlerFunc< Packet > > handler_tuple
Alias for a tuple containing information for packet handling.
void enqueue_filter_promise(UniquePacketID packet_id, promise_filter filtered_promise)
Enqueues a promise with a filter associated with a packet.
std::unordered_map< PacketSubsystemID, std::vector< handler_tuple > > subsystem_handlers_
bool fulfill_handlers(UniquePacketID packet_id, BasePacketPtr &packet, float &min_handler_timestamp, SteadyTimer &timer)
Fulfills handlers associated with a packet ID and packet data.
std::unique_ptr< Packet > BasePacketPtr
Alias for a unique pointer to a base packet type.
std::unordered_map< UniquePacketID, std::vector< promise_filter > > promise_filter_map_
void register_default_handler(PacketHandlerFunc< DerivedPacket > handler, PacketFilterFunc< DerivedPacket > filter={}, float delay=0.0f)
Registers a default handler for the provided packet type.
void enqueue_packet(BasePacketPtr &&packet)
Enqueues a packet for processing.
SynchronizationWrapper< std::pair< UniquePacketID, promise_filter > > promise_filter_map_input_
boost::asio::io_context & io_context_
std::pair< std::function< bool(BasePacketPtr const &)>, shared_packet_promise > promise_filter
Alias for a filter function paired with a shared packet promise.
SynchronizationWrapper< std::pair< UniquePacketID, handler_tuple > > default_handlers_input_
SynchronizationWrapper< std::pair< UniquePacketID, shared_packet_promise > > promise_map_input_
void enqueue_promise(UniquePacketID packet_id, shared_packet_promise promise)
Enqueues a promise associated with a packet.
std::unordered_map< UniquePacketID, std::deque< shared_packet_promise > > promise_map_
Base class for all packets.
This is the main namespace for the Mal Packet Weaver library.
boost::asio::awaitable< T > await_future(Executor &executor, std::future< T > &fut)
constexpr PacketSubsystemID UniquePacketIDToPacketSubsystemID(UniquePacketID subsystem_type) noexcept
Extract PacketSubsystemID from a UniquePacketID.
std::function< bool(Args..., DerivedPacket const &)> PacketFilterFunc
Predicate function type to filter packets.
std::function< void(Args..., std::unique_ptr< DerivedPacket >)> PacketHandlerFunc
Callback function type to handle packets asynchronously.
uint32_t UniquePacketID
Unique identifier for a packet, combining subsystem and packet IDs.