DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
sde_synchronization.hpp
1#pragma once
2
3#include "interprocess_decorator.hpp"
4#include <functional>
5#include <memory>
6#include <chrono>
7#include <random>
8
9namespace diffeq::core::composable {
10
14enum class SDESyncMode {
15 IMMEDIATE, // Noise data required immediately
16 BUFFERED, // Buffer noise data for smooth delivery
17 INTERPOLATED, // Interpolate between noise samples
18 GENERATED // Generate noise locally with synchronized seed
19};
20
24enum class NoiseProcessType {
25 WIENER, // Standard Wiener process (Brownian motion)
26 COLORED_NOISE, // Colored noise with correlation
27 JUMP_PROCESS, // Jump diffusion process
28 LEVY_PROCESS, // Lévy process
29 CUSTOM // Custom noise process
30};
31
36 SDESyncMode sync_mode{SDESyncMode::BUFFERED};
37 NoiseProcessType noise_type{NoiseProcessType::WIENER};
38
39 // Timing parameters
40 std::chrono::microseconds max_noise_delay{1000}; // 1ms
41 std::chrono::microseconds noise_buffer_time{5000}; // 5ms
42 std::chrono::microseconds sync_timeout{10000}; // 10ms
43
44 // Noise generation parameters
45 uint64_t random_seed{12345};
46 double noise_intensity{1.0};
47 size_t noise_dimensions{1};
48
49 // Buffering and interpolation
50 size_t buffer_size{1000};
51 bool enable_interpolation{true};
52 double interpolation_tolerance{1e-6};
53
54 // Network synchronization
55 bool enable_time_sync{true};
56 std::chrono::microseconds time_sync_interval{1000}; // 1ms
57
61 void validate() const {
62 if (max_noise_delay <= std::chrono::microseconds{0}) {
63 throw std::invalid_argument("max_noise_delay must be positive");
64 }
65
66 if (noise_buffer_time <= std::chrono::microseconds{0}) {
67 throw std::invalid_argument("noise_buffer_time must be positive");
68 }
69
70 if (sync_timeout <= std::chrono::microseconds{0}) {
71 throw std::invalid_argument("sync_timeout must be positive");
72 }
73
74 if (noise_dimensions == 0) {
75 throw std::invalid_argument("noise_dimensions must be positive");
76 }
77
78 if (buffer_size == 0) {
79 throw std::invalid_argument("buffer_size must be positive");
80 }
81
82 if (interpolation_tolerance <= 0) {
83 throw std::invalid_argument("interpolation_tolerance must be positive");
84 }
85 }
86};
87
91template<typename T>
92struct NoiseData {
93 T timestamp;
94 std::vector<double> increments;
95 NoiseProcessType type;
96 uint64_t sequence_number{0};
97
98 NoiseData(T time, std::vector<double> inc, NoiseProcessType t)
99 : timestamp(time), increments(std::move(inc)), type(t) {}
100};
101
105template<typename T>
107public:
108 virtual ~NoiseProcessGenerator() = default;
109
110 virtual NoiseData<T> generate_increment(T current_time, T dt) = 0;
111 virtual void reset_seed(uint64_t seed) = 0;
112 virtual std::string get_process_name() const = 0;
113};
114
118template<typename T>
120private:
121 std::mt19937_64 rng_;
122 std::normal_distribution<double> normal_dist_;
123 size_t dimensions_;
124 double intensity_;
125
126public:
127 explicit WienerProcessGenerator(size_t dimensions, double intensity = 1.0, uint64_t seed = 12345)
128 : rng_(seed), normal_dist_(0.0, 1.0), dimensions_(dimensions), intensity_(intensity) {}
129
130 NoiseData<T> generate_increment(T current_time, T dt) override {
131 std::vector<double> increments;
132 increments.reserve(dimensions_);
133
134 // Generate independent Gaussian increments
135 double sqrt_dt = std::sqrt(static_cast<double>(dt));
136 for (size_t i = 0; i < dimensions_; ++i) {
137 increments.push_back(intensity_ * normal_dist_(rng_) * sqrt_dt);
138 }
139
140 return NoiseData<T>(current_time, std::move(increments), NoiseProcessType::WIENER);
141 }
142
143 void reset_seed(uint64_t seed) override {
144 rng_.seed(seed);
145 }
146
147 std::string get_process_name() const override {
148 return "Wiener Process (Brownian Motion)";
149 }
150};
151
162template<system_state S, can_be_time T = double>
164private:
165 SDESyncConfig config_;
166 std::unique_ptr<NoiseProcessGenerator<T>> local_generator_;
167 std::vector<NoiseData<T>> noise_buffer_;
168 std::mutex noise_mutex_;
169 std::condition_variable noise_cv_;
170 std::atomic<bool> noise_available_{false};
171
172 // Time synchronization
173 std::chrono::steady_clock::time_point reference_time_;
174 std::atomic<T> synchronized_time_{};
175
176 // Statistics
177 size_t noise_requests_{0};
178 size_t noise_timeouts_{0};
179 size_t interpolations_{0};
180
181public:
186 explicit SDESynchronizer(SDESyncConfig config = {})
187 : config_(std::move(config)), reference_time_(std::chrono::steady_clock::now()) {
188
189 config_.validate();
190 initialize_local_generator();
191 }
192
200 NoiseData<T> get_noise_increment(T current_time, T dt) {
201 noise_requests_++;
202
203 switch (config_.sync_mode) {
204 case SDESyncMode::IMMEDIATE:
205 return get_immediate_noise(current_time, dt);
206 case SDESyncMode::BUFFERED:
207 return get_buffered_noise(current_time, dt);
208 case SDESyncMode::INTERPOLATED:
209 return get_interpolated_noise(current_time, dt);
210 case SDESyncMode::GENERATED:
211 return get_generated_noise(current_time, dt);
212 default:
213 throw std::runtime_error("Unknown SDE synchronization mode");
214 }
215 }
216
221 void submit_noise_data(const NoiseData<T>& noise_data) {
222 std::lock_guard<std::mutex> lock(noise_mutex_);
223
224 // Insert in chronological order
225 auto it = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), noise_data,
226 [](const NoiseData<T>& a, const NoiseData<T>& b) {
227 return a.timestamp < b.timestamp;
228 });
229
230 noise_buffer_.insert(it, noise_data);
231
232 // Limit buffer size
233 if (noise_buffer_.size() > config_.buffer_size) {
234 noise_buffer_.erase(noise_buffer_.begin());
235 }
236
237 noise_available_ = true;
238 noise_cv_.notify_all();
239 }
240
245 template<typename IPCDecorator>
246 void configure_with_ipc(IPCDecorator& ipc_decorator) {
247 // Set up callback to receive noise data
248 ipc_decorator.set_receive_callback([this](const S& state, T time) {
249 // Deserialize noise data from state
250 // This is simplified - real implementation would need proper serialization
251 if (state.size() >= config_.noise_dimensions) {
252 std::vector<double> increments(state.begin(), state.begin() + config_.noise_dimensions);
253 NoiseData<T> noise_data(time, std::move(increments), config_.noise_type);
254 submit_noise_data(noise_data);
255 }
256 });
257 }
258
266 template<typename ProducerIntegrator, typename ConsumerIntegrator>
267 static std::pair<std::unique_ptr<AbstractIntegrator<S, T>>, std::unique_ptr<AbstractIntegrator<S, T>>>
268 create_synchronized_pair(std::unique_ptr<ProducerIntegrator> producer_integrator,
269 std::unique_ptr<ConsumerIntegrator> consumer_integrator,
270 InterprocessConfig ipc_config,
271 SDESyncConfig sync_config = {}) {
272
273 // Configure producer (noise generator)
274 InterprocessConfig producer_config = ipc_config;
275 producer_config.direction = IPCDirection::PRODUCER;
276
277 auto producer = make_builder(std::move(producer_integrator))
278 .with_interprocess(producer_config)
279 .build();
280
281 // Configure consumer (noise receiver)
282 InterprocessConfig consumer_config = ipc_config;
283 consumer_config.direction = IPCDirection::CONSUMER;
284 consumer_config.sync_mode = IPCSyncMode::BLOCKING; // SDE needs synchronous noise
285
286 auto consumer = make_builder(std::move(consumer_integrator))
287 .with_interprocess(consumer_config)
288 .build();
289
290 return {std::move(producer), std::move(consumer)};
291 }
292
296 struct SyncStats {
297 size_t noise_requests;
298 size_t noise_timeouts;
299 size_t interpolations;
300 double timeout_rate() const {
301 return noise_requests > 0 ? static_cast<double>(noise_timeouts) / noise_requests : 0.0;
302 }
303 };
304
305 SyncStats get_statistics() const {
306 return {noise_requests_, noise_timeouts_, interpolations_};
307 }
308
313 noise_requests_ = 0;
314 noise_timeouts_ = 0;
315 interpolations_ = 0;
316 }
317
318private:
322 void initialize_local_generator() {
323 switch (config_.noise_type) {
324 case NoiseProcessType::WIENER:
325 local_generator_ = std::make_unique<WienerProcessGenerator<T>>(
326 config_.noise_dimensions, config_.noise_intensity, config_.random_seed);
327 break;
328 default:
329 local_generator_ = std::make_unique<WienerProcessGenerator<T>>(
330 config_.noise_dimensions, config_.noise_intensity, config_.random_seed);
331 break;
332 }
333 }
334
338 NoiseData<T> get_immediate_noise(T current_time, T dt) {
339 std::unique_lock<std::mutex> lock(noise_mutex_);
340
341 if (noise_cv_.wait_for(lock, config_.sync_timeout, [this] { return noise_available_.load(); })) {
342 // Find noise data at or near current time
343 auto it = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), current_time,
344 [](const NoiseData<T>& noise, T time) {
345 return noise.timestamp < time;
346 });
347
348 if (it != noise_buffer_.end()) {
349 NoiseData<T> result = *it;
350 noise_buffer_.erase(it);
351 noise_available_ = !noise_buffer_.empty();
352 return result;
353 }
354 }
355
356 // Timeout - generate locally or throw
357 noise_timeouts_++;
358 return local_generator_->generate_increment(current_time, dt);
359 }
360
364 NoiseData<T> get_buffered_noise(T current_time, T dt) {
365 std::lock_guard<std::mutex> lock(noise_mutex_);
366
367 // Look for buffered noise near current time
368 for (auto it = noise_buffer_.begin(); it != noise_buffer_.end(); ++it) {
369 if (std::abs(it->timestamp - current_time) < static_cast<T>(config_.max_noise_delay.count()) / 1e6) {
370 NoiseData<T> result = *it;
371 noise_buffer_.erase(it);
372 return result;
373 }
374 }
375
376 // No suitable buffered noise - generate locally
377 return local_generator_->generate_increment(current_time, dt);
378 }
379
383 NoiseData<T> get_interpolated_noise(T current_time, T dt) {
384 std::lock_guard<std::mutex> lock(noise_mutex_);
385
386 if (noise_buffer_.size() < 2) {
387 return local_generator_->generate_increment(current_time, dt);
388 }
389
390 // Find surrounding noise data points
391 auto upper = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), current_time,
392 [](const NoiseData<T>& noise, T time) {
393 return noise.timestamp < time;
394 });
395
396 if (upper == noise_buffer_.begin() || upper == noise_buffer_.end()) {
397 return local_generator_->generate_increment(current_time, dt);
398 }
399
400 auto lower = std::prev(upper);
401
402 // Linear interpolation
403 T alpha = (current_time - lower->timestamp) / (upper->timestamp - lower->timestamp);
404
405 std::vector<double> interpolated_increments;
406 interpolated_increments.reserve(config_.noise_dimensions);
407
408 for (size_t i = 0; i < config_.noise_dimensions && i < lower->increments.size() && i < upper->increments.size(); ++i) {
409 double interpolated = (1 - alpha) * lower->increments[i] + alpha * upper->increments[i];
410 interpolated_increments.push_back(interpolated);
411 }
412
413 interpolations_++;
414 return NoiseData<T>(current_time, std::move(interpolated_increments), config_.noise_type);
415 }
416
420 NoiseData<T> get_generated_noise(T current_time, T dt) {
421 return local_generator_->generate_increment(current_time, dt);
422 }
423};
424
425// ============================================================================
426// CONVENIENCE FUNCTIONS FOR SDE SYNCHRONIZATION
427// ============================================================================
428
436template<system_state S, can_be_time T = double>
437auto create_wiener_process_generator(size_t dimensions, double intensity = 1.0, const std::string& channel_name = "wiener_process") {
438 // This would be a specialized integrator that generates Wiener process increments
439 // and sends them via IPC. Implementation would depend on specific requirements.
440
441 InterprocessConfig ipc_config;
442 ipc_config.direction = IPCDirection::PRODUCER;
443 ipc_config.channel_name = channel_name;
444
445 SDESyncConfig sync_config;
446 sync_config.noise_type = NoiseProcessType::WIENER;
447 sync_config.noise_dimensions = dimensions;
448 sync_config.noise_intensity = intensity;
449
450 // Note: This is a placeholder - would need actual Wiener process integrator implementation
451 return std::make_pair(ipc_config, sync_config);
452}
453
460template<typename SDEIntegrator>
461auto configure_for_external_noise(std::unique_ptr<SDEIntegrator> integrator, const std::string& channel_name = "wiener_process") {
462 InterprocessConfig ipc_config;
463 ipc_config.direction = IPCDirection::CONSUMER;
464 ipc_config.channel_name = channel_name;
465 ipc_config.sync_mode = IPCSyncMode::BLOCKING; // SDE requires synchronous noise
466
467 return make_builder(std::move(integrator))
468 .with_interprocess(ipc_config)
469 .build();
470}
471
472} // namespace diffeq::core::composable
473
SDE synchronization helper for coordinating noise processes.
void configure_with_ipc(IPCDecorator &ipc_decorator)
Configure for use with InterprocessDecorator.
static std::pair< std::unique_ptr< AbstractIntegrator< S, T > >, std::unique_ptr< AbstractIntegrator< S, T > > > create_synchronized_pair(std::unique_ptr< ProducerIntegrator > producer_integrator, std::unique_ptr< ConsumerIntegrator > consumer_integrator, InterprocessConfig ipc_config, SDESyncConfig sync_config={})
Create synchronized SDE integrator pair.
NoiseData< T > get_noise_increment(T current_time, T dt)
Get noise increment for SDE integration.
void submit_noise_data(const NoiseData< T > &noise_data)
Submit noise data from external process.
SDESynchronizer(SDESyncConfig config={})
Construct SDE synchronizer.
Configuration for interprocess communication.
Configuration for SDE synchronization.
void validate() const
Validate configuration.