37 explicit AsyncExecutor(
size_t num_threads = std::thread::hardware_concurrency())
40 threads_.reserve(num_threads);
41 for (
size_t i = 0; i < num_threads; ++i) {
42 threads_.emplace_back([
this] { worker_thread(); });
51 auto submit(F&& func) -> std::future<std::invoke_result_t<F>> {
52 using return_type = std::invoke_result_t<F>;
54 auto task = std::make_shared<std::packaged_task<return_type()>>(
58 auto future = task->get_future();
61 std::lock_guard<std::mutex> lock(queue_mutex_);
63 throw std::runtime_error(
"Executor is shutting down");
65 task_queue_.emplace([task] { (*task)(); });
68 condition_.notify_one();
74 std::lock_guard<std::mutex> lock(queue_mutex_);
78 condition_.notify_all();
80 for (
auto& thread : threads_) {
81 if (thread.joinable()) {
89 void worker_thread() {
91 std::function<void()> task;
94 std::unique_lock<std::mutex> lock(queue_mutex_);
95 condition_.wait(lock, [
this] {
96 return stop_flag_ || !task_queue_.empty();
99 if (stop_flag_ && task_queue_.empty()) {
103 task = std::move(task_queue_.front());
111 std::vector<std::thread> threads_;
112 std::queue<std::function<void()>> task_queue_;
113 std::mutex queue_mutex_;
114 std::condition_variable condition_;
115 std::atomic<bool> stop_flag_;
152 using state_type =
typename base_integrator_type::state_type;
153 using time_type =
typename base_integrator_type::time_type;
154 using value_type =
typename base_integrator_type::value_type;
155 using system_function =
typename base_integrator_type::system_function;
158 using step_callback = std::function<void(
const state_type&, time_type)>;
159 using parameter_callback = std::function<void(
const std::string&,
double)>;
160 using emergency_callback = std::function<void()>;
166 bool enable_async_stepping =
false;
167 bool enable_state_monitoring =
false;
168 std::chrono::microseconds monitoring_interval{1000};
169 size_t max_concurrent_operations = 4;
173 std::unique_ptr<base_integrator_type> integrator,
175 ) : base_integrator_(std::move(integrator))
177 , executor_(config.max_concurrent_operations)
179 , emergency_stop_(false) {}
189 if (running_.exchange(
true)) {
193 if (config_.enable_state_monitoring) {
194 monitoring_thread_ = std::thread([
this] { monitoring_loop(); });
202 if (!running_.exchange(
false)) {
206 monitoring_condition_.notify_all();
208 if (monitoring_thread_.joinable()) {
209 monitoring_thread_.join();
212 executor_.shutdown();
218 std::future<void>
step_async(state_type& state, time_type dt) {
219 if (emergency_stop_.load()) {
220 auto promise = std::promise<void>();
221 promise.set_exception(std::make_exception_ptr(
222 std::runtime_error(
"Emergency stop activated")));
223 return promise.get_future();
226 return executor_.submit([
this, &state, dt]() {
227 std::lock_guard<std::mutex> lock(integration_mutex_);
228 base_integrator_->step(state, dt);
231 if (step_callback_) {
232 step_callback_(state, base_integrator_->current_time());
240 std::future<void>
integrate_async(state_type& state, time_type dt, time_type end_time) {
241 return executor_.submit([
this, &state, dt, end_time]() {
242 while (base_integrator_->current_time() < end_time && !emergency_stop_.load()) {
243 time_type step_size = std::min<time_type>(dt, end_time - base_integrator_->current_time());
246 std::lock_guard<std::mutex> lock(integration_mutex_);
247 base_integrator_->step(state, step_size);
251 if (step_callback_) {
252 step_callback_(state, base_integrator_->current_time());
256 std::this_thread::yield();
264 void step(state_type& state, time_type dt) {
265 if (emergency_stop_.load()) {
266 throw std::runtime_error(
"Emergency stop activated");
269 std::lock_guard<std::mutex> lock(integration_mutex_);
270 base_integrator_->step(state, dt);
272 if (step_callback_) {
273 step_callback_(state, base_integrator_->current_time());
277 void integrate(state_type& state, time_type dt, time_type end_time) {
278 if (!running_.load() && config_.enable_async_stepping) {
282 base_integrator_->integrate(state, dt, end_time);
286 time_type current_time()
const {
return base_integrator_->current_time(); }
287 void set_time(time_type t) { base_integrator_->set_time(t); }
288 void set_system(system_function sys) { base_integrator_->set_system(std::move(sys)); }
294 step_callback_ = std::move(callback);
297 void set_parameter_callback(parameter_callback callback) {
298 parameter_callback_ = std::move(callback);
301 void set_emergency_callback(emergency_callback callback) {
302 emergency_callback_ = std::move(callback);
309 return executor_.submit([
this, name, value]() {
310 if (parameter_callback_) {
311 parameter_callback_(name, value);
320 emergency_stop_.store(
true);
321 if (emergency_callback_) {
322 emergency_callback_();
330 emergency_stop_.store(
false);
337 std::lock_guard<std::mutex> lock(state_mutex_);
338 return current_state_;
342 std::unique_ptr<base_integrator_type> base_integrator_;
346 std::atomic<bool> running_;
347 std::atomic<bool> emergency_stop_;
349 mutable std::mutex integration_mutex_;
350 mutable std::mutex state_mutex_;
351 state_type current_state_;
353 std::thread monitoring_thread_;
354 std::condition_variable monitoring_condition_;
357 step_callback step_callback_;
358 parameter_callback parameter_callback_;
359 emergency_callback emergency_callback_;
361 void monitoring_loop() {
362 while (running_.load()) {
363 std::unique_lock<std::mutex> lock(state_mutex_);
364 monitoring_condition_.wait_for(
366 config_.monitoring_interval,
367 [
this] { return !running_.load(); }
370 if (!running_.load())
break;
373 current_state_ = get_integration_state();
377 state_type get_integration_state()
const {
380 if constexpr (std::is_default_constructible_v<state_type>) {
383 throw std::runtime_error(
"Cannot get integration state - state type not default constructible");
393template<system_state S>
395 typename core::AbstractIntegrator<S>::system_function sys,
397 typename S::value_type rtol =
static_cast<typename S::value_type
>(1e-6),
398 typename S::value_type atol =
static_cast<typename S::value_type
>(1e-9)
400 auto base = std::make_unique<diffeq::RK45Integrator<S>>(std::move(sys), rtol, atol);
401 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);
404template<system_state S>
405auto make_async_dop853(
406 typename core::AbstractIntegrator<S>::system_function sys,
408 typename S::value_type rtol =
static_cast<typename S::value_type
>(1e-10),
409 typename S::value_type atol =
static_cast<typename S::value_type
>(1e-15)
411 auto base = std::make_unique<diffeq::DOP853Integrator<S>>(std::move(sys), rtol, atol);
412 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);
415template<system_state S>
417 typename core::AbstractIntegrator<S>::system_function sys,
419 typename S::value_type rtol =
static_cast<typename S::value_type
>(1e-6),
420 typename S::value_type atol =
static_cast<typename S::value_type
>(1e-9)
422 auto base = std::make_unique<diffeq::BDFIntegrator<S>>(std::move(sys), rtol, atol);
423 return std::make_unique<AsyncIntegrator<S>>(std::move(base), config);