FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
hdf5_output_sink.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-2.0-only
2//
3// Copyright (c) 2026-present FERS Contributors (see AUTHORS.md).
4//
5// See the GNU GPLv2 LICENSE file in the FERS project root for more information.
6
8
9#include <algorithm>
10#include <cmath>
11#include <filesystem>
12#include <format>
13#include <highfive/highfive.hpp>
14#include <iterator>
15#include <limits>
16#include <mutex>
17#include <optional>
18#include <stdexcept>
19#include <unordered_map>
20#include <utility>
21#include <vector>
22
24#include "core/parameters.h"
27#include "serial/hdf5_handler.h"
28
29namespace serial
30{
31 namespace
32 {
34 {
35 metadata.pulse_count = static_cast<std::uint64_t>(metadata.chunks.size());
36 metadata.total_samples = 0;
37 metadata.min_pulse_length_samples = metadata.chunks.empty() ? 0 : std::numeric_limits<std::uint64_t>::max();
38 metadata.max_pulse_length_samples = 0;
39 metadata.uniform_pulse_length = true;
40
41 for (const auto& chunk : metadata.chunks)
42 {
43 metadata.total_samples += chunk.sample_count;
44 metadata.min_pulse_length_samples = std::min(metadata.min_pulse_length_samples, chunk.sample_count);
45 metadata.max_pulse_length_samples = std::max(metadata.max_pulse_length_samples, chunk.sample_count);
46 }
47
48 if (!metadata.chunks.empty())
49 {
50 const auto expected = metadata.chunks.front().sample_count;
51 metadata.uniform_pulse_length = std::ranges::all_of(metadata.chunks, [expected](const auto& chunk)
52 { return chunk.sample_count == expected; });
53 }
54
55 metadata.sample_start = 0;
56 metadata.sample_end_exclusive = metadata.total_samples;
57 }
58 }
59
61 {
63 {
66 std::vector<ComplexType> streaming_buffer;
67 std::optional<core::OutputFileMetadata> streaming_metadata;
68 std::unique_ptr<HighFive::File> pulsed_file;
69 unsigned next_chunk_index = 0;
70 bool opened = false;
71 bool closed = false;
72 };
73
74 Impl(std::string out_dir, std::shared_ptr<core::OutputMetadataCollector> collector) :
76 {
77 }
78
79 void initializeRun(const core::OutputConfig& config, const std::string& /*simulation_name*/)
80 {
82 {
83 throw std::invalid_argument("Hdf5OutputSink requires HDF5 output mode");
84 }
85 std::scoped_lock const lock(mutex);
86 std::filesystem::create_directories(output_dir);
87 finalized = false;
88 }
89
91 {
92 std::scoped_lock const lock(mutex);
93 const auto key = streamKey(stream);
94 if (const auto found = stream_ids.find(key); found != stream_ids.end())
95 {
96 return found->second;
97 }
98
99 const auto stream_id = next_stream_id++;
101 state.descriptor = stream;
103 .receiver_name = stream.receiver_name,
104 .mode = stream.mode,
105 .path = outputPath(stream.receiver_name),
106 .sampling_rate = stream.sample_rate};
107 stream_ids.emplace(key, stream_id);
108 streams.emplace(stream_id, std::move(state));
109 return stream_id;
110 }
111
112 void openStream(const std::uint32_t stream_id, const RealType /*first_sample_time*/)
113 {
114 std::scoped_lock const lock(mutex);
115 auto& state = stateFor(stream_id);
116 if (state.opened)
117 {
118 return;
119 }
120 std::filesystem::create_directories(output_dir);
121 if (isPulsed(state))
122 {
123 std::scoped_lock const hdf5_lock(hdf5_global_mutex);
124 state.pulsed_file = std::make_unique<HighFive::File>(state.metadata.path, HighFive::File::Truncate);
125 }
126 else
127 {
128 const auto expected_samples = expectedStreamingSamples(state.descriptor.sample_rate);
129 if (expected_samples > 0u)
130 {
131 state.streaming_buffer.resize(expected_samples);
132 }
133 }
134 state.opened = true;
135 state.closed = false;
136 }
137
139 {
140 std::scoped_lock const lock(mutex);
141 const auto stream_id = registerStream(block.stream);
142 auto& state = stateFor(stream_id);
143 if (!state.opened)
144 {
145 openStream(stream_id, block.first_sample_time);
146 }
147 if (block.file_metadata)
148 {
149 state.streaming_metadata = *block.file_metadata;
150 }
151
152 if (isPulsed(state))
153 {
155 return;
156 }
158 }
159
160 void closeStream(const std::uint32_t stream_id)
161 {
162 std::scoped_lock const lock(mutex);
163 auto& state = stateFor(stream_id);
164 if (state.closed)
165 {
166 return;
167 }
168
169 if (isPulsed(state))
170 {
172 }
173 else
174 {
176 }
177 state.closed = true;
178 }
179
181 {
182 std::scoped_lock const lock(mutex);
183 if (!finalized)
184 {
185 std::vector<std::uint32_t> ids;
186 ids.reserve(streams.size());
187 for (const auto& [stream_id, state] : streams)
188 {
189 if (state.opened && !state.closed)
190 {
191 ids.push_back(stream_id);
192 }
193 }
194 for (const auto stream_id : ids)
195 {
196 closeStream(stream_id);
197 }
198 finalized = true;
199 }
200 return core::OutputStats{.mode = core::OutputMode::Hdf5, .streams = {}};
201 }
202
203 static bool isPulsed(const StreamState& state) { return state.descriptor.mode == "pulsed"; }
204
205 static std::size_t expectedStreamingSamples(const RealType sample_rate)
206 {
207 if (sample_rate <= 0.0)
208 {
209 return 0u;
210 }
211 const RealType duration = std::max<RealType>(0.0, params::endTime() - params::startTime());
212 return static_cast<std::size_t>(std::ceil(duration * sample_rate));
213 }
214
215 static std::string streamKey(const core::ReceiverStreamDescriptor& stream)
216 {
217 return std::format("{}:{}:{}", stream.receiver_id, stream.receiver_name, stream.mode);
218 }
219
220 [[nodiscard]] std::string outputPath(const std::string& receiver_name) const
221 {
222 const std::filesystem::path out_path(output_dir);
223 return (out_path / std::format("{}_results.h5", receiver_name)).string();
224 }
225
226 StreamState& stateFor(const std::uint32_t stream_id)
227 {
228 const auto found = streams.find(stream_id);
229 if (found == streams.end())
230 {
231 throw std::out_of_range("Unknown HDF5 output stream ID");
232 }
233 return found->second;
234 }
235
237 {
238 if (!state.pulsed_file)
239 {
240 throw std::logic_error("HDF5 pulsed stream is not open");
241 }
242
243 std::vector<ComplexType> chunk(block.samples.begin(), block.samples.end());
245 const auto chunk_index = state.next_chunk_index++;
246 const auto sample_start = state.metadata.total_samples;
247 core::PulseChunkMetadata chunk_metadata{.chunk_index = chunk_index,
248 .i_dataset = std::format("chunk_{:06}_I", chunk_index),
249 .q_dataset = std::format("chunk_{:06}_Q", chunk_index),
250 .start_time = block.first_sample_time,
251 .sample_count = static_cast<std::uint64_t>(chunk.size()),
252 .sample_start = sample_start,
253 .sample_end_exclusive =
254 sample_start + static_cast<std::uint64_t>(chunk.size())};
255 addChunkToFile(*state.pulsed_file, chunk, block.first_sample_time, fullscale, chunk_index, &chunk_metadata);
256 state.metadata.chunks.push_back(std::move(chunk_metadata));
257 state.metadata.total_samples = state.metadata.chunks.back().sample_end_exclusive;
258 }
259
261 {
262 const auto sample_start = static_cast<std::size_t>(block.sample_start);
263 const auto sample_end = sample_start + block.samples.size();
264 if (state.streaming_buffer.size() < sample_end)
265 {
266 state.streaming_buffer.resize(sample_end);
267 }
268 std::ranges::copy(block.samples,
269 std::next(state.streaming_buffer.begin(),
270 static_cast<std::vector<ComplexType>::difference_type>(sample_start)));
271 }
272
274 {
275 if (!state.pulsed_file)
276 {
277 return;
278 }
280 {
281 std::scoped_lock const hdf5_lock(hdf5_global_mutex);
282 writeOutputFileMetadataAttributes(*state.pulsed_file, state.metadata);
283 state.pulsed_file.reset();
284 }
286 {
287 metadata_collector->addFile(state.metadata);
288 }
289 }
290
292 {
293 if (state.streaming_buffer.empty())
294 {
295 return;
296 }
297
298 auto metadata = state.streaming_metadata.value_or(
299 core::OutputFileMetadata{.receiver_id = state.descriptor.receiver_id,
300 .receiver_name = state.descriptor.receiver_name,
301 .mode = state.descriptor.mode,
302 .path = state.metadata.path,
303 .sampling_rate = state.descriptor.sample_rate});
304 metadata.path = state.metadata.path;
305 metadata.sampling_rate = state.descriptor.sample_rate;
306 metadata.total_samples = static_cast<std::uint64_t>(state.streaming_buffer.size());
307 metadata.sample_start = 0;
308 metadata.sample_end_exclusive = static_cast<std::uint64_t>(state.streaming_buffer.size());
309
310 const RealType fullscale = processing::quantizeAndScaleWindow(state.streaming_buffer);
311 processing::pipeline::exportStreamingToHdf5(state.metadata.path, state.streaming_buffer, fullscale,
312 state.descriptor.reference_frequency, &metadata,
313 state.descriptor.sample_rate);
315 {
316 metadata_collector->addFile(std::move(metadata));
317 }
318 }
319
320 std::string output_dir;
321 std::shared_ptr<core::OutputMetadataCollector> metadata_collector;
322 std::recursive_mutex mutex;
323 std::unordered_map<std::uint32_t, StreamState> streams;
324 std::unordered_map<std::string, std::uint32_t> stream_ids;
325 std::uint32_t next_stream_id = 1;
326 bool finalized = false;
327 };
328
329 Hdf5OutputSink::Hdf5OutputSink(std::string output_dir,
330 std::shared_ptr<core::OutputMetadataCollector> metadata_collector) :
331 _impl(std::make_unique<Impl>(std::move(output_dir), std::move(metadata_collector)))
332 {
333 }
334
336 {
337 if (_impl)
338 {
339 try
340 {
341 (void)_impl->finalize();
342 }
343 catch (...)
344 {
345 }
346 }
347 }
348
349 void Hdf5OutputSink::initializeRun(const core::OutputConfig& config, std::string simulation_name)
350 {
351 _impl->initializeRun(config, simulation_name);
352 }
353
355 {
356 return _impl->registerStream(stream);
357 }
358
359 void Hdf5OutputSink::openStream(const std::uint32_t stream_id, const RealType first_sample_time)
360 {
361 _impl->openStream(stream_id, first_sample_time);
362 }
363
365
366 void Hdf5OutputSink::emitContextHeartbeat(const RealType /*simulation_time*/) {}
367
368 void Hdf5OutputSink::closeStream(const std::uint32_t stream_id) { _impl->closeStream(stream_id); }
369
370 core::OutputStats Hdf5OutputSink::finalize() { return _impl->finalize(); }
371
372 std::unique_ptr<core::ReceiverOutputSink>
373 makeHdf5OutputSink(std::string output_dir, std::shared_ptr<core::OutputMetadataCollector> metadata_collector)
374 {
375 return std::make_unique<Hdf5OutputSink>(std::move(output_dir), std::move(metadata_collector));
376 }
377}
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream) override
void emitContextHeartbeat(RealType simulation_time) override
Hdf5OutputSink(std::string output_dir, std::shared_ptr< core::OutputMetadataCollector > metadata_collector=nullptr)
void initializeRun(const core::OutputConfig &config, std::string simulation_name) override
void submitBlock(const core::ReceiverSampleBlock &block) override
core::OutputStats finalize() override
void openStream(std::uint32_t stream_id, RealType first_sample_time) override
void closeStream(std::uint32_t stream_id) override
double RealType
Type for real numbers.
Definition config.h:27
Declares focused, testable pipeline steps for receiver finalization.
Header file for HDF5 data export and import functions.
RealType endTime() noexcept
Get the end time for the simulation.
Definition parameters.h:109
RealType startTime() noexcept
Get the start time for the simulation.
Definition parameters.h:103
void exportStreamingToHdf5(const std::string &filename, const std::vector< ComplexType > &iq_buffer, const RealType fullscale, const RealType ref_freq, const core::OutputFileMetadata *metadata, const RealType sample_rate)
Exports a finalized streaming IQ buffer to an HDF5 file.
RealType quantizeAndScaleWindow(std::span< ComplexType > window)
Simulates ADC quantization and scales a window of complex I/Q samples.
std::mutex hdf5_global_mutex
Global mutex to protect all HDF5 C-library calls, which are not thread-safe.
void writeOutputFileMetadataAttributes(HighFive::File &file, const core::OutputFileMetadata &metadata)
Writes additive FERS output metadata attributes to an open HDF5 file.
void addChunkToFile(HighFive::File &file, const std::vector< ComplexType > &data, const RealType time, const RealType fullscale, const unsigned count, const core::PulseChunkMetadata *metadata)
Adds a chunk of data to an HDF5 file.
std::unique_ptr< core::ReceiverOutputSink > makeHdf5OutputSink(std::string output_dir, std::shared_ptr< core::OutputMetadataCollector > metadata_collector)
Defines the Parameters struct and provides methods for managing simulation parameters.
Header for receiver-side signal processing and rendering.
math::Vec3 max
Metadata for one receiver output file.
std::uint64_t max_pulse_length_samples
Maximum pulse length in samples.
std::uint64_t total_samples
Total sample count written to the file.
std::string path
Filesystem path to the generated output file.
RealType sampling_rate
Sample rate for this output file in hertz.
std::vector< PulseChunkMetadata > chunks
Pulsed output chunks written to the file.
SimId receiver_id
Receiver SimId that owns the output file.
std::uint64_t sample_start
Inclusive global sample index for the file start.
std::uint64_t sample_end_exclusive
Exclusive global sample index for the file end.
std::uint64_t pulse_count
Number of pulses represented in the file.
std::uint64_t min_pulse_length_samples
Minimum pulse length in samples.
bool uniform_pulse_length
True when every pulse has the same sample length.
Metadata for one pulsed output chunk written to HDF5.
std::optional< core::OutputFileMetadata > streaming_metadata
std::unique_ptr< HighFive::File > pulsed_file
core::ReceiverStreamDescriptor descriptor
static std::string streamKey(const core::ReceiverStreamDescriptor &stream)
void initializeRun(const core::OutputConfig &config, const std::string &)
void openStream(const std::uint32_t stream_id, const RealType)
void writePulsedBlock(StreamState &state, const core::ReceiverSampleBlock &block)
void appendStreamingBlock(StreamState &state, const core::ReceiverSampleBlock &block)
Impl(std::string out_dir, std::shared_ptr< core::OutputMetadataCollector > collector)
std::string outputPath(const std::string &receiver_name) const
void closeStreamingStream(StreamState &state)
StreamState & stateFor(const std::uint32_t stream_id)
std::shared_ptr< core::OutputMetadataCollector > metadata_collector
void closeStream(const std::uint32_t stream_id)
static std::size_t expectedStreamingSamples(const RealType sample_rate)
std::unordered_map< std::string, std::uint32_t > stream_ids
void submitBlock(const core::ReceiverSampleBlock &block)
static bool isPulsed(const StreamState &state)
void closePulsedStream(StreamState &state)
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)
std::unordered_map< std::uint32_t, StreamState > streams