10 : io_context_{ io_context }, signal_handler_{ io_context },
11 unprocessed_packets_input_{ io_context,
14 for (
auto &packet_ptr : packets)
16 if (packet_ptr ==
nullptr)
18 spdlog::warn(
"packet_ptr in unprocessed_input is nullptr");
21 auto packet_id = packet_ptr->type;
23 unprocessed_packets_queue_.emplace_back(std::move(packet_ptr));
28 [
this](std::vector<std::pair<UniquePacketID, shared_packet_promise>> &promise_input) ->
void
30 for (
auto &[packet_id, shared_promise] : promise_input)
32 auto &promise_queue = promise_map_[packet_id];
33 promise_queue.emplace_back(std::move(shared_promise));
38 promise_filter_map_input_{ io_context,
40 std::vector<std::pair<UniquePacketID, promise_filter>> &promise_input) ->
void
42 for (
auto &[packet_id, filter] : promise_input)
44 auto &filter_queue = promise_filter_map_[packet_id];
45 filter_queue.emplace_back(std::move(filter));
48 default_handlers_input_{ io_context,
49 [
this](std::vector<std::pair<UniquePacketID, handler_tuple>> &handlers) ->
void
51 for (
auto &[packet_id, handler] : handlers)
53 auto &handler_list = default_handlers_[packet_id];
56 handler_list, std::move(handler),
57 [](handler_tuple
const &left,
58 handler_tuple
const &right) ->
bool __mal_toolkit_lambda_force_inline
60 if (
bool(std::get<1>(left)))
64 else if (
bool(std::get<1>(right)))
72 subsystem_handlers_input_{ io_context,
73 [
this](std::vector<std::pair<PacketSubsystemID, handler_tuple>> &handlers) ->
void
75 for (
auto &[packet_id, handler] : handlers)
77 auto &handler_list = subsystem_handlers_[packet_id];
80 handler_list, std::move(handler),
81 [](handler_tuple
const &left,
82 handler_tuple
const &right) ->
bool __mal_toolkit_lambda_force_inline
84 if (
bool(std::get<1>(left)))
88 else if (
bool(std::get<1>(right)))
97 spdlog::debug(
"PacketDispatcher constructor called.");
98 co_spawn(io_context, std::bind(&PacketDispatcher::Run,
this), boost::asio::detached);
109 boost::asio::awaitable<std::shared_ptr<PacketDispatcher>> PacketDispatcher::get_shared_ptr()
111 ExponentialBackoff backoff(std::chrono::microseconds(1), std::chrono::microseconds(100), 2, 32, 0.1);
118 co_return shared_from_this();
120 catch (std::bad_weak_ptr &)
125 co_await timer.async_wait(boost::asio::use_awaitable);
127 if (it >= 50 && it % 20 == 0)
129 spdlog::error(
"Failed to retrieve shared pointer, iteration: {}", it);
132 spdlog::error(
"Exceeded maximum attempts to retrieve shared pointer");
135 boost::asio::awaitable<void> PacketDispatcher::Run()
138 float min_handler_timestamp = std::numeric_limits<float>::max();
140 std::shared_ptr<PacketDispatcher> dispatcher_lock =
co_await get_shared_ptr();
141 if (dispatcher_lock ==
nullptr)
144 "Couldn't retrieve shared pointer for session. Did you create the "
145 "session using std::make_shared?");
148 co_await boost::asio::this_coro::executor;
151 while (alive_.load())
153 bool updated =
co_await pop_inputs();
157 if (min_handler_timestamp < timer.
elapsed())
159 spdlog::trace(
"Updating handlers...");
160 min_handler_timestamp = std::numeric_limits<float>::max();
162 for (
auto &[packet_id, packet_vector] : unprocessed_packets_)
166 packet_vector, [
this, &packet_id, &min_handler_timestamp, &timer](
BasePacketPtr &packet)
167 __mal_toolkit_lambda_force_inline
168 {
return fulfill_handlers(packet_id, packet, min_handler_timestamp, timer); });
171 if (min_handler_timestamp == std::numeric_limits<float>::max())
173 co_await signal_handler_.wait();
177 co_await signal_handler_.wait(
178 std::chrono::microseconds(
static_cast<size_t>(min_handler_timestamp * 1.0e6)));
183 spdlog::trace(
"Input arrays were updated! Fetching...");
189 min_handler_timestamp = std::numeric_limits<float>::max();
190 for (
auto &[packet_id, packet_vector] : unprocessed_packets_)
193 std::erase_if(packet_vector,
194 [
this, &packet_id, &min_handler_timestamp, &timer](
BasePacketPtr &packet)
195 __mal_toolkit_lambda_force_inline
197 return fulfill_promises(packet_id, packet) ||
198 fulfill_handlers(packet_id, packet, min_handler_timestamp, timer) ||
204 std::erase_if(unprocessed_packets_,
205 [](
auto const &pair) __mal_toolkit_lambda_force_inline {
return pair.second.empty(); });
208 catch (std::exception &e)
210 spdlog::error(
"PacketDispatcher::Run cancelled with an error: {}", e.what());
212 spdlog::info(
"Exiting PacketDispatcher::Run");
215 boost::asio::awaitable<bool> PacketDispatcher::pop_inputs()
217 std::vector<std::future<bool>> futures{};
218 if (unprocessed_packets_input_.has_data())
220 futures.emplace_back(unprocessed_packets_input_.create_pop_task());
222 if (promise_map_input_.has_data())
224 futures.emplace_back(promise_map_input_.create_pop_task());
226 if (promise_filter_map_input_.has_data())
228 futures.emplace_back(promise_filter_map_input_.create_pop_task());
230 if (default_handlers_input_.has_data())
232 futures.emplace_back(default_handlers_input_.create_pop_task());
234 if (subsystem_handlers_input_.has_data())
236 futures.emplace_back(subsystem_handlers_input_.create_pop_task());
239 co_await boost::asio::this_coro::executor;
240 for (
auto &future : futures)
242 spdlog::trace(
"Waiting for futures to complete...");
243 rv |=
co_await await_future(io_context_, future);
244 spdlog::trace(
"Futures complete. Result for popping is: {}", rv);