FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
core::SimulationEngine Class Reference

Encapsulates the state and logic of the event-driven simulation loop. More...

#include "sim_threading.h"

Public Member Functions

 SimulationEngine (World *world, pool::ThreadPool &pool, std::shared_ptr< ProgressReporter > reporter, std::string output_dir, std::shared_ptr< OutputMetadataCollector > metadata_collector=nullptr, ReceiverOutputSink *output_sink=nullptr, std::function< bool()> cancel_callback=nullptr, bool eager_context_stream_open=false)
 Constructs the simulation engine.
 
void run ()
 Starts and runs the main simulation loop until completion.
 
bool cancelled () const noexcept
 Returns true after cooperative cancellation has been requested.
 
void processStreamingPhysics (RealType t_event)
 Advances the time-stepped inner loop for active streaming systems.
 
void processEvent (const Event &event)
 Dispatches a discrete simulation event to its specific handler.
 
void handleTxPulsedStart (radar::Transmitter *tx, RealType t_event)
 Handles the start of a pulsed transmission.
 
void handleRxPulsedWindowStart (radar::Receiver *rx, RealType t_event)
 Handles the opening of a pulsed receiver's listening window.
 
void handleRxPulsedWindowEnd (radar::Receiver *rx, RealType t_event)
 Handles the closing of a pulsed receiver's listening window, triggering finalization.
 
void handleTxStreamingStart (const ActiveStreamingSource &source)
 Handles a streaming transmitter turning on.
 
void handleTxStreamingEnd (radar::Transmitter *tx)
 Handles a streaming transmitter turning off.
 
void handleRxStreamingStart (radar::Receiver *rx)
 Handles a streaming receiver starting to record.
 
void handleRxStreamingEnd (radar::Receiver *rx)
 Handles a streaming receiver stopping recording.
 

Detailed Description

Encapsulates the state and logic of the event-driven simulation loop.

Breaking the simulation loop into this class allows for easily testable, focused functions with low cyclomatic complexity.

Definition at line 109 of file sim_threading.h.

Constructor & Destructor Documentation

◆ SimulationEngine()

core::SimulationEngine::SimulationEngine ( World world,
pool::ThreadPool pool,
std::shared_ptr< ProgressReporter reporter,
std::string  output_dir,
std::shared_ptr< OutputMetadataCollector metadata_collector = nullptr,
ReceiverOutputSink output_sink = nullptr,
std::function< bool()>  cancel_callback = nullptr,
bool  eager_context_stream_open = false 
)

Constructs the simulation engine.

Parameters
worldPointer to the simulation world containing all entities.
poolReference to the thread pool for asynchronous tasks.
reporterShared pointer to the thread-safe progress reporter.
output_dirOutput directory for the simulation files.

Definition at line 754 of file sim_threading.cpp.

758 :
759 _world(world), _pool(pool), _reporter(std::move(reporter)), _metadata_collector(std::move(metadata_collector)),
760 _output_sink(output_sink), _cancel_callback(std::move(cancel_callback)),
761 _eager_context_stream_open(eager_context_stream_open), _last_report_time(std::chrono::steady_clock::now()),
762 _next_context_heartbeat_time(params::startTime() + 1.0), _output_dir(std::move(output_dir)),
763 _internal_stop_time(params::endTime())
764 {
765 _streaming_tracker_caches.resize(_world->getReceivers().size());
766 _if_pulse_tracker_caches.resize(_world->getReceivers().size());
767 _fmcw_if_block_buffers.resize(_world->getReceivers().size());
768 _fmcw_if_block_start_times.resize(_world->getReceivers().size(), params::startTime());
769 _streaming_output_block_buffers.resize(_world->getReceivers().size());
770 _streaming_output_processed_buffers.resize(_world->getReceivers().size());
771 _streaming_output_block_start_times.resize(_world->getReceivers().size(), params::startTime());
772 _streaming_output_block_start_indices.resize(_world->getReceivers().size(), 0);
773 _streaming_downsamplers.resize(_world->getReceivers().size());
774 _streaming_downsample_base_indices.resize(_world->getReceivers().size(), 0);
775 _streaming_downsample_segment_start_times.resize(_world->getReceivers().size(), params::startTime());
776 _streaming_output_sample_cursors.resize(_world->getReceivers().size(), 0);
777 _streaming_output_stream_ids.resize(_world->getReceivers().size(), 0);
778 _streaming_output_stream_open.resize(_world->getReceivers().size(), false);
779 _streaming_output_file_metadata.resize(_world->getReceivers().size());
780 for (auto& block : _streaming_output_block_buffers)
781 {
783 }
784 for (auto& block : _streaming_output_processed_buffers)
785 {
787 }
788 }
const std::vector< std::unique_ptr< radar::Receiver > > & getReceivers() const noexcept
Retrieves the list of radar receivers.
Definition world.h:236
RealType endTime() noexcept
Get the end time for the simulation.
Definition parameters.h:109
RealType startTime() noexcept
Get the start time for the simulation.
Definition parameters.h:103
math::Vec3 max

