27#if defined(__i386__) || defined(__x86_64__)
29#elif defined(__aarch64__) && (defined(__GNUC__) || defined(__clang__))
32 std::atomic_signal_fence(std::memory_order_seq_cst);
38 _sender(std::
move(
sender)), _queue_depth(queue_depth)
42 throw std::invalid_argument(
"PacedSender requires a datagram sender");
46 throw std::invalid_argument(
"PacedSender queue depth must be positive");
52 void PacedSender::open(
const std::string& host,
const std::uint16_t port) { _sender->open(host, port); }
56 std::scoped_lock
const lock(_mutex);
62 _steady_epoch = std::chrono::steady_clock::now();
65 _thread = std::thread([
this] { run(); });
70 std::unique_lock
lock(_mutex);
73 throw std::logic_error(
"PacedSender must be started before enqueue");
79 .dropped = std::nullopt,
83 _cv.wait(
lock, [
this] {
return _stopping || queuedOrSendingCount() < _queue_depth; });
88 .dropped = std::nullopt,
92 _queue.push_back(std::move(
packet));
99 std::unique_lock
lock(_mutex);
100 _cv.wait(
lock, [
this] {
return _queue.empty() && !_send_in_progress; });
106 std::scoped_lock
const lock(_mutex);
107 if (!_started && !_thread.joinable())
116 if (_thread.joinable())
122 std::scoped_lock
const lock(_mutex);
125 _send_in_progress =
false;
133 std::scoped_lock
const lock(_mutex);
134 const auto found = _late_packets.find(stream_id);
135 return found == _late_packets.end() ? 0 :
found->second;
140 std::scoped_lock
const lock(_mutex);
141 const auto found = _sent_packets.find(stream_id);
142 return found == _sent_packets.end() ? 0 :
found->second;
147 std::scoped_lock
const lock(_mutex);
148 const auto found = _send_failures.find(stream_id);
149 return found == _send_failures.end() ? 0 :
found->second;
154 std::scoped_lock
const lock(_mutex);
155 const auto found = _dropped_data_packets.find(stream_id);
156 return found == _dropped_data_packets.end() ? 0 :
found->second;
161 std::scoped_lock
const lock(_mutex);
162 const auto found = _dropped_context_packets.find(stream_id);
163 return found == _dropped_context_packets.end() ? 0 :
found->second;
168 std::scoped_lock
const lock(_mutex);
169 const auto found = _dropped_samples.find(stream_id);
170 return found == _dropped_samples.end() ? 0 :
found->second;
175 std::scoped_lock
const lock(_mutex);
176 auto result = std::move(_pending_dropped_datagrams);
177 _pending_dropped_datagrams.clear();
181 void PacedSender::run()
183 std::unique_lock
lock(_mutex);
192 _cv.wait(
lock, [
this] {
return _stopping || !_queue.empty(); });
196 const auto due = dueTime(_queue.front());
199 auto packet = std::move(_queue.front());
201 _send_in_progress =
true;
202 const auto now = std::chrono::steady_clock::now();
206 _send_in_progress =
false;
211 void PacedSender::waitUntilDue(std::unique_lock<std::mutex>&
lock,
const std::chrono::steady_clock::time_point
due)
215 const auto now = std::chrono::steady_clock::now();
233 while (std::chrono::steady_clock::now() <
due)
241 void PacedSender::sendOneUnlocked(SerializedPacket
packet,
const std::chrono::steady_clock::time_point
now)
246 _sender->send(
packet.bytes);
250 std::scoped_lock
const lock(_mutex);
251 ++_send_failures[
packet.stream_id];
252 recordDroppedUnlocked(
packet);
253 _pending_dropped_datagrams.push_back(makeDroppedDatagram(
packet));
256 std::scoped_lock
const lock(_mutex);
257 ++_sent_packets[
packet.stream_id];
258 if (
now >
due + std::chrono::milliseconds(1))
260 ++_late_packets[
packet.stream_id];
264 void PacedSender::recordDroppedUnlocked(
const SerializedPacket&
packet)
268 ++_dropped_data_packets[
packet.stream_id];
269 _dropped_samples[
packet.stream_id] +=
packet.sample_count;
272 if (
packet.context_packet)
274 ++_dropped_context_packets[
packet.stream_id];
278 DroppedDatagram PacedSender::makeDroppedDatagram(
const SerializedPacket&
packet)
const noexcept
280 return DroppedDatagram{.stream_id =
packet.stream_id,
281 .sample_count =
packet.sample_count,
282 .data_packet =
packet.data_packet,
283 .context_packet =
packet.context_packet};
286 std::size_t PacedSender::queuedOrSendingCount()
const noexcept
288 return _queue.size() + (_send_in_progress ? 1u : 0
u);
291 std::chrono::steady_clock::time_point PacedSender::dueTime(
const SerializedPacket&
packet)
const
293 const auto seconds =
packet.first_sample_time - _simulation_epoch_time;
294 const auto nanos =
static_cast<std::int64_t
>(
seconds * 1'000'000'000.0);
295 return _steady_epoch + std::chrono::nanoseconds(
nanos);