FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
vita49_output_sink.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-2.0-only
2//
3// Copyright (c) 2026-present FERS Contributors (see AUTHORS.md).
4//
5// See the GNU GPLv2 LICENSE file in the FERS project root for more information.
6
8
9#include <algorithm>
10#include <chrono>
11#include <optional>
12#include <stdexcept>
13
14#include "core/parameters.h"
16
17namespace serial::vita49
18{
19 namespace
20 {
21 constexpr auto kStatsEmitInterval = std::chrono::milliseconds(250);
22 constexpr auto kPacketTraceEmitInterval = std::chrono::milliseconds(250);
23 constexpr std::size_t kPacketTraceBatchSize = 64;
24
25 [[nodiscard]] std::uint64_t defaultEpochNanoseconds()
26 {
27 const auto now = std::chrono::system_clock::now().time_since_epoch();
28 return static_cast<std::uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(now).count());
29 }
30
31 [[nodiscard]] core::Vita49Timestamp toCoreTimestamp(const Timestamp& timestamp) noexcept
32 {
33 return core::Vita49Timestamp{.integer_seconds = timestamp.integer_seconds,
34 .fractional_picoseconds = timestamp.fractional_picoseconds};
35 }
36
37 [[nodiscard]] std::optional<core::Vita49Timestamp>
38 tryCoreTimestampFromEpoch(const std::uint64_t epoch_unix_nanoseconds,
39 const RealType sample_time_seconds) noexcept
40 {
41 try
42 {
43 return toCoreTimestamp(timestampFromEpoch(epoch_unix_nanoseconds, sample_time_seconds));
44 }
45 catch (...)
46 {
47 return std::nullopt;
48 }
49 }
50 }
51
52 Vita49OutputSink::Vita49OutputSink(std::unique_ptr<DatagramSender> sender,
54 _telemetry_callback(std::move(telemetry_callback)), _provided_sender(std::move(sender))
55 {
56 }
57
59 {
60 if (!_finalized)
61 {
62 try
63 {
64 (void)finalize();
65 }
66 catch (...)
67 {
68 }
69 }
70 }
71
72 void Vita49OutputSink::initializeRun(const core::OutputConfig& config, std::string simulation_name)
73 {
74 std::scoped_lock const lock(_mutex);
76 {
77 throw std::invalid_argument("Vita49OutputSink requires VITA output mode");
78 }
79 if (config.vita49.host.empty() || config.vita49.port == 0)
80 {
81 throw std::invalid_argument("VITA output requires destination host and port");
82 }
83 if (config.vita49.queue_depth == 0)
84 {
85 throw std::invalid_argument("VITA output queue depth must be positive");
86 }
87
88 _config = config;
89 _simulation_name = std::move(simulation_name);
90 const auto epoch_ns = config.vita49.epoch_unix_nanoseconds.value_or(defaultEpochNanoseconds());
91 _packetizer =
92 std::make_unique<Vita49Packetizer>(epoch_ns, config.vita49.adc_fullscale, config.vita49.max_udp_payload);
93 _sender = std::make_unique<PacedSender>(
94 _provided_sender ? std::move(_provided_sender) : std::make_unique<UdpSender>(), config.vita49.queue_depth);
95 _sender->open(config.vita49.host, config.vita49.port);
96 _sender->start(params::startTime());
97 _last_stats_emit = std::chrono::steady_clock::time_point::min();
98 _last_packet_trace_emit = std::chrono::steady_clock::now();
99 _pending_packet_traces.clear();
100 _trace_sequence = 0;
101 _initialized = true;
102 _finalized = false;
103 emitTelemetry({}, true);
104 }
105
107 {
108 std::scoped_lock const lock(_mutex);
109 const auto stream_id = _registry.registerStream(stream);
110 if (!_streams.contains(stream_id))
111 {
112 StreamState state;
113 state.descriptor = stream;
114 state.stats.receiver_id = stream.receiver_id;
115 state.stats.receiver_name = stream.receiver_name;
116 state.stats.stream_id = stream_id;
117 state.stats.mode = stream.mode.empty() ? "unknown" : stream.mode;
118 state.stats.sample_rate = stream.sample_rate;
119 state.stats.reference_frequency = stream.reference_frequency;
120 _streams.emplace(stream_id, std::move(state));
121 emitTelemetry({}, true);
122 }
123 return stream_id;
124 }
125
126 void Vita49OutputSink::openStream(const std::uint32_t stream_id, const RealType first_sample_time)
127 {
128 std::scoped_lock const lock(_mutex);
129 emitContext(stream_id, first_sample_time, true, false);
130 stateFor(stream_id).opened = true;
131 emitTelemetry({}, true);
132 }
133
135 {
136 std::scoped_lock const lock(_mutex);
137 if (!_initialized || !_packetizer)
138 {
139 throw std::logic_error("VITA output sink has not been initialized");
140 }
141 emitTelemetry(consumeSenderDropsLocked(), false);
142 const auto stream_id = registerStream(block.stream);
143 auto& state = stateFor(stream_id);
144 if (!state.opened)
145 {
146 openStream(stream_id, block.first_sample_time);
147 }
148
149 const RealType sample_rate = block.sample_rate > 0.0 ? block.sample_rate : state.stats.sample_rate;
150 auto result = _packetizer->packetize(block, stream_id, state.packet_counts, state.sample_loss_pending);
151 state.sample_loss_pending = false;
152 for (auto& packet : result.packets)
153 {
154 const auto packet_sample_count = packet.sample_count;
155 const auto packet_first_sample_time = packet.first_sample_time;
156 const auto packet_end_sample_time = sample_rate > 0.0
157 ? packet_first_sample_time + static_cast<RealType>(packet_sample_count) / sample_rate
159 const auto packet_over_range = packet.over_range;
160 const auto packet_timestamp = toCoreTimestamp(packet.timestamp);
161 const auto packet_end_timestamp =
162 tryCoreTimestampFromEpoch(_packetizer->epochUnixNanoseconds(), packet_end_sample_time);
163 if (!enqueuePacket(std::move(packet)))
164 {
165 continue;
166 }
167 state.stats.samples_emitted += packet_sample_count;
168 ++state.stats.packets_emitted;
169 if (!state.stats.first_sample_time.has_value())
170 {
171 state.stats.first_sample_time = packet_first_sample_time;
172 state.stats.first_timestamp = packet_timestamp;
173 }
174 state.stats.end_sample_time = packet_end_sample_time;
175 state.stats.end_timestamp = packet_end_timestamp;
177 {
178 state.over_range_pending = true;
179 }
180 }
181 state.stats.over_range_count += result.over_range_count;
182 }
183
185 {
186 std::scoped_lock const lock(_mutex);
187 emitTelemetry(consumeSenderDropsLocked(), false);
188 for (auto& [stream_id, state] : _streams)
189 {
190 if (!state.closed && simulation_time - state.last_context_time >= 1.0)
191 {
192 emitContext(stream_id, simulation_time, false, false);
193 }
194 }
195 }
196
197 void Vita49OutputSink::closeStream(const std::uint32_t stream_id)
198 {
199 std::scoped_lock const lock(_mutex);
200 emitTelemetry(consumeSenderDropsLocked(), false);
201 auto& state = stateFor(stream_id);
202 if (!state.closed)
203 {
204 emitContext(stream_id, state.last_context_time, false, true);
205 state.closed = true;
206 emitTelemetry({}, true);
207 }
208 }
209
211 {
212 std::scoped_lock const lock(_mutex);
213 if (_finalized)
214 {
215 auto stats = snapshotStatsLocked();
216 emitTelemetry({}, true);
217 return stats;
218 }
219
220 if (_sender)
221 {
222 _sender->flush();
223 emitTelemetry(consumeSenderDropsLocked(), false);
224 }
225
226 for (auto& [stream_id, state] : _streams)
227 {
228 if (!state.closed)
229 {
230 emitContext(stream_id, state.last_context_time, false, true);
231 state.closed = true;
232 }
233 }
234
235 if (_sender)
236 {
237 _sender->stop();
238 emitTelemetry(consumeSenderDropsLocked(), false);
239 }
240
242 .epoch_unix_nanoseconds = _packetizer
243 ? std::optional<std::uint64_t>(_packetizer->epochUnixNanoseconds())
244 : std::nullopt,
245 .streams = {}};
246 for (auto& [stream_id, state] : _streams)
247 {
248 if (_sender)
249 {
250 state.stats.late_packet_count = _sender->latePacketCount(stream_id);
251 }
252 stats.streams.push_back(state.stats);
253 }
254 _finalized = true;
255 emitTelemetry({}, true);
256 return stats;
257 }
258
260 {
261 std::scoped_lock const lock(_mutex);
262 return snapshotStatsLocked();
263 }
264
265 Vita49OutputSink::StreamState& Vita49OutputSink::stateFor(const std::uint32_t stream_id)
266 {
267 const auto found = _streams.find(stream_id);
268 if (found == _streams.end())
269 {
270 throw std::out_of_range("Unknown VITA stream ID");
271 }
272 return found->second;
273 }
274
275 const Vita49OutputSink::StreamState& Vita49OutputSink::stateFor(const std::uint32_t stream_id) const
276 {
277 const auto found = _streams.find(stream_id);
278 if (found == _streams.end())
279 {
280 throw std::out_of_range("Unknown VITA stream ID");
281 }
282 return found->second;
283 }
284
285 core::OutputStats Vita49OutputSink::snapshotStatsLocked() const
286 {
288 .epoch_unix_nanoseconds = _packetizer
289 ? std::optional<std::uint64_t>(_packetizer->epochUnixNanoseconds())
290 : std::nullopt,
291 .streams = {}};
292 for (const auto& [stream_id, state] : _streams)
293 {
294 auto stream_stats = state.stats;
295 if (_sender)
296 {
297 stream_stats.late_packet_count = _sender->latePacketCount(stream_id);
298 }
299 stats.streams.push_back(std::move(stream_stats));
300 }
301 return stats;
302 }
303
304 std::vector<core::ReceiverOutputPacketTrace> Vita49OutputSink::consumeSenderDropsLocked()
305 {
306 std::vector<core::ReceiverOutputPacketTrace> traces;
307 if (!_sender)
308 {
309 return traces;
310 }
311
312 for (const auto& dropped : _sender->consumeDroppedDatagrams())
313 {
314 applyDropped(dropped);
315 if (_config.vita49.packet_trace_enabled)
316 {
317 traces.push_back(makeDropTrace(dropped));
318 }
319 }
320 return traces;
321 }
322
323 bool Vita49OutputSink::enqueuePacket(SerializedPacket&& packet)
324 {
325 if (!_sender)
326 {
327 throw std::logic_error("VITA paced sender is unavailable");
328 }
329
330 std::vector<core::ReceiverOutputPacketTrace> traces;
331 std::optional<core::ReceiverOutputPacketTrace> sent_trace;
332 if (_config.vita49.packet_trace_enabled)
333 {
334 sent_trace = makeTrace(packet, packet.context_packet ? "context" : "data");
335 }
336 const auto result = _sender->enqueue(std::move(packet));
337 if (_config.vita49.packet_trace_enabled && result.dropped)
338 {
339 traces.push_back(makeDropTrace(*result.dropped));
340 }
341 if (result.dropped)
342 {
343 applyDropped(*result.dropped);
344 }
345 if (result.enqueued && sent_trace.has_value())
346 {
347 traces.push_back(std::move(*sent_trace));
348 }
349 emitTelemetry(std::move(traces), false);
350 return result.enqueued;
351 }
352
353 void Vita49OutputSink::emitTelemetry(std::vector<core::ReceiverOutputPacketTrace> packets, const bool force_stats)
354 {
355 if (!_telemetry_callback)
356 {
357 return;
358 }
359
360 std::optional<core::OutputStats> stats;
361 const auto now = std::chrono::steady_clock::now();
362 if (force_stats || _last_stats_emit == std::chrono::steady_clock::time_point::min() ||
363 now - _last_stats_emit >= kStatsEmitInterval)
364 {
365 stats = snapshotStatsLocked();
366 _last_stats_emit = now;
367 }
368
369 for (auto& packet : packets)
370 {
371 packet.sequence = ++_trace_sequence;
372 _pending_packet_traces.push_back(std::move(packet));
373 }
374
375 std::vector<core::ReceiverOutputPacketTrace> packet_batch;
376 const bool trace_interval_elapsed = _last_packet_trace_emit == std::chrono::steady_clock::time_point::min() ||
377 now - _last_packet_trace_emit >= kPacketTraceEmitInterval;
378 if (!_pending_packet_traces.empty() &&
379 (force_stats || _pending_packet_traces.size() >= kPacketTraceBatchSize || trace_interval_elapsed))
380 {
381 packet_batch = std::move(_pending_packet_traces);
382 _pending_packet_traces.clear();
383 _last_packet_trace_emit = now;
384 }
385
386 if (!stats.has_value() && packet_batch.empty())
387 {
388 return;
389 }
390 _telemetry_callback(stats, packet_batch);
391 }
392
393 core::ReceiverOutputPacketTrace Vita49OutputSink::makeTrace(const SerializedPacket& packet, std::string event) const
394 {
396 .event = std::move(event),
397 .stream_id = packet.stream_id,
398 .byte_count = packet.bytes.size(),
399 .sample_count = packet.sample_count,
400 .first_sample_time = packet.first_sample_time,
401 .timestamp = toCoreTimestamp(packet.timestamp),
402 .data_packet = packet.data_packet,
403 .context_packet = packet.context_packet,
404 .dropped = false,
405 .over_range = packet.over_range,
406 .sample_loss = packet.sample_loss};
407 }
408
409 core::ReceiverOutputPacketTrace Vita49OutputSink::makeDropTrace(const DroppedDatagram& dropped) const
410 {
412 .event = "drop",
413 .stream_id = dropped.stream_id,
414 .byte_count = 0,
415 .sample_count = dropped.sample_count,
416 .first_sample_time = 0.0,
417 .timestamp = std::nullopt,
418 .data_packet = dropped.data_packet,
419 .context_packet = dropped.context_packet,
420 .dropped = true,
421 .over_range = false,
422 .sample_loss = true};
423 }
424
425 void Vita49OutputSink::emitContext(const std::uint32_t stream_id, const RealType simulation_time,
426 const bool stream_open, const bool stream_close)
427 {
428 if (!_packetizer)
429 {
430 throw std::logic_error("VITA packetizer is unavailable");
431 }
432 auto& state = stateFor(stream_id);
433 const RealType context_time = simulation_time <= -1.0e200 ? 0.0 : simulation_time;
434 const auto timestamp = timestampFromEpoch(_packetizer->epochUnixNanoseconds(), context_time);
435 const ContextBuildRequest request{.stream = state.descriptor,
436 .stream_id = stream_id,
437 .simulation_name = _simulation_name,
438 .adc_fullscale = _packetizer->adcFullscale(),
439 .timestamp = timestamp,
440 .packet_count = state.packet_counts.next(),
441 .valid_data = true,
442 .calibrated_time = true,
443 .reference_lock = true,
444 .over_range = state.over_range_pending,
445 .sample_loss = state.sample_loss_pending,
446 .stream_open = stream_open,
447 .stream_close = stream_close};
448 const auto context = Vita49ContextBuilder::build(request);
449 auto packet = _packetizer->makeContextPacket(context);
450 packet.first_sample_time = context_time;
451 if (enqueuePacket(std::move(packet)))
452 {
453 ++state.stats.context_packets;
454 }
455 state.last_context_time = context_time;
456 state.sample_loss_pending = false;
457 state.over_range_pending = false;
458 }
459
460 void Vita49OutputSink::applyDropped(const DroppedDatagram& dropped)
461 {
462 if (dropped.stream_id == 0 || !_streams.contains(dropped.stream_id))
463 {
464 return;
465 }
466 auto& state = stateFor(dropped.stream_id);
467 ++state.stats.packets_dropped;
468 state.stats.samples_dropped += dropped.sample_count;
469 if (dropped.data_packet || (!dropped.context_packet && dropped.sample_count > 0))
470 {
471 state.stats.packets_emitted -= std::min<std::uint64_t>(state.stats.packets_emitted, 1u);
472 state.stats.samples_emitted -= std::min(state.stats.samples_emitted, dropped.sample_count);
473 }
474 else if (dropped.context_packet)
475 {
476 state.stats.context_packets -= std::min<std::uint64_t>(state.stats.context_packets, 1u);
477 }
478 state.sample_loss_pending = true;
479 }
480
481 std::unique_ptr<core::ReceiverOutputSink>
483 {
484 return std::make_unique<Vita49OutputSink>(nullptr, std::move(telemetry_callback));
485 }
486}
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)
void closeStream(std::uint32_t stream_id) override
core::OutputStats snapshotStats() const override
void emitContextHeartbeat(RealType simulation_time) override
core::OutputStats finalize() override
void openStream(std::uint32_t stream_id, RealType first_sample_time) override
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream) override
void submitBlock(const core::ReceiverSampleBlock &block) override
void initializeRun(const core::OutputConfig &config, std::string simulation_name) override
Vita49OutputSink(std::unique_ptr< DatagramSender > sender=nullptr, core::ReceiverOutputTelemetryCallback telemetry_callback=nullptr)
double RealType
Type for real numbers.
Definition config.h:27
std::function< void(const std::optional< OutputStats > &, std::span< const ReceiverOutputPacketTrace >)> ReceiverOutputTelemetryCallback
RealType startTime() noexcept
Get the start time for the simulation.
Definition parameters.h:103
Timestamp timestampFromEpoch(const std::uint64_t epoch_unix_nanoseconds, const RealType sample_time_seconds)
std::unique_ptr< core::ReceiverOutputSink > makeVita49OutputSink(core::ReceiverOutputTelemetryCallback telemetry_callback)
Defines the Parameters struct and provides methods for managing simulation parameters.
math::Vec3 max
std::uint32_t integer_seconds