DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
realtime_integrator.hpp
1#pragma once
2
3#include <core/abstract_integrator.hpp>
4#include <communication/event_bus.hpp>
5#include <communication/process_connector.hpp>
6#include <communication/realtime_priority.hpp>
7
8#include <atomic>
9#include <thread>
10#include <mutex>
11#include <condition_variable>
12#include <future>
13#include <optional>
14#include <chrono>
15#include <functional>
16#include <memory>
17#include <queue>
18#include <span>
19
20// C++23 std::execution support with fallback
21#if __has_include(<execution>) && defined(__cpp_lib_execution)
22#include <execution>
23#define DIFFEQ_HAS_STD_EXECUTION 1
24#else
25#define DIFFEQ_HAS_STD_EXECUTION 0
26#endif
27
28namespace diffeq::realtime {
29
33enum class SignalType : uint8_t {
34 Control = 0, // Robot control signals
35 Financial = 1, // Trading/market signals
36 Safety = 2, // Emergency stop/safety signals
37 Parameter = 3, // Parameter update signals
38 State = 4, // State output signals
39 Diagnostic = 5 // System diagnostics
40};
41
45template<typename T>
47 SignalType signal_type;
48 T data;
49 std::chrono::steady_clock::time_point timestamp;
50 uint64_t sequence_id;
51 double priority{1.0};
52 std::optional<std::chrono::milliseconds> timeout;
53
54 template<typename U>
55 RealtimeSignal(SignalType type, U&& signal_data, double prio = 1.0)
56 : signal_type(type)
57 , data(std::forward<U>(signal_data))
58 , timestamp(std::chrono::steady_clock::now())
59 , sequence_id(next_sequence_id())
60 , priority(prio) {}
61
62private:
63 static std::atomic<uint64_t> sequence_counter_;
64 static uint64_t next_sequence_id() { return sequence_counter_.fetch_add(1); }
65};
66
67template<typename T>
68std::atomic<uint64_t> RealtimeSignal<T>::sequence_counter_{0};
69
73template<typename Handler, typename Signal>
74concept SignalHandler = requires(Handler h, const Signal& s) {
75 { h(s) } -> std::same_as<void>;
76};
77
81template<typename Handler, typename Signal>
82concept AsyncSignalHandler = requires(Handler h, const Signal& s) {
83 { h(s) } -> std::same_as<std::future<void>>;
84};
85
90public:
91 CustomExecutor(size_t num_threads = std::thread::hardware_concurrency())
92 : stop_flag_(false) {
93 threads_.reserve(num_threads);
94 for (size_t i = 0; i < num_threads; ++i) {
95 threads_.emplace_back([this] { worker_thread(); });
96 }
97 }
98
100 shutdown();
101 }
102
103 template<typename F>
104 auto submit(F&& func) -> std::future<std::invoke_result_t<F>> {
105 using return_type = std::invoke_result_t<F>;
106
107 auto task = std::make_shared<std::packaged_task<return_type()>>(
108 std::forward<F>(func)
109 );
110
111 auto future = task->get_future();
112
113 {
114 std::lock_guard<std::mutex> lock(queue_mutex_);
115 if (stop_flag_) {
116 throw std::runtime_error("Executor is shutting down");
117 }
118
119 task_queue_.emplace([task] { (*task)(); });
120 }
121
122 condition_.notify_one();
123 return future;
124 }
125
126 void shutdown() {
127 {
128 std::lock_guard<std::mutex> lock(queue_mutex_);
129 stop_flag_ = true;
130 }
131
132 condition_.notify_all();
133
134 for (auto& thread : threads_) {
135 if (thread.joinable()) {
136 thread.join();
137 }
138 }
139 threads_.clear();
140 }
141
142private:
143 void worker_thread() {
144 while (true) {
145 std::function<void()> task;
146
147 {
148 std::unique_lock<std::mutex> lock(queue_mutex_);
149 condition_.wait(lock, [this] {
150 return stop_flag_ || !task_queue_.empty();
151 });
152
153 if (stop_flag_ && task_queue_.empty()) {
154 break;
155 }
156
157 task = std::move(task_queue_.front());
158 task_queue_.pop();
159 }
160
161 task();
162 }
163 }
164
165 std::vector<std::thread> threads_;
166 std::queue<std::function<void()>> task_queue_;
167 std::mutex queue_mutex_;
168 std::condition_variable condition_;
169 std::atomic<bool> stop_flag_;
170};
171
182template<system_state S, can_be_time T = double>
184public:
186 using state_type = typename base_type::state_type;
187 using time_type = typename base_type::time_type;
188 using value_type = typename base_type::value_type;
189 using system_function = typename base_type::system_function;
190
191 // Signal type aliases
192 template<typename U>
194
199
204 bool enable_realtime_priority = false;
205 communication::RealtimePriority::Priority priority = communication::RealtimePriority::Priority::Normal;
206 bool lock_memory = false;
207 size_t signal_buffer_size = 1024;
208 std::chrono::microseconds signal_processing_interval{100};
209 std::chrono::milliseconds max_signal_latency{10};
210 bool enable_state_output = true;
211 std::chrono::microseconds state_output_interval{1000};
212 };
213
214 explicit RealtimeIntegrator(
215 std::unique_ptr<core::AbstractIntegrator<S, T>> base_integrator,
216 RealtimeConfig config = {}
217 ) : base_type(std::move(base_integrator->sys_))
218 , base_integrator_(std::move(base_integrator))
219 , config_(std::move(config))
220 , event_bus_(std::make_unique<communication::EventBus>())
221 , executor_(std::make_unique<CustomExecutor>())
222 , running_(false)
223 , emergency_stop_(false) {
224
225 setup_signal_handlers();
226
227 if (config_.enable_realtime_priority) {
228 setup_realtime_priority();
229 }
230 }
231
232 ~RealtimeIntegrator() {
233 shutdown();
234 }
235
236 // Non-copyable, movable
237 RealtimeIntegrator(const RealtimeIntegrator&) = delete;
238 RealtimeIntegrator& operator=(const RealtimeIntegrator&) = delete;
239 RealtimeIntegrator(RealtimeIntegrator&&) noexcept = default;
240 RealtimeIntegrator& operator=(RealtimeIntegrator&&) noexcept = default;
241
246 if (running_.exchange(true)) {
247 return; // Already running
248 }
249
250 // Start signal processing thread
251 signal_thread_ = std::thread([this] { signal_processing_loop(); });
252
253 // Start state output thread if enabled
254 if (config_.enable_state_output) {
255 state_output_thread_ = std::thread([this] { state_output_loop(); });
256 }
257 }
258
262 void shutdown() {
263 if (!running_.exchange(false)) {
264 return; // Already stopped
265 }
266
267 signal_condition_.notify_all();
268 state_output_condition_.notify_all();
269
270 if (signal_thread_.joinable()) {
271 signal_thread_.join();
272 }
273
274 if (state_output_thread_.joinable()) {
275 state_output_thread_.join();
276 }
277
278 if (executor_) {
279 executor_->shutdown();
280 }
281 }
282
286 void step(state_type& state, time_type dt) override {
287 // Check for emergency stop
288 if (emergency_stop_.load()) {
289 throw std::runtime_error("Emergency stop activated");
290 }
291
292 // Process any pending signals before stepping
293 process_pending_signals(state, dt);
294
295 // Delegate to the base integrator
296 base_integrator_->step(state, dt);
297 this->advance_time(dt);
298
299 // Update current state for output
300 {
301 std::lock_guard<std::mutex> lock(state_mutex_);
302 current_state_ = state;
303 last_step_time_ = std::chrono::steady_clock::now();
304 }
305 }
306
310 void integrate(state_type& state, time_type dt, time_type end_time) override {
311 if (!running_.load()) {
313 }
314
315 while (this->current_time_ < end_time && !emergency_stop_.load()) {
316 time_type step_size = std::min(dt, end_time - this->current_time_);
317 step(state, step_size);
318
319 // Allow signal processing between steps
320 std::this_thread::sleep_for(std::chrono::microseconds(1));
321 }
322 }
323
327 template<typename ControlData>
328 void send_control_signal(ControlData&& data, double priority = 1.0) {
329 auto signal = std::make_shared<control_signal>(
330 SignalType::Control,
331 std::forward<ControlData>(data),
332 priority
333 );
334
335 enqueue_signal(signal);
336 }
337
341 template<typename FinancialData>
342 void send_financial_signal(FinancialData&& data, double priority = 1.0) {
343 auto signal = std::make_shared<financial_signal>(
344 SignalType::Financial,
345 std::forward<FinancialData>(data),
346 priority
347 );
348
349 enqueue_signal(signal);
350 }
351
355 void update_parameters(const std::unordered_map<std::string, double>& params) {
356 auto signal = std::make_shared<parameter_signal>(
357 SignalType::Parameter,
358 params,
359 10.0 // High priority for parameter updates
360 );
361
362 enqueue_signal(signal);
363 }
364
369 emergency_stop_.store(true);
370
371 // Send emergency signal
372 communication::RobotControlMessage emergency_msg;
373 emergency_msg.emergency_stop = true;
374 emergency_msg.timestamp_sec = std::chrono::duration<double>(
375 std::chrono::steady_clock::now().time_since_epoch()
376 ).count();
377
378 send_control_signal(emergency_msg, 100.0); // Maximum priority
379 }
380
384 state_type get_current_state() const {
385 std::lock_guard<std::mutex> lock(state_mutex_);
386 return current_state_;
387 }
388
392 void setup_process_connector(communication::ProcessConnector::ConnectionConfig config) {
393 process_connector_ = std::make_unique<communication::ProcessConnector>(std::move(config));
394
395 // Start async connection
396 auto connect_future = process_connector_->connect();
397
398 // Handle connection in background
399 #if DIFFEQ_HAS_STD_EXECUTION
400 std::execution::execute(std::execution::par, [this, fut = std::move(connect_future)]() mutable {
401 try {
402 if (fut.get()) {
403 start_external_communication();
404 }
405 } catch (const std::exception& e) {
406 // Log connection error
407 std::cerr << "Process connector failed: " << e.what() << std::endl;
408 }
409 });
410 #else
411 executor_->submit([this, fut = std::move(connect_future)]() mutable {
412 try {
413 if (fut.get()) {
414 start_external_communication();
415 }
416 } catch (const std::exception& e) {
417 std::cerr << "Process connector failed: " << e.what() << std::endl;
418 }
419 });
420 #endif
421 }
422
426 template<typename SignalT, SignalHandler<SignalT> Handler>
427 void register_signal_handler(SignalType type, Handler&& handler) {
428 std::lock_guard<std::mutex> lock(handlers_mutex_);
429 signal_handlers_[static_cast<uint8_t>(type)] =
430 [h = std::forward<Handler>(handler)](const std::any& signal) {
431 try {
432 const auto& typed_signal = std::any_cast<const SignalT&>(signal);
433 h(typed_signal);
434 } catch (const std::bad_any_cast& e) {
435 std::cerr << "Signal type mismatch: " << e.what() << std::endl;
436 }
437 };
438 }
439
440private:
441 std::unique_ptr<core::AbstractIntegrator<S, T>> base_integrator_;
442 RealtimeConfig config_;
443 std::unique_ptr<communication::EventBus> event_bus_;
444 std::unique_ptr<CustomExecutor> executor_;
445 std::unique_ptr<communication::ProcessConnector> process_connector_;
446
447 // Threading and synchronization
448 std::atomic<bool> running_;
449 std::atomic<bool> emergency_stop_;
450 std::thread signal_thread_;
451 std::thread state_output_thread_;
452 std::condition_variable signal_condition_;
453 std::condition_variable state_output_condition_;
454
455 // State management
456 mutable std::mutex state_mutex_;
457 state_type current_state_;
458 std::chrono::steady_clock::time_point last_step_time_;
459
460 // Signal handling
461 std::mutex signal_queue_mutex_;
462 std::priority_queue<std::shared_ptr<std::any>,
463 std::vector<std::shared_ptr<std::any>>,
464 std::function<bool(const std::shared_ptr<std::any>&,
465 const std::shared_ptr<std::any>&)>> signal_queue_;
466
467 std::mutex handlers_mutex_;
468 std::unordered_map<uint8_t, std::function<void(const std::any&)>> signal_handlers_;
469
470 void setup_realtime_priority() {
471 auto result = communication::RealtimePriority::set_thread_priority(
472 communication::RealtimePriority::Policy::RealTimeFIFO,
473 config_.priority
474 );
475
476 if (result) {
477 std::cerr << "Failed to set real-time priority: " << result.message() << std::endl;
478 }
479
480 if (config_.lock_memory) {
481 result = communication::RealtimePriority::lock_memory();
482 if (result) {
483 std::cerr << "Failed to lock memory: " << result.message() << std::endl;
484 }
485 }
486 }
487
488 void setup_signal_handlers() {
489 // Initialize priority queue with custom comparator
490 auto comparator = [](const std::shared_ptr<std::any>& a, const std::shared_ptr<std::any>& b) -> bool {
491 // Default comparison - in real implementation, extract priority from signal
492 return false; // For now, FIFO ordering
493 };
494
495 signal_queue_ = std::priority_queue<std::shared_ptr<std::any>,
496 std::vector<std::shared_ptr<std::any>>,
497 decltype(comparator)>(comparator);
498 }
499
500 template<typename SignalT>
501 void enqueue_signal(std::shared_ptr<SignalT> signal) {
502 {
503 std::lock_guard<std::mutex> lock(signal_queue_mutex_);
504 signal_queue_.push(std::static_pointer_cast<std::any>(
505 std::make_shared<SignalT>(*signal)
506 ));
507 }
508 signal_condition_.notify_one();
509 }
510
511 void signal_processing_loop() {
512 while (running_.load()) {
513 std::unique_lock<std::mutex> lock(signal_queue_mutex_);
514 signal_condition_.wait_for(
515 lock,
516 config_.signal_processing_interval,
517 [this] { return !signal_queue_.empty() || !running_.load(); }
518 );
519
520 while (!signal_queue_.empty() && running_.load()) {
521 auto signal = signal_queue_.top();
522 signal_queue_.pop();
523 lock.unlock();
524
525 // Process signal asynchronously
526 #if DIFFEQ_HAS_STD_EXECUTION
527 std::execution::execute(std::execution::par, [this, signal] {
528 process_signal(signal);
529 });
530 #else
531 executor_->submit([this, signal] {
532 process_signal(signal);
533 });
534 #endif
535
536 lock.lock();
537 }
538 }
539 }
540
541 void process_signal(std::shared_ptr<std::any> signal) {
542 // In a real implementation, we'd extract signal type and dispatch appropriately
543 // For now, this is a placeholder
544 }
545
546 void process_pending_signals(state_type& state, time_type dt) {
547 // Quick processing of high-priority signals before integration step
548 // This ensures minimal latency for critical signals
549 }
550
551 void state_output_loop() {
552 while (running_.load()) {
553 std::unique_lock<std::mutex> lock(state_mutex_);
554 state_output_condition_.wait_for(
555 lock,
556 config_.state_output_interval,
557 [this] { return !running_.load(); }
558 );
559
560 if (!running_.load()) break;
561
562 // Output current state
563 if (process_connector_) {
564 state_output_signal output_signal(
565 SignalType::State,
566 current_state_
567 );
568
569 // Send asynchronously
570 #if DIFFEQ_HAS_STD_EXECUTION
571 std::execution::execute(std::execution::par, [this, output_signal] {
572 // process_connector_->send(output_signal);
573 });
574 #else
575 executor_->submit([this, output_signal] {
576 // process_connector_->send(output_signal);
577 });
578 #endif
579 }
580 }
581 }
582
583 void start_external_communication() {
584 // Start receiving signals from external processes
585 if (!process_connector_) return;
586
587 #if DIFFEQ_HAS_STD_EXECUTION
588 std::execution::execute(std::execution::par, [this] {
589 receive_external_signals();
590 });
591 #else
592 executor_->submit([this] {
593 receive_external_signals();
594 });
595 #endif
596 }
597
598 void receive_external_signals() {
599 while (running_.load() && process_connector_) {
600 // Receive signals from external processes
601 // This would use the process_connector_ to receive different signal types
602
603 std::this_thread::sleep_for(std::chrono::milliseconds(1));
604 }
605 }
606};
607
611namespace factory {
612
613template<system_state S, can_be_time T = double>
614auto make_realtime_rk45(
615 typename core::AbstractIntegrator<S, T>::system_function sys,
616 typename RealtimeIntegrator<S, T>::RealtimeConfig config = {},
617 T rtol = static_cast<T>(1e-6),
618 T atol = static_cast<T>(1e-9)
619) {
620 auto base = std::make_unique<RK45Integrator<S, T>>(std::move(sys), rtol, atol);
621 return std::make_unique<RealtimeIntegrator<S, T>>(std::move(base), std::move(config));
622}
623
624template<system_state S, can_be_time T = double>
625auto make_realtime_dop853(
626 typename core::AbstractIntegrator<S, T>::system_function sys,
627 typename RealtimeIntegrator<S, T>::RealtimeConfig config = {},
628 T rtol = static_cast<T>(1e-10),
629 T atol = static_cast<T>(1e-15)
630) {
631 auto base = std::make_unique<DOP853Integrator<S, T>>(std::move(sys), rtol, atol);
632 return std::make_unique<RealtimeIntegrator<S, T>>(std::move(base), std::move(config));
633}
634
635} // namespace factory
636
637} // namespace diffeq::realtime
Custom executor for async operations when std::execution is not available.
Real-time integrator with signal processing capabilities.
void integrate(state_type &state, time_type dt, time_type end_time) override
Enhanced integrate function with real-time capabilities.
void send_control_signal(ControlData &&data, double priority=1.0)
Send control signal (for robotics applications)
void start_realtime()
Start real-time operation.
void setup_process_connector(communication::ProcessConnector::ConnectionConfig config)
Set up process connector for inter-process communication.
void emergency_stop()
Emergency stop - immediately halt integration.
void step(state_type &state, time_type dt) override
Enhanced step function with signal processing.
void update_parameters(const std::unordered_map< std::string, double > &params)
Update system parameters dynamically.
void register_signal_handler(SignalType type, Handler &&handler)
Register signal handler for specific signal type.
state_type get_current_state() const
Get current state (thread-safe)
void send_financial_signal(FinancialData &&data, double priority=1.0)
Send financial signal (for quantitative trading)
void shutdown()
Stop real-time operation.
Signal handler concept for processing real-time signals.
Real-time signal structure for inter-process communication.