FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
paced_sender.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-2.0-only
2//
3// Copyright (c) 2026-present FERS Contributors (see AUTHORS.md).
4//
5// See the GNU GPLv2 LICENSE file in the FERS project root for more information.
6
8
9#include <algorithm>
10#include <atomic>
11#include <chrono>
12#include <stdexcept>
13
14#if defined(__i386__) || defined(__x86_64__)
15#include <immintrin.h>
16#endif
17
19{
20 namespace
21 {
22 constexpr auto kCoarsePacingSleep = std::chrono::milliseconds(1);
23 constexpr auto kFinePacingSpin = std::chrono::microseconds(200);
24
25 void cpuPause() noexcept
26 {
27#if defined(__i386__) || defined(__x86_64__)
28 _mm_pause();
29#elif defined(__aarch64__) && (defined(__GNUC__) || defined(__clang__))
30 __asm__ __volatile__("yield" ::: "memory");
31#else
32 std::atomic_signal_fence(std::memory_order_seq_cst);
33#endif
34 }
35 }
36
37 PacedSender::PacedSender(std::unique_ptr<DatagramSender> sender, const std::size_t queue_depth) :
38 _sender(std::move(sender)), _queue_depth(queue_depth)
39 {
40 if (!_sender)
41 {
42 throw std::invalid_argument("PacedSender requires a datagram sender");
43 }
44 if (queue_depth == 0)
45 {
46 throw std::invalid_argument("PacedSender queue depth must be positive");
47 }
48 }
49
51
52 void PacedSender::open(const std::string& host, const std::uint16_t port) { _sender->open(host, port); }
53
55 {
56 std::scoped_lock const lock(_mutex);
57 if (_started)
58 {
59 return;
60 }
61 _simulation_epoch_time = simulation_epoch_time;
62 _steady_epoch = std::chrono::steady_clock::now();
63 _stopping = false;
64 _started = true;
65 _thread = std::thread([this] { run(); });
66 }
67
69 {
70 std::unique_lock lock(_mutex);
71 if (!_started)
72 {
73 throw std::logic_error("PacedSender must be started before enqueue");
74 }
75 if (_stopping)
76 {
77 return EnqueueResult{
78 .enqueued = false,
79 .dropped = std::nullopt,
80 };
81 }
82
83 _cv.wait(lock, [this] { return _stopping || queuedOrSendingCount() < _queue_depth; });
84 if (_stopping)
85 {
86 return EnqueueResult{
87 .enqueued = false,
88 .dropped = std::nullopt,
89 };
90 }
91
92 _queue.push_back(std::move(packet));
93 _cv.notify_one();
94 return EnqueueResult{.enqueued = true, .dropped = std::nullopt};
95 }
96
98 {
99 std::unique_lock lock(_mutex);
100 _cv.wait(lock, [this] { return _queue.empty() && !_send_in_progress; });
101 }
102
104 {
105 {
106 std::scoped_lock const lock(_mutex);
107 if (!_started && !_thread.joinable())
108 {
109 _sender->close();
110 return;
111 }
112 _stopping = true;
113 _cv.notify_all();
114 }
115
116 if (_thread.joinable())
117 {
118 _thread.join();
119 }
120
121 {
122 std::scoped_lock const lock(_mutex);
123 _started = false;
124 _stopping = false;
125 _send_in_progress = false;
126 _cv.notify_all();
127 }
128 _sender->close();
129 }
130
131 std::uint64_t PacedSender::latePacketCount(const std::uint32_t stream_id) const
132 {
133 std::scoped_lock const lock(_mutex);
134 const auto found = _late_packets.find(stream_id);
135 return found == _late_packets.end() ? 0 : found->second;
136 }
137
138 std::uint64_t PacedSender::sentPacketCount(const std::uint32_t stream_id) const
139 {
140 std::scoped_lock const lock(_mutex);
141 const auto found = _sent_packets.find(stream_id);
142 return found == _sent_packets.end() ? 0 : found->second;
143 }
144
145 std::uint64_t PacedSender::sendFailureCount(const std::uint32_t stream_id) const
146 {
147 std::scoped_lock const lock(_mutex);
148 const auto found = _send_failures.find(stream_id);
149 return found == _send_failures.end() ? 0 : found->second;
150 }
151
152 std::uint64_t PacedSender::droppedDataPacketCount(const std::uint32_t stream_id) const
153 {
154 std::scoped_lock const lock(_mutex);
155 const auto found = _dropped_data_packets.find(stream_id);
156 return found == _dropped_data_packets.end() ? 0 : found->second;
157 }
158
159 std::uint64_t PacedSender::droppedContextPacketCount(const std::uint32_t stream_id) const
160 {
161 std::scoped_lock const lock(_mutex);
162 const auto found = _dropped_context_packets.find(stream_id);
163 return found == _dropped_context_packets.end() ? 0 : found->second;
164 }
165
166 std::uint64_t PacedSender::droppedSampleCount(const std::uint32_t stream_id) const
167 {
168 std::scoped_lock const lock(_mutex);
169 const auto found = _dropped_samples.find(stream_id);
170 return found == _dropped_samples.end() ? 0 : found->second;
171 }
172
173 std::vector<DroppedDatagram> PacedSender::consumeDroppedDatagrams()
174 {
175 std::scoped_lock const lock(_mutex);
176 auto result = std::move(_pending_dropped_datagrams);
177 _pending_dropped_datagrams.clear();
178 return result;
179 }
180
181 void PacedSender::run()
182 {
183 std::unique_lock lock(_mutex);
184 while (true)
185 {
186 if (_queue.empty())
187 {
188 if (_stopping)
189 {
190 return;
191 }
192 _cv.wait(lock, [this] { return _stopping || !_queue.empty(); });
193 continue;
194 }
195
196 const auto due = dueTime(_queue.front());
197 waitUntilDue(lock, due);
198
199 auto packet = std::move(_queue.front());
200 _queue.pop_front();
201 _send_in_progress = true;
202 const auto now = std::chrono::steady_clock::now();
203 lock.unlock();
204 sendOneUnlocked(std::move(packet), now);
205 lock.lock();
206 _send_in_progress = false;
207 _cv.notify_all();
208 }
209 }
210
211 void PacedSender::waitUntilDue(std::unique_lock<std::mutex>& lock, const std::chrono::steady_clock::time_point due)
212 {
213 while (true)
214 {
215 const auto now = std::chrono::steady_clock::now();
216 if (now >= due)
217 {
218 return;
219 }
220
221 const auto remaining = due - now;
223 {
224 const auto coarse_sleep =
225 std::chrono::duration_cast<std::chrono::steady_clock::duration>(kCoarsePacingSleep);
226 const auto fine_spin = std::chrono::duration_cast<std::chrono::steady_clock::duration>(kFinePacingSpin);
227 const auto wait_duration = std::min(remaining - fine_spin, coarse_sleep);
228 _cv.wait_for(lock, wait_duration);
229 continue;
230 }
231
232 lock.unlock();
233 while (std::chrono::steady_clock::now() < due)
234 {
235 cpuPause();
236 }
237 lock.lock();
238 }
239 }
240
241 void PacedSender::sendOneUnlocked(SerializedPacket packet, const std::chrono::steady_clock::time_point now)
242 {
243 const auto due = dueTime(packet);
244 try
245 {
246 _sender->send(packet.bytes);
247 }
248 catch (...)
249 {
250 std::scoped_lock const lock(_mutex);
251 ++_send_failures[packet.stream_id];
252 recordDroppedUnlocked(packet);
253 _pending_dropped_datagrams.push_back(makeDroppedDatagram(packet));
254 return;
255 }
256 std::scoped_lock const lock(_mutex);
257 ++_sent_packets[packet.stream_id];
258 if (now > due + std::chrono::milliseconds(1))
259 {
260 ++_late_packets[packet.stream_id];
261 }
262 }
263
264 void PacedSender::recordDroppedUnlocked(const SerializedPacket& packet)
265 {
266 if (packet.data_packet || (!packet.context_packet && packet.sample_count > 0))
267 {
268 ++_dropped_data_packets[packet.stream_id];
269 _dropped_samples[packet.stream_id] += packet.sample_count;
270 return;
271 }
272 if (packet.context_packet)
273 {
274 ++_dropped_context_packets[packet.stream_id];
275 }
276 }
277
278 DroppedDatagram PacedSender::makeDroppedDatagram(const SerializedPacket& packet) const noexcept
279 {
280 return DroppedDatagram{.stream_id = packet.stream_id,
281 .sample_count = packet.sample_count,
282 .data_packet = packet.data_packet,
283 .context_packet = packet.context_packet};
284 }
285
286 std::size_t PacedSender::queuedOrSendingCount() const noexcept
287 {
288 return _queue.size() + (_send_in_progress ? 1u : 0u);
289 }
290
291 std::chrono::steady_clock::time_point PacedSender::dueTime(const SerializedPacket& packet) const
292 {
293 const auto seconds = packet.first_sample_time - _simulation_epoch_time;
294 const auto nanos = static_cast<std::int64_t>(seconds * 1'000'000'000.0);
295 return _steady_epoch + std::chrono::nanoseconds(nanos);
296 }
297
298}
std::vector< DroppedDatagram > consumeDroppedDatagrams()
PacedSender(std::unique_ptr< DatagramSender > sender, std::size_t queue_depth)
std::uint64_t droppedContextPacketCount(std::uint32_t stream_id) const
std::uint64_t droppedSampleCount(std::uint32_t stream_id) const
std::uint64_t sentPacketCount(std::uint32_t stream_id) const
std::uint64_t droppedDataPacketCount(std::uint32_t stream_id) const
void start(RealType simulation_epoch_time=0.0)
void open(const std::string &host, std::uint16_t port)
EnqueueResult enqueue(SerializedPacket packet)
std::uint64_t latePacketCount(std::uint32_t stream_id) const
std::uint64_t sendFailureCount(std::uint32_t stream_id) const
double RealType
Type for real numbers.
Definition config.h:27
math::Vec3 max