FERS 0.1.0
The Flexible Extensible Radar Simulator
Loading...
Searching...
No Matches
serial::vita49::PacedSender Class Reference

#include "paced_sender.h"

Public Member Functions

 PacedSender (std::unique_ptr< DatagramSender > sender, std::size_t queue_depth)
 
 ~PacedSender ()
 
 PacedSender (const PacedSender &)=delete
 
PacedSenderoperator= (const PacedSender &)=delete
 
void open (const std::string &host, std::uint16_t port)
 
void start (RealType simulation_epoch_time=0.0)
 
EnqueueResult enqueue (SerializedPacket packet)
 
void flush ()
 
void stop ()
 
std::uint64_t latePacketCount (std::uint32_t stream_id) const
 
std::uint64_t sentPacketCount (std::uint32_t stream_id) const
 
std::uint64_t sendFailureCount (std::uint32_t stream_id) const
 
std::uint64_t droppedDataPacketCount (std::uint32_t stream_id) const
 
std::uint64_t droppedContextPacketCount (std::uint32_t stream_id) const
 
std::uint64_t droppedSampleCount (std::uint32_t stream_id) const
 
std::vector< DroppedDatagramconsumeDroppedDatagrams ()
 

Detailed Description

Definition at line 38 of file paced_sender.h.

Constructor & Destructor Documentation

◆ PacedSender() [1/2]

serial::vita49::PacedSender::PacedSender ( std::unique_ptr< DatagramSender sender,
std::size_t  queue_depth 
)

Definition at line 37 of file paced_sender.cpp.

37 :
38 _sender(std::move(sender)), _queue_depth(queue_depth)
39 {
40 if (!_sender)
41 {
42 throw std::invalid_argument("PacedSender requires a datagram sender");
43 }
44 if (queue_depth == 0)
45 {
46 throw std::invalid_argument("PacedSender queue depth must be positive");
47 }
48 }
math::Vec3 max

◆ ~PacedSender()

serial::vita49::PacedSender::~PacedSender ( )

Definition at line 50 of file paced_sender.cpp.

References stop().

+ Here is the call graph for this function:

◆ PacedSender() [2/2]

serial::vita49::PacedSender::PacedSender ( const PacedSender )
delete

Member Function Documentation

◆ consumeDroppedDatagrams()

std::vector< DroppedDatagram > serial::vita49::PacedSender::consumeDroppedDatagrams ( )

Definition at line 173 of file paced_sender.cpp.

174 {
175 std::scoped_lock const lock(_mutex);
176 auto result = std::move(_pending_dropped_datagrams);
177 _pending_dropped_datagrams.clear();
178 return result;
179 }

References max.

◆ droppedContextPacketCount()

std::uint64_t serial::vita49::PacedSender::droppedContextPacketCount ( std::uint32_t  stream_id) const

Definition at line 159 of file paced_sender.cpp.

160 {
161 std::scoped_lock const lock(_mutex);
162 const auto found = _dropped_context_packets.find(stream_id);
163 return found == _dropped_context_packets.end() ? 0 : found->second;
164 }

References max.

◆ droppedDataPacketCount()

std::uint64_t serial::vita49::PacedSender::droppedDataPacketCount ( std::uint32_t  stream_id) const

Definition at line 152 of file paced_sender.cpp.

153 {
154 std::scoped_lock const lock(_mutex);
155 const auto found = _dropped_data_packets.find(stream_id);
156 return found == _dropped_data_packets.end() ? 0 : found->second;
157 }

References max.

◆ droppedSampleCount()

std::uint64_t serial::vita49::PacedSender::droppedSampleCount ( std::uint32_t  stream_id) const

Definition at line 166 of file paced_sender.cpp.

167 {
168 std::scoped_lock const lock(_mutex);
169 const auto found = _dropped_samples.find(stream_id);
170 return found == _dropped_samples.end() ? 0 : found->second;
171 }

References max.

◆ enqueue()

