DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
event_decorator.hpp
1#pragma once
2
3#include "integrator_decorator.hpp"
4#include <functional>
5#include <vector>
6#include <map>
7#include <queue>
8#include <memory>
9#include <atomic>
10#include <thread>
11#include <mutex>
12#include <condition_variable>
13#include <chrono>
14#include <algorithm>
15#include <utility>
16
17namespace diffeq::core::composable {
18
22enum class EventTrigger {
23 TIME_BASED, // Trigger at specific time intervals
24 STATE_BASED, // Trigger based on state conditions
25 EXTERNAL_SIGNAL, // Trigger from external source
26 SENSOR_DATA, // Trigger when sensor data arrives
27 CONTROL_FEEDBACK, // Trigger for control loop feedback
28 THRESHOLD_CROSSING, // Trigger when value crosses threshold
29 DERIVATIVE_CHANGE, // Trigger based on derivative changes
30 CUSTOM // Custom trigger condition
31};
32
36enum class EventPriority {
37 LOW = 0,
38 NORMAL = 1,
39 HIGH = 2,
40 CRITICAL = 3,
41 EMERGENCY = 4
42};
43
47enum class EventProcessingMode {
48 IMMEDIATE, // Process immediately when triggered
49 DEFERRED, // Process at next integration step
50 BATCHED, // Process in batches
51 ASYNC // Process asynchronously
52};
53
58 EventProcessingMode processing_mode{EventProcessingMode::IMMEDIATE};
59 bool enable_priority_queue{true};
60 bool enable_event_history{true};
61 size_t max_event_history{1000};
62
63 // Timing constraints
64 std::chrono::microseconds max_event_processing_time{1000}; // 1ms
65 std::chrono::microseconds event_timeout{10000}; // 10ms
66 bool strict_timing{false};
67
68 // Control loop settings
69 std::chrono::microseconds control_loop_period{1000}; // 1ms
70 bool enable_control_loop{false};
71 double control_tolerance{1e-6};
72
73 // Sensor settings
74 std::chrono::microseconds sensor_timeout{5000}; // 5ms
75 bool enable_sensor_validation{true};
76 double sensor_noise_threshold{1e-3};
77
78 // Threading
79 size_t event_thread_pool_size{2};
80 bool enable_async_processing{true};
81
86 void validate() const {
87 if (max_event_history == 0) {
88 throw std::invalid_argument("max_event_history must be positive");
89 }
90
91 if (max_event_processing_time <= std::chrono::microseconds{0}) {
92 throw std::invalid_argument("max_event_processing_time must be positive");
93 }
94
95 if (event_timeout <= std::chrono::microseconds{0}) {
96 throw std::invalid_argument("event_timeout must be positive");
97 }
98
99 if (control_loop_period <= std::chrono::microseconds{0}) {
100 throw std::invalid_argument("control_loop_period must be positive");
101 }
102
103 if (sensor_timeout <= std::chrono::microseconds{0}) {
104 throw std::invalid_argument("sensor_timeout must be positive");
105 }
106
107 if (control_tolerance <= 0) {
108 throw std::invalid_argument("control_tolerance must be positive");
109 }
110
111 if (sensor_noise_threshold <= 0) {
112 throw std::invalid_argument("sensor_noise_threshold must be positive");
113 }
114
115 if (event_thread_pool_size == 0) {
116 throw std::invalid_argument("event_thread_pool_size must be positive");
117 }
118 }
119};
120
124template<typename S, typename T>
125struct Event {
126 EventTrigger trigger;
127 EventPriority priority;
128 T timestamp;
129 std::string event_id;
130 std::function<void(S&, T)> handler;
131 std::vector<uint8_t> data;
132
133 // Metadata
134 std::chrono::steady_clock::time_point created_at;
135 std::chrono::steady_clock::time_point processed_at;
136 bool processed{false};
137 bool timed_out{false};
138
139 Event(EventTrigger t, EventPriority p, T time, std::string id, std::function<void(S&, T)> h)
140 : trigger(t), priority(p), timestamp(time), event_id(std::move(id)), handler(std::move(h))
141 , created_at(std::chrono::steady_clock::now()) {}
142
143 // Comparison for priority queue (higher priority first)
144 bool operator<(const Event& other) const {
145 if (priority != other.priority) {
146 return priority < other.priority;
147 }
148 return timestamp > other.timestamp; // Earlier timestamp first for same priority
149 }
150};
151
155template<typename T>
157 std::string sensor_id;
158 std::vector<double> values;
159 T timestamp;
160 double confidence{1.0};
161 bool valid{true};
162
163 SensorData(std::string id, std::vector<double> vals, T time)
164 : sensor_id(std::move(id)), values(std::move(vals)), timestamp(time) {}
165};
166
170template<typename S, typename T>
172 std::string control_id;
173 S target_state;
174 S current_state;
175 S error_state;
176 T timestamp;
177 double performance_metric{0.0};
178
179 ControlFeedback(std::string id, S target, S current, T time)
180 : control_id(std::move(id)), target_state(std::move(target)), current_state(std::move(current)), timestamp(time) {
181 // Calculate error state
182 error_state = target_state;
183 for (size_t i = 0; i < error_state.size(); ++i) {
184 error_state[i] = target_state[i] - current_state[i];
185 }
186 }
187};
188
193 size_t total_events{0};
194 size_t processed_events{0};
195 size_t timed_out_events{0};
196 size_t high_priority_events{0};
197 size_t control_feedback_events{0};
198 size_t sensor_events{0};
199 std::chrono::microseconds total_processing_time{0};
200 std::chrono::microseconds max_processing_time{0};
201 std::chrono::microseconds min_processing_time{std::chrono::microseconds::max()};
202
203 double average_processing_time_us() const {
204 return processed_events > 0 ?
205 static_cast<double>(total_processing_time.count()) / processed_events : 0.0;
206 }
207
208 double event_success_rate() const {
209 return total_events > 0 ?
210 static_cast<double>(processed_events) / total_events : 0.0;
211 }
212
213 double timeout_rate() const {
214 return total_events > 0 ?
215 static_cast<double>(timed_out_events) / total_events : 0.0;
216 }
217};
218
236template<system_state S, can_be_time T = double>
238private:
239 EventConfig config_;
240 std::priority_queue<Event<S, T>> event_queue_;
241 std::vector<Event<S, T>> event_history_;
242 std::map<std::string, SensorData<T>> sensor_data_;
243 std::map<std::string, ControlFeedback<S, T>> control_feedback_;
244 EventStats stats_;
245
246 // Threading for async processing
247 std::vector<std::thread> event_threads_;
248 std::atomic<bool> running_{false};
249 std::mutex event_queue_mutex_;
250 std::condition_variable event_queue_cv_;
251 std::mutex sensor_data_mutex_;
252 std::mutex control_feedback_mutex_;
253 std::mutex stats_mutex_;
254
255 // Control loop
256 std::thread control_loop_thread_;
257 std::atomic<bool> control_loop_running_{false};
258 std::chrono::steady_clock::time_point last_control_update_;
259
260 // Event callbacks
261 std::map<EventTrigger, std::vector<std::function<void(S&, T)>>> event_callbacks_;
262
263public:
270 explicit EventDecorator(std::unique_ptr<AbstractIntegrator<S, T>> integrator,
271 EventConfig config = {})
272 : IntegratorDecorator<S, T>(std::move(integrator)), config_(std::move(config))
273 , last_control_update_(std::chrono::steady_clock::now()) {
274
275 config_.validate();
276
277 if (config_.enable_async_processing) {
278 start_event_processing();
279 }
280
281 if (config_.enable_control_loop) {
282 start_control_loop();
283 }
284 }
285
290 stop_control_loop();
291 stop_event_processing();
292 }
293
297 void step(typename IntegratorDecorator<S, T>::state_type& state, T dt) override {
298 // Process pending events before step
299 process_events(state);
300
301 // Perform integration step
302 this->wrapped_integrator_->step(state, dt);
303
304 // Check for state-based events after step
305 check_state_events(state, this->current_time());
306
307 // Process any new events
308 process_events(state);
309 }
310
314 void integrate(typename IntegratorDecorator<S, T>::state_type& state, T dt, T end_time) override {
315 // Process initial events
316 process_events(state);
317
318 // Integrate with event handling
319 this->wrapped_integrator_->integrate(state, dt, end_time);
320
321 // Final event processing
322 process_events(state);
323 }
324
330 void register_event_handler(EventTrigger trigger, std::function<void(S&, T)> handler) {
331 event_callbacks_[trigger].push_back(std::move(handler));
332 }
333
341 void trigger_event(const std::string& event_id, EventPriority priority,
342 std::function<void(S&, T)> handler, std::vector<uint8_t> data = {}) {
343 std::lock_guard<std::mutex> lock(event_queue_mutex_);
344
345 Event<S, T> event(EventTrigger::CUSTOM, priority, this->current_time(), event_id, std::move(handler));
346 event.data = std::move(data);
347
348 event_queue_.push(event);
349 event_queue_cv_.notify_one();
350
351 stats_.total_events++;
352 if (priority >= EventPriority::HIGH) {
353 stats_.high_priority_events++;
354 }
355 }
356
363 void submit_sensor_data(const std::string& sensor_id, const std::vector<double>& values, double confidence = 1.0) {
364 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
365
366 SensorData<T> sensor_data(sensor_id, values, this->current_time());
367 sensor_data.confidence = confidence;
368 sensor_data.valid = validate_sensor_data(sensor_data);
369
370 sensor_data_[sensor_id] = sensor_data;
371
372 // Trigger sensor event
373 trigger_event("sensor_" + sensor_id, EventPriority::NORMAL,
374 [this, sensor_id](S& state, T time) {
375 handle_sensor_event(sensor_id, state, time);
376 });
377
378 stats_.sensor_events++;
379 }
380
387 void submit_control_feedback(const std::string& control_id, const S& target_state, const S& current_state) {
388 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
389
390 ControlFeedback<S, T> feedback(control_id, target_state, current_state, this->current_time());
391 control_feedback_[control_id] = feedback;
392
393 // Trigger control feedback event
394 trigger_event("control_" + control_id, EventPriority::HIGH,
395 [this, control_id](S& state, T time) {
396 handle_control_feedback_event(control_id, state, time);
397 });
398
399 stats_.control_feedback_events++;
400 }
401
408 void set_state_condition(std::function<bool(const S&, T)> condition,
409 std::function<void(S&, T)> handler,
410 EventPriority priority = EventPriority::NORMAL) {
411 register_event_handler(EventTrigger::STATE_BASED,
412 [condition, handler, priority, this](S& state, T time) {
413 if (condition(state, time)) {
414 handler(state, time);
415 }
416 });
417 }
418
426 void set_threshold_event(size_t state_index, double threshold, bool crossing_direction,
427 std::function<void(S&, T)> handler) {
428 static std::map<size_t, double> last_values;
429
430 register_event_handler(EventTrigger::THRESHOLD_CROSSING,
431 [state_index, threshold, crossing_direction, handler, this](S& state, T time) {
432 if (state_index >= state.size()) return;
433
434 double current_value = state[state_index];
435 double last_value = last_values[state_index];
436
437 bool crossed = crossing_direction ?
438 (last_value < threshold && current_value >= threshold) :
439 (last_value > threshold && current_value <= threshold);
440
441 if (crossed) {
442 handler(state, time);
443 }
444
445 last_values[state_index] = current_value;
446 });
447 }
448
452 const EventStats& get_statistics() const {
453 return stats_;
454 }
455
460 std::lock_guard<std::mutex> lock(stats_mutex_);
461 stats_ = EventStats{};
462 }
463
467 const std::vector<Event<S, T>>& get_event_history() const {
468 return event_history_;
469 }
470
475 event_history_.clear();
476 }
477
481 std::map<std::string, SensorData<T>> get_sensor_data() const {
482 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
483 return sensor_data_;
484 }
485
489 std::map<std::string, ControlFeedback<S, T>> get_control_feedback() const {
490 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
491 return control_feedback_;
492 }
493
497 EventConfig& config() { return config_; }
498 const EventConfig& config() const { return config_; }
499
500private:
504 void process_events(S& state) {
505 auto deadline = std::chrono::steady_clock::now() + config_.max_event_processing_time;
506
507 while (!event_queue_.empty() && std::chrono::steady_clock::now() < deadline) {
508 std::lock_guard<std::mutex> lock(event_queue_mutex_);
509
510 if (event_queue_.empty()) break;
511
512 Event<S, T> event = event_queue_.top();
513 event_queue_.pop();
514
515 process_single_event(event, state);
516 }
517 }
518
522 void process_single_event(Event<S, T>& event, S& state) {
523 auto start_time = std::chrono::steady_clock::now();
524
525 try {
526 if (event.handler) {
527 event.handler(state, event.timestamp);
528 }
529
530 event.processed = true;
531 event.processed_at = std::chrono::steady_clock::now();
532
533 auto processing_time = std::chrono::duration_cast<std::chrono::microseconds>(
534 event.processed_at - start_time);
535
536 // Update statistics
537 std::lock_guard<std::mutex> lock(stats_mutex_);
538 stats_.processed_events++;
539 stats_.total_processing_time += processing_time;
540 stats_.max_processing_time = std::max(stats_.max_processing_time, processing_time);
541 stats_.min_processing_time = std::min(stats_.min_processing_time, processing_time);
542
543 } catch (const std::exception& e) {
544 // Log error but don't stop processing
545 event.timed_out = true;
546 stats_.timed_out_events++;
547 }
548
549 // Add to history if enabled
550 if (config_.enable_event_history) {
551 event_history_.push_back(event);
552 if (event_history_.size() > config_.max_event_history) {
553 event_history_.erase(event_history_.begin());
554 }
555 }
556 }
557
561 void check_state_events(const S& state, T time) {
562 for (const auto& handler : event_callbacks_[EventTrigger::STATE_BASED]) {
563 // Create a copy of the state for the handler
564 S state_copy = state;
565 handler(state_copy, time);
566 }
567 }
568
572 void handle_sensor_event(const std::string& sensor_id, S& state, T time) {
573 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
574
575 auto it = sensor_data_.find(sensor_id);
576 if (it != sensor_data_.end() && it->second.valid) {
577 // Process sensor data - this is application-specific
578 // For example, update state based on sensor feedback
579
580 // Call registered sensor callbacks
581 for (const auto& handler : event_callbacks_[EventTrigger::SENSOR_DATA]) {
582 handler(state, time);
583 }
584 }
585 }
586
590 void handle_control_feedback_event(const std::string& control_id, S& state, T time) {
591 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
592
593 auto it = control_feedback_.find(control_id);
594 if (it != control_feedback_.end()) {
595 const auto& feedback = it->second;
596
597 // Apply control correction based on error
598 for (size_t i = 0; i < state.size() && i < feedback.error_state.size(); ++i) {
599 state[i] += feedback.error_state[i] * 0.1; // Simple proportional control
600 }
601
602 // Call registered control callbacks
603 for (const auto& handler : event_callbacks_[EventTrigger::CONTROL_FEEDBACK]) {
604 handler(state, time);
605 }
606 }
607 }
608
612 bool validate_sensor_data(const SensorData<T>& sensor_data) {
613 if (!config_.enable_sensor_validation) {
614 return true;
615 }
616
617 // Check for reasonable values
618 for (double value : sensor_data.values) {
619 if (std::isnan(value) || std::isinf(value)) {
620 return false;
621 }
622
623 // Check noise threshold
624 if (std::abs(value) < config_.sensor_noise_threshold) {
625 // Might be noise, but still valid
626 }
627 }
628
629 return sensor_data.confidence > 0.1; // Minimum confidence threshold
630 }
631
635 void start_event_processing() {
636 running_ = true;
637
638 for (size_t i = 0; i < config_.event_thread_pool_size; ++i) {
639 event_threads_.emplace_back([this]() {
640 while (running_) {
641 std::unique_lock<std::mutex> lock(event_queue_mutex_);
642
643 if (event_queue_cv_.wait_for(lock, std::chrono::milliseconds(100),
644 [this] { return !event_queue_.empty() || !running_; })) {
645
646 if (!running_) break;
647
648 if (!event_queue_.empty()) {
649 Event<S, T> event = event_queue_.top();
650 event_queue_.pop();
651 lock.unlock();
652
653 // Process event asynchronously
654 // Note: This would need access to current state, which is tricky
655 // In practice, async events might be limited to specific types
656 }
657 }
658 }
659 });
660 }
661 }
662
666 void stop_event_processing() {
667 running_ = false;
668 event_queue_cv_.notify_all();
669
670 for (auto& thread : event_threads_) {
671 if (thread.joinable()) {
672 thread.join();
673 }
674 }
675
676 event_threads_.clear();
677 }
678
682 void start_control_loop() {
683 control_loop_running_ = true;
684
685 control_loop_thread_ = std::thread([this]() {
686 while (control_loop_running_) {
687 auto now = std::chrono::steady_clock::now();
688
689 if (now - last_control_update_ >= config_.control_loop_period) {
690 // Trigger control loop events
691 for (const auto& handler : event_callbacks_[EventTrigger::CONTROL_FEEDBACK]) {
692 // Note: This needs access to current state
693 // Would need to be implemented differently in practice
694 }
695
696 last_control_update_ = now;
697 }
698
699 std::this_thread::sleep_for(config_.control_loop_period / 10);
700 }
701 });
702 }
703
707 void stop_control_loop() {
708 control_loop_running_ = false;
709
710 if (control_loop_thread_.joinable()) {
711 control_loop_thread_.join();
712 }
713 }
714};
715
716} // namespace diffeq::core::composable
Event decorator - adds event-driven feedback capabilities to any integrator.
void submit_sensor_data(const std::string &sensor_id, const std::vector< double > &values, double confidence=1.0)
Submit sensor data.
const std::vector< Event< S, T > > & get_event_history() const
Get event history.
void set_state_condition(std::function< bool(const S &, T)> condition, std::function< void(S &, T)> handler, EventPriority priority=EventPriority::NORMAL)
Set state-based event condition.
std::map< std::string, SensorData< T > > get_sensor_data() const
Get current sensor data.
EventDecorator(std::unique_ptr< AbstractIntegrator< S, T > > integrator, EventConfig config={})
Construct event decorator.
void trigger_event(const std::string &event_id, EventPriority priority, std::function< void(S &, T)> handler, std::vector< uint8_t > data={})
Trigger custom event.
const EventStats & get_statistics() const
Get event statistics.
EventConfig & config()
Access and modify event configuration.
void integrate(typename IntegratorDecorator< S, T >::state_type &state, T dt, T end_time) override
Override integrate to handle events during integration.
std::map< std::string, ControlFeedback< S, T > > get_control_feedback() const
Get current control feedback.
void reset_statistics()
Reset event statistics.
void clear_event_history()
Clear event history.
void set_threshold_event(size_t state_index, double threshold, bool crossing_direction, std::function< void(S &, T)> handler)
Set threshold crossing event.
void step(typename IntegratorDecorator< S, T >::state_type &state, T dt) override
Override step to handle events during integration.
void register_event_handler(EventTrigger trigger, std::function< void(S &, T)> handler)
Register event handler for specific trigger type.
void submit_control_feedback(const std::string &control_id, const S &target_state, const S &current_state)
Submit control feedback.
~EventDecorator()
Destructor ensures proper cleanup.
Base decorator interface for integrator enhancements.
void validate() const
Validate configuration parameters.