DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
async_integrator.hpp
1#pragma once
2
3#include <core/abstract_integrator.hpp>
4#include <integrators/ode/rk45.hpp>
5#include <integrators/ode/dop853.hpp>
6#include <integrators/ode/bdf.hpp>
7#include <thread>
8#include <mutex>
9#include <condition_variable>
10#include <atomic>
11#include <future>
12#include <functional>
13#include <memory>
14#include <queue>
15#include <chrono>
16#include <optional>
17#include <type_traits>
18
19// C++23 std::execution support with fallback
20#if __has_include(<execution>) && defined(__cpp_lib_execution)
21#include <execution>
22#define DIFFEQ_HAS_STD_EXECUTION 1
23#else
24#define DIFFEQ_HAS_STD_EXECUTION 0
25#endif
26
27namespace diffeq::async {
28
36public:
37 explicit AsyncExecutor(size_t num_threads = std::thread::hardware_concurrency())
38 : stop_flag_(false) {
39
40 threads_.reserve(num_threads);
41 for (size_t i = 0; i < num_threads; ++i) {
42 threads_.emplace_back([this] { worker_thread(); });
43 }
44 }
45
47 shutdown();
48 }
49
50 template<typename F>
51 auto submit(F&& func) -> std::future<std::invoke_result_t<F>> {
52 using return_type = std::invoke_result_t<F>;
53
54 auto task = std::make_shared<std::packaged_task<return_type()>>(
55 std::forward<F>(func)
56 );
57
58 auto future = task->get_future();
59
60 {
61 std::lock_guard<std::mutex> lock(queue_mutex_);
62 if (stop_flag_) {
63 throw std::runtime_error("Executor is shutting down");
64 }
65 task_queue_.emplace([task] { (*task)(); });
66 }
67
68 condition_.notify_one();
69 return future;
70 }
71
72 void shutdown() {
73 {
74 std::lock_guard<std::mutex> lock(queue_mutex_);
75 stop_flag_ = true;
76 }
77
78 condition_.notify_all();
79
80 for (auto& thread : threads_) {
81 if (thread.joinable()) {
82 thread.join();
83 }
84 }
85 threads_.clear();
86 }
87
88private:
89 void worker_thread() {
90 while (true) {
91 std::function<void()> task;
92
93 {
94 std::unique_lock<std::mutex> lock(queue_mutex_);
95 condition_.wait(lock, [this] {
96 return stop_flag_ || !task_queue_.empty();
97 });
98
99 if (stop_flag_ && task_queue_.empty()) {
100 break;
101 }
102
103 task = std::move(task_queue_.front());
104 task_queue_.pop();
105 }
106
107 task();
108 }
109 }
110
111 std::vector<std::thread> threads_;
112 std::queue<std::function<void()>> task_queue_;
113 std::mutex queue_mutex_;
114 std::condition_variable condition_;
115 std::atomic<bool> stop_flag_;
116};
117
121enum class IntegrationEvent {
122 StepCompleted,
123 StateChanged,
124 ParameterUpdated,
125 EmergencyStop
126};
127
131template<typename T>
132struct Event {
133 IntegrationEvent type;
134 T data;
135 std::chrono::steady_clock::time_point timestamp;
136
137 Event(IntegrationEvent t, T d)
138 : type(t), data(std::move(d)), timestamp(std::chrono::steady_clock::now()) {}
139};
140
148template<system_state S>
150public:
152 using state_type = typename base_integrator_type::state_type;
153 using time_type = typename base_integrator_type::time_type;
154 using value_type = typename base_integrator_type::value_type;
155 using system_function = typename base_integrator_type::system_function;
156
157 // Event callback types
158 using step_callback = std::function<void(const state_type&, time_type)>;
159 using parameter_callback = std::function<void(const std::string&, double)>;
160 using emergency_callback = std::function<void()>;
161
165 struct Config {
166 bool enable_async_stepping = false;
167 bool enable_state_monitoring = false;
168 std::chrono::microseconds monitoring_interval{1000};
169 size_t max_concurrent_operations = 4;
170 };
171
172 explicit AsyncIntegrator(
173 std::unique_ptr<base_integrator_type> integrator,
174 Config config = {}
175 ) : base_integrator_(std::move(integrator))
176 , config_(config)
177 , executor_(config.max_concurrent_operations)
178 , running_(false)
179 , emergency_stop_(false) {}
180
181 ~AsyncIntegrator() {
182 stop();
183 }
184
188 void start() {
189 if (running_.exchange(true)) {
190 return;
191 }
192
193 if (config_.enable_state_monitoring) {
194 monitoring_thread_ = std::thread([this] { monitoring_loop(); });
195 }
196 }
197
201 void stop() {
202 if (!running_.exchange(false)) {
203 return;
204 }
205
206 monitoring_condition_.notify_all();
207
208 if (monitoring_thread_.joinable()) {
209 monitoring_thread_.join();
210 }
211
212 executor_.shutdown();
213 }
214
218 std::future<void> step_async(state_type& state, time_type dt) {
219 if (emergency_stop_.load()) {
220 auto promise = std::promise<void>();
221 promise.set_exception(std::make_exception_ptr(
222 std::runtime_error("Emergency stop activated")));
223 return promise.get_future();
224 }
225
226 return executor_.submit([this, &state, dt]() {
227 std::lock_guard<std::mutex> lock(integration_mutex_);
228 base_integrator_->step(state, dt);
229
230 // Notify step completion
231 if (step_callback_) {
232 step_callback_(state, base_integrator_->current_time());
233 }
234 });
235 }
236
240 std::future<void> integrate_async(state_type& state, time_type dt, time_type end_time) {
241 return executor_.submit([this, &state, dt, end_time]() {
242 while (base_integrator_->current_time() < end_time && !emergency_stop_.load()) {
243 time_type step_size = std::min<time_type>(dt, end_time - base_integrator_->current_time());
244
245 {
246 std::lock_guard<std::mutex> lock(integration_mutex_);
247 base_integrator_->step(state, step_size);
248 }
249
250 // Notify step completion
251 if (step_callback_) {
252 step_callback_(state, base_integrator_->current_time());
253 }
254
255 // Allow other operations
256 std::this_thread::yield();
257 }
258 });
259 }
260
264 void step(state_type& state, time_type dt) {
265 if (emergency_stop_.load()) {
266 throw std::runtime_error("Emergency stop activated");
267 }
268
269 std::lock_guard<std::mutex> lock(integration_mutex_);
270 base_integrator_->step(state, dt);
271
272 if (step_callback_) {
273 step_callback_(state, base_integrator_->current_time());
274 }
275 }
276
277 void integrate(state_type& state, time_type dt, time_type end_time) {
278 if (!running_.load() && config_.enable_async_stepping) {
279 start();
280 }
281
282 base_integrator_->integrate(state, dt, end_time);
283 }
284
285 // Getters/Setters
286 time_type current_time() const { return base_integrator_->current_time(); }
287 void set_time(time_type t) { base_integrator_->set_time(t); }
288 void set_system(system_function sys) { base_integrator_->set_system(std::move(sys)); }
289
293 void set_step_callback(step_callback callback) {
294 step_callback_ = std::move(callback);
295 }
296
297 void set_parameter_callback(parameter_callback callback) {
298 parameter_callback_ = std::move(callback);
299 }
300
301 void set_emergency_callback(emergency_callback callback) {
302 emergency_callback_ = std::move(callback);
303 }
304
308 std::future<void> update_parameter_async(const std::string& name, double value) {
309 return executor_.submit([this, name, value]() {
310 if (parameter_callback_) {
311 parameter_callback_(name, value);
312 }
313 });
314 }
315
320 emergency_stop_.store(true);
321 if (emergency_callback_) {
322 emergency_callback_();
323 }
324 }
325
330 emergency_stop_.store(false);
331 }
332
336 state_type get_current_state() const {
337 std::lock_guard<std::mutex> lock(state_mutex_);
338 return current_state_;
339 }
340
341private:
342 std::unique_ptr<base_integrator_type> base_integrator_;
343 Config config_;
344 AsyncExecutor executor_;
345
346 std::atomic<bool> running_;
347 std::atomic<bool> emergency_stop_;
348
349 mutable std::mutex integration_mutex_;
350 mutable std::mutex state_mutex_;
351 state_type current_state_;
352
353 std::thread monitoring_thread_;
354 std::condition_variable monitoring_condition_;
355
356 // Callbacks
357 step_callback step_callback_;
358 parameter_callback parameter_callback_;
359 emergency_callback emergency_callback_;
360
361 void monitoring_loop() {
362 while (running_.load()) {
363 std::unique_lock<std::mutex> lock(state_mutex_);
364 monitoring_condition_.wait_for(
365 lock,
366 config_.monitoring_interval,
367 [this] { return !running_.load(); }
368 );
369
370 if (!running_.load()) break;
371
372 // Update monitored state
373 current_state_ = get_integration_state();
374 }
375 }
376
377 state_type get_integration_state() const {
378 // This would need to be implemented based on the specific integrator
379 // For now, return a default-constructed state
380 if constexpr (std::is_default_constructible_v<state_type>) {
381 return state_type{};
382 } else {
383 throw std::runtime_error("Cannot get integration state - state type not default constructible");
384 }
385 }
386};
387
391namespace factory {
392
393template<system_state S>
394auto make_async_rk45(
395 typename core::AbstractIntegrator<S>::system_function sys,
396 typename AsyncIntegrator<S>::Config config = {},
397 typename S::value_type rtol = static_cast<typename S::value_type>(1e-6),
398 typename S::value_type atol = static_cast<typename S::value_type>(1e-9)
399) {
400 auto base = std::make_unique<diffeq::RK45Integrator<S>>(std::move(sys), rtol, atol);
401 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);
402}
403
404template<system_state S>
405auto make_async_dop853(
406 typename core::AbstractIntegrator<S>::system_function sys,
407 typename AsyncIntegrator<S>::Config config = {},
408 typename S::value_type rtol = static_cast<typename S::value_type>(1e-10),
409 typename S::value_type atol = static_cast<typename S::value_type>(1e-15)
410) {
411 auto base = std::make_unique<diffeq::DOP853Integrator<S>>(std::move(sys), rtol, atol);
412 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);
413}
414
415template<system_state S>
416auto make_async_bdf(
417 typename core::AbstractIntegrator<S>::system_function sys,
418 typename AsyncIntegrator<S>::Config config = {},
419 typename S::value_type rtol = static_cast<typename S::value_type>(1e-6),
420 typename S::value_type atol = static_cast<typename S::value_type>(1e-9)
421) {
422 auto base = std::make_unique<diffeq::BDFIntegrator<S>>(std::move(sys), rtol, atol);
423 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);
424}
425
426} // namespace factory
427
431template<typename F, typename... Args>
432auto async_execute(F&& func, Args&&... args) {
433 static AsyncExecutor global_executor;
434 return global_executor.submit([f = std::forward<F>(func),
435 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable {
436 return std::apply(std::move(f), std::move(args_tuple));
437 });
438}
439
440#if DIFFEQ_HAS_STD_EXECUTION
444template<typename ExecutionPolicy, typename F, typename... Args>
445auto execute_std(ExecutionPolicy&& policy, F&& func, Args&&... args) {
446 // Note: std::execution::execute is not yet standardized
447 // This is a placeholder for future C++ standard versions
448 // For now, we use our own async executor
449 static AsyncExecutor global_executor;
450 return global_executor.submit([f = std::forward<F>(func),
451 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable {
452 return std::apply(std::move(f), std::move(args_tuple));
453 });
454}
455#endif
456
457} // namespace diffeq::async
Simple async executor using standard C++ facilities only.
Lightweight async integrator wrapper.
void step(state_type &state, time_type dt)
Synchronous delegation to base integrator.
void set_step_callback(step_callback callback)
Register callbacks for different events.
void reset_emergency_stop()
Reset emergency stop.
void start()
Start async operation.
state_type get_current_state() const
Get current state (thread-safe)
std::future< void > update_parameter_async(const std::string &name, double value)
Update parameter asynchronously.
std::future< void > integrate_async(state_type &state, time_type dt, time_type end_time)
Async integration over time interval.
void stop()
Stop async operation.
void emergency_stop()
Emergency stop.
std::future< void > step_async(state_type &state, time_type dt)
Async integration step.
Configuration for async operation.
Simple event data structure.