94 size_t total_signals_received{0};
95 size_t total_signals_processed{0};
96 size_t signals_dropped{0};
97 size_t batch_processes{0};
98 std::chrono::milliseconds total_processing_time{0};
99 std::chrono::milliseconds max_processing_time{0};
101 double average_processing_time_ms()
const {
102 return total_signals_processed > 0 ?
103 static_cast<double>(total_processing_time.count()) / total_signals_processed : 0.0;
106 double signal_drop_rate()
const {
107 return total_signals_received > 0 ?
108 static_cast<double>(signals_dropped) / total_signals_received : 0.0;
131 std::vector<std::function<void(S&,
typename IntegratorDecorator<S>::time_type)>> signal_handlers_;
132 std::queue<SignalInfo<S>> signal_queue_;
133 std::chrono::steady_clock::time_point last_signal_check_;
134 mutable std::mutex signal_mutex_;
136 std::atomic<bool> processing_active_{
false};
148 , last_signal_check_(std::chrono::steady_clock::now()) {
156 void step(
typename IntegratorDecorator<S>::state_type& state,
typename IntegratorDecorator<S>::time_type dt)
override {
158 if (config_.enable_real_time_processing) {
159 process_signals(state, this->current_time());
162 this->wrapped_integrator_->step(state, dt);
165 if (config_.enable_real_time_processing && config_.mode == SignalProcessingMode::SYNCHRONOUS) {
166 process_signals(state, this->current_time());
173 void integrate(
typename IntegratorDecorator<S>::state_type& state,
typename IntegratorDecorator<S>::time_type dt,
174 typename IntegratorDecorator<S>::time_type end_time)
override {
175 processing_active_.store(
true);
176 auto processing_guard = make_scope_guard([
this] { processing_active_.store(
false); });
178 if (config_.mode == SignalProcessingMode::BATCH) {
180 this->wrapped_integrator_->integrate(state, dt, end_time);
181 process_signal_batch(state, this->current_time());
184 this->wrapped_integrator_->integrate(state, dt, end_time);
195 const std::string& signal_id =
"",
196 SignalPriority priority = SignalPriority::NORMAL) {
197 std::lock_guard<std::mutex> lock(signal_mutex_);
199 if (config_.enable_priority_queue) {
202 signal_info.handler = std::move(handler);
203 signal_info.priority = priority;
204 signal_info.signal_id = signal_id;
205 signal_info.timestamp = std::chrono::steady_clock::now();
207 signal_queue_.push(std::move(signal_info));
210 signal_handlers_.push_back(std::move(handler));
213 stats_.total_signals_received++;
221 std::lock_guard<std::mutex> lock(signal_mutex_);
223 for (
const auto& handler : handlers) {
224 signal_handlers_.push_back(handler);
225 stats_.total_signals_received++;
233 std::lock_guard<std::mutex> lock(signal_mutex_);
234 signal_handlers_.clear();
237 while (!signal_queue_.empty()) {
246 std::lock_guard<std::mutex> lock(signal_mutex_);
247 return signal_handlers_.size() + signal_queue_.size();
256 process_signals(state, time);
277 return processing_active_.load();
293 std::lock_guard<std::mutex> lock(signal_mutex_);
294 config_ = std::move(new_config);
301 void process_signals(S& state,
typename IntegratorDecorator<S>::time_type time) {
302 auto now = std::chrono::steady_clock::now();
305 if (now - last_signal_check_ < config_.signal_check_interval) {
309 std::lock_guard<std::mutex> lock(signal_mutex_);
310 auto start_time = std::chrono::high_resolution_clock::now();
313 for (
auto& handler : signal_handlers_) {
315 handler(state, time);
316 stats_.total_signals_processed++;
321 if (config_.enable_priority_queue) {
322 process_priority_signals(state, time);
325 auto end_time = std::chrono::high_resolution_clock::now();
326 auto processing_time = std::chrono::duration_cast<std::chrono::milliseconds>(
327 end_time - start_time);
329 stats_.total_processing_time += processing_time;
330 if (processing_time > stats_.max_processing_time) {
331 stats_.max_processing_time = processing_time;
334 last_signal_check_ = now;
340 void process_signal_batch(S& state,
typename IntegratorDecorator<S>::time_type time) {
341 if (signal_handlers_.empty() && signal_queue_.empty()) {
345 std::lock_guard<std::mutex> lock(signal_mutex_);
346 auto start_time = std::chrono::high_resolution_clock::now();
348 size_t processed_count = 0;
351 for (
auto& handler : signal_handlers_) {
352 if (processed_count >= config_.max_batch_size) {
357 handler(state, time);
358 stats_.total_signals_processed++;
363 auto end_time = std::chrono::high_resolution_clock::now();
364 auto processing_time = std::chrono::duration_cast<std::chrono::milliseconds>(
365 end_time - start_time);
367 stats_.total_processing_time += processing_time;
368 stats_.batch_processes++;
374 void process_priority_signals(S& state,
typename IntegratorDecorator<S>::time_type time) {
377 while (!signal_queue_.empty()) {
378 auto& signal_info = signal_queue_.front();
380 if (signal_info.handler && !signal_info.processed) {
381 signal_info.handler(state, time);
382 signal_info.processed =
true;
383 stats_.total_signals_processed++;
398 explicit ScopeGuard(F f) : func_(std::move(f)), active_(true) {}
399 ~ScopeGuard() {
if (active_) func_(); }
400 ScopeGuard(ScopeGuard&& other) noexcept
401 : func_(std::move(other.func_)), active_(other.active_) {
402 other.active_ =
false;
404 ScopeGuard(
const ScopeGuard&) =
delete;
405 ScopeGuard& operator=(
const ScopeGuard&) =
delete;
406 ScopeGuard& operator=(ScopeGuard&&) =
delete;
410 auto make_scope_guard(F&& func) {
411 return ScopeGuard<std::decay_t<F>>(std::forward<F>(func));