FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
sim_threading.h
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-2.0-only
2//
3// Copyright (c) 2006-2008 Marc Brooker and Michael Inggs
4// Copyright (c) 2008-present FERS Contributors (see AUTHORS.md).
5//
6// See the GNU GPLv2 LICENSE file in the FERS project root for more information.
7
8/**
9 * @file sim_threading.h
10 * @brief Header file for the main simulation runner.
11 *
12 * This file contains the declarations for the high-level function and engine that
13 * orchestrates and manages the event-driven radar simulation.
14 */
15
16#pragma once
17
18#include <chrono>
19#include <cstdint>
20#include <functional>
21#include <memory>
22#include <mutex>
23#include <optional>
24#include <span>
25#include <string>
26#include <thread>
27#include <vector>
28
29#include "core/config.h"
30#include "core/output_config.h"
32#include "core/parameters.h"
34#include "core/sim_events.h"
36#include "signal/dsp_filters.h"
38
39namespace pool
40{
41 class ThreadPool;
42}
43
44namespace radar
45{
46 class Receiver;
47 class Target;
48 class Transmitter;
49}
50
51namespace serial
52{
53 class Response;
54}
55
56namespace core
57{
58 class World;
59 class ReceiverOutputSink;
60
61 /**
62 * @class ProgressReporter
63 * @brief A thread-safe wrapper for the simulation progress callback.
64 *
65 * Allows multiple worker threads to report progress concurrently without race conditions.
66 */
68 {
69 public:
70 /**
71 * @typedef Callback
72 * @brief Defines the signature for the progress reporting callback function.
73 */
74 using Callback = std::function<void(const std::string&, int, int)>;
75
76 /**
77 * @brief Constructs a ProgressReporter with the given callback.
78 * @param cb The callback function to wrap.
79 */
80 explicit ProgressReporter(Callback cb) : _callback(std::move(cb)) {}
81
82 /**
83 * @brief Safely reports progress to the underlying callback.
84 * @param msg The status message to report.
85 * @param current The current progress value.
86 * @param total The total progress value.
87 */
88 void report(const std::string& msg, int current, int total)
89 {
90 if (_callback)
91 {
92 std::scoped_lock const lock(_mutex);
93 _callback(msg, current, total);
94 }
95 }
96
97 private:
98 std::mutex _mutex; ///< Mutex to ensure thread-safe access to the callback.
99 Callback _callback; ///< The underlying callback function.
100 };
101
102 /**
103 * @class SimulationEngine
104 * @brief Encapsulates the state and logic of the event-driven simulation loop.
105 *
106 * Breaking the simulation loop into this class allows for easily testable,
107 * focused functions with low cyclomatic complexity.
108 */
110 {
111 public:
112 /**
113 * @brief Constructs the simulation engine.
114 * @param world Pointer to the simulation world containing all entities.
115 * @param pool Reference to the thread pool for asynchronous tasks.
116 * @param reporter Shared pointer to the thread-safe progress reporter.
117 * @param output_dir Output directory for the simulation files.
118 */
119 SimulationEngine(World* world, pool::ThreadPool& pool, std::shared_ptr<ProgressReporter> reporter,
120 std::string output_dir, std::shared_ptr<OutputMetadataCollector> metadata_collector = nullptr,
121 ReceiverOutputSink* output_sink = nullptr, std::function<bool()> cancel_callback = nullptr,
122 bool eager_context_stream_open = false);
123
124 /**
125 * @brief Starts and runs the main simulation loop until completion.
126 */
127 void run();
128
129 /// Returns true after cooperative cancellation has been requested.
130 [[nodiscard]] bool cancelled() const noexcept { return _cancelled; }
131
132 /**
133 * @brief Advances the time-stepped inner loop for active streaming systems.
134 * @param t_event The timestamp of the next discrete event to process up to.
135 */
137
138 /**
139 * @brief Dispatches a discrete simulation event to its specific handler.
140 * @param event The event to process.
141 */
142 void processEvent(const Event& event);
143
144 /**
145 * @brief Handles the start of a pulsed transmission.
146 * @param tx Pointer to the transmitting radar object.
147 * @param t_event The timestamp of the transmission event.
148 */
150
151 /**
152 * @brief Handles the opening of a pulsed receiver's listening window.
153 * @param rx Pointer to the receiving radar object.
154 * @param t_event The timestamp of the window opening event.
155 */
157
158 /**
159 * @brief Handles the closing of a pulsed receiver's listening window, triggering finalization.
160 * @param rx Pointer to the receiving radar object.
161 * @param t_event The timestamp of the window closing event.
162 */
164
165 /**
166 * @brief Handles a streaming transmitter turning on.
167 * @param tx Pointer to the transmitting radar object.
168 */
170
171 /**
172 * @brief Handles a streaming transmitter turning off.
173 * @param tx Pointer to the transmitting radar object.
174 */
176
177 /**
178 * @brief Handles a streaming receiver starting to record.
179 * @param rx Pointer to the receiving radar object.
180 */
182
183 /**
184 * @brief Handles a streaming receiver stopping recording.
185 * @param rx Pointer to the receiving radar object.
186 */
188
189 private:
190 /// Calculates one streaming I/Q sample for the receiver at the specified time step.
191 [[nodiscard]] ComplexType calculateStreamingSample(radar::Receiver* rx, RealType t_step,
192 const std::vector<ActiveStreamingSource>& streaming_sources,
194
195 /// Adds tracker storage for a newly active streaming source.
196 void appendStreamingTrackerSource();
197
198 /// Removes tracker storage for a streaming source that has ended.
199 void eraseStreamingTrackerSource(std::size_t source_index);
200
201 /// Removes ended streaming sources once no future receiver sample can observe their in-flight energy.
202 void cleanupInactiveStreamingSources(RealType from_time);
203
204 /// Returns the next time at which ended streaming sources can be cleaned up.
205 [[nodiscard]] std::optional<RealType> nextStreamingCleanupDeadline(RealType from_time);
206
207 /// Returns the next streaming chunk boundary before the target event time.
208 [[nodiscard]] RealType streamingChunkEnd(RealType from_time, RealType event_time);
209
210 /// Returns true when cooperative cancellation should stop the current streaming chunk.
211 [[nodiscard]] bool shouldStopStreamingChunk(std::size_t sample_index, std::size_t chunk_start_index);
212
213 /// Processes one simulation sample for all active streaming receivers.
214 void processStreamingSample(std::size_t sample_index, std::size_t first_index, std::size_t final_index,
216
217 /// Adds one sample to every active streaming receiver.
218 void appendActiveReceiverStreamingSamples(std::size_t sample_index, RealType t_step);
219
220 /// Adds one sample to a single active streaming receiver.
221 void appendReceiverStreamingSample(std::size_t receiver_index, std::size_t sample_index, RealType t_step);
222
223 /// Builds and attaches IF resampling sinks for configured FMCW receivers.
224 void initializeFmcwIfResamplers();
225
226 /// Builds and attaches one receiver IF resampler when configured.
227 void initializeFmcwIfResampler(std::size_t receiver_index);
228
229 /// Extends resolved dechirp source spans to cover IF resampler over-render.
230 void extendDechirpSourcesForIfOverrender();
231
232 /// Flushes all pending high-rate samples buffered for IF resamplers.
233 void flushFmcwIfBlocks();
234
235 /// Flushes one receiver's pending IF-resampling high-rate block.
236 void flushFmcwIfBlock(std::size_t receiver_index);
237
238 /// Adds one high-rate sample to a receiver's IF-resampling block buffer.
239 void appendFmcwIfSample(std::size_t receiver_index, RealType t_step, ComplexType sample);
240
241 /// Adds one raw streaming sample to the live output block buffer.
242 void appendStreamingOutputSample(std::size_t receiver_index, std::size_t sample_index, RealType t_step,
243 ComplexType sample);
244
245 /// Flushes all pending live streaming output blocks.
246 void flushStreamingOutputBlocks();
247
248 /// Flushes one receiver's pending live streaming output block.
249 void flushStreamingOutputBlock(std::size_t receiver_index, bool finish_downsampler = false);
250
251 /// Emits an already processed streaming block to the selected output sink.
252 void emitStreamingOutputBlock(std::size_t receiver_index, RealType first_sample_time, RealType sample_rate,
253 std::span<const ComplexType> samples, std::uint64_t sample_start);
254
255 /// Returns the sink-visible sample rate for one live streaming receiver.
256 [[nodiscard]] RealType streamingOutputSampleRate(std::size_t receiver_index) const;
257
258 /// Registers and opens one live streaming receiver before delayed sample flushes.
259 void ensureStreamingOutputStreamOpen(std::size_t receiver_index, RealType first_sample_time,
260 RealType sample_rate);
261
262 /// Returns an initialized stateful downsampler for one streaming receiver segment.
264 streamingDownsampler(std::size_t receiver_index, std::uint64_t input_start_index, RealType segment_start_time);
265
266 /// Emits sink heartbeats on the continuous simulation clock up to the given time.
267 void emitContextHeartbeatsThrough(RealType simulation_time);
268
269 /// Returns the latest conservative receive time at which an ended source must still be retained.
270 [[nodiscard]] std::optional<RealType> streamingSourceCleanupDeadline(const ActiveStreamingSource& source,
271 RealType from_time) const;
272
273 /// Returns the latest conservative receive time at which one receiver may still observe a source.
274 [[nodiscard]] std::optional<RealType> receiverCleanupDeadline(const ActiveStreamingSource& source,
275 const radar::Receiver* rx,
276 RealType from_time) const;
277
278 /// Creates the CW phase-noise lookup if any active timing source needs it.
279 void ensureCwPhaseNoiseLookup();
280
281 /// Returns the dechirp reference mixer for a receiver at one sample time.
282 [[nodiscard]] std::optional<ComplexType> calculateDechirpMixer(radar::Receiver* rx, RealType t_step,
284
285 /// Adds pulsed interference into a completed high-rate IF block before resampling.
286 void applyPulsedInterferenceToFmcwIfBlock(std::size_t receiver_index, std::span<ComplexType> block,
288
289 /// Adds pulsed interference into a live streaming block before final noise/scaling.
290 void applyPulsedInterferenceToStreamingBlock(std::size_t receiver_index, std::span<ComplexType> block,
291 RealType block_start_time, RealType sample_rate, bool dechirp_mix);
292
293 /// Adds one rendered pulsed-interference slice into a streaming block.
294 void addPulsedInterferenceSamples(std::span<ComplexType> block, std::span<const ComplexType> rendered_pulse,
295 long long dest_begin, long long dest_end, std::size_t crop_offset,
298
299 /// Emits summary logs for streaming receiver configuration.
300 void logStreamingSummaries() const;
301
302 /**
303 * @brief Starts dedicated finalizer threads for all pulsed receivers.
304 */
305 void initializeFinalizers();
306
307 /**
308 * @brief Routes a calculated radar response to the appropriate receiver inbox or log.
309 * @param rx Pointer to the receiving radar object.
310 * @param response The calculated response to route.
311 */
312 void routeResponse(radar::Receiver* rx, std::unique_ptr<serial::Response> response) const;
313
314 /**
315 * @brief Throttles and emits progress updates to the reporter.
316 */
317 void updateProgress();
318
319 /**
320 * @brief Throttles and emits progress updates at an explicit simulation time.
321 */
322 void reportSimulationProgress(RealType t_current);
323
324 /// Polls the host cancellation callback and latches the result.
325 [[nodiscard]] bool isCancellationRequested();
326
327 /// Collects streaming sources active anywhere within the requested time window.
328 [[nodiscard]] std::vector<ActiveStreamingSource> collectStreamingSourcesForWindow(RealType start_time,
329 RealType end_time) const;
330
331 /**
332 * @brief Initiates the shutdown phase, waiting for all asynchronous tasks to complete.
333 */
334 void shutdown();
335
336 World* _world; ///< Pointer to the simulation world state.
337 pool::ThreadPool& _pool; ///< Reference to the global thread pool.
338 std::shared_ptr<ProgressReporter> _reporter; ///< Shared progress reporter instance.
339 std::vector<std::jthread> _finalizer_threads; ///< Collection of dedicated pulsed finalizer threads.
340 std::shared_ptr<OutputMetadataCollector> _metadata_collector; ///< Collector for generated output metadata.
341 ReceiverOutputSink* _output_sink = nullptr; ///< Selected receiver output sink.
342 std::function<bool()> _cancel_callback; ///< Optional cooperative cancellation callback.
343 bool _eager_context_stream_open = false; ///< True when context heartbeat requires pre-data stream open.
344 bool _cancelled = false; ///< Latched cancellation state.
345
346 std::chrono::steady_clock::time_point _last_report_time; ///< Timestamp of the last progress report.
347 int _last_reported_percent = -1; ///< The last reported percentage to prevent redundant updates.
348 RealType _next_context_heartbeat_time = 0.0; ///< Next one-second sink heartbeat on simulation clock.
349
350 std::string _output_dir; ///< Output directory for the simulation files.
351 std::unique_ptr<simulation::CwPhaseNoiseLookup> _cw_phase_noise_lookup; ///< Cached CW phase-noise lookup.
352 std::vector<ReceiverTrackerCache> _streaming_tracker_caches; ///< Per-receiver streaming tracker caches.
353 std::vector<ReceiverTrackerCache> _if_pulse_tracker_caches; ///< Per-receiver dechirp trackers for pulse blocks.
354 std::vector<std::vector<ComplexType>> _fmcw_if_block_buffers; ///< Pending high-rate IF blocks.
355 std::vector<RealType> _fmcw_if_block_start_times; ///< Start time for each pending IF block.
356 std::vector<std::vector<ComplexType>> _streaming_output_block_buffers; ///< Pending live CW/FMCW output blocks.
357 std::vector<std::vector<ComplexType>> _streaming_output_processed_buffers; ///< Reused post-noise output blocks.
358 std::vector<RealType> _streaming_output_block_start_times; ///< Start time for pending live output blocks.
359 std::vector<std::uint64_t> _streaming_output_block_start_indices; ///< High-rate sample index for each block.
360 std::vector<std::unique_ptr<fers_signal::DownsamplingSink>>
361 _streaming_downsamplers; ///< Stateful downsamplers for non-dechirped streaming output.
362 std::vector<std::uint64_t> _streaming_downsample_base_indices; ///< Output index at segment start.
363 std::vector<RealType> _streaming_downsample_segment_start_times; ///< Segment start time at output sample 0.
364 std::vector<std::uint64_t> _streaming_output_sample_cursors; ///< Output sample cursor per receiver.
365 std::vector<std::uint32_t> _streaming_output_stream_ids; ///< Registered sink stream IDs for live receivers.
366 std::vector<bool> _streaming_output_stream_open; ///< Live receiver stream lifecycle state.
367 std::vector<std::shared_ptr<const OutputFileMetadata>>
368 _streaming_output_file_metadata; ///< Per-receiver metadata attached to sink blocks.
369 RealType _internal_stop_time = 0.0; ///< Physics stop time including IF over-render margin.
370 };
371
372 /**
373 * @brief Runs the unified, event-driven radar simulation.
374 *
375 * This function is the core entry point of the simulator. It advances time by
376 * processing events from a global priority queue. It handles both pulsed
377 * and continuous-wave (CW) physics, dispatching finalization tasks to
378 * worker threads for asynchronous processing.
379 *
380 * @param world A pointer to the simulation world containing all entities and state.
381 * @param pool A reference to the thread pool for executing tasks.
382 * @param progress_callback An optional callback function for reporting progress.
383 * @param output_dir Output directory for the simulation files.
384 */
385 OutputMetadata runEventDrivenSim(World* world, pool::ThreadPool& pool,
386 const std::function<void(const std::string&, int, int)>& progress_callback,
387 const std::string& output_dir, const OutputConfig& output_config = OutputConfig{},
388 std::function<bool()> cancel_callback = nullptr, bool* cancelled = nullptr,
390}
const Receiver & receiver
Header for radar channel propagation and interaction models.
A thread-safe wrapper for the simulation progress callback.
ProgressReporter(Callback cb)
Constructs a ProgressReporter with the given callback.
std::function< void(const std::string &, int, int)> Callback
Defines the signature for the progress reporting callback function.
void report(const std::string &msg, int current, int total)
Safely reports progress to the underlying callback.
Encapsulates the state and logic of the event-driven simulation loop.
void handleRxStreamingStart(radar::Receiver *rx)
Handles a streaming receiver starting to record.
void handleTxStreamingEnd(radar::Transmitter *tx)
Handles a streaming transmitter turning off.
void handleRxPulsedWindowEnd(radar::Receiver *rx, RealType t_event)
Handles the closing of a pulsed receiver's listening window, triggering finalization.
void handleRxPulsedWindowStart(radar::Receiver *rx, RealType t_event)
Handles the opening of a pulsed receiver's listening window.
void run()
Starts and runs the main simulation loop until completion.
void processEvent(const Event &event)
Dispatches a discrete simulation event to its specific handler.
void handleTxStreamingStart(const ActiveStreamingSource &source)
Handles a streaming transmitter turning on.
bool cancelled() const noexcept
Returns true after cooperative cancellation has been requested.
void handleRxStreamingEnd(radar::Receiver *rx)
Handles a streaming receiver stopping recording.
void processStreamingPhysics(RealType t_event)
Advances the time-stepped inner loop for active streaming systems.
void handleTxPulsedStart(radar::Transmitter *tx, RealType t_event)
Handles the start of a pulsed transmission.
The World class manages the simulator environment.
Definition world.h:39
Stateful FIR decimator for chunked streaming output.
Definition dsp_filters.h:51
A simple thread pool implementation.
Definition thread_pool.h:29
Manages radar signal reception and response processing.
Definition receiver.h:47
Represents a radar transmitter system.
Definition transmitter.h:34
Global configuration file for the project.
double RealType
Type for real numbers.
Definition config.h:27
std::complex< RealType > ComplexType
Type for complex numbers.
Definition config.h:35
Header file for Digital Signal Processing (DSP) filters and upsampling/downsampling functionality.
OutputMetadata runEventDrivenSim(World *world, pool::ThreadPool &pool, const std::function< void(const std::string &, int, int)> &progress_callback, const std::string &output_dir, const OutputConfig &output_config, std::function< bool()> cancel_callback, bool *cancelled, ReceiverOutputTelemetryCallback telemetry_callback)
Runs the unified, event-driven radar simulation.
std::function< void(const std::optional< OutputStats > &, std::span< const ReceiverOutputPacketTrace >)> ReceiverOutputTelemetryCallback
Defines the Parameters struct and provides methods for managing simulation parameters.
Defines the core structures for the event-driven simulation engine.
math::Vec3 max
Defines the global state for the event-driven simulation engine.
Cached description of an active streaming transmitter segment.
Represents a single event in the simulation's time-ordered queue.
Definition sim_events.h:45
Per-receiver FMCW tracker state for direct and reflected streaming paths.