EnqueueResult serial::vita49::PacedSender::enqueue ( SerializedPacket  packet)

Definition at line 68 of file paced_sender.cpp.

69 {
70 std::unique_lock lock(_mutex);
71 if (!_started)
72 {
73 throw std::logic_error("PacedSender must be started before enqueue");
74 }
75 if (_stopping)
76 {
77 return EnqueueResult{
78 .enqueued = false,
79 .dropped = std::nullopt,
80 };
81 }
82
83 _cv.wait(lock, [this] { return _stopping || queuedOrSendingCount() < _queue_depth; });
84 if (_stopping)
85 {
86 return EnqueueResult{
87 .enqueued = false,
88 .dropped = std::nullopt,
89 };
90 }
91
92 _queue.push_back(std::move(packet));
93 _cv.notify_one();
94 return EnqueueResult{.enqueued = true, .dropped = std::nullopt};
95 }

References serial::vita49::EnqueueResult::enqueued, and max.

◆ flush()

void serial::vita49::PacedSender::flush ( )

Definition at line 97 of file paced_sender.cpp.

98 {
99 std::unique_lock lock(_mutex);
100 _cv.wait(lock, [this] { return _queue.empty() && !_send_in_progress; });
101 }

References max.

◆ latePacketCount()

std::uint64_t serial::vita49::PacedSender::latePacketCount ( std::uint32_t  stream_id) const

Definition at line 131 of file paced_sender.cpp.

132 {
133 std::scoped_lock const lock(_mutex);
134 const auto found = _late_packets.find(stream_id);
135 return found == _late_packets.end() ? 0 : found->second;
136 }

References max.

◆ open()

void serial::vita49::PacedSender::open ( const std::string &  host,
std::uint16_t  port 
)

Definition at line 52 of file paced_sender.cpp.

52{ _sender->open(host, port); }

◆ operator=()

PacedSender & serial::vita49::PacedSender::operator= ( const PacedSender )
delete

◆ sendFailureCount()

std::uint64_t serial::vita49::PacedSender::sendFailureCount ( std::uint32_t  stream_id) const

Definition at line 145 of file paced_sender.cpp.

146 {
147 std::scoped_lock const lock(_mutex);
148 const auto found = _send_failures.find(stream_id);
149 return found == _send_failures.end() ? 0 : found->second;
150 }

References max.

◆ sentPacketCount()

std::uint64_t serial::vita49::PacedSender::sentPacketCount ( std::uint32_t  stream_id) const

Definition at line 138 of file paced_sender.cpp.

139 {
140 std::scoped_lock const lock(_mutex);
141 const auto found = _sent_packets.find(stream_id);
142 return found == _sent_packets.end() ? 0 : found->second;
143 }

References max.

◆ start()

void serial::vita49::PacedSender::start ( RealType  simulation_epoch_time = 0.0)

Definition at line 54 of file paced_sender.cpp.

55 {
56 std::scoped_lock const lock(_mutex);
57 if (_started)
58 {
59 return;
60 }
61 _simulation_epoch_time = simulation_epoch_time;
62 _steady_epoch = std::chrono::steady_clock::now();
63 _stopping = false;
64 _started = true;
65 _thread = std::thread([this] { run(); });
66 }

References max.

◆ stop()

void serial::vita49::PacedSender::stop ( )

Definition at line 103 of file paced_sender.cpp.

104 {
105 {
106 std::scoped_lock const lock(_mutex);
107 if (!_started && !_thread.joinable())
108 {
109 _sender->close();
110 return;
111 }
112 _stopping = true;
113 _cv.notify_all();
114 }
115
116 if (_thread.joinable())
117 {
118 _thread.join();
119 }
120
121 {
122 std::scoped_lock const lock(_mutex);
123 _started = false;
124 _stopping = false;
125 _send_in_progress = false;
126 _cv.notify_all();
127 }
128 _sender->close();
129 }

References max.

Referenced by ~PacedSender().

+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: