DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
interprocess_decorator.hpp
1#pragma once
2
3#include "integrator_decorator.hpp"
4#include <memory>
5#include <string>
6#include <vector>
7#include <map>
8#include <atomic>
9#include <thread>
10#include <mutex>
11#include <condition_variable>
12#include <chrono>
13#include <functional>
14#include <fstream>
15#include <queue>
16#include <cstring>
17
18#ifdef _WIN32
19#include <windows.h>
20#include <io.h>
21#include <fcntl.h>
22#else
23#include <sys/mman.h>
24#include <sys/stat.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/types.h>
28#include <errno.h>
29#endif
30
31namespace diffeq::core::composable {
32
36enum class IPCMethod {
37 SHARED_MEMORY, // Shared memory (fastest)
38 NAMED_PIPES, // Named pipes (cross-platform)
39 MEMORY_MAPPED_FILE, // Memory-mapped files
40 TCP_SOCKET, // TCP sockets (network-capable)
41 UDP_SOCKET // UDP sockets (low-latency)
42};
43
47enum class IPCDirection {
48 PRODUCER, // This process sends data
49 CONSUMER, // This process receives data
50 BIDIRECTIONAL // Both send and receive
51};
52
56enum class IPCSyncMode {
57 BLOCKING, // Block until data available
58 NON_BLOCKING, // Return immediately if no data
59 TIMEOUT, // Block with timeout
60 POLLING // Actively poll for data
61};
62
67 IPCMethod method{IPCMethod::SHARED_MEMORY};
68 IPCDirection direction{IPCDirection::PRODUCER};
69 IPCSyncMode sync_mode{IPCSyncMode::NON_BLOCKING};
70
71 std::string channel_name{"diffeq_channel"};
72 size_t buffer_size{1024 * 1024}; // 1MB default
73 size_t max_message_size{64 * 1024}; // 64KB default
74
75 std::chrono::milliseconds timeout{100};
76 std::chrono::microseconds polling_interval{100}; // 100μs
77
78 // Reliability settings
79 bool enable_acknowledgments{false};
80 bool enable_sequence_numbers{true};
81 bool enable_error_correction{false};
82 size_t max_retries{3};
83
84 // Performance settings
85 bool enable_compression{false};
86 bool enable_batching{false};
87 size_t batch_size{10};
88 std::chrono::milliseconds batch_timeout{10};
89
90 // Network settings (for TCP/UDP)
91 std::string host{"127.0.0.1"};
92 uint16_t port{8080};
93
98 void validate() const {
99 if (channel_name.empty()) {
100 throw std::invalid_argument("channel_name cannot be empty");
101 }
102
103 if (buffer_size == 0) {
104 throw std::invalid_argument("buffer_size must be positive");
105 }
106
107 if (max_message_size > buffer_size) {
108 throw std::invalid_argument("max_message_size cannot exceed buffer_size");
109 }
110
111 if (timeout <= std::chrono::milliseconds{0}) {
112 throw std::invalid_argument("timeout must be positive");
113 }
114
115 if (polling_interval <= std::chrono::microseconds{0}) {
116 throw std::invalid_argument("polling_interval must be positive");
117 }
118
119 if (batch_size == 0) {
120 throw std::invalid_argument("batch_size must be positive");
121 }
122
123 if (port == 0) {
124 throw std::invalid_argument("port must be positive");
125 }
126 }
127};
128
132template<typename T>
134 uint32_t sequence_number{0};
135 uint32_t message_size{0};
136 T timestamp{};
137 std::vector<uint8_t> data;
138
139 void serialize_state(const auto& state) {
140 data.resize(sizeof(state));
141 std::memcpy(data.data(), &state, sizeof(state));
142 message_size = data.size();
143 }
144
145 auto deserialize_state() const {
146 // Note: This is a simplified implementation
147 // Real implementation would use proper serialization
148 if (data.size() < sizeof(std::vector<double>)) {
149 throw std::runtime_error("Invalid message size for deserialization");
150 }
151
152 std::vector<double> state;
153 // Simplified - would need proper deserialization
154 return state;
155 }
156};
157
161struct IPCStats {
162 size_t messages_sent{0};
163 size_t messages_received{0};
164 size_t bytes_sent{0};
165 size_t bytes_received{0};
166 size_t send_failures{0};
167 size_t receive_failures{0};
168 size_t acknowledgments_sent{0};
169 size_t acknowledgments_received{0};
170 std::chrono::milliseconds total_send_time{0};
171 std::chrono::milliseconds total_receive_time{0};
172
173 double average_send_time_ms() const {
174 return messages_sent > 0 ?
175 static_cast<double>(total_send_time.count()) / messages_sent : 0.0;
176 }
177
178 double average_receive_time_ms() const {
179 return messages_received > 0 ?
180 static_cast<double>(total_receive_time.count()) / messages_received : 0.0;
181 }
182
183 double send_success_rate() const {
184 size_t total = messages_sent + send_failures;
185 return total > 0 ? static_cast<double>(messages_sent) / total : 0.0;
186 }
187
188 double receive_success_rate() const {
189 size_t total = messages_received + receive_failures;
190 return total > 0 ? static_cast<double>(messages_received) / total : 0.0;
191 }
192};
193
197template<typename T>
199public:
200 virtual ~IPCChannel() = default;
201
202 virtual bool initialize() = 0;
203 virtual void cleanup() = 0;
204
205 virtual bool send_message(const IPCMessage<T>& message) = 0;
206 virtual bool receive_message(IPCMessage<T>& message) = 0;
207
208 virtual bool is_connected() const = 0;
209 virtual std::string get_status() const = 0;
210};
211
215template<typename T>
217private:
218 std::string name_;
219 size_t buffer_size_;
220 void* memory_ptr_{nullptr};
221
222#ifdef _WIN32
223 HANDLE file_mapping_{nullptr};
224#else
225 int fd_{-1};
226#endif
227
228 std::mutex* mutex_{nullptr};
229 std::condition_variable* condition_{nullptr};
230 bool initialized_{false};
231
232public:
233 explicit SharedMemoryChannel(const std::string& name, size_t buffer_size)
234 : name_(name), buffer_size_(buffer_size) {}
235
237 cleanup();
238 }
239
240 bool initialize() override {
241 if (initialized_) return true;
242
243#ifdef _WIN32
244 file_mapping_ = CreateFileMapping(
245 INVALID_HANDLE_VALUE,
246 nullptr,
247 PAGE_READWRITE,
248 0,
249 buffer_size_,
250 name_.c_str()
251 );
252
253 if (file_mapping_ == nullptr) {
254 return false;
255 }
256
257 memory_ptr_ = MapViewOfFile(
258 file_mapping_,
259 FILE_MAP_ALL_ACCESS,
260 0,
261 0,
262 buffer_size_
263 );
264
265 if (memory_ptr_ == nullptr) {
266 CloseHandle(file_mapping_);
267 return false;
268 }
269#else
270 fd_ = shm_open(name_.c_str(), O_CREAT | O_RDWR, 0666);
271 if (fd_ == -1) {
272 return false;
273 }
274
275 if (ftruncate(fd_, buffer_size_) == -1) {
276 close(fd_);
277 return false;
278 }
279
280 memory_ptr_ = mmap(nullptr, buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
281 if (memory_ptr_ == MAP_FAILED) {
282 close(fd_);
283 return false;
284 }
285#endif
286
287 // Initialize synchronization primitives in shared memory
288 mutex_ = new(memory_ptr_) std::mutex();
289 condition_ = new(static_cast<char*>(memory_ptr_) + sizeof(std::mutex)) std::condition_variable();
290
291 initialized_ = true;
292 return true;
293 }
294
295 void cleanup() override {
296 if (!initialized_) return;
297
298#ifdef _WIN32
299 if (memory_ptr_) {
300 UnmapViewOfFile(memory_ptr_);
301 memory_ptr_ = nullptr;
302 }
303 if (file_mapping_) {
304 CloseHandle(file_mapping_);
305 file_mapping_ = nullptr;
306 }
307#else
308 if (memory_ptr_ != MAP_FAILED) {
309 munmap(memory_ptr_, buffer_size_);
310 memory_ptr_ = nullptr;
311 }
312 if (fd_ != -1) {
313 close(fd_);
314 shm_unlink(name_.c_str());
315 fd_ = -1;
316 }
317#endif
318
319 initialized_ = false;
320 }
321
322 bool send_message(const IPCMessage<T>& message) override {
323 if (!initialized_) return false;
324
325 std::lock_guard<std::mutex> lock(*mutex_);
326
327 // Write message to shared memory
328 char* data_ptr = static_cast<char*>(memory_ptr_) + sizeof(std::mutex) + sizeof(std::condition_variable);
329 size_t message_size = sizeof(IPCMessage<T>) + message.data.size();
330
331 if (message_size > buffer_size_ - sizeof(std::mutex) - sizeof(std::condition_variable)) {
332 return false;
333 }
334
335 std::memcpy(data_ptr, &message, sizeof(IPCMessage<T>));
336 std::memcpy(data_ptr + sizeof(IPCMessage<T>), message.data.data(), message.data.size());
337
338 condition_->notify_one();
339 return true;
340 }
341
342 bool receive_message(IPCMessage<T>& message) override {
343 if (!initialized_) return false;
344
345 std::unique_lock<std::mutex> lock(*mutex_);
346
347 // For simplicity, we'll use a timeout-based approach
348 if (condition_->wait_for(lock, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
349 return false;
350 }
351
352 // Read message from shared memory
353 char* data_ptr = static_cast<char*>(memory_ptr_) + sizeof(std::mutex) + sizeof(std::condition_variable);
354
355 std::memcpy(&message, data_ptr, sizeof(IPCMessage<T>));
356 message.data.resize(message.message_size);
357 std::memcpy(message.data.data(), data_ptr + sizeof(IPCMessage<T>), message.message_size);
358
359 return true;
360 }
361
362 bool is_connected() const override {
363 return initialized_;
364 }
365
366 std::string get_status() const override {
367 return initialized_ ? "Connected" : "Disconnected";
368 }
369};
370
374template<typename T>
375class NamedPipeChannel : public IPCChannel<T> {
376private:
377 std::string name_;
378 std::string pipe_path_;
379
380#ifdef _WIN32
381 HANDLE pipe_handle_{INVALID_HANDLE_VALUE};
382#else
383 int pipe_fd_{-1};
384#endif
385
386 bool initialized_{false};
387 bool is_server_{false};
388
389public:
390 explicit NamedPipeChannel(const std::string& name, bool is_server = false)
391 : name_(name), is_server_(is_server) {
392
393#ifdef _WIN32
394 pipe_path_ = "\\\\.\\pipe\\" + name_;
395#else
396 pipe_path_ = "/tmp/" + name_;
397#endif
398 }
399
401 cleanup();
402 }
403
404 bool initialize() override {
405 if (initialized_) return true;
406
407#ifdef _WIN32
408 if (is_server_) {
409 pipe_handle_ = CreateNamedPipe(
410 pipe_path_.c_str(),
411 PIPE_ACCESS_DUPLEX,
412 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
413 1,
414 4096,
415 4096,
416 0,
417 nullptr
418 );
419
420 if (pipe_handle_ == INVALID_HANDLE_VALUE) {
421 return false;
422 }
423
424 if (!ConnectNamedPipe(pipe_handle_, nullptr)) {
425 if (GetLastError() != ERROR_PIPE_CONNECTED) {
426 CloseHandle(pipe_handle_);
427 return false;
428 }
429 }
430 } else {
431 pipe_handle_ = CreateFile(
432 pipe_path_.c_str(),
433 GENERIC_READ | GENERIC_WRITE,
434 0,
435 nullptr,
436 OPEN_EXISTING,
437 0,
438 nullptr
439 );
440
441 if (pipe_handle_ == INVALID_HANDLE_VALUE) {
442 return false;
443 }
444 }
445#else
446 if (is_server_) {
447 if (mkfifo(pipe_path_.c_str(), 0666) == -1 && errno != EEXIST) {
448 return false;
449 }
450 }
451
452 pipe_fd_ = open(pipe_path_.c_str(), O_RDWR | O_NONBLOCK);
453 if (pipe_fd_ == -1) {
454 return false;
455 }
456#endif
457
458 initialized_ = true;
459 return true;
460 }
461
462 void cleanup() override {
463 if (!initialized_) return;
464
465#ifdef _WIN32
466 if (pipe_handle_ != INVALID_HANDLE_VALUE) {
467 CloseHandle(pipe_handle_);
468 pipe_handle_ = INVALID_HANDLE_VALUE;
469 }
470#else
471 if (pipe_fd_ != -1) {
472 close(pipe_fd_);
473 pipe_fd_ = -1;
474 }
475 if (is_server_) {
476 unlink(pipe_path_.c_str());
477 }
478#endif
479
480 initialized_ = false;
481 }
482
483 bool send_message(const IPCMessage<T>& message) override {
484 if (!initialized_) return false;
485
486 // Serialize message
487 std::vector<uint8_t> serialized_data;
488 serialized_data.resize(sizeof(IPCMessage<T>) + message.data.size());
489
490 std::memcpy(serialized_data.data(), &message, sizeof(IPCMessage<T>));
491 std::memcpy(serialized_data.data() + sizeof(IPCMessage<T>), message.data.data(), message.data.size());
492
493#ifdef _WIN32
494 DWORD bytes_written;
495 return WriteFile(pipe_handle_, serialized_data.data(), serialized_data.size(), &bytes_written, nullptr) &&
496 bytes_written == serialized_data.size();
497#else
498 ssize_t bytes_written = write(pipe_fd_, serialized_data.data(), serialized_data.size());
499 return bytes_written == static_cast<ssize_t>(serialized_data.size());
500#endif
501 }
502
503 bool receive_message(IPCMessage<T>& message) override {
504 if (!initialized_) return false;
505
506 // First, read the message header
507 IPCMessage<T> header;
508
509#ifdef _WIN32
510 DWORD bytes_read;
511 if (!ReadFile(pipe_handle_, &header, sizeof(IPCMessage<T>), &bytes_read, nullptr) ||
512 bytes_read != sizeof(IPCMessage<T>)) {
513 return false;
514 }
515#else
516 ssize_t bytes_read = read(pipe_fd_, &header, sizeof(IPCMessage<T>));
517 if (bytes_read != sizeof(IPCMessage<T>)) {
518 return false;
519 }
520#endif
521
522 // Then read the data
523 message = header;
524 message.data.resize(header.message_size);
525
526#ifdef _WIN32
527 if (!ReadFile(pipe_handle_, message.data.data(), header.message_size, &bytes_read, nullptr) ||
528 bytes_read != header.message_size) {
529 return false;
530 }
531#else
532 bytes_read = read(pipe_fd_, message.data.data(), header.message_size);
533 if (bytes_read != static_cast<ssize_t>(header.message_size)) {
534 return false;
535 }
536#endif
537
538 return true;
539 }
540
541 bool is_connected() const override {
542 return initialized_;
543 }
544
545 std::string get_status() const override {
546 return initialized_ ? "Connected" : "Disconnected";
547 }
548};
549
566template<system_state S>
568private:
569 InterprocessConfig config_;
570 std::unique_ptr<IPCChannel<typename IntegratorDecorator<S>::time_type>> channel_;
571 std::atomic<uint32_t> sequence_number_{0};
572 IPCStats stats_;
573
574 // Threading for async operations
575 std::thread communication_thread_;
576 std::atomic<bool> running_{false};
577 std::mutex message_queue_mutex_;
578 std::condition_variable message_queue_cv_;
579 std::queue<IPCMessage<typename IntegratorDecorator<S>::time_type>> outgoing_messages_;
580 std::queue<IPCMessage<typename IntegratorDecorator<S>::time_type>> incoming_messages_;
581
582 // Callbacks for received data
583 std::function<void(const S&, typename IntegratorDecorator<S>::time_type)> receive_callback_;
584
585 // SDE synchronization
586 std::mutex sde_sync_mutex_;
587 std::condition_variable sde_sync_cv_;
588 std::atomic<bool> noise_data_available_{false};
589 S pending_noise_data_;
590
591public:
598 explicit InterprocessDecorator(std::unique_ptr<AbstractIntegrator<S>> integrator,
600 : IntegratorDecorator<S>(std::move(integrator)), config_(std::move(config)) {
601
602 config_.validate();
603 initialize_channel();
604
605 if (config_.direction == IPCDirection::CONSUMER || config_.direction == IPCDirection::BIDIRECTIONAL) {
606 start_communication_thread();
607 }
608 }
609
614 stop_communication_thread();
615 if (channel_) {
616 channel_->cleanup();
617 }
618 }
619
623 void step(typename IntegratorDecorator<S>::state_type& state, typename IntegratorDecorator<S>::time_type dt) override {
624 // Handle SDE synchronization if needed
625 if (config_.direction == IPCDirection::CONSUMER) {
626 wait_for_noise_data();
627 }
628
629 // Send current state if producer
630 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
631 send_state(state, this->current_time());
632 }
633
634 // Perform integration step
635 this->wrapped_integrator_->step(state, dt);
636
637 // Handle received data if consumer
638 if (config_.direction == IPCDirection::CONSUMER || config_.direction == IPCDirection::BIDIRECTIONAL) {
639 process_incoming_messages(state);
640 }
641 }
642
646 void integrate(typename IntegratorDecorator<S>::state_type& state, typename IntegratorDecorator<S>::time_type dt, typename IntegratorDecorator<S>::time_type end_time) override {
647 // Send initial state
648 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
649 send_state(state, this->current_time());
650 }
651
652 // Integrate with IPC handling
653 this->wrapped_integrator_->integrate(state, dt, end_time);
654
655 // Send final state
656 if (config_.direction == IPCDirection::PRODUCER || config_.direction == IPCDirection::BIDIRECTIONAL) {
657 send_state(state, this->current_time());
658 }
659 }
660
665 void set_receive_callback(std::function<void(const S&, typename IntegratorDecorator<S>::time_type)> callback) {
666 receive_callback_ = std::move(callback);
667 }
668
675 bool send_state(const S& state, typename IntegratorDecorator<S>::time_type time) {
676 if (!channel_ || !channel_->is_connected()) {
677 return false;
678 }
679
680 auto start_time = std::chrono::high_resolution_clock::now();
681
683 message.sequence_number = sequence_number_++;
684 message.timestamp = time;
685 message.serialize_state(state);
686
687 bool success = false;
688 size_t retries = 0;
689
690 while (!success && retries < config_.max_retries) {
691 success = channel_->send_message(message);
692 if (!success) {
693 retries++;
694 std::this_thread::sleep_for(std::chrono::milliseconds(10));
695 }
696 }
697
698 auto end_time = std::chrono::high_resolution_clock::now();
699 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
700
701 if (success) {
702 stats_.messages_sent++;
703 stats_.bytes_sent += message.message_size;
704 stats_.total_send_time += duration;
705 } else {
706 stats_.send_failures++;
707 }
708
709 return success;
710 }
711
715 const IPCStats& get_statistics() const {
716 return stats_;
717 }
718
723 stats_ = IPCStats{};
724 }
725
729 bool is_connected() const {
730 return channel_ && channel_->is_connected();
731 }
732
736 std::string get_status() const {
737 return channel_ ? channel_->get_status() : "Not initialized";
738 }
739
743 InterprocessConfig& config() { return config_; }
744 const InterprocessConfig& config() const { return config_; }
745
746private:
750 void initialize_channel() {
751 switch (config_.method) {
752 case IPCMethod::SHARED_MEMORY:
753 channel_ = std::make_unique<SharedMemoryChannel<typename IntegratorDecorator<S>::time_type>>(config_.channel_name, config_.buffer_size);
754 break;
755 case IPCMethod::NAMED_PIPES:
756 channel_ = std::make_unique<NamedPipeChannel<typename IntegratorDecorator<S>::time_type>>(config_.channel_name,
757 config_.direction == IPCDirection::PRODUCER);
758 break;
759 case IPCMethod::MEMORY_MAPPED_FILE:
760 // TODO: Implement memory-mapped file channel
761 throw std::runtime_error("Memory-mapped file IPC not yet implemented");
762 case IPCMethod::TCP_SOCKET:
763 // TODO: Implement TCP socket channel
764 throw std::runtime_error("TCP socket IPC not yet implemented");
765 case IPCMethod::UDP_SOCKET:
766 // TODO: Implement UDP socket channel
767 throw std::runtime_error("UDP socket IPC not yet implemented");
768 default:
769 throw std::runtime_error("Unknown IPC method");
770 }
771
772 if (!channel_->initialize()) {
773 throw std::runtime_error("Failed to initialize IPC channel");
774 }
775 }
776
780 void start_communication_thread() {
781 running_ = true;
782 communication_thread_ = std::thread([this]() {
783 while (running_) {
784 IPCMessage<typename IntegratorDecorator<S>::time_type> message;
785 if (channel_->receive_message(message)) {
786 std::lock_guard<std::mutex> lock(message_queue_mutex_);
787 incoming_messages_.push(message);
788 message_queue_cv_.notify_one();
789
790 stats_.messages_received++;
791 stats_.bytes_received += message.message_size;
792 }
793
794 std::this_thread::sleep_for(config_.polling_interval);
795 }
796 });
797 }
798
802 void stop_communication_thread() {
803 running_ = false;
804 if (communication_thread_.joinable()) {
805 communication_thread_.join();
806 }
807 }
808
812 void process_incoming_messages(S& state) {
813 std::lock_guard<std::mutex> lock(message_queue_mutex_);
814
815 while (!incoming_messages_.empty()) {
816 auto message = incoming_messages_.front();
817 incoming_messages_.pop();
818
819 // Deserialize state data
820 auto received_state = message.deserialize_state();
821
822 // Call receive callback if set
823 if (receive_callback_) {
824 receive_callback_(received_state, message.timestamp);
825 }
826
827 // For SDE synchronization, signal that noise data is available
828 if (config_.direction == IPCDirection::CONSUMER) {
829 std::lock_guard<std::mutex> sync_lock(sde_sync_mutex_);
830 pending_noise_data_ = received_state;
831 noise_data_available_ = true;
832 sde_sync_cv_.notify_one();
833 }
834 }
835 }
836
840 void wait_for_noise_data() {
841 std::unique_lock<std::mutex> lock(sde_sync_mutex_);
842
843 if (config_.sync_mode == IPCSyncMode::BLOCKING) {
844 sde_sync_cv_.wait(lock, [this] { return noise_data_available_.load(); });
845 } else if (config_.sync_mode == IPCSyncMode::TIMEOUT) {
846 sde_sync_cv_.wait_for(lock, config_.timeout, [this] { return noise_data_available_.load(); });
847 }
848
849 noise_data_available_ = false;
850 }
851};
852
853} // namespace diffeq::core::composable
Base decorator interface for integrator enhancements.
std::string get_status() const
Get channel status.
InterprocessConfig & config()
Access and modify interprocess configuration.
bool send_state(const S &state, typename IntegratorDecorator< S >::time_type time)
Send state data to other processes.
void step(typename IntegratorDecorator< S >::state_type &state, typename IntegratorDecorator< S >::time_type dt) override
Override step to handle IPC during integration.
const IPCStats & get_statistics() const
Get current IPC statistics.
bool is_connected() const
Check if channel is connected.
InterprocessDecorator(std::unique_ptr< AbstractIntegrator< S > > integrator, InterprocessConfig config={})
Construct interprocess decorator.
~InterprocessDecorator()
Destructor ensures proper cleanup.
void set_receive_callback(std::function< void(const S &, typename IntegratorDecorator< S >::time_type)> callback)
Set callback for received data.
void integrate(typename IntegratorDecorator< S >::state_type &state, typename IntegratorDecorator< S >::time_type dt, typename IntegratorDecorator< S >::time_type end_time) override
Override integrate to handle IPC during integration.
Configuration for interprocess communication.
void validate() const
Validate configuration parameters.