27 const auto now = std::chrono::system_clock::now().time_since_epoch();
28 return static_cast<std::uint64_t
>(std::chrono::duration_cast<std::chrono::nanoseconds>(
now).count());
34 .fractional_picoseconds = timestamp.fractional_picoseconds};
37 [[
nodiscard]] std::optional<core::Vita49Timestamp>
74 std::scoped_lock
const lock(_mutex);
77 throw std::invalid_argument(
"Vita49OutputSink requires VITA output mode");
79 if (
config.vita49.host.empty() ||
config.vita49.port == 0)
81 throw std::invalid_argument(
"VITA output requires destination host and port");
83 if (
config.vita49.queue_depth == 0)
85 throw std::invalid_argument(
"VITA output queue depth must be positive");
89 _simulation_name = std::move(simulation_name);
92 std::make_unique<Vita49Packetizer>(
epoch_ns,
config.vita49.adc_fullscale,
config.vita49.max_udp_payload);
93 _sender = std::make_unique<PacedSender>(
94 _provided_sender ? std::move(_provided_sender) : std::make_unique<UdpSender>(),
config.vita49.queue_depth);
97 _last_stats_emit = std::chrono::steady_clock::time_point::min();
98 _last_packet_trace_emit = std::chrono::steady_clock::now();
99 _pending_packet_traces.clear();
103 emitTelemetry({},
true);
108 std::scoped_lock
const lock(_mutex);
110 if (!_streams.contains(stream_id))
113 state.descriptor = stream;
116 state.stats.stream_id = stream_id;
117 state.stats.mode = stream.
mode.empty() ?
"unknown" : stream.
mode;
120 _streams.emplace(stream_id, std::move(
state));
121 emitTelemetry({},
true);
128 std::scoped_lock
const lock(_mutex);
129 emitContext(stream_id, first_sample_time,
true,
false);
130 stateFor(stream_id).opened =
true;
131 emitTelemetry({},
true);
136 std::scoped_lock
const lock(_mutex);
137 if (!_initialized || !_packetizer)
139 throw std::logic_error(
"VITA output sink has not been initialized");
141 emitTelemetry(consumeSenderDropsLocked(),
false);
143 auto&
state = stateFor(stream_id);
150 auto result = _packetizer->packetize(
block, stream_id,
state.packet_counts,
state.sample_loss_pending);
151 state.sample_loss_pending =
false;
163 if (!enqueuePacket(std::move(
packet)))
168 ++
state.stats.packets_emitted;
169 if (!
state.stats.first_sample_time.has_value())
178 state.over_range_pending =
true;
181 state.stats.over_range_count +=
result.over_range_count;
186 std::scoped_lock
const lock(_mutex);
187 emitTelemetry(consumeSenderDropsLocked(),
false);
188 for (
auto& [stream_id,
state] : _streams)
199 std::scoped_lock
const lock(_mutex);
200 emitTelemetry(consumeSenderDropsLocked(),
false);
201 auto&
state = stateFor(stream_id);
204 emitContext(stream_id,
state.last_context_time,
false,
true);
206 emitTelemetry({},
true);
212 std::scoped_lock
const lock(_mutex);
215 auto stats = snapshotStatsLocked();
216 emitTelemetry({},
true);
223 emitTelemetry(consumeSenderDropsLocked(),
false);
226 for (
auto& [stream_id,
state] : _streams)
230 emitContext(stream_id,
state.last_context_time,
false,
true);
238 emitTelemetry(consumeSenderDropsLocked(),
false);
242 .epoch_unix_nanoseconds = _packetizer
243 ? std::optional<std::uint64_t>(_packetizer->epochUnixNanoseconds())
246 for (
auto& [stream_id,
state] : _streams)
250 state.stats.late_packet_count = _sender->latePacketCount(stream_id);
252 stats.streams.push_back(
state.stats);
255 emitTelemetry({},
true);
261 std::scoped_lock
const lock(_mutex);
262 return snapshotStatsLocked();
265 Vita49OutputSink::StreamState& Vita49OutputSink::stateFor(
const std::uint32_t stream_id)
267 const auto found = _streams.find(stream_id);
268 if (
found == _streams.end())
270 throw std::out_of_range(
"Unknown VITA stream ID");
272 return found->second;
275 const Vita49OutputSink::StreamState& Vita49OutputSink::stateFor(
const std::uint32_t stream_id)
const
277 const auto found = _streams.find(stream_id);
278 if (
found == _streams.end())
280 throw std::out_of_range(
"Unknown VITA stream ID");
282 return found->second;
288 .epoch_unix_nanoseconds = _packetizer
289 ? std::optional<std::uint64_t>(_packetizer->epochUnixNanoseconds())
292 for (
const auto& [stream_id,
state] : _streams)
297 stream_stats.late_packet_count = _sender->latePacketCount(stream_id);
304 std::vector<core::ReceiverOutputPacketTrace> Vita49OutputSink::consumeSenderDropsLocked()
306 std::vector<core::ReceiverOutputPacketTrace>
traces;
312 for (
const auto& dropped : _sender->consumeDroppedDatagrams())
314 applyDropped(dropped);
315 if (_config.vita49.packet_trace_enabled)
317 traces.push_back(makeDropTrace(dropped));
323 bool Vita49OutputSink::enqueuePacket(SerializedPacket&&
packet)
327 throw std::logic_error(
"VITA paced sender is unavailable");
330 std::vector<core::ReceiverOutputPacketTrace>
traces;
331 std::optional<core::ReceiverOutputPacketTrace>
sent_trace;
332 if (_config.vita49.packet_trace_enabled)
336 const auto result = _sender->enqueue(std::move(
packet));
337 if (_config.vita49.packet_trace_enabled &&
result.dropped)
343 applyDropped(*
result.dropped);
349 emitTelemetry(std::move(
traces),
false);
353 void Vita49OutputSink::emitTelemetry(std::vector<core::ReceiverOutputPacketTrace> packets,
const bool force_stats)
355 if (!_telemetry_callback)
360 std::optional<core::OutputStats> stats;
361 const auto now = std::chrono::steady_clock::now();
362 if (
force_stats || _last_stats_emit == std::chrono::steady_clock::time_point::min() ||
365 stats = snapshotStatsLocked();
366 _last_stats_emit =
now;
369 for (
auto&
packet : packets)
371 packet.sequence = ++_trace_sequence;
372 _pending_packet_traces.push_back(std::move(
packet));
375 std::vector<core::ReceiverOutputPacketTrace>
packet_batch;
376 const bool trace_interval_elapsed = _last_packet_trace_emit == std::chrono::steady_clock::time_point::min() ||
378 if (!_pending_packet_traces.empty() &&
382 _pending_packet_traces.clear();
383 _last_packet_trace_emit =
now;
396 .event = std::move(event),
397 .stream_id =
packet.stream_id,
398 .byte_count =
packet.bytes.size(),
399 .sample_count =
packet.sample_count,
400 .first_sample_time =
packet.first_sample_time,
402 .data_packet =
packet.data_packet,
403 .context_packet =
packet.context_packet,
405 .over_range =
packet.over_range,
406 .sample_loss =
packet.sample_loss};
413 .stream_id = dropped.stream_id,
415 .sample_count = dropped.sample_count,
416 .first_sample_time = 0.0,
417 .timestamp = std::nullopt,
418 .data_packet = dropped.data_packet,
419 .context_packet = dropped.context_packet,
422 .sample_loss =
true};
426 const bool stream_open,
const bool stream_close)
430 throw std::logic_error(
"VITA packetizer is unavailable");
432 auto&
state = stateFor(stream_id);
435 const ContextBuildRequest
request{.stream =
state.descriptor,
436 .stream_id = stream_id,
437 .simulation_name = _simulation_name,
438 .adc_fullscale = _packetizer->adcFullscale(),
439 .timestamp = timestamp,
440 .packet_count =
state.packet_counts.next(),
442 .calibrated_time =
true,
443 .reference_lock =
true,
444 .over_range =
state.over_range_pending,
445 .sample_loss =
state.sample_loss_pending,
446 .stream_open = stream_open,
447 .stream_close = stream_close};
448 const auto context = Vita49ContextBuilder::build(
request);
449 auto packet = _packetizer->makeContextPacket(context);
451 if (enqueuePacket(std::move(
packet)))
453 ++
state.stats.context_packets;
456 state.sample_loss_pending =
false;
457 state.over_range_pending =
false;
460 void Vita49OutputSink::applyDropped(
const DroppedDatagram& dropped)
462 if (dropped.stream_id == 0 || !_streams.contains(dropped.stream_id))
466 auto&
state = stateFor(dropped.stream_id);
467 ++
state.stats.packets_dropped;
468 state.stats.samples_dropped += dropped.sample_count;
469 if (dropped.data_packet || (!dropped.context_packet && dropped.sample_count > 0))
471 state.stats.packets_emitted -= std::min<std::uint64_t>(
state.stats.packets_emitted, 1u);
472 state.stats.samples_emitted -= std::min(
state.stats.samples_emitted, dropped.sample_count);
474 else if (dropped.context_packet)
476 state.stats.context_packets -= std::min<std::uint64_t>(
state.stats.context_packets, 1u);
478 state.sample_loss_pending =
true;
481 std::unique_ptr<core::ReceiverOutputSink>
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)
void closeStream(std::uint32_t stream_id) override
core::OutputStats snapshotStats() const override
void emitContextHeartbeat(RealType simulation_time) override
core::OutputStats finalize() override
void openStream(std::uint32_t stream_id, RealType first_sample_time) override
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream) override
void submitBlock(const core::ReceiverSampleBlock &block) override
~Vita49OutputSink() override
void initializeRun(const core::OutputConfig &config, std::string simulation_name) override
Vita49OutputSink(std::unique_ptr< DatagramSender > sender=nullptr, core::ReceiverOutputTelemetryCallback telemetry_callback=nullptr)
double RealType
Type for real numbers.
std::function< void(const std::optional< OutputStats > &, std::span< const ReceiverOutputPacketTrace >)> ReceiverOutputTelemetryCallback
RealType startTime() noexcept
Get the start time for the simulation.
Timestamp timestampFromEpoch(const std::uint64_t epoch_unix_nanoseconds, const RealType sample_time_seconds)
std::unique_ptr< core::ReceiverOutputSink > makeVita49OutputSink(core::ReceiverOutputTelemetryCallback telemetry_callback)
Defines the Parameters struct and provides methods for managing simulation parameters.
std::string receiver_name
RealType reference_frequency
std::uint32_t integer_seconds