FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
serial::Hdf5OutputSink::Impl Struct Reference

Classes

struct  StreamState
 

Public Member Functions

 Impl (std::string out_dir, std::shared_ptr< core::OutputMetadataCollector > collector)
 
void initializeRun (const core::OutputConfig &config, const std::string &)
 
std::uint32_t registerStream (const core::ReceiverStreamDescriptor &stream)
 
void openStream (const std::uint32_t stream_id, const RealType)
 
void submitBlock (const core::ReceiverSampleBlock &block)
 
void closeStream (const std::uint32_t stream_id)
 
core::OutputStats finalize ()
 
std::string outputPath (const std::string &receiver_name) const
 
StreamStatestateFor (const std::uint32_t stream_id)
 
void writePulsedBlock (StreamState &state, const core::ReceiverSampleBlock &block)
 
void appendStreamingBlock (StreamState &state, const core::ReceiverSampleBlock &block)
 
void closePulsedStream (StreamState &state)
 
void closeStreamingStream (StreamState &state)
 

Static Public Member Functions

static bool isPulsed (const StreamState &state)
 
static std::size_t expectedStreamingSamples (const RealType sample_rate)
 
static std::string streamKey (const core::ReceiverStreamDescriptor &stream)
 

Public Attributes

std::string output_dir
 
std::shared_ptr< core::OutputMetadataCollectormetadata_collector
 
std::recursive_mutex mutex
 
std::unordered_map< std::uint32_t, StreamStatestreams
 
std::unordered_map< std::string, std::uint32_t > stream_ids
 
std::uint32_t next_stream_id = 1
 
bool finalized = false
 

Detailed Description

Definition at line 60 of file hdf5_output_sink.cpp.

Constructor & Destructor Documentation

◆ Impl()

serial::Hdf5OutputSink::Impl::Impl ( std::string  out_dir,
std::shared_ptr< core::OutputMetadataCollector collector 
)

Definition at line 74 of file hdf5_output_sink.cpp.

74 :
75 output_dir(std::move(out_dir)), metadata_collector(std::move(collector))
76 {
77 }
math::Vec3 max
std::shared_ptr< core::OutputMetadataCollector > metadata_collector

Member Function Documentation

◆ appendStreamingBlock()

void serial::Hdf5OutputSink::Impl::appendStreamingBlock ( StreamState state,
const core::ReceiverSampleBlock block 
)

Definition at line 260 of file hdf5_output_sink.cpp.

261 {
262 const auto sample_start = static_cast<std::size_t>(block.sample_start);
263 const auto sample_end = sample_start + block.samples.size();
264 if (state.streaming_buffer.size() < sample_end)
265 {
266 state.streaming_buffer.resize(sample_end);
267 }
268 std::ranges::copy(block.samples,
269 std::next(state.streaming_buffer.begin(),
270 static_cast<std::vector<ComplexType>::difference_type>(sample_start)));
271 }

References max.

Referenced by submitBlock().

+ Here is the caller graph for this function:

◆ closePulsedStream()

void serial::Hdf5OutputSink::Impl::closePulsedStream ( StreamState state)

Definition at line 273 of file hdf5_output_sink.cpp.

274 {
275 if (!state.pulsed_file)
276 {
277 return;
278 }
280 {
281 std::scoped_lock const hdf5_lock(hdf5_global_mutex);
282 writeOutputFileMetadataAttributes(*state.pulsed_file, state.metadata);
283 state.pulsed_file.reset();
284 }
286 {
287 metadata_collector->addFile(state.metadata);
288 }
289 }
std::mutex hdf5_global_mutex
Global mutex to protect all HDF5 C-library calls, which are not thread-safe.
void writeOutputFileMetadataAttributes(HighFive::File &file, const core::OutputFileMetadata &metadata)
Writes additive FERS output metadata attributes to an open HDF5 file.

References serial::hdf5_global_mutex, max, metadata_collector, and serial::writeOutputFileMetadataAttributes().

Referenced by closeStream().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ closeStream()

void serial::Hdf5OutputSink::Impl::closeStream ( const std::uint32_t  stream_id)

Definition at line 160 of file hdf5_output_sink.cpp.

161 {
162 std::scoped_lock const lock(mutex);
163 auto& state = stateFor(stream_id);
164 if (state.closed)
165 {
166 return;
167 }
168
169 if (isPulsed(state))
170 {
172 }
173 else
174 {
176 }
177 state.closed = true;
178 }
void closeStreamingStream(StreamState &state)
StreamState & stateFor(const std::uint32_t stream_id)
static bool isPulsed(const StreamState &state)
void closePulsedStream(StreamState &state)

