FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
serial::vita49::Vita49OutputSink Class Referencefinal

#include "vita49_output_sink.h"

+ Inheritance diagram for serial::vita49::Vita49OutputSink:
+ Collaboration diagram for serial::vita49::Vita49OutputSink:

Public Member Functions

 Vita49OutputSink (std::unique_ptr< DatagramSender > sender=nullptr, core::ReceiverOutputTelemetryCallback telemetry_callback=nullptr)
 
 ~Vita49OutputSink () override
 
 Vita49OutputSink (const Vita49OutputSink &)=delete
 
Vita49OutputSinkoperator= (const Vita49OutputSink &)=delete
 
 Vita49OutputSink (Vita49OutputSink &&)=delete
 
Vita49OutputSinkoperator= (Vita49OutputSink &&)=delete
 
void initializeRun (const core::OutputConfig &config, std::string simulation_name) override
 
std::uint32_t registerStream (const core::ReceiverStreamDescriptor &stream) override
 
void openStream (std::uint32_t stream_id, RealType first_sample_time) override
 
void submitBlock (const core::ReceiverSampleBlock &block) override
 
void emitContextHeartbeat (RealType simulation_time) override
 
void closeStream (std::uint32_t stream_id) override
 
core::OutputStats finalize () override
 
core::OutputStats snapshotStats () const override
 

Detailed Description

Definition at line 25 of file vita49_output_sink.h.

Constructor & Destructor Documentation

◆ Vita49OutputSink() [1/3]

serial::vita49::Vita49OutputSink::Vita49OutputSink ( std::unique_ptr< DatagramSender sender = nullptr,
core::ReceiverOutputTelemetryCallback  telemetry_callback = nullptr 
)
explicit

Definition at line 52 of file vita49_output_sink.cpp.

53 :
54 _telemetry_callback(std::move(telemetry_callback)), _provided_sender(std::move(sender))
55 {
56 }
math::Vec3 max

◆ ~Vita49OutputSink()

serial::vita49::Vita49OutputSink::~Vita49OutputSink ( )
override

Definition at line 58 of file vita49_output_sink.cpp.

59 {
60 if (!_finalized)
61 {
62 try
63 {
64 (void)finalize();
65 }
66 catch (...)
67 {
68 }
69 }
70 }
core::OutputStats finalize() override

References finalize(), and max.

+ Here is the call graph for this function:

◆ Vita49OutputSink() [2/3]

serial::vita49::Vita49OutputSink::Vita49OutputSink ( const Vita49OutputSink )
delete

◆ Vita49OutputSink() [3/3]

serial::vita49::Vita49OutputSink::Vita49OutputSink ( Vita49OutputSink &&  )
delete

Member Function Documentation

◆ closeStream()

void serial::vita49::Vita49OutputSink::closeStream ( std::uint32_t  stream_id)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 197 of file vita49_output_sink.cpp.

198 {
199 std::scoped_lock const lock(_mutex);
200 emitTelemetry(consumeSenderDropsLocked(), false);
201 auto& state = stateFor(stream_id);
202 if (!state.closed)
203 {
204 emitContext(stream_id, state.last_context_time, false, true);
205 state.closed = true;
206 emitTelemetry({}, true);
207 }
208 }

References max.

◆ emitContextHeartbeat()

void serial::vita49::Vita49OutputSink::emitContextHeartbeat ( RealType  simulation_time)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 184 of file vita49_output_sink.cpp.

185 {
186 std::scoped_lock const lock(_mutex);
187 emitTelemetry(consumeSenderDropsLocked(), false);
188 for (auto& [stream_id, state] : _streams)
189 {
190 if (!state.closed && simulation_time - state.last_context_time >= 1.0)
191 {
192 emitContext(stream_id, simulation_time, false, false);
193 }
194 }
195 }

References max.

◆ finalize()

core::OutputStats serial::vita49::Vita49OutputSink::finalize ( )
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 210 of file vita49_output_sink.cpp.

