DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
signal_decorator.hpp
1#pragma once
2
3#include "integrator_decorator.hpp"
4#include <vector>
5#include <functional>
6#include <chrono>
7#include <queue>
8#include <mutex>
9#include <atomic>
10
11namespace diffeq::core::composable {
12
16enum class SignalProcessingMode {
17 SYNCHRONOUS, // Process signals immediately during integration
18 ASYNCHRONOUS, // Process signals in background thread
19 BATCH // Accumulate signals and process in batches
20};
21
25enum class SignalPriority {
26 LOW = 0,
27 NORMAL = 1,
28 HIGH = 2,
29 CRITICAL = 3
30};
31
35template<system_state S>
36struct SignalInfo {
37 std::function<void(S&, typename S::value_type)> handler;
38 SignalPriority priority{SignalPriority::NORMAL};
39 std::chrono::steady_clock::time_point timestamp;
40 bool processed{false};
41 std::string signal_id;
42};
43
48 SignalProcessingMode mode{SignalProcessingMode::SYNCHRONOUS};
49 bool enable_real_time_processing{true};
50 std::chrono::microseconds signal_check_interval{100};
51 size_t signal_buffer_size{100};
52 size_t max_batch_size{10};
53 bool enable_priority_queue{false};
54
55 // Validation settings
56 bool validate_intervals{true};
57 std::chrono::microseconds min_check_interval{1}; // Minimum 1μs
58 std::chrono::microseconds max_check_interval{std::chrono::seconds{1}}; // Maximum 1s
59
64 void validate() const {
65 if (validate_intervals) {
66 if (signal_check_interval < min_check_interval) {
67 throw std::invalid_argument("signal_check_interval below minimum " +
68 std::to_string(min_check_interval.count()) + "μs");
69 }
70 if (signal_check_interval > max_check_interval) {
71 throw std::invalid_argument("signal_check_interval exceeds maximum " +
72 std::to_string(max_check_interval.count()) + "μs");
73 }
74 }
75
76 if (signal_buffer_size == 0) {
77 throw std::invalid_argument("signal_buffer_size must be positive");
78 }
79
80 if (max_batch_size == 0) {
81 throw std::invalid_argument("max_batch_size must be positive");
82 }
83
84 if (max_batch_size > signal_buffer_size) {
85 throw std::invalid_argument("max_batch_size cannot exceed signal_buffer_size");
86 }
87 }
88};
89
94 size_t total_signals_received{0};
95 size_t total_signals_processed{0};
96 size_t signals_dropped{0};
97 size_t batch_processes{0};
98 std::chrono::milliseconds total_processing_time{0};
99 std::chrono::milliseconds max_processing_time{0};
100
101 double average_processing_time_ms() const {
102 return total_signals_processed > 0 ?
103 static_cast<double>(total_processing_time.count()) / total_signals_processed : 0.0;
104 }
105
106 double signal_drop_rate() const {
107 return total_signals_received > 0 ?
108 static_cast<double>(signals_dropped) / total_signals_received : 0.0;
109 }
110};
111
127template<system_state S>
129private:
130 SignalConfig config_;
131 std::vector<std::function<void(S&, typename IntegratorDecorator<S>::time_type)>> signal_handlers_;
132 std::queue<SignalInfo<S>> signal_queue_;
133 std::chrono::steady_clock::time_point last_signal_check_;
134 mutable std::mutex signal_mutex_;
135 SignalStats stats_;
136 std::atomic<bool> processing_active_{false};
137
138public:
145 explicit SignalDecorator(std::unique_ptr<AbstractIntegrator<S>> integrator,
146 SignalConfig config = {})
147 : IntegratorDecorator<S>(std::move(integrator)), config_(std::move(config))
148 , last_signal_check_(std::chrono::steady_clock::now()) {
149
150 config_.validate();
151 }
152
156 void step(typename IntegratorDecorator<S>::state_type& state, typename IntegratorDecorator<S>::time_type dt) override {
157 // Process signals before step
158 if (config_.enable_real_time_processing) {
159 process_signals(state, this->current_time());
160 }
161
162 this->wrapped_integrator_->step(state, dt);
163
164 // Process signals after step if real-time enabled
165 if (config_.enable_real_time_processing && config_.mode == SignalProcessingMode::SYNCHRONOUS) {
166 process_signals(state, this->current_time());
167 }
168 }
169
173 void integrate(typename IntegratorDecorator<S>::state_type& state, typename IntegratorDecorator<S>::time_type dt,
174 typename IntegratorDecorator<S>::time_type end_time) override {
175 processing_active_.store(true);
176 auto processing_guard = make_scope_guard([this] { processing_active_.store(false); });
177
178 if (config_.mode == SignalProcessingMode::BATCH) {
179 // Process in batch mode
180 this->wrapped_integrator_->integrate(state, dt, end_time);
181 process_signal_batch(state, this->current_time());
182 } else {
183 // Process with real-time signal handling
184 this->wrapped_integrator_->integrate(state, dt, end_time);
185 }
186 }
187
194 void register_signal_handler(std::function<void(S&, typename IntegratorDecorator<S>::time_type)> handler,
195 const std::string& signal_id = "",
196 SignalPriority priority = SignalPriority::NORMAL) {
197 std::lock_guard<std::mutex> lock(signal_mutex_);
198
199 if (config_.enable_priority_queue) {
200 // Add to priority queue
201 SignalInfo<S> signal_info;
202 signal_info.handler = std::move(handler);
203 signal_info.priority = priority;
204 signal_info.signal_id = signal_id;
205 signal_info.timestamp = std::chrono::steady_clock::now();
206
207 signal_queue_.push(std::move(signal_info));
208 } else {
209 // Add to simple vector
210 signal_handlers_.push_back(std::move(handler));
211 }
212
213 stats_.total_signals_received++;
214 }
215
220 void register_signal_handlers(const std::vector<std::function<void(S&, typename IntegratorDecorator<S>::time_type)>>& handlers) {
221 std::lock_guard<std::mutex> lock(signal_mutex_);
222
223 for (const auto& handler : handlers) {
224 signal_handlers_.push_back(handler);
225 stats_.total_signals_received++;
226 }
227 }
228
233 std::lock_guard<std::mutex> lock(signal_mutex_);
234 signal_handlers_.clear();
235
236 // Clear queue as well
237 while (!signal_queue_.empty()) {
238 signal_queue_.pop();
239 }
240 }
241
246 std::lock_guard<std::mutex> lock(signal_mutex_);
247 return signal_handlers_.size() + signal_queue_.size();
248 }
249
255 void process_signals_now(S& state, typename IntegratorDecorator<S>::time_type time) {
256 process_signals(state, time);
257 }
258
263 return stats_;
264 }
265
270 stats_ = SignalStats{};
271 }
272
276 bool is_processing_active() const {
277 return processing_active_.load();
278 }
279
283 SignalConfig& config() { return config_; }
284 const SignalConfig& config() const { return config_; }
285
291 void update_config(SignalConfig new_config) {
292 new_config.validate();
293 std::lock_guard<std::mutex> lock(signal_mutex_);
294 config_ = std::move(new_config);
295 }
296
297private:
301 void process_signals(S& state, typename IntegratorDecorator<S>::time_type time) {
302 auto now = std::chrono::steady_clock::now();
303
304 // Check if it's time for signal processing
305 if (now - last_signal_check_ < config_.signal_check_interval) {
306 return;
307 }
308
309 std::lock_guard<std::mutex> lock(signal_mutex_);
310 auto start_time = std::chrono::high_resolution_clock::now();
311
312 // Process regular signal handlers
313 for (auto& handler : signal_handlers_) {
314 if (handler) {
315 handler(state, time);
316 stats_.total_signals_processed++;
317 }
318 }
319
320 // Process priority queue if enabled
321 if (config_.enable_priority_queue) {
322 process_priority_signals(state, time);
323 }
324
325 auto end_time = std::chrono::high_resolution_clock::now();
326 auto processing_time = std::chrono::duration_cast<std::chrono::milliseconds>(
327 end_time - start_time);
328
329 stats_.total_processing_time += processing_time;
330 if (processing_time > stats_.max_processing_time) {
331 stats_.max_processing_time = processing_time;
332 }
333
334 last_signal_check_ = now;
335 }
336
340 void process_signal_batch(S& state, typename IntegratorDecorator<S>::time_type time) {
341 if (signal_handlers_.empty() && signal_queue_.empty()) {
342 return;
343 }
344
345 std::lock_guard<std::mutex> lock(signal_mutex_);
346 auto start_time = std::chrono::high_resolution_clock::now();
347
348 size_t processed_count = 0;
349
350 // Process up to max_batch_size signals
351 for (auto& handler : signal_handlers_) {
352 if (processed_count >= config_.max_batch_size) {
353 break;
354 }
355
356 if (handler) {
357 handler(state, time);
358 stats_.total_signals_processed++;
359 processed_count++;
360 }
361 }
362
363 auto end_time = std::chrono::high_resolution_clock::now();
364 auto processing_time = std::chrono::duration_cast<std::chrono::milliseconds>(
365 end_time - start_time);
366
367 stats_.total_processing_time += processing_time;
368 stats_.batch_processes++;
369 }
370
374 void process_priority_signals(S& state, typename IntegratorDecorator<S>::time_type time) {
375 // For simplicity, process all signals in queue order
376 // A real implementation might sort by priority first
377 while (!signal_queue_.empty()) {
378 auto& signal_info = signal_queue_.front();
379
380 if (signal_info.handler && !signal_info.processed) {
381 signal_info.handler(state, time);
382 signal_info.processed = true;
383 stats_.total_signals_processed++;
384 }
385
386 signal_queue_.pop();
387 }
388 }
389
393 template<typename F>
394 class ScopeGuard {
395 F func_;
396 bool active_;
397 public:
398 explicit ScopeGuard(F f) : func_(std::move(f)), active_(true) {}
399 ~ScopeGuard() { if (active_) func_(); }
400 ScopeGuard(ScopeGuard&& other) noexcept
401 : func_(std::move(other.func_)), active_(other.active_) {
402 other.active_ = false;
403 }
404 ScopeGuard(const ScopeGuard&) = delete;
405 ScopeGuard& operator=(const ScopeGuard&) = delete;
406 ScopeGuard& operator=(ScopeGuard&&) = delete;
407 };
408
409 template<typename F>
410 auto make_scope_guard(F&& func) {
411 return ScopeGuard<std::decay_t<F>>(std::forward<F>(func));
412 }
413};
414
415} // namespace diffeq::core::composable
Base decorator interface for integrator enhancements.
Signal decorator - adds signal processing to any integrator.
const SignalStats & get_statistics() const
Get signal processing statistics.
void process_signals_now(S &state, typename IntegratorDecorator< S >::time_type time)
Force immediate signal processing.
SignalConfig & config()
Access and modify signal configuration.
void reset_statistics()
Reset signal processing statistics.
void register_signal_handler(std::function< void(S &, typename IntegratorDecorator< S >::time_type)> handler, const std::string &signal_id="", SignalPriority priority=SignalPriority::NORMAL)
Register a signal handler function.
void step(typename IntegratorDecorator< S >::state_type &state, typename IntegratorDecorator< S >::time_type dt) override
Override step to add signal processing.
void clear_signal_handlers()
Clear all signal handlers.
void update_config(SignalConfig new_config)
Update signal configuration with validation.
bool is_processing_active() const
Check if signal processing is currently active.
void integrate(typename IntegratorDecorator< S >::state_type &state, typename IntegratorDecorator< S >::time_type dt, typename IntegratorDecorator< S >::time_type end_time) override
Override integrate to handle signal processing during integration.
size_t get_signal_handler_count() const
Get number of registered signal handlers.
void register_signal_handlers(const std::vector< std::function< void(S &, typename IntegratorDecorator< S >::time_type)> > &handlers)
Register multiple signal handlers at once.
SignalDecorator(std::unique_ptr< AbstractIntegrator< S > > integrator, SignalConfig config={})
Construct signal decorator.
Configuration for signal processing.
void validate() const
Validate configuration parameters.
Signal information structure.
Signal processing statistics.