References closePulsedStream(), closeStreamingStream(), isPulsed(), max, mutex, and stateFor().

Referenced by finalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ closeStreamingStream()

void serial::Hdf5OutputSink::Impl::closeStreamingStream ( StreamState state)

Definition at line 291 of file hdf5_output_sink.cpp.

292 {
293 if (state.streaming_buffer.empty())
294 {
295 return;
296 }
297
298 auto metadata = state.streaming_metadata.value_or(
299 core::OutputFileMetadata{.receiver_id = state.descriptor.receiver_id,
300 .receiver_name = state.descriptor.receiver_name,
301 .mode = state.descriptor.mode,
302 .path = state.metadata.path,
303 .sampling_rate = state.descriptor.sample_rate});
304 metadata.path = state.metadata.path;
305 metadata.sampling_rate = state.descriptor.sample_rate;
306 metadata.total_samples = static_cast<std::uint64_t>(state.streaming_buffer.size());
307 metadata.sample_start = 0;
308 metadata.sample_end_exclusive = static_cast<std::uint64_t>(state.streaming_buffer.size());
309
310 const RealType fullscale = processing::quantizeAndScaleWindow(state.streaming_buffer);
311 processing::pipeline::exportStreamingToHdf5(state.metadata.path, state.streaming_buffer, fullscale,
312 state.descriptor.reference_frequency, &metadata,
313 state.descriptor.sample_rate);
315 {
316 metadata_collector->addFile(std::move(metadata));
317 }
318 }
double RealType
Type for real numbers.
Definition config.h:27
void exportStreamingToHdf5(const std::string &filename, const std::vector< ComplexType > &iq_buffer, const RealType fullscale, const RealType ref_freq, const core::OutputFileMetadata *metadata, const RealType sample_rate)
Exports a finalized streaming IQ buffer to an HDF5 file.
RealType quantizeAndScaleWindow(std::span< ComplexType > window)
Simulates ADC quantization and scales a window of complex I/Q samples.
Metadata for one receiver output file.
SimId receiver_id
Receiver SimId that owns the output file.

References processing::pipeline::exportStreamingToHdf5(), max, metadata_collector, core::OutputFileMetadata::path, processing::quantizeAndScaleWindow(), core::OutputFileMetadata::receiver_id, core::OutputFileMetadata::sample_end_exclusive, core::OutputFileMetadata::sample_start, core::OutputFileMetadata::sampling_rate, and core::OutputFileMetadata::total_samples.

Referenced by closeStream().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ expectedStreamingSamples()

static std::size_t serial::Hdf5OutputSink::Impl::expectedStreamingSamples ( const RealType  sample_rate)
static

Definition at line 205 of file hdf5_output_sink.cpp.

206 {
207 if (sample_rate <= 0.0)
208 {
209 return 0u;
210 }
211 const RealType duration = std::max<RealType>(0.0, params::endTime() - params::startTime());
212 return static_cast<std::size_t>(std::ceil(duration * sample_rate));
213 }
RealType endTime() noexcept
Get the end time for the simulation.
Definition parameters.h:109
RealType startTime() noexcept
Get the start time for the simulation.
Definition parameters.h:103

References params::endTime(), max, and params::startTime().

Referenced by openStream().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ finalize()

core::OutputStats serial::Hdf5OutputSink::Impl::finalize ( )

Definition at line 180 of file hdf5_output_sink.cpp.

181 {
182 std::scoped_lock const lock(mutex);
183 if (!finalized)
184 {
185 std::vector<std::uint32_t> ids;
186 ids.reserve(streams.size());
187 for (const auto& [stream_id, state] : streams)
188 {
189 if (state.opened && !state.closed)
190 {
191 ids.push_back(stream_id);
192 }
193 }
194 for (const auto stream_id : ids)
195 {
196 closeStream(stream_id);
197 }
198 finalized = true;
199 }
200 return core::OutputStats{.mode = core::OutputMode::Hdf5, .streams = {}};
201 }
void closeStream(const std::uint32_t stream_id)
std::unordered_map< std::uint32_t, StreamState > streams

References closeStream(), finalized, core::Hdf5, max, core::OutputStats::mode, mutex, and streams.

+ Here is the call graph for this function:

◆ initializeRun()

void serial::Hdf5OutputSink::Impl::initializeRun ( const core::OutputConfig config,
const std::string &   
)

Definition at line 79 of file hdf5_output_sink.cpp.