211 {
212 std::scoped_lock const lock(_mutex);
213 if (_finalized)
214 {
215 auto stats = snapshotStatsLocked();
216 emitTelemetry({}, true);
217 return stats;
218 }
219
220 if (_sender)
221 {
222 _sender->flush();
223 emitTelemetry(consumeSenderDropsLocked(), false);
224 }
225
226 for (auto& [stream_id, state] : _streams)
227 {
228 if (!state.closed)
229 {
230 emitContext(stream_id, state.last_context_time, false, true);
231 state.closed = true;
232 }
233 }
234
235 if (_sender)
236 {
237 _sender->stop();
238 emitTelemetry(consumeSenderDropsLocked(), false);
239 }
240
242 .epoch_unix_nanoseconds = _packetizer
243 ? std::optional<std::uint64_t>(_packetizer->epochUnixNanoseconds())
244 : std::nullopt,
245 .streams = {}};
246 for (auto& [stream_id, state] : _streams)
247 {
248 if (_sender)
249 {
250 state.stats.late_packet_count = _sender->latePacketCount(stream_id);
251 }
252 stats.streams.push_back(state.stats);
253 }
254 _finalized = true;
255 emitTelemetry({}, true);
256 return stats;
257 }

References max, core::OutputStats::mode, and core::Vita49Udp.

Referenced by ~Vita49OutputSink().

+ Here is the caller graph for this function:

◆ initializeRun()

void serial::vita49::Vita49OutputSink::initializeRun ( const core::OutputConfig config,
std::string  simulation_name 
)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 72 of file vita49_output_sink.cpp.

73 {
74 std::scoped_lock const lock(_mutex);
76 {
77 throw std::invalid_argument("Vita49OutputSink requires VITA output mode");
78 }
79 if (config.vita49.host.empty() || config.vita49.port == 0)
80 {
81 throw std::invalid_argument("VITA output requires destination host and port");
82 }
83 if (config.vita49.queue_depth == 0)
84 {
85 throw std::invalid_argument("VITA output queue depth must be positive");
86 }
87
88 _config = config;
89 _simulation_name = std::move(simulation_name);
90 const auto epoch_ns = config.vita49.epoch_unix_nanoseconds.value_or(defaultEpochNanoseconds());
91 _packetizer =
92 std::make_unique<Vita49Packetizer>(epoch_ns, config.vita49.adc_fullscale, config.vita49.max_udp_payload);
93 _sender = std::make_unique<PacedSender>(
94 _provided_sender ? std::move(_provided_sender) : std::make_unique<UdpSender>(), config.vita49.queue_depth);
95 _sender->open(config.vita49.host, config.vita49.port);
96 _sender->start(params::startTime());
97 _last_stats_emit = std::chrono::steady_clock::time_point::min();
98 _last_packet_trace_emit = std::chrono::steady_clock::now();
99 _pending_packet_traces.clear();
100 _trace_sequence = 0;
101 _initialized = true;
102 _finalized = false;
103 emitTelemetry({}, true);
104 }
RealType startTime() noexcept
Get the start time for the simulation.
Definition parameters.h:103

References max, params::startTime(), and core::Vita49Udp.

+ Here is the call graph for this function:

◆ openStream()

void serial::vita49::Vita49OutputSink::openStream ( std::uint32_t  stream_id,
RealType  first_sample_time 
)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 126 of file vita49_output_sink.cpp.

127 {
128 std::scoped_lock const lock(_mutex);
129 emitContext(stream_id, first_sample_time, true, false);
130 stateFor(stream_id).opened = true;
131 emitTelemetry({}, true);
132 }

References max.

Referenced by submitBlock().

+ Here is the caller graph for this function:

◆ operator=() [1/2]

Vita49OutputSink & serial::vita49::Vita49OutputSink::operator= ( const Vita49OutputSink )
delete

◆ operator=() [2/2]

Vita49OutputSink & serial::vita49::Vita49OutputSink::operator= ( Vita49OutputSink &&  )
delete

◆ registerStream()

