91 CustomExecutor(
size_t num_threads = std::thread::hardware_concurrency())
93 threads_.reserve(num_threads);
94 for (
size_t i = 0; i < num_threads; ++i) {
95 threads_.emplace_back([
this] { worker_thread(); });
104 auto submit(F&& func) -> std::future<std::invoke_result_t<F>> {
105 using return_type = std::invoke_result_t<F>;
107 auto task = std::make_shared<std::packaged_task<return_type()>>(
108 std::forward<F>(func)
111 auto future = task->get_future();
114 std::lock_guard<std::mutex> lock(queue_mutex_);
116 throw std::runtime_error(
"Executor is shutting down");
119 task_queue_.emplace([task] { (*task)(); });
122 condition_.notify_one();
128 std::lock_guard<std::mutex> lock(queue_mutex_);
132 condition_.notify_all();
134 for (
auto& thread : threads_) {
135 if (thread.joinable()) {
143 void worker_thread() {
145 std::function<void()> task;
148 std::unique_lock<std::mutex> lock(queue_mutex_);
149 condition_.wait(lock, [
this] {
150 return stop_flag_ || !task_queue_.empty();
153 if (stop_flag_ && task_queue_.empty()) {
157 task = std::move(task_queue_.front());
165 std::vector<std::thread> threads_;
166 std::queue<std::function<void()>> task_queue_;
167 std::mutex queue_mutex_;
168 std::condition_variable condition_;
169 std::atomic<bool> stop_flag_;
186 using state_type =
typename base_type::state_type;
187 using time_type =
typename base_type::time_type;
188 using value_type =
typename base_type::value_type;
189 using system_function =
typename base_type::system_function;
204 bool enable_realtime_priority =
false;
205 communication::RealtimePriority::Priority priority = communication::RealtimePriority::Priority::Normal;
206 bool lock_memory =
false;
207 size_t signal_buffer_size = 1024;
208 std::chrono::microseconds signal_processing_interval{100};
209 std::chrono::milliseconds max_signal_latency{10};
210 bool enable_state_output =
true;
211 std::chrono::microseconds state_output_interval{1000};
217 ) : base_type(std::move(base_integrator->sys_))
218 , base_integrator_(std::move(base_integrator))
219 , config_(std::move(config))
220 , event_bus_(std::make_unique<communication::EventBus>())
223 , emergency_stop_(false) {
225 setup_signal_handlers();
227 if (config_.enable_realtime_priority) {
228 setup_realtime_priority();
232 ~RealtimeIntegrator() {
237 RealtimeIntegrator(
const RealtimeIntegrator&) =
delete;
238 RealtimeIntegrator& operator=(
const RealtimeIntegrator&) =
delete;
239 RealtimeIntegrator(RealtimeIntegrator&&) noexcept = default;
240 RealtimeIntegrator& operator=(RealtimeIntegrator&&) noexcept = default;
246 if (running_.exchange(
true)) {
251 signal_thread_ = std::thread([
this] { signal_processing_loop(); });
254 if (config_.enable_state_output) {
255 state_output_thread_ = std::thread([
this] { state_output_loop(); });
263 if (!running_.exchange(
false)) {
267 signal_condition_.notify_all();
268 state_output_condition_.notify_all();
270 if (signal_thread_.joinable()) {
271 signal_thread_.join();
274 if (state_output_thread_.joinable()) {
275 state_output_thread_.join();
279 executor_->shutdown();
286 void step(state_type& state, time_type dt)
override {
288 if (emergency_stop_.load()) {
289 throw std::runtime_error(
"Emergency stop activated");
293 process_pending_signals(state, dt);
296 base_integrator_->step(state, dt);
297 this->advance_time(dt);
301 std::lock_guard<std::mutex> lock(state_mutex_);
302 current_state_ = state;
303 last_step_time_ = std::chrono::steady_clock::now();
310 void integrate(state_type& state, time_type dt, time_type end_time)
override {
311 if (!running_.load()) {
315 while (this->current_time_ < end_time && !emergency_stop_.load()) {
316 time_type step_size = std::min(dt, end_time - this->current_time_);
317 step(state, step_size);
320 std::this_thread::sleep_for(std::chrono::microseconds(1));
327 template<
typename ControlData>
329 auto signal = std::make_shared<control_signal>(
331 std::forward<ControlData>(data),
335 enqueue_signal(signal);
341 template<
typename FinancialData>
343 auto signal = std::make_shared<financial_signal>(
344 SignalType::Financial,
345 std::forward<FinancialData>(data),
349 enqueue_signal(signal);
356 auto signal = std::make_shared<parameter_signal>(
357 SignalType::Parameter,
362 enqueue_signal(signal);
369 emergency_stop_.store(
true);
372 communication::RobotControlMessage emergency_msg;
373 emergency_msg.emergency_stop =
true;
374 emergency_msg.timestamp_sec = std::chrono::duration<double>(
375 std::chrono::steady_clock::now().time_since_epoch()
385 std::lock_guard<std::mutex> lock(state_mutex_);
386 return current_state_;
393 process_connector_ = std::make_unique<communication::ProcessConnector>(std::move(config));
396 auto connect_future = process_connector_->connect();
399 #if DIFFEQ_HAS_STD_EXECUTION
400 std::execution::execute(std::execution::par, [
this, fut = std::move(connect_future)]()
mutable {
403 start_external_communication();
405 }
catch (
const std::exception& e) {
407 std::cerr <<
"Process connector failed: " << e.what() << std::endl;
411 executor_->submit([
this, fut = std::move(connect_future)]()
mutable {
414 start_external_communication();
416 }
catch (
const std::exception& e) {
417 std::cerr <<
"Process connector failed: " << e.what() << std::endl;
426 template<
typename SignalT, SignalHandler<SignalT> Handler>
428 std::lock_guard<std::mutex> lock(handlers_mutex_);
429 signal_handlers_[
static_cast<uint8_t
>(type)] =
430 [h = std::forward<Handler>(handler)](
const std::any& signal) {
432 const auto& typed_signal = std::any_cast<const SignalT&>(signal);
434 }
catch (
const std::bad_any_cast& e) {
435 std::cerr <<
"Signal type mismatch: " << e.what() << std::endl;
441 std::unique_ptr<core::AbstractIntegrator<S, T>> base_integrator_;
442 RealtimeConfig config_;
443 std::unique_ptr<communication::EventBus> event_bus_;
444 std::unique_ptr<CustomExecutor> executor_;
445 std::unique_ptr<communication::ProcessConnector> process_connector_;
448 std::atomic<bool> running_;
449 std::atomic<bool> emergency_stop_;
450 std::thread signal_thread_;
451 std::thread state_output_thread_;
452 std::condition_variable signal_condition_;
453 std::condition_variable state_output_condition_;
456 mutable std::mutex state_mutex_;
457 state_type current_state_;
458 std::chrono::steady_clock::time_point last_step_time_;
461 std::mutex signal_queue_mutex_;
462 std::priority_queue<std::shared_ptr<std::any>,
463 std::vector<std::shared_ptr<std::any>>,
464 std::function<bool(
const std::shared_ptr<std::any>&,
465 const std::shared_ptr<std::any>&)>> signal_queue_;
467 std::mutex handlers_mutex_;
468 std::unordered_map<uint8_t, std::function<void(
const std::any&)>> signal_handlers_;
470 void setup_realtime_priority() {
471 auto result = communication::RealtimePriority::set_thread_priority(
472 communication::RealtimePriority::Policy::RealTimeFIFO,
477 std::cerr <<
"Failed to set real-time priority: " << result.message() << std::endl;
480 if (config_.lock_memory) {
481 result = communication::RealtimePriority::lock_memory();
483 std::cerr <<
"Failed to lock memory: " << result.message() << std::endl;
488 void setup_signal_handlers() {
490 auto comparator = [](
const std::shared_ptr<std::any>& a,
const std::shared_ptr<std::any>& b) ->
bool {
495 signal_queue_ = std::priority_queue<std::shared_ptr<std::any>,
496 std::vector<std::shared_ptr<std::any>>,
497 decltype(comparator)>(comparator);
500 template<
typename SignalT>
501 void enqueue_signal(std::shared_ptr<SignalT> signal) {
503 std::lock_guard<std::mutex> lock(signal_queue_mutex_);
504 signal_queue_.push(std::static_pointer_cast<std::any>(
505 std::make_shared<SignalT>(*signal)
508 signal_condition_.notify_one();
511 void signal_processing_loop() {
512 while (running_.load()) {
513 std::unique_lock<std::mutex> lock(signal_queue_mutex_);
514 signal_condition_.wait_for(
516 config_.signal_processing_interval,
517 [
this] { return !signal_queue_.empty() || !running_.load(); }
520 while (!signal_queue_.empty() && running_.load()) {
521 auto signal = signal_queue_.top();
526 #if DIFFEQ_HAS_STD_EXECUTION
527 std::execution::execute(std::execution::par, [
this, signal] {
528 process_signal(signal);
531 executor_->submit([
this, signal] {
532 process_signal(signal);
541 void process_signal(std::shared_ptr<std::any> signal) {
546 void process_pending_signals(state_type& state, time_type dt) {
551 void state_output_loop() {
552 while (running_.load()) {
553 std::unique_lock<std::mutex> lock(state_mutex_);
554 state_output_condition_.wait_for(
556 config_.state_output_interval,
557 [
this] { return !running_.load(); }
560 if (!running_.load())
break;
563 if (process_connector_) {
564 state_output_signal output_signal(
570 #if DIFFEQ_HAS_STD_EXECUTION
571 std::execution::execute(std::execution::par, [
this, output_signal] {
575 executor_->submit([
this, output_signal] {
583 void start_external_communication() {
585 if (!process_connector_)
return;
587 #if DIFFEQ_HAS_STD_EXECUTION
588 std::execution::execute(std::execution::par, [
this] {
589 receive_external_signals();
592 executor_->submit([
this] {
593 receive_external_signals();
598 void receive_external_signals() {
599 while (running_.load() && process_connector_) {
603 std::this_thread::sleep_for(std::chrono::milliseconds(1));
613template<system_state S, can_be_time T =
double>
614auto make_realtime_rk45(
615 typename core::AbstractIntegrator<S, T>::system_function sys,
617 T rtol =
static_cast<T
>(1e-6),
618 T atol =
static_cast<T
>(1e-9)
620 auto base = std::make_unique<RK45Integrator<S, T>>(std::move(sys), rtol, atol);
621 return std::make_unique<RealtimeIntegrator<S, T>>(std::move(base), std::move(config));
624template<system_state S, can_be_time T =
double>
625auto make_realtime_dop853(
626 typename core::AbstractIntegrator<S, T>::system_function sys,
628 T rtol =
static_cast<T
>(1e-10),
629 T atol =
static_cast<T
>(1e-15)
631 auto base = std::make_unique<DOP853Integrator<S, T>>(std::move(sys), rtol, atol);
632 return std::make_unique<RealtimeIntegrator<S, T>>(std::move(base), std::move(config));