80 {
82 {
83 throw std::invalid_argument("Hdf5OutputSink requires HDF5 output mode");
84 }
85 std::scoped_lock const lock(mutex);
86 std::filesystem::create_directories(output_dir);
87 finalized = false;
88 }

References finalized, core::Hdf5, max, mutex, and output_dir.

◆ isPulsed()

static bool serial::Hdf5OutputSink::Impl::isPulsed ( const StreamState state)
static

Definition at line 203 of file hdf5_output_sink.cpp.

203{ return state.descriptor.mode == "pulsed"; }

References max.

Referenced by closeStream(), openStream(), and submitBlock().

+ Here is the caller graph for this function:

◆ openStream()

void serial::Hdf5OutputSink::Impl::openStream ( const std::uint32_t  stream_id,
const RealType   
)

Definition at line 112 of file hdf5_output_sink.cpp.

113 {
114 std::scoped_lock const lock(mutex);
115 auto& state = stateFor(stream_id);
116 if (state.opened)
117 {
118 return;
119 }
120 std::filesystem::create_directories(output_dir);
121 if (isPulsed(state))
122 {
123 std::scoped_lock const hdf5_lock(hdf5_global_mutex);
124 state.pulsed_file = std::make_unique<HighFive::File>(state.metadata.path, HighFive::File::Truncate);
125 }
126 else
127 {
128 const auto expected_samples = expectedStreamingSamples(state.descriptor.sample_rate);
129 if (expected_samples > 0u)
130 {
131 state.streaming_buffer.resize(expected_samples);
132 }
133 }
134 state.opened = true;
135 state.closed = false;
136 }
static std::size_t expectedStreamingSamples(const RealType sample_rate)

References expectedStreamingSamples(), serial::hdf5_global_mutex, isPulsed(), max, mutex, output_dir, and stateFor().

Referenced by submitBlock().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ outputPath()

std::string serial::Hdf5OutputSink::Impl::outputPath ( const std::string &  receiver_name) const

Definition at line 220 of file hdf5_output_sink.cpp.

221 {
222 const std::filesystem::path out_path(output_dir);
223 return (out_path / std::format("{}_results.h5", receiver_name)).string();
224 }

References max, and output_dir.

Referenced by registerStream().

+ Here is the caller graph for this function:

◆ registerStream()

std::uint32_t serial::Hdf5OutputSink::Impl::registerStream ( const core::ReceiverStreamDescriptor stream)

Definition at line 90 of file hdf5_output_sink.cpp.

91 {
92 std::scoped_lock const lock(mutex);
93 const auto key = streamKey(stream);
94 if (const auto found = stream_ids.find(key); found != stream_ids.end())
95 {
96 return found->second;
97 }
98
99 const auto stream_id = next_stream_id++;
100 StreamState state;
101 state.descriptor = stream;
103 .receiver_name = stream.receiver_name,
104 .mode = stream.mode,
105 .path = outputPath(stream.receiver_name),
106 .sampling_rate = stream.sample_rate};
107 stream_ids.emplace(key, stream_id);
108 streams.emplace(stream_id, std::move(state));
109 return stream_id;
110 }
static std::string streamKey(const core::ReceiverStreamDescriptor &stream)
std::string outputPath(const std::string &receiver_name) const
std::unordered_map< std::string, std::uint32_t > stream_ids

References max, core::ReceiverStreamDescriptor::mode, mutex, next_stream_id, outputPath(), core::OutputFileMetadata::receiver_id, core::ReceiverStreamDescriptor::receiver_id, core::ReceiverStreamDescriptor::receiver_name, core::ReceiverStreamDescriptor::sample_rate, stream_ids, streamKey(), and streams.

Referenced by submitBlock().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ stateFor()

StreamState & serial::Hdf5OutputSink::Impl::stateFor ( const std::uint32_t  stream_id)

Definition at line 226 of file hdf5_output_sink.cpp.

227 {
228 const auto found = streams.find(stream_id);
229 if (found == streams.end())
230 {
231 throw std::out_of_range("Unknown HDF5 output stream ID");
232 }
233 return found->second;
234 }

References max, and streams.

Referenced by closeStream(), openStream(), and submitBlock().

+ Here is the caller graph for this function:

◆ streamKey()

static std::string serial::Hdf5OutputSink::Impl::streamKey ( const core::ReceiverStreamDescriptor stream)
static

Definition at line 215 of file hdf5_output_sink.cpp.

216 {
217 return std::format("{}:{}:{}", stream.receiver_id, stream.receiver_name, stream.mode);
218 }