References core::World::getReceivers(), max, and params::startTime().

+ Here is the call graph for this function:

Member Function Documentation

◆ cancelled()

bool core::SimulationEngine::cancelled ( ) const
noexcept

Returns true after cooperative cancellation has been requested.

Definition at line 130 of file sim_threading.h.

130{ return _cancelled; }

◆ handleRxPulsedWindowEnd()

void core::SimulationEngine::handleRxPulsedWindowEnd ( radar::Receiver rx,
RealType  t_event 
)

Handles the closing of a pulsed receiver's listening window, triggering finalization.

Parameters
rxPointer to the receiving radar object.
t_eventThe timestamp of the window closing event.

Definition at line 1796 of file sim_threading.cpp.

1797 {
1798 rx->setActive(false);
1799 const auto active_streaming_sources =
1800 collectStreamingSourcesForWindow(t_event - rx->getWindowLength(), t_event);
1801
1802 RenderingJob job{.ideal_start_time = t_event - rx->getWindowLength(),
1803 .duration = rx->getWindowLength(),
1804 .responses = rx->drainInbox(),
1805 .active_streaming_sources = active_streaming_sources};
1806
1807 rx->enqueueFinalizerJob(std::move(job));
1808
1809 const RealType next_theoretical = t_event - rx->getWindowLength() + 1.0 / rx->getWindowPrf();
1810 if (const auto next_start = rx->getNextWindowTime(next_theoretical);
1812 {
1814 }
1815 }
std::priority_queue< Event, std::vector< Event >, EventComparator > & getEventQueue() noexcept
Gets a mutable reference to the global event queue.
Definition world.h:313
double RealType
Type for real numbers.
Definition config.h:27
@ RX_PULSED_WINDOW_START
A pulsed receiver opens its listening window.

References params::endTime(), core::World::getEventQueue(), max, and core::RX_PULSED_WINDOW_START.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleRxPulsedWindowStart()

void core::SimulationEngine::handleRxPulsedWindowStart ( radar::Receiver rx,
RealType  t_event 
)

Handles the opening of a pulsed receiver's listening window.

Parameters
rxPointer to the receiving radar object.
t_eventThe timestamp of the window opening event.

Definition at line 1790 of file sim_threading.cpp.

1791 {
1792 rx->setActive(true);
1793 _world->getEventQueue().push({t_event + rx->getWindowLength(), EventType::RX_PULSED_WINDOW_END, rx});
1794 }
@ RX_PULSED_WINDOW_END
A pulsed receiver closes its listening window.

References core::World::getEventQueue(), max, and core::RX_PULSED_WINDOW_END.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleRxStreamingEnd()

void core::SimulationEngine::handleRxStreamingEnd ( radar::Receiver rx)

Handles a streaming receiver stopping recording.

Parameters
rxPointer to the receiving radar object.

Definition at line 1852 of file sim_threading.cpp.

1853 {
1854 const auto receiver_it = std::ranges::find_if(_world->getReceivers(), [rx](const auto& receiver_ptr)
1855 { return receiver_ptr.get() == rx; });
1856 if (receiver_it != _world->getReceivers().end())
1857 {
1858 const auto receiver_index = static_cast<std::size_t>(receiver_it - _world->getReceivers().begin());
1859 flushFmcwIfBlock(receiver_index);
1860 flushStreamingOutputBlock(receiver_index, true);
1861 }
1862 if (rx->hasFmcwIfResamplingSink() && _world->getSimulationState().t_current >= params::endTime() &&
1863 _world->getSimulationState().t_current < _internal_stop_time && activePastUserEnd(rx))
1864 {
1865 return;
1866 }
1867 if (rx->hasFmcwIfResamplingSink())
1868 {
1869 rx->endFmcwIfResamplingSegment();
1870 }
1871 if (_output_sink != nullptr && receiver_it != _world->getReceivers().end())
1872 {
1873 const auto receiver_index = static_cast<std::size_t>(receiver_it - _world->getReceivers().begin());
1874 if (_streaming_output_stream_open[receiver_index])
1875 {
1876 _output_sink->closeStream(_streaming_output_stream_ids[receiver_index]);
1877 _streaming_output_stream_open[receiver_index] = false;
1878 }
1879 }
1880 rx->setActive(false);
1881 }
virtual void closeStream(std::uint32_t stream_id)=0
SimulationState & getSimulationState() noexcept
Gets a mutable reference to the global simulation state.
Definition world.h:322
RealType t_current
The master simulation clock, advanced by the event loop.

References core::ReceiverOutputSink::closeStream(), params::endTime(), core::World::getReceivers(), core::World::getSimulationState(), max, and core::SimulationState::t_current.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleRxStreamingStart()

void core::SimulationEngine::handleRxStreamingStart ( radar::Receiver rx)

Handles a streaming receiver starting to record.

Parameters
rxPointer to the receiving radar object.

Definition at line 1831 of file sim_threading.cpp.

1832 {
1833 rx->setActive(true);
1834 const auto receiver_it = std::ranges::find_if(_world->getReceivers(), [rx](const auto& receiver_ptr)
1835 { return receiver_ptr.get() == rx; });
1836 if (receiver_it != _world->getReceivers().end())
1837 {
1838 const auto receiver_index = static_cast<std::size_t>(receiver_it - _world->getReceivers().begin());
1839 _streaming_downsamplers[receiver_index].reset();
1840 if (_eager_context_stream_open)
1841 {
1842 ensureStreamingOutputStreamOpen(receiver_index, _world->getSimulationState().t_current,
1843 streamingOutputSampleRate(receiver_index));
1844 }
1845 }
1846 if (rx->hasFmcwIfResamplingSink())
1847 {
1848 rx->beginFmcwIfResamplingSegment(_world->getSimulationState().t_current);
1849 }
1850 }

References core::World::getReceivers(), core::World::getSimulationState(), max, and core::SimulationState::t_current.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleTxPulsedStart()

void core::SimulationEngine::handleTxPulsedStart ( radar::Transmitter tx,
RealType  t_event 
)

Handles the start of a pulsed transmission.

Parameters
txPointer to the transmitting radar object.
t_eventThe timestamp of the transmission event.

Definition at line 1766 of file sim_threading.cpp.

1767 {
1768 for (const auto& rx_ptr : _world->getReceivers())
1769 {
1770 if (!rx_ptr->checkFlag(Receiver::RecvFlag::FLAG_NODIRECT))
1771 {
1772 routeResponse(rx_ptr.get(), simulation::calculateResponse(tx, rx_ptr.get(), tx->getSignal(), t_event));
1773 }
1774 for (const auto& target_ptr : _world->getTargets())
1775 {
1776 routeResponse(
1777 rx_ptr.get(),
1778 simulation::calculateResponse(tx, rx_ptr.get(), tx->getSignal(), t_event, target_ptr.get()));
1779 }
1780 }
1781
1782 const RealType next_theoretical_time = t_event + 1.0 / tx->getPrf();
1783 if (const auto next_pulse_opt = tx->getNextPulseTime(next_theoretical_time);
1785 {
1787 }
1788 }
@ TX_PULSED_START
A pulsed transmitter begins emitting a pulse.
std::unique_ptr< serial::Response > calculateResponse(const Transmitter *trans, const Receiver *recv, const RadarSignal *signal, const RealType startTime, const Target *targ)
Creates a Response object by simulating a signal's interaction over its duration.

References simulation::calculateResponse(), params::endTime(), core::World::getEventQueue(), core::World::getReceivers(), core::World::getTargets(), max, and core::TX_PULSED_START.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleTxStreamingEnd()

void core::SimulationEngine::handleTxStreamingEnd ( radar::Transmitter tx)

Handles a streaming transmitter turning off.

Parameters
txPointer to the transmitting radar object.

Definition at line 1823 of file sim_threading.cpp.

1824 {
1825 (void)tx;
1826 // A transmitter stop is a transmit-time boundary, not an instantaneous receive-time cutoff.
1827 // Ended sources are removed only after all future receive-time samples fail the retarded-time gate.
1828 cleanupInactiveStreamingSources(_world->getSimulationState().t_current);
1829 }

References core::World::getSimulationState(), max, and core::SimulationState::t_current.

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ handleTxStreamingStart()

void core::SimulationEngine::handleTxStreamingStart ( const ActiveStreamingSource source)

Handles a streaming transmitter turning on.

Parameters
txPointer to the transmitting radar object.

Definition at line 1817 of file sim_threading.cpp.

1818 {
1819 _world->getSimulationState().active_streaming_transmitters.push_back(source);
1820 appendStreamingTrackerSource();
1821 }
std::vector< ActiveStreamingSource > active_streaming_transmitters
A global list of all currently active streaming transmitters.

References core::SimulationState::active_streaming_transmitters, and core::World::getSimulationState().

Referenced by processEvent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ processEvent()

void core::SimulationEngine::processEvent ( const Event event)

Dispatches a discrete simulation event to its specific handler.

Parameters
eventThe event to process.

Definition at line 1715 of file sim_threading.cpp.

1716 {
1717 // NOLINTBEGIN(cppcoreguidelines-pro-type-static-cast-downcast)
1718 switch (event.type)
1719 {
1721 handleTxPulsedStart(static_cast<Transmitter*>(event.source_object), event.timestamp);
1722 break;
1724 handleRxPulsedWindowStart(static_cast<Receiver*>(event.source_object), event.timestamp);
1725 break;
1727 handleRxPulsedWindowEnd(static_cast<Receiver*>(event.source_object), event.timestamp);
1728 break;
1730 if (const auto source = streamingSourceAtEvent(static_cast<Transmitter*>(event.source_object),
1731 event.timestamp, _internal_stop_time);
1732 source.has_value())
1733 {
1734 handleTxStreamingStart(*source);
1735 }
1736 break;
1738 handleTxStreamingEnd(static_cast<Transmitter*>(event.source_object));
1739 break;
1741 handleRxStreamingStart(static_cast<Receiver*>(event.source_object));
1742 break;
1744 handleRxStreamingEnd(static_cast<Receiver*>(event.source_object));
1745 break;
1746 }
1747 // NOLINTEND(cppcoreguidelines-pro-type-static-cast-downcast)
1748 }
void handleRxStreamingStart(radar::Receiver *rx)
Handles a streaming receiver starting to record.
void handleTxStreamingEnd(radar::Transmitter *tx)
Handles a streaming transmitter turning off.
void handleRxPulsedWindowEnd(radar::Receiver *rx, RealType t_event)
Handles the closing of a pulsed receiver's listening window, triggering finalization.
void handleRxPulsedWindowStart(radar::Receiver *rx, RealType t_event)
Handles the opening of a pulsed receiver's listening window.
void handleTxStreamingStart(const ActiveStreamingSource &source)
Handles a streaming transmitter turning on.
void handleRxStreamingEnd(radar::Receiver *rx)
Handles a streaming receiver stopping recording.
void handleTxPulsedStart(radar::Transmitter *tx, RealType t_event)
Handles the start of a pulsed transmission.
Manages radar signal reception and response processing.
Definition receiver.h:47
Represents a radar transmitter system.
Definition transmitter.h:34
@ TX_STREAMING_END
A streaming transmitter stops transmitting.
@ RX_STREAMING_END
A streaming receiver stops listening.
@ TX_STREAMING_START
A streaming transmitter starts transmitting.
@ RX_STREAMING_START
A streaming receiver starts listening.

References handleRxPulsedWindowEnd(), handleRxPulsedWindowStart(), handleRxStreamingEnd(), handleRxStreamingStart(), handleTxPulsedStart(), handleTxStreamingEnd(), handleTxStreamingStart(), max, core::RX_PULSED_WINDOW_END, core::RX_PULSED_WINDOW_START, core::RX_STREAMING_END, core::RX_STREAMING_START, core::Event::source_object, core::Event::timestamp, core::TX_PULSED_START, core::TX_STREAMING_END, core::TX_STREAMING_START, and core::Event::type.

Referenced by run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ processStreamingPhysics()

void core::SimulationEngine::processStreamingPhysics ( RealType  t_event)

Advances the time-stepped inner loop for active streaming systems.

Parameters
t_eventThe timestamp of the next discrete event to process up to.

Definition at line 998 of file sim_threading.cpp.

999 {
1000 auto& state = _world->getSimulationState();
1001 auto& t_current = state.t_current;
1002
1003 if (t_event <= t_current)
1004 {
1005 return;
1006 }
1007
1009 const auto first_index = streamingSampleIndexAtOrAfter(t_current, dt_sim);
1011 const auto sample_count = final_index - first_index;
1012 const auto progress_report_stride = std::max<std::size_t>(1, sample_count / 1000);
1013
1014 ensureCwPhaseNoiseLookup();
1015
1016 while (t_current < t_event && !isCancellationRequested())
1017 {
1018 cleanupInactiveStreamingSources(t_current);
1019
1020 const RealType chunk_end = streamingChunkEnd(t_current, t_event);
1021 if (chunk_end <= t_current)
1022 {
1023 break;
1024 }
1025
1026 const auto start_index = streamingSampleIndexAtOrAfter(t_current, dt_sim);
1029 {
1030 if (shouldStopStreamingChunk(sample_index, start_index))
1031 {
1032 break;
1033 }
1035 }
1036
1037 t_current = chunk_end;
1038 emitContextHeartbeatsThrough(t_current);
1039 }
1040 cleanupInactiveStreamingSources(t_current);
1041 }
RealType rate() noexcept
Get the rendering sample rate.
Definition parameters.h:121
unsigned oversampleRatio() noexcept
Get the oversampling ratio.
Definition parameters.h:151

References core::World::getSimulationState(), max, params::oversampleRatio(), and params::rate().

Referenced by run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ run()

void core::SimulationEngine::run ( )

Starts and runs the main simulation loop until completion.

Definition at line 790 of file sim_threading.cpp.

791 {
792 if (_reporter)
793 {
794 _reporter->report("Initializing event-driven simulation...", 0, 100);
795 }
796
797 initializeFmcwIfResamplers();
798
800
801 initializeFinalizers();
802
803 LOG(Level::INFO, "Starting unified event-driven simulation loop.");
804 logStreamingSummaries();
805
806 auto& event_queue = _world->getEventQueue();
807 auto& state = _world->getSimulationState();
808 const RealType end_time = _internal_stop_time;
809
810 while (!event_queue.empty() && state.t_current <= end_time)
811 {
812 if (isCancellationRequested())
813 {
814 break;
815 }
816 const Event event = event_queue.top();
817 event_queue.pop();
818
819 processStreamingPhysics(event.timestamp);
820 if (isCancellationRequested())
821 {
822 break;
823 }
824 flushFmcwIfBlocks();
825 flushStreamingOutputBlocks();
826
827 state.t_current = event.timestamp;
828
829 processEvent(event);
830 updateProgress();
831 }
832
833 const bool has_active_if_overrender =
834 std::ranges::any_of(_world->getReceivers(), [](const auto& receiver)
835 { return receiver->isActive() && receiver->hasFmcwIfResamplingSink(); });
836 if (!isCancellationRequested() && has_active_if_overrender)
837 {
838 processStreamingPhysics(end_time);
839 }
840 flushFmcwIfBlocks();
841 flushStreamingOutputBlocks();
842
843 shutdown();
844 }
const Receiver & receiver
void processEvent(const Event &event)
Dispatches a discrete simulation event to its specific handler.
void processStreamingPhysics(RealType t_event)
Advances the time-stepped inner loop for active streaming systems.
#define LOG(level,...)
Definition logging.h:19
void logSimulationMemoryProjection(const World &world)
Logs the projected simulation memory footprint for the provided world.

References core::World::getEventQueue(), core::World::getReceivers(), core::World::getSimulationState(), LOG, core::logSimulationMemoryProjection(), max, processEvent(), processStreamingPhysics(), and receiver.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: