13#include <highfive/highfive.hpp>
19#include <unordered_map>
41 for (
const auto&
chunk : metadata.chunks)
48 if (!metadata.
chunks.empty())
52 { return chunk.sample_count == expected; });
83 throw std::invalid_argument(
"Hdf5OutputSink requires HDF5 output mode");
86 std::filesystem::create_directories(
output_dir);
101 state.descriptor = stream;
120 std::filesystem::create_directories(
output_dir);
124 state.pulsed_file = std::make_unique<HighFive::File>(
state.metadata.path, HighFive::File::Truncate);
135 state.closed =
false;
147 if (
block.file_metadata)
149 state.streaming_metadata = *
block.file_metadata;
185 std::vector<std::uint32_t>
ids;
191 ids.push_back(stream_id);
194 for (
const auto stream_id :
ids)
207 if (sample_rate <= 0.0)
212 return static_cast<std::size_t
>(std::ceil(duration * sample_rate));
223 return (
out_path / std::format(
"{}_results.h5", receiver_name)).string();
231 throw std::out_of_range(
"Unknown HDF5 output stream ID");
233 return found->second;
238 if (!
state.pulsed_file)
240 throw std::logic_error(
"HDF5 pulsed stream is not open");
243 std::vector<ComplexType>
chunk(
block.samples.begin(),
block.samples.end());
245 const auto chunk_index =
state.next_chunk_index++;
246 const auto sample_start =
state.metadata.total_samples;
248 .i_dataset = std::format(
"chunk_{:06}_I", chunk_index),
249 .q_dataset = std::format(
"chunk_{:06}_Q", chunk_index),
250 .start_time =
block.first_sample_time,
251 .sample_count =
static_cast<std::uint64_t
>(
chunk.size()),
252 .sample_start = sample_start,
253 .sample_end_exclusive =
254 sample_start +
static_cast<std::uint64_t
>(
chunk.size())};
257 state.metadata.total_samples =
state.metadata.chunks.back().sample_end_exclusive;
262 const auto sample_start =
static_cast<std::size_t
>(
block.sample_start);
268 std::ranges::copy(
block.samples,
269 std::next(
state.streaming_buffer.begin(),
270 static_cast<std::vector<ComplexType>::difference_type
>(sample_start)));
275 if (!
state.pulsed_file)
283 state.pulsed_file.reset();
293 if (
state.streaming_buffer.empty())
298 auto metadata =
state.streaming_metadata.value_or(
300 .receiver_name =
state.descriptor.receiver_name,
301 .mode =
state.descriptor.mode,
302 .path =
state.metadata.path,
303 .sampling_rate =
state.descriptor.sample_rate});
312 state.descriptor.reference_frequency, &metadata,
313 state.descriptor.sample_rate);
323 std::unordered_map<std::uint32_t, StreamState>
streams;
330 std::shared_ptr<core::OutputMetadataCollector> metadata_collector) :
341 (
void)_impl->finalize();
351 _impl->initializeRun(
config, simulation_name);
356 return _impl->registerStream(stream);
361 _impl->openStream(stream_id, first_sample_time);
372 std::unique_ptr<core::ReceiverOutputSink>
373 makeHdf5OutputSink(std::string output_dir, std::shared_ptr<core::OutputMetadataCollector> metadata_collector)
375 return std::make_unique<Hdf5OutputSink>(std::move(output_dir), std::move(metadata_collector));
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream) override
void emitContextHeartbeat(RealType simulation_time) override
Hdf5OutputSink(std::string output_dir, std::shared_ptr< core::OutputMetadataCollector > metadata_collector=nullptr)
void initializeRun(const core::OutputConfig &config, std::string simulation_name) override
void submitBlock(const core::ReceiverSampleBlock &block) override
~Hdf5OutputSink() override
core::OutputStats finalize() override
void openStream(std::uint32_t stream_id, RealType first_sample_time) override
void closeStream(std::uint32_t stream_id) override
double RealType
Type for real numbers.
Declares focused, testable pipeline steps for receiver finalization.
Header file for HDF5 data export and import functions.
RealType endTime() noexcept
Get the end time for the simulation.
RealType startTime() noexcept
Get the start time for the simulation.
void exportStreamingToHdf5(const std::string &filename, const std::vector< ComplexType > &iq_buffer, const RealType fullscale, const RealType ref_freq, const core::OutputFileMetadata *metadata, const RealType sample_rate)
Exports a finalized streaming IQ buffer to an HDF5 file.
RealType quantizeAndScaleWindow(std::span< ComplexType > window)
Simulates ADC quantization and scales a window of complex I/Q samples.
std::mutex hdf5_global_mutex
Global mutex to protect all HDF5 C-library calls, which are not thread-safe.
void writeOutputFileMetadataAttributes(HighFive::File &file, const core::OutputFileMetadata &metadata)
Writes additive FERS output metadata attributes to an open HDF5 file.
void addChunkToFile(HighFive::File &file, const std::vector< ComplexType > &data, const RealType time, const RealType fullscale, const unsigned count, const core::PulseChunkMetadata *metadata)
Adds a chunk of data to an HDF5 file.
std::unique_ptr< core::ReceiverOutputSink > makeHdf5OutputSink(std::string output_dir, std::shared_ptr< core::OutputMetadataCollector > metadata_collector)
Defines the Parameters struct and provides methods for managing simulation parameters.
Header for receiver-side signal processing and rendering.
std::string receiver_name
std::optional< core::OutputFileMetadata > streaming_metadata
std::unique_ptr< HighFive::File > pulsed_file
std::vector< ComplexType > streaming_buffer
unsigned next_chunk_index
core::ReceiverStreamDescriptor descriptor
core::OutputFileMetadata metadata
static std::string streamKey(const core::ReceiverStreamDescriptor &stream)
void initializeRun(const core::OutputConfig &config, const std::string &)
void openStream(const std::uint32_t stream_id, const RealType)
void writePulsedBlock(StreamState &state, const core::ReceiverSampleBlock &block)
void appendStreamingBlock(StreamState &state, const core::ReceiverSampleBlock &block)
Impl(std::string out_dir, std::shared_ptr< core::OutputMetadataCollector > collector)
std::string outputPath(const std::string &receiver_name) const
void closeStreamingStream(StreamState &state)
StreamState & stateFor(const std::uint32_t stream_id)
std::shared_ptr< core::OutputMetadataCollector > metadata_collector
void closeStream(const std::uint32_t stream_id)
core::OutputStats finalize()
static std::size_t expectedStreamingSamples(const RealType sample_rate)
std::recursive_mutex mutex
std::unordered_map< std::string, std::uint32_t > stream_ids
void submitBlock(const core::ReceiverSampleBlock &block)
static bool isPulsed(const StreamState &state)
void closePulsedStream(StreamState &state)
std::uint32_t registerStream(const core::ReceiverStreamDescriptor &stream)
std::uint32_t next_stream_id
std::unordered_map< std::uint32_t, StreamState > streams