References core::ReceiverStreamDescriptor::mode, core::ReceiverStreamDescriptor::receiver_id, and core::ReceiverStreamDescriptor::receiver_name.

Referenced by registerStream().

+ Here is the caller graph for this function:

◆ submitBlock()

void serial::Hdf5OutputSink::Impl::submitBlock ( const core::ReceiverSampleBlock block)

Definition at line 138 of file hdf5_output_sink.cpp.

139 {
140 std::scoped_lock const lock(mutex);
141 const auto stream_id = registerStream(block.stream);
142 auto& state = stateFor(stream_id);
143 if (!state.opened)
144 {
145 openStream(stream_id, block.first_sample_time);
146 }
147 if (block.file_metadata)
148 {
149 state.streaming_metadata = *block.file_metadata;
150 }
151
152 if (isPulsed(state))
153 {
155 return;
156 }
158 }
void openStream(const std::uint32_t stream_id, const RealType)
void writePulsedBlock(StreamState &state, const core::ReceiverSampleBlock &block)
void appendStreamingBlock(StreamState &state, const core::ReceiverSampleBlock &block)
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)

References appendStreamingBlock(), isPulsed(), max, mutex, openStream(), registerStream(), stateFor(), and writePulsedBlock().

+ Here is the call graph for this function:

◆ writePulsedBlock()

void serial::Hdf5OutputSink::Impl::writePulsedBlock ( StreamState state,
const core::ReceiverSampleBlock block 
)

Definition at line 236 of file hdf5_output_sink.cpp.

237 {
238 if (!state.pulsed_file)
239 {
240 throw std::logic_error("HDF5 pulsed stream is not open");
241 }
242
243 std::vector<ComplexType> chunk(block.samples.begin(), block.samples.end());
245 const auto chunk_index = state.next_chunk_index++;
246 const auto sample_start = state.metadata.total_samples;
247 core::PulseChunkMetadata chunk_metadata{.chunk_index = chunk_index,
248 .i_dataset = std::format("chunk_{:06}_I", chunk_index),
249 .q_dataset = std::format("chunk_{:06}_Q", chunk_index),
250 .start_time = block.first_sample_time,
251 .sample_count = static_cast<std::uint64_t>(chunk.size()),
252 .sample_start = sample_start,
253 .sample_end_exclusive =
254 sample_start + static_cast<std::uint64_t>(chunk.size())};
255 addChunkToFile(*state.pulsed_file, chunk, block.first_sample_time, fullscale, chunk_index, &chunk_metadata);
256 state.metadata.chunks.push_back(std::move(chunk_metadata));
257 state.metadata.total_samples = state.metadata.chunks.back().sample_end_exclusive;
258 }
void addChunkToFile(HighFive::File &file, const std::vector< ComplexType > &data, const RealType time, const RealType fullscale, const unsigned count, const core::PulseChunkMetadata *metadata)
Adds a chunk of data to an HDF5 file.
Metadata for one pulsed output chunk written to HDF5.

References serial::addChunkToFile(), max, and processing::quantizeAndScaleWindow().

Referenced by submitBlock().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ finalized

bool serial::Hdf5OutputSink::Impl::finalized = false

Definition at line 326 of file hdf5_output_sink.cpp.

Referenced by finalize(), and initializeRun().

◆ metadata_collector

std::shared_ptr<core::OutputMetadataCollector> serial::Hdf5OutputSink::Impl::metadata_collector

Definition at line 321 of file hdf5_output_sink.cpp.

Referenced by closePulsedStream(), and closeStreamingStream().

◆ mutex

std::recursive_mutex serial::Hdf5OutputSink::Impl::mutex

◆ next_stream_id

std::uint32_t serial::Hdf5OutputSink::Impl::next_stream_id = 1

Definition at line 325 of file hdf5_output_sink.cpp.

Referenced by registerStream().

◆ output_dir

std::string serial::Hdf5OutputSink::Impl::output_dir

Definition at line 320 of file hdf5_output_sink.cpp.

Referenced by initializeRun(), openStream(), and outputPath().

◆ stream_ids

std::unordered_map<std::string, std::uint32_t> serial::Hdf5OutputSink::Impl::stream_ids

Definition at line 324 of file hdf5_output_sink.cpp.

Referenced by registerStream().

◆ streams

std::unordered_map<std::uint32_t, StreamState> serial::Hdf5OutputSink::Impl::streams

Definition at line 323 of file hdf5_output_sink.cpp.

Referenced by finalize(), registerStream(), and stateFor().


The documentation for this struct was generated from the following file: