126 EventTrigger trigger;
127 EventPriority priority;
129 std::string event_id;
130 std::function<void(S&, T)> handler;
131 std::vector<uint8_t> data;
134 std::chrono::steady_clock::time_point created_at;
135 std::chrono::steady_clock::time_point processed_at;
136 bool processed{
false};
137 bool timed_out{
false};
139 Event(EventTrigger t, EventPriority p, T time, std::string
id, std::function<
void(S&, T)> h)
140 : trigger(t), priority(p), timestamp(time), event_id(std::move(
id)), handler(std::move(h))
141 , created_at(std::chrono::steady_clock::now()) {}
144 bool operator<(
const Event& other)
const {
145 if (priority != other.priority) {
146 return priority < other.priority;
148 return timestamp > other.timestamp;
240 std::priority_queue<Event<S, T>> event_queue_;
241 std::vector<Event<S, T>> event_history_;
242 std::map<std::string, SensorData<T>> sensor_data_;
243 std::map<std::string, ControlFeedback<S, T>> control_feedback_;
247 std::vector<std::thread> event_threads_;
248 std::atomic<bool> running_{
false};
249 std::mutex event_queue_mutex_;
250 std::condition_variable event_queue_cv_;
251 std::mutex sensor_data_mutex_;
252 std::mutex control_feedback_mutex_;
253 std::mutex stats_mutex_;
256 std::thread control_loop_thread_;
257 std::atomic<bool> control_loop_running_{
false};
258 std::chrono::steady_clock::time_point last_control_update_;
261 std::map<EventTrigger, std::vector<std::function<void(S&, T)>>> event_callbacks_;
273 , last_control_update_(std::chrono::steady_clock::now()) {
277 if (config_.enable_async_processing) {
278 start_event_processing();
281 if (config_.enable_control_loop) {
282 start_control_loop();
291 stop_event_processing();
297 void step(
typename IntegratorDecorator<S, T>::state_type& state, T dt)
override {
299 process_events(state);
302 this->wrapped_integrator_->step(state, dt);
305 check_state_events(state, this->current_time());
308 process_events(state);
314 void integrate(
typename IntegratorDecorator<S, T>::state_type& state, T dt, T end_time)
override {
316 process_events(state);
319 this->wrapped_integrator_->integrate(state, dt, end_time);
322 process_events(state);
331 event_callbacks_[trigger].push_back(std::move(handler));
342 std::function<
void(S&, T)> handler, std::vector<uint8_t> data = {}) {
343 std::lock_guard<std::mutex> lock(event_queue_mutex_);
345 Event<S, T> event(EventTrigger::CUSTOM, priority, this->current_time(), event_id, std::move(handler));
346 event.data = std::move(data);
348 event_queue_.push(event);
349 event_queue_cv_.notify_one();
351 stats_.total_events++;
352 if (priority >= EventPriority::HIGH) {
353 stats_.high_priority_events++;
363 void submit_sensor_data(
const std::string& sensor_id,
const std::vector<double>& values,
double confidence = 1.0) {
364 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
366 SensorData<T> sensor_data(sensor_id, values, this->current_time());
367 sensor_data.confidence = confidence;
368 sensor_data.valid = validate_sensor_data(sensor_data);
370 sensor_data_[sensor_id] = sensor_data;
374 [
this, sensor_id](S& state, T time) {
375 handle_sensor_event(sensor_id, state, time);
378 stats_.sensor_events++;
388 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
391 control_feedback_[control_id] = feedback;
395 [
this, control_id](S& state, T time) {
396 handle_control_feedback_event(control_id, state, time);
399 stats_.control_feedback_events++;
409 std::function<
void(S&, T)> handler,
410 EventPriority priority = EventPriority::NORMAL) {
412 [condition, handler, priority,
this](S& state, T time) {
413 if (condition(state, time)) {
414 handler(state, time);
427 std::function<
void(S&, T)> handler) {
428 static std::map<size_t, double> last_values;
431 [state_index, threshold, crossing_direction, handler,
this](S& state, T time) {
432 if (state_index >= state.size())
return;
434 double current_value = state[state_index];
435 double last_value = last_values[state_index];
437 bool crossed = crossing_direction ?
438 (last_value < threshold && current_value >= threshold) :
439 (last_value > threshold && current_value <= threshold);
442 handler(state, time);
445 last_values[state_index] = current_value;
460 std::lock_guard<std::mutex> lock(stats_mutex_);
468 return event_history_;
475 event_history_.clear();
482 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
490 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
491 return control_feedback_;
504 void process_events(S& state) {
505 auto deadline = std::chrono::steady_clock::now() + config_.max_event_processing_time;
507 while (!event_queue_.empty() && std::chrono::steady_clock::now() < deadline) {
508 std::lock_guard<std::mutex> lock(event_queue_mutex_);
510 if (event_queue_.empty())
break;
512 Event<S, T>
event = event_queue_.top();
515 process_single_event(event, state);
522 void process_single_event(Event<S, T>& event, S& state) {
523 auto start_time = std::chrono::steady_clock::now();
527 event.handler(state, event.timestamp);
530 event.processed =
true;
531 event.processed_at = std::chrono::steady_clock::now();
533 auto processing_time = std::chrono::duration_cast<std::chrono::microseconds>(
534 event.processed_at - start_time);
537 std::lock_guard<std::mutex> lock(stats_mutex_);
538 stats_.processed_events++;
539 stats_.total_processing_time += processing_time;
540 stats_.max_processing_time = std::max(stats_.max_processing_time, processing_time);
541 stats_.min_processing_time = std::min(stats_.min_processing_time, processing_time);
543 }
catch (
const std::exception& e) {
545 event.timed_out =
true;
546 stats_.timed_out_events++;
550 if (config_.enable_event_history) {
551 event_history_.push_back(event);
552 if (event_history_.size() > config_.max_event_history) {
553 event_history_.erase(event_history_.begin());
561 void check_state_events(
const S& state, T time) {
562 for (
const auto& handler : event_callbacks_[EventTrigger::STATE_BASED]) {
564 S state_copy = state;
565 handler(state_copy, time);
572 void handle_sensor_event(
const std::string& sensor_id, S& state, T time) {
573 std::lock_guard<std::mutex> lock(sensor_data_mutex_);
575 auto it = sensor_data_.find(sensor_id);
576 if (it != sensor_data_.end() && it->second.valid) {
581 for (
const auto& handler : event_callbacks_[EventTrigger::SENSOR_DATA]) {
582 handler(state, time);
590 void handle_control_feedback_event(
const std::string& control_id, S& state, T time) {
591 std::lock_guard<std::mutex> lock(control_feedback_mutex_);
593 auto it = control_feedback_.find(control_id);
594 if (it != control_feedback_.end()) {
595 const auto& feedback = it->second;
598 for (
size_t i = 0; i < state.size() && i < feedback.error_state.size(); ++i) {
599 state[i] += feedback.error_state[i] * 0.1;
603 for (
const auto& handler : event_callbacks_[EventTrigger::CONTROL_FEEDBACK]) {
604 handler(state, time);
612 bool validate_sensor_data(
const SensorData<T>& sensor_data) {
613 if (!config_.enable_sensor_validation) {
618 for (
double value : sensor_data.values) {
619 if (std::isnan(value) || std::isinf(value)) {
624 if (std::abs(value) < config_.sensor_noise_threshold) {
629 return sensor_data.confidence > 0.1;
635 void start_event_processing() {
638 for (
size_t i = 0; i < config_.event_thread_pool_size; ++i) {
639 event_threads_.emplace_back([
this]() {
641 std::unique_lock<std::mutex> lock(event_queue_mutex_);
643 if (event_queue_cv_.wait_for(lock, std::chrono::milliseconds(100),
644 [
this] { return !event_queue_.empty() || !running_; })) {
646 if (!running_)
break;
648 if (!event_queue_.empty()) {
649 Event<S, T>
event = event_queue_.top();
666 void stop_event_processing() {
668 event_queue_cv_.notify_all();
670 for (
auto& thread : event_threads_) {
671 if (thread.joinable()) {
676 event_threads_.clear();
682 void start_control_loop() {
683 control_loop_running_ =
true;
685 control_loop_thread_ = std::thread([
this]() {
686 while (control_loop_running_) {
687 auto now = std::chrono::steady_clock::now();
689 if (now - last_control_update_ >= config_.control_loop_period) {
691 for (
const auto& handler : event_callbacks_[EventTrigger::CONTROL_FEEDBACK]) {
696 last_control_update_ = now;
699 std::this_thread::sleep_for(config_.control_loop_period / 10);
707 void stop_control_loop() {
708 control_loop_running_ =
false;
710 if (control_loop_thread_.joinable()) {
711 control_loop_thread_.join();