121 std::mt19937_64 rng_;
122 std::normal_distribution<double> normal_dist_;
128 : rng_(seed), normal_dist_(0.0, 1.0), dimensions_(dimensions), intensity_(intensity) {}
130 NoiseData<T> generate_increment(T current_time, T dt)
override {
131 std::vector<double> increments;
132 increments.reserve(dimensions_);
135 double sqrt_dt = std::sqrt(
static_cast<double>(dt));
136 for (
size_t i = 0; i < dimensions_; ++i) {
137 increments.push_back(intensity_ * normal_dist_(rng_) * sqrt_dt);
140 return NoiseData<T>(current_time, std::move(increments), NoiseProcessType::WIENER);
143 void reset_seed(uint64_t seed)
override {
147 std::string get_process_name()
const override {
148 return "Wiener Process (Brownian Motion)";
166 std::unique_ptr<NoiseProcessGenerator<T>> local_generator_;
167 std::vector<NoiseData<T>> noise_buffer_;
168 std::mutex noise_mutex_;
169 std::condition_variable noise_cv_;
170 std::atomic<bool> noise_available_{
false};
173 std::chrono::steady_clock::time_point reference_time_;
174 std::atomic<T> synchronized_time_{};
177 size_t noise_requests_{0};
178 size_t noise_timeouts_{0};
179 size_t interpolations_{0};
187 : config_(std::move(config)), reference_time_(std::chrono::steady_clock::now()) {
190 initialize_local_generator();
203 switch (config_.sync_mode) {
204 case SDESyncMode::IMMEDIATE:
205 return get_immediate_noise(current_time, dt);
206 case SDESyncMode::BUFFERED:
207 return get_buffered_noise(current_time, dt);
208 case SDESyncMode::INTERPOLATED:
209 return get_interpolated_noise(current_time, dt);
210 case SDESyncMode::GENERATED:
211 return get_generated_noise(current_time, dt);
213 throw std::runtime_error(
"Unknown SDE synchronization mode");
222 std::lock_guard<std::mutex> lock(noise_mutex_);
225 auto it = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), noise_data,
227 return a.timestamp < b.timestamp;
230 noise_buffer_.insert(it, noise_data);
233 if (noise_buffer_.size() > config_.buffer_size) {
234 noise_buffer_.erase(noise_buffer_.begin());
237 noise_available_ =
true;
238 noise_cv_.notify_all();
245 template<
typename IPCDecorator>
248 ipc_decorator.set_receive_callback([
this](
const S& state, T time) {
251 if (state.size() >= config_.noise_dimensions) {
252 std::vector<double> increments(state.begin(), state.begin() + config_.noise_dimensions);
253 NoiseData<T> noise_data(time, std::move(increments), config_.noise_type);
254 submit_noise_data(noise_data);
266 template<
typename ProducerIntegrator,
typename ConsumerIntegrator>
267 static std::pair<std::unique_ptr<AbstractIntegrator<S, T>>, std::unique_ptr<AbstractIntegrator<S, T>>>
269 std::unique_ptr<ConsumerIntegrator> consumer_integrator,
275 producer_config.direction = IPCDirection::PRODUCER;
277 auto producer = make_builder(std::move(producer_integrator))
278 .with_interprocess(producer_config)
283 consumer_config.direction = IPCDirection::CONSUMER;
284 consumer_config.sync_mode = IPCSyncMode::BLOCKING;
286 auto consumer = make_builder(std::move(consumer_integrator))
287 .with_interprocess(consumer_config)
290 return {std::move(producer), std::move(consumer)};
297 size_t noise_requests;
298 size_t noise_timeouts;
299 size_t interpolations;
300 double timeout_rate()
const {
301 return noise_requests > 0 ?
static_cast<double>(noise_timeouts) / noise_requests : 0.0;
306 return {noise_requests_, noise_timeouts_, interpolations_};
322 void initialize_local_generator() {
323 switch (config_.noise_type) {
324 case NoiseProcessType::WIENER:
325 local_generator_ = std::make_unique<WienerProcessGenerator<T>>(
326 config_.noise_dimensions, config_.noise_intensity, config_.random_seed);
329 local_generator_ = std::make_unique<WienerProcessGenerator<T>>(
330 config_.noise_dimensions, config_.noise_intensity, config_.random_seed);
338 NoiseData<T> get_immediate_noise(T current_time, T dt) {
339 std::unique_lock<std::mutex> lock(noise_mutex_);
341 if (noise_cv_.wait_for(lock, config_.sync_timeout, [
this] { return noise_available_.load(); })) {
343 auto it = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), current_time,
344 [](
const NoiseData<T>& noise, T time) {
345 return noise.timestamp < time;
348 if (it != noise_buffer_.end()) {
349 NoiseData<T> result = *it;
350 noise_buffer_.erase(it);
351 noise_available_ = !noise_buffer_.empty();
358 return local_generator_->generate_increment(current_time, dt);
364 NoiseData<T> get_buffered_noise(T current_time, T dt) {
365 std::lock_guard<std::mutex> lock(noise_mutex_);
368 for (
auto it = noise_buffer_.begin(); it != noise_buffer_.end(); ++it) {
369 if (std::abs(it->timestamp - current_time) <
static_cast<T
>(config_.max_noise_delay.count()) / 1e6) {
370 NoiseData<T> result = *it;
371 noise_buffer_.erase(it);
377 return local_generator_->generate_increment(current_time, dt);
383 NoiseData<T> get_interpolated_noise(T current_time, T dt) {
384 std::lock_guard<std::mutex> lock(noise_mutex_);
386 if (noise_buffer_.size() < 2) {
387 return local_generator_->generate_increment(current_time, dt);
391 auto upper = std::lower_bound(noise_buffer_.begin(), noise_buffer_.end(), current_time,
392 [](
const NoiseData<T>& noise, T time) {
393 return noise.timestamp < time;
396 if (upper == noise_buffer_.begin() || upper == noise_buffer_.end()) {
397 return local_generator_->generate_increment(current_time, dt);
400 auto lower = std::prev(upper);
403 T alpha = (current_time - lower->timestamp) / (upper->timestamp - lower->timestamp);
405 std::vector<double> interpolated_increments;
406 interpolated_increments.reserve(config_.noise_dimensions);
408 for (
size_t i = 0; i < config_.noise_dimensions && i < lower->increments.size() && i < upper->increments.size(); ++i) {
409 double interpolated = (1 - alpha) * lower->increments[i] + alpha * upper->increments[i];
410 interpolated_increments.push_back(interpolated);
414 return NoiseData<T>(current_time, std::move(interpolated_increments), config_.noise_type);
420 NoiseData<T> get_generated_noise(T current_time, T dt) {
421 return local_generator_->generate_increment(current_time, dt);