67 IPCMethod method{IPCMethod::SHARED_MEMORY};
68 IPCDirection direction{IPCDirection::PRODUCER};
69 IPCSyncMode sync_mode{IPCSyncMode::NON_BLOCKING};
71 std::string channel_name{
"diffeq_channel"};
72 size_t buffer_size{1024 * 1024};
73 size_t max_message_size{64 * 1024};
75 std::chrono::milliseconds timeout{100};
76 std::chrono::microseconds polling_interval{100};
79 bool enable_acknowledgments{
false};
80 bool enable_sequence_numbers{
true};
81 bool enable_error_correction{
false};
82 size_t max_retries{3};
85 bool enable_compression{
false};
86 bool enable_batching{
false};
87 size_t batch_size{10};
88 std::chrono::milliseconds batch_timeout{10};
91 std::string host{
"127.0.0.1"};
99 if (channel_name.empty()) {
100 throw std::invalid_argument(
"channel_name cannot be empty");
103 if (buffer_size == 0) {
104 throw std::invalid_argument(
"buffer_size must be positive");
107 if (max_message_size > buffer_size) {
108 throw std::invalid_argument(
"max_message_size cannot exceed buffer_size");
111 if (timeout <= std::chrono::milliseconds{0}) {
112 throw std::invalid_argument(
"timeout must be positive");
115 if (polling_interval <= std::chrono::microseconds{0}) {
116 throw std::invalid_argument(
"polling_interval must be positive");
119 if (batch_size == 0) {
120 throw std::invalid_argument(
"batch_size must be positive");
124 throw std::invalid_argument(
"port must be positive");
162 size_t messages_sent{0};
163 size_t messages_received{0};
164 size_t bytes_sent{0};
165 size_t bytes_received{0};
166 size_t send_failures{0};
167 size_t receive_failures{0};
168 size_t acknowledgments_sent{0};
169 size_t acknowledgments_received{0};
170 std::chrono::milliseconds total_send_time{0};
171 std::chrono::milliseconds total_receive_time{0};
173 double average_send_time_ms()
const {
174 return messages_sent > 0 ?
175 static_cast<double>(total_send_time.count()) / messages_sent : 0.0;
178 double average_receive_time_ms()
const {
179 return messages_received > 0 ?
180 static_cast<double>(total_receive_time.count()) / messages_received : 0.0;
183 double send_success_rate()
const {
184 size_t total = messages_sent + send_failures;
185 return total > 0 ?
static_cast<double>(messages_sent) / total : 0.0;
188 double receive_success_rate()
const {
189 size_t total = messages_received + receive_failures;
190 return total > 0 ?
static_cast<double>(messages_received) / total : 0.0;
220 void* memory_ptr_{
nullptr};
223 HANDLE file_mapping_{
nullptr};
228 std::mutex* mutex_{
nullptr};
229 std::condition_variable* condition_{
nullptr};
230 bool initialized_{
false};
234 : name_(name), buffer_size_(buffer_size) {}
240 bool initialize()
override {
241 if (initialized_)
return true;
244 file_mapping_ = CreateFileMapping(
245 INVALID_HANDLE_VALUE,
253 if (file_mapping_ ==
nullptr) {
257 memory_ptr_ = MapViewOfFile(
265 if (memory_ptr_ ==
nullptr) {
266 CloseHandle(file_mapping_);
270 fd_ = shm_open(name_.c_str(), O_CREAT | O_RDWR, 0666);
275 if (ftruncate(fd_, buffer_size_) == -1) {
280 memory_ptr_ = mmap(
nullptr, buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
281 if (memory_ptr_ == MAP_FAILED) {
288 mutex_ =
new(memory_ptr_) std::mutex();
289 condition_ =
new(
static_cast<char*
>(memory_ptr_) +
sizeof(std::mutex)) std::condition_variable();
295 void cleanup()
override {
296 if (!initialized_)
return;
300 UnmapViewOfFile(memory_ptr_);
301 memory_ptr_ =
nullptr;
304 CloseHandle(file_mapping_);
305 file_mapping_ =
nullptr;
308 if (memory_ptr_ != MAP_FAILED) {
309 munmap(memory_ptr_, buffer_size_);
310 memory_ptr_ =
nullptr;
314 shm_unlink(name_.c_str());
319 initialized_ =
false;
323 if (!initialized_)
return false;
325 std::lock_guard<std::mutex> lock(*mutex_);
328 char* data_ptr =
static_cast<char*
>(memory_ptr_) +
sizeof(std::mutex) +
sizeof(std::condition_variable);
329 size_t message_size =
sizeof(
IPCMessage<T>) + message.data.size();
331 if (message_size > buffer_size_ -
sizeof(std::mutex) -
sizeof(std::condition_variable)) {
336 std::memcpy(data_ptr +
sizeof(
IPCMessage<T>), message.data.data(), message.data.size());
338 condition_->notify_one();
343 if (!initialized_)
return false;
345 std::unique_lock<std::mutex> lock(*mutex_);
348 if (condition_->wait_for(lock, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
353 char* data_ptr =
static_cast<char*
>(memory_ptr_) +
sizeof(std::mutex) +
sizeof(std::condition_variable);
356 message.data.resize(message.message_size);
357 std::memcpy(message.data.data(), data_ptr +
sizeof(
IPCMessage<T>), message.message_size);
362 bool is_connected()
const override {
366 std::string get_status()
const override {
367 return initialized_ ?
"Connected" :
"Disconnected";
378 std::string pipe_path_;
381 HANDLE pipe_handle_{INVALID_HANDLE_VALUE};
386 bool initialized_{
false};
387 bool is_server_{
false};
391 : name_(name), is_server_(is_server) {
394 pipe_path_ =
"\\\\.\\pipe\\" + name_;
396 pipe_path_ =
"/tmp/" + name_;
404 bool initialize()
override {
405 if (initialized_)
return true;
409 pipe_handle_ = CreateNamedPipe(
412 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
420 if (pipe_handle_ == INVALID_HANDLE_VALUE) {
424 if (!ConnectNamedPipe(pipe_handle_,
nullptr)) {
425 if (GetLastError() != ERROR_PIPE_CONNECTED) {
426 CloseHandle(pipe_handle_);
431 pipe_handle_ = CreateFile(
433 GENERIC_READ | GENERIC_WRITE,
441 if (pipe_handle_ == INVALID_HANDLE_VALUE) {
447 if (mkfifo(pipe_path_.c_str(), 0666) == -1 && errno != EEXIST) {
452 pipe_fd_ = open(pipe_path_.c_str(), O_RDWR | O_NONBLOCK);
453 if (pipe_fd_ == -1) {
462 void cleanup()
override {
463 if (!initialized_)
return;
466 if (pipe_handle_ != INVALID_HANDLE_VALUE) {
467 CloseHandle(pipe_handle_);
468 pipe_handle_ = INVALID_HANDLE_VALUE;
471 if (pipe_fd_ != -1) {
476 unlink(pipe_path_.c_str());
480 initialized_ =
false;
484 if (!initialized_)
return false;
487 std::vector<uint8_t> serialized_data;
488 serialized_data.resize(
sizeof(
IPCMessage<T>) + message.data.size());
490 std::memcpy(serialized_data.data(), &message,
sizeof(
IPCMessage<T>));
491 std::memcpy(serialized_data.data() +
sizeof(
IPCMessage<T>), message.data.data(), message.data.size());
495 return WriteFile(pipe_handle_, serialized_data.data(), serialized_data.size(), &bytes_written,
nullptr) &&
496 bytes_written == serialized_data.size();
498 ssize_t bytes_written = write(pipe_fd_, serialized_data.data(), serialized_data.size());
499 return bytes_written ==
static_cast<ssize_t
>(serialized_data.size());
504 if (!initialized_)
return false;
511 if (!ReadFile(pipe_handle_, &header,
sizeof(
IPCMessage<T>), &bytes_read,
nullptr) ||
516 ssize_t bytes_read = read(pipe_fd_, &header,
sizeof(
IPCMessage<T>));
524 message.data.resize(header.message_size);
527 if (!ReadFile(pipe_handle_, message.data.data(), header.message_size, &bytes_read,
nullptr) ||
528 bytes_read != header.message_size) {
532 bytes_read = read(pipe_fd_, message.data.data(), header.message_size);
533 if (bytes_read !=
static_cast<ssize_t
>(header.message_size)) {
541 bool is_connected()
const override {
545 std::string get_status()
const override {
546 return initialized_ ?
"Connected" :
"Disconnected";
570 std::unique_ptr<IPCChannel<typename IntegratorDecorator<S>::time_type>> channel_;
571 std::atomic<uint32_t> sequence_number_{0};
575 std::thread communication_thread_;
576 std::atomic<bool> running_{
false};
577 std::mutex message_queue_mutex_;
578 std::condition_variable message_queue_cv_;
579 std::queue<IPCMessage<typename IntegratorDecorator<S>::time_type>> outgoing_messages_;
580 std::queue<IPCMessage<typename IntegratorDecorator<S>::time_type>> incoming_messages_;
583 std::function<void(
const S&,
typename IntegratorDecorator<S>::time_type)> receive_callback_;
586 std::mutex sde_sync_mutex_;
587 std::condition_variable sde_sync_cv_;
588 std::atomic<bool> noise_data_available_{
false};
589 S pending_noise_data_;
603 initialize_channel();
605 if (config_.direction == IPCDirection::CONSUMER || config_.direction == IPCDirection::BIDIRECTIONAL) {
606 start_communication_thread();
614 stop_communication_thread();
623 void step(
typename IntegratorDecorator<S>::state_type& state,
typename IntegratorDecorator<S>::time_type dt)
override {
625 if (config_.direction == IPCDirection::CONSUMER) {
626 wait_for_noise_data();
630 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
635 this->wrapped_integrator_->step(state, dt);
638 if (config_.direction == IPCDirection::CONSUMER || config_.direction == IPCDirection::BIDIRECTIONAL) {
639 process_incoming_messages(state);
646 void integrate(
typename IntegratorDecorator<S>::state_type& state,
typename IntegratorDecorator<S>::time_type dt,
typename IntegratorDecorator<S>::time_type end_time)
override {
648 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
653 this->wrapped_integrator_->integrate(state, dt, end_time);
656 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
665 void set_receive_callback(std::function<
void(
const S&,
typename IntegratorDecorator<S>::time_type)> callback) {
666 receive_callback_ = std::move(callback);
675 bool send_state(
const S& state,
typename IntegratorDecorator<S>::time_type time) {
676 if (!channel_ || !channel_->is_connected()) {
680 auto start_time = std::chrono::high_resolution_clock::now();
683 message.sequence_number = sequence_number_++;
684 message.timestamp = time;
685 message.serialize_state(state);
687 bool success =
false;
690 while (!success && retries < config_.max_retries) {
691 success = channel_->send_message(message);
694 std::this_thread::sleep_for(std::chrono::milliseconds(10));
698 auto end_time = std::chrono::high_resolution_clock::now();
699 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
702 stats_.messages_sent++;
703 stats_.bytes_sent += message.message_size;
704 stats_.total_send_time += duration;
706 stats_.send_failures++;
730 return channel_ && channel_->is_connected();
737 return channel_ ? channel_->get_status() :
"Not initialized";
750 void initialize_channel() {
751 switch (config_.method) {
752 case IPCMethod::SHARED_MEMORY:
753 channel_ = std::make_unique<SharedMemoryChannel<typename IntegratorDecorator<S>::time_type>>(config_.channel_name, config_.buffer_size);
755 case IPCMethod::NAMED_PIPES:
756 channel_ = std::make_unique<NamedPipeChannel<typename IntegratorDecorator<S>::time_type>>(config_.channel_name,
757 config_.direction == IPCDirection::PRODUCER);
759 case IPCMethod::MEMORY_MAPPED_FILE:
761 throw std::runtime_error(
"Memory-mapped file IPC not yet implemented");
762 case IPCMethod::TCP_SOCKET:
764 throw std::runtime_error(
"TCP socket IPC not yet implemented");
765 case IPCMethod::UDP_SOCKET:
767 throw std::runtime_error(
"UDP socket IPC not yet implemented");
769 throw std::runtime_error(
"Unknown IPC method");
772 if (!channel_->initialize()) {
773 throw std::runtime_error(
"Failed to initialize IPC channel");
780 void start_communication_thread() {
782 communication_thread_ = std::thread([
this]() {
784 IPCMessage<typename IntegratorDecorator<S>::time_type> message;
785 if (channel_->receive_message(message)) {
786 std::lock_guard<std::mutex> lock(message_queue_mutex_);
787 incoming_messages_.push(message);
788 message_queue_cv_.notify_one();
790 stats_.messages_received++;
791 stats_.bytes_received += message.message_size;
794 std::this_thread::sleep_for(config_.polling_interval);
802 void stop_communication_thread() {
804 if (communication_thread_.joinable()) {
805 communication_thread_.join();
812 void process_incoming_messages(S& state) {
813 std::lock_guard<std::mutex> lock(message_queue_mutex_);
815 while (!incoming_messages_.empty()) {
816 auto message = incoming_messages_.front();
817 incoming_messages_.pop();
820 auto received_state = message.deserialize_state();
823 if (receive_callback_) {
824 receive_callback_(received_state, message.timestamp);
828 if (config_.direction == IPCDirection::CONSUMER) {
829 std::lock_guard<std::mutex> sync_lock(sde_sync_mutex_);
830 pending_noise_data_ = received_state;
831 noise_data_available_ =
true;
832 sde_sync_cv_.notify_one();
840 void wait_for_noise_data() {
841 std::unique_lock<std::mutex> lock(sde_sync_mutex_);
843 if (config_.sync_mode == IPCSyncMode::BLOCKING) {
844 sde_sync_cv_.wait(lock, [
this] {
return noise_data_available_.load(); });
845 }
else if (config_.sync_mode == IPCSyncMode::TIMEOUT) {
846 sde_sync_cv_.wait_for(lock, config_.timeout, [
this] { return noise_data_available_.load(); });
849 noise_data_available_ =
false;