std::uint32_t serial::vita49::Vita49OutputSink::registerStream ( const core::ReceiverStreamDescriptor stream)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 106 of file vita49_output_sink.cpp.

107 {
108 std::scoped_lock const lock(_mutex);
109 const auto stream_id = _registry.registerStream(stream);
110 if (!_streams.contains(stream_id))
111 {
112 StreamState state;
113 state.descriptor = stream;
114 state.stats.receiver_id = stream.receiver_id;
115 state.stats.receiver_name = stream.receiver_name;
116 state.stats.stream_id = stream_id;
117 state.stats.mode = stream.mode.empty() ? "unknown" : stream.mode;
118 state.stats.sample_rate = stream.sample_rate;
119 state.stats.reference_frequency = stream.reference_frequency;
120 _streams.emplace(stream_id, std::move(state));
121 emitTelemetry({}, true);
122 }
123 return stream_id;
124 }
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)

References max, core::ReceiverStreamDescriptor::mode, core::ReceiverStreamDescriptor::receiver_id, core::ReceiverStreamDescriptor::receiver_name, core::ReceiverStreamDescriptor::reference_frequency, serial::vita49::StreamRegistry::registerStream(), and core::ReceiverStreamDescriptor::sample_rate.

Referenced by submitBlock().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ snapshotStats()

core::OutputStats serial::vita49::Vita49OutputSink::snapshotStats ( ) const
overridevirtual

Reimplemented from core::ReceiverOutputSink.

Definition at line 259 of file vita49_output_sink.cpp.

260 {
261 std::scoped_lock const lock(_mutex);
262 return snapshotStatsLocked();
263 }

References max.

◆ submitBlock()

void serial::vita49::Vita49OutputSink::submitBlock ( const core::ReceiverSampleBlock block)
overridevirtual

Implements core::ReceiverOutputSink.

Definition at line 134 of file vita49_output_sink.cpp.

135 {
136 std::scoped_lock const lock(_mutex);
137 if (!_initialized || !_packetizer)
138 {
139 throw std::logic_error("VITA output sink has not been initialized");
140 }
141 emitTelemetry(consumeSenderDropsLocked(), false);
142 const auto stream_id = registerStream(block.stream);
143 auto& state = stateFor(stream_id);
144 if (!state.opened)
145 {
146 openStream(stream_id, block.first_sample_time);
147 }
148
149 const RealType sample_rate = block.sample_rate > 0.0 ? block.sample_rate : state.stats.sample_rate;
150 auto result = _packetizer->packetize(block, stream_id, state.packet_counts, state.sample_loss_pending);
151 state.sample_loss_pending = false;
152 for (auto& packet : result.packets)
153 {
154 const auto packet_sample_count = packet.sample_count;
155 const auto packet_first_sample_time = packet.first_sample_time;
156 const auto packet_end_sample_time = sample_rate > 0.0
157 ? packet_first_sample_time + static_cast<RealType>(packet_sample_count) / sample_rate
159 const auto packet_over_range = packet.over_range;
160 const auto packet_timestamp = toCoreTimestamp(packet.timestamp);
161 const auto packet_end_timestamp =
162 tryCoreTimestampFromEpoch(_packetizer->epochUnixNanoseconds(), packet_end_sample_time);
163 if (!enqueuePacket(std::move(packet)))
164 {
165 continue;
166 }
167 state.stats.samples_emitted += packet_sample_count;
168 ++state.stats.packets_emitted;
169 if (!state.stats.first_sample_time.has_value())
170 {
171 state.stats.first_sample_time = packet_first_sample_time;
172 state.stats.first_timestamp = packet_timestamp;
173 }
174 state.stats.end_sample_time = packet_end_sample_time;
175 state.stats.end_timestamp = packet_end_timestamp;
177 {
178 state.over_range_pending = true;
179 }
180 }
181 state.stats.over_range_count += result.over_range_count;
182 }
void openStream(std::uint32_t stream_id, RealType first_sample_time) override
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream) override
double RealType
Type for real numbers.
Definition config.h:27

References max, openStream(), and registerStream().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: