mal-packet-weaver
C++20 packet serialization/deserialization library.
Loading...
Searching...
No Matches
packet-dispatcher.hpp
Go to the documentation of this file.
1#pragma once
2#include "packet.hpp"
3
4namespace mal_packet_weaver
5{
16 template <typename DerivedPacket, typename... Args>
17 using PacketHandlerFunc = std::function<void(Args..., std::unique_ptr<DerivedPacket>)>;
18
29 template <typename DerivedPacket, typename... Args>
30 using PacketFilterFunc = std::function<bool(Args..., DerivedPacket const &)>;
31
45 public std::enable_shared_from_this<PacketDispatcher>
46 {
55 template <typename Value>
57 {
58 public:
65 SynchronizationWrapper(boost::asio::io_context &context, std::function<void(std::vector<Value> &)> fn)
66 : strand_{ context }, dequeue_function_(std::move(fn))
67 {
68 }
77 inline void push(Value &&value) requires(std::is_copy_constructible_v<Value> || std::is_copy_assignable_v<Value>)
78 {
79 strand_.post(
80 [this, copied_value = value]()
81 {
82 input_data_.emplace_back(std::move(copied_value));
83 synchronization_flag_.test_and_set(std::memory_order_release);
84 }
85 );
86 }
87 inline void push(Value &&value)
88 requires(std::is_same_v<Value, std::unique_ptr<typename Value::element_type>>)
89 {
90 strand_.post(
91 [this, released_ptr = value.release()]()
92 {
93 Value unique_ptr{ released_ptr };
94 input_data_.emplace_back(std::move(unique_ptr));
95 synchronization_flag_.test_and_set(std::memory_order_release);
96 });
97 }
98
105 inline bool has_data() { return synchronization_flag_.test(std::memory_order_acquire); }
106
116 inline std::future<bool> create_pop_task()
117 {
118 std::shared_ptr<std::promise<bool>> promise = std::make_shared<std::promise<bool>>();
119 std::future<bool> input_future = promise->get_future();
120 strand_.post(
121 [this, promise]()
122 {
123 if (input_data_.empty())
124 {
125 promise->set_value(false);
126 synchronization_flag_.clear(std::memory_order_release);
127 return;
128 }
130 input_data_.clear();
131 promise->set_value(true);
132 synchronization_flag_.clear(std::memory_order_release);
133 });
134 return input_future;
135 }
136
137 private:
138 boost::asio::io_context::strand strand_;
139 std::atomic_flag synchronization_flag_;
140 std::function<void(std::vector<Value> &)> dequeue_function_;
141 std::vector<Value> input_data_;
142 };
143
144 public:
148 using BasePacketPtr = std::unique_ptr<Packet>;
149
157 template <typename T>
158 using shared_promise = std::shared_ptr<std::promise<T>>;
159
164
172 using promise_filter = std::pair<std::function<bool(BasePacketPtr const &)>, shared_packet_promise>;
173
182 using handler_tuple = std::tuple<float, PacketFilterFunc<Packet>, PacketHandlerFunc<Packet>>;
183
189 PacketDispatcher(boost::asio::io_context &io_context);
190
199 inline void enqueue_packet(BasePacketPtr &&packet);
214 template <IsPacket DerivedPacket>
215 boost::asio::awaitable<std::unique_ptr<DerivedPacket>> await_packet(float timeout = -1.0f);
234 template <IsPacket DerivedPacket>
235 boost::asio::awaitable<std::unique_ptr<DerivedPacket>> await_packet(PacketFilterFunc<DerivedPacket> filter,
236 float timeout = -1.0f);
254 template <IsPacket DerivedPacket>
256 PacketFilterFunc<DerivedPacket> filter = {}, float delay = 0.0f);
257
275 PacketFilterFunc<Packet> filter = {}, float delay = 0.0f);
276
287 inline void enqueue_promise(UniquePacketID packet_id, shared_packet_promise promise);
299 inline void enqueue_filter_promise(UniquePacketID packet_id, promise_filter filtered_promise);
300
306 void Destroy() { alive_.store(false); }
307
308 private:
315 boost::asio::awaitable<std::shared_ptr<PacketDispatcher>> get_shared_ptr();
316
324 boost::asio::awaitable<void> Run();
325
340 inline bool fulfill_promises(UniquePacketID packet_id, BasePacketPtr &packet);
341
357 inline bool fulfill_handlers(UniquePacketID packet_id, BasePacketPtr &packet, float &min_handler_timestamp,
358 SteadyTimer &timer);
359
370 inline void push_packet(BasePacketPtr &&packet);
371
376 boost::asio::awaitable<bool> pop_inputs();
377
378 boost::asio::io_context &io_context_;
393 std::unordered_map<UniquePacketID, std::vector<BasePacketPtr>>
395 std::unordered_map<UniquePacketID, std::deque<shared_packet_promise>>
397 std::unordered_map<UniquePacketID, std::vector<promise_filter>>
399 std::unordered_map<UniquePacketID, std::vector<handler_tuple>>
401 std::unordered_map<PacketSubsystemID, std::vector<handler_tuple>>
405 std::atomic_bool alive_{ true };
406 };
407} // namespace mal_packet_weaver
408
409#include "packet-dispatcher.inl"
A templated class representing a derived packet from the base Packet class.
Definition packet.hpp:114
A wrapper class for synchronizing and processing data.
std::function< void(std::vector< Value > &)> dequeue_function_
bool has_data()
Checks if there is data available for processing.
void push(Value &&value)
Pushes a value into the queue for asynchronous processing.
std::future< bool > create_pop_task()
Creates a task to asynchronously process the enqueued data.
SynchronizationWrapper(boost::asio::io_context &context, std::function< void(std::vector< Value > &)> fn)
Constructor for the SynchronizationWrapper class.
The PacketDispatcher class is responsible for managing packet dispatching and handling.
SynchronizationWrapper< BasePacketPtr > unprocessed_packets_input_
std::unordered_map< UniquePacketID, std::vector< handler_tuple > > default_handlers_
boost::asio::awaitable< std::unique_ptr< DerivedPacket > > await_packet(float timeout=-1.0f)
Wait until the packet is registered in the dispatch system and return as soon as possible.
std::unordered_map< UniquePacketID, std::vector< BasePacketPtr > > unprocessed_packets_
boost::asio::awaitable< std::shared_ptr< PacketDispatcher > > get_shared_ptr()
Retrieves a shared pointer to the current dispatcher.
bool fulfill_promises(UniquePacketID packet_id, BasePacketPtr &packet)
Fulfills promises associated with a packet ID.
void push_packet(BasePacketPtr &&packet)
Pushes an input packet to the unprocessed_packets_input_ queue.
SynchronizationWrapper< std::pair< PacketSubsystemID, handler_tuple > > subsystem_handlers_input_
shared_promise< BasePacketPtr > shared_packet_promise
Alias for a shared promise of a base packet pointer.
std::tuple< float, PacketFilterFunc< Packet >, PacketHandlerFunc< Packet > > handler_tuple
Alias for a tuple containing information for packet handling.
void enqueue_filter_promise(UniquePacketID packet_id, promise_filter filtered_promise)
Enqueues a promise with a filter associated with a packet.
void register_subsystem_handler(PacketSubsystemID subsystem_id, PacketHandlerFunc< Packet > handler, PacketFilterFunc< Packet > filter={}, float delay=0.0f)
Registers a subsystem handler for the provided packet type.
std::unordered_map< PacketSubsystemID, std::vector< handler_tuple > > subsystem_handlers_
bool fulfill_handlers(UniquePacketID packet_id, BasePacketPtr &packet, float &min_handler_timestamp, SteadyTimer &timer)
Fulfills handlers associated with a packet ID and packet data.
std::unique_ptr< Packet > BasePacketPtr
Alias for a unique pointer to a base packet type.
boost::asio::awaitable< bool > pop_inputs()
Pops input packets from input queues to local maps for processing.
std::unordered_map< UniquePacketID, std::vector< promise_filter > > promise_filter_map_
void register_default_handler(PacketHandlerFunc< DerivedPacket > handler, PacketFilterFunc< DerivedPacket > filter={}, float delay=0.0f)
Registers a default handler for the provided packet type.
void enqueue_packet(BasePacketPtr &&packet)
Enqueues a packet for processing.
SynchronizationWrapper< std::pair< UniquePacketID, promise_filter > > promise_filter_map_input_
PacketDispatcher(boost::asio::io_context &io_context)
Constructs a PacketDispatcher instance associated with the given io_context.
std::pair< std::function< bool(BasePacketPtr const &)>, shared_packet_promise > promise_filter
Alias for a filter function paired with a shared packet promise.
void Destroy()
Coroutines use the shared pointer from this, so you need to explicitly call Destroy so alive_ is fals...
std::shared_ptr< std::promise< T > > shared_promise
Alias for a shared promise of type T.
boost::asio::awaitable< void > Run()
This function represents the main loop for running a task with exponential backoff and asynchronous I...
SynchronizationWrapper< std::pair< UniquePacketID, handler_tuple > > default_handlers_input_
SynchronizationWrapper< std::pair< UniquePacketID, shared_packet_promise > > promise_map_input_
void enqueue_promise(UniquePacketID packet_id, shared_packet_promise promise)
Enqueues a promise associated with a packet.
std::unordered_map< UniquePacketID, std::deque< shared_packet_promise > > promise_map_
A class that can't be copied neither moved.
This is the main namespace for the Mal Packet Weaver library.
Definition common.hpp:42
std::function< bool(Args..., DerivedPacket const &)> PacketFilterFunc
Predicate function type to filter packets.
uint16_t PacketSubsystemID
Type alias for packet subsystem IDs.
Definition packet.hpp:12
std::function< void(Args..., std::unique_ptr< DerivedPacket >)> PacketHandlerFunc
Callback function type to handle packets asynchronously.
uint32_t UniquePacketID
Unique identifier for a packet, combining subsystem and packet IDs.
Definition packet.hpp:16
STL namespace.