191 std::mt19937_64 rng_;
192 std::normal_distribution<double> normal_dist_;
193 std::vector<double> batch_buffer_;
197 : rng_(seed), normal_dist_(0.0, 1.0) {}
203 std::vector<double> result;
204 result.reserve(count);
208 generate_batch_avx2(result, count, intensity);
210 generate_batch_scalar(result, count, intensity);
212#elif defined(__SSE2__)
214 generate_batch_sse2(result, count, intensity);
216 generate_batch_scalar(result, count, intensity);
219 generate_batch_scalar(result, count, intensity);
226 void generate_batch_scalar(std::vector<double>& result,
size_t count,
double intensity) {
227 for (
size_t i = 0; i < count; ++i) {
228 result.push_back(intensity * normal_dist_(rng_));
233 void generate_batch_avx2(std::vector<double>& result,
size_t count,
double intensity) {
234 const size_t simd_count = (count / 8) * 8;
237 for (
size_t i = 0; i < simd_count; i += 8) {
239 alignas(32)
double values[8];
240 for (
int j = 0; j < 8; ++j) {
241 values[j] = intensity * normal_dist_(rng_);
245 __m256d vec = _mm256_load_pd(values);
246 _mm256_store_pd(&values[0], vec);
248 for (
int j = 0; j < 8; ++j) {
249 result.push_back(values[j]);
254 for (
size_t i = simd_count; i < count; ++i) {
255 result.push_back(intensity * normal_dist_(rng_));
261 void generate_batch_sse2(std::vector<double>& result,
size_t count,
double intensity) {
262 const size_t simd_count = (count / 4) * 4;
265 for (
size_t i = 0; i < simd_count; i += 4) {
266 alignas(16)
double values[4];
267 for (
int j = 0; j < 4; ++j) {
268 values[j] = intensity * normal_dist_(rng_);
271 __m128d vec1 = _mm_load_pd(&values[0]);
272 __m128d vec2 = _mm_load_pd(&values[2]);
273 _mm_store_pd(&values[0], vec1);
274 _mm_store_pd(&values[2], vec2);
276 for (
int j = 0; j < 4; ++j) {
277 result.push_back(values[j]);
282 for (
size_t i = simd_count; i < count; ++i) {
283 result.push_back(intensity * normal_dist_(rng_));
308 std::vector<std::thread> worker_threads_;
309 std::vector<std::unique_ptr<SIMDNoiseGenerator>> generators_;
310 std::vector<std::unique_ptr<LockFreeNoiseQueue<T>>> noise_queues_;
313 std::atomic<size_t> total_noise_generated_{0};
314 std::atomic<size_t> total_noise_consumed_{0};
315 std::atomic<size_t> cache_hits_{0};
316 std::atomic<size_t> cache_misses_{0};
319 std::vector<std::vector<double>> precomputed_cache_;
320 std::atomic<size_t> cache_index_{0};
321 std::mutex cache_mutex_;
324 std::atomic<bool> running_{
false};
325 std::condition_variable worker_cv_;
326 std::mutex worker_mutex_;
333 : config_(std::move(config)) {
347 total_noise_consumed_++;
349 switch (config_.threading_mode) {
350 case SDEThreadingMode::VECTORIZED:
351 return get_vectorized_noise(current_time, dt, dimensions);
352 case SDEThreadingMode::LOCK_FREE:
353 return get_lockfree_noise(current_time, dt, dimensions);
354 case SDEThreadingMode::MULTI_THREAD:
355 return get_multithreaded_noise(current_time, dt, dimensions);
357 return get_cached_noise(current_time, dt, dimensions);
365 size_t dimensions,
size_t num_simulations) {
366 std::vector<NoiseData<T>> results;
367 results.reserve(num_simulations);
369 if (config_.enable_simd && num_simulations >= config_.batch_size) {
370 return generate_vectorized_batch(current_time, dt, dimensions, num_simulations);
372 return generate_standard_batch(current_time, dt, dimensions, num_simulations);
379 template<
typename Integrator,
typename InitialCondition>
381 std::function<S()> initial_condition_generator,
382 T dt, T end_time,
size_t num_simulations) {
384 std::vector<S> final_states;
385 final_states.reserve(num_simulations);
387 const size_t num_threads = config_.num_threads;
388 const size_t sims_per_thread = num_simulations / num_threads;
390 std::vector<std::future<std::vector<S>>> futures;
392 for (
size_t thread_id = 0; thread_id < num_threads; ++thread_id) {
393 size_t start_sim = thread_id * sims_per_thread;
394 size_t end_sim = (thread_id == num_threads - 1) ? num_simulations : (thread_id + 1) * sims_per_thread;
396 futures.emplace_back(std::async(std::launch::async, [=,
this]() {
397 return run_thread_simulations(integrator_factory, initial_condition_generator,
398 dt, end_time, start_sim, end_sim, thread_id);
403 for (
auto& future : futures) {
404 auto thread_results = future.get();
405 final_states.insert(final_states.end(), thread_results.begin(), thread_results.end());
415 size_t noise_generated;
416 size_t noise_consumed;
419 double cache_hit_rate()
const {
420 size_t total = cache_hits + cache_misses;
421 return total > 0 ?
static_cast<double>(cache_hits) / total : 0.0;
423 double throughput_msamples_per_sec(std::chrono::milliseconds elapsed_time)
const {
424 return noise_generated / (elapsed_time.count() * 1000.0);
430 total_noise_generated_.load(),
431 total_noise_consumed_.load(),
441 total_noise_generated_ = 0;
442 total_noise_consumed_ = 0;
450 void warmup(
size_t warmup_samples = 100000) {
451 std::cout <<
"Warming up high-performance SDE system...\n";
453 auto start = std::chrono::high_resolution_clock::now();
456 for (
size_t i = 0; i < warmup_samples; ++i) {
460 auto end = std::chrono::high_resolution_clock::now();
461 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
463 std::cout <<
"Warmup completed: " << warmup_samples <<
" samples in "
464 << duration.count() <<
"ms\n";
465 std::cout <<
"Throughput: " << (warmup_samples / (duration.count() * 1000.0))
466 <<
" M samples/sec\n";
470 void initialize_system() {
472 generators_.resize(config_.num_threads);
473 noise_queues_.resize(config_.num_threads);
475 for (
size_t i = 0; i < config_.num_threads; ++i) {
476 generators_[i] = std::make_unique<SIMDNoiseGenerator>(12345 + i);
477 noise_queues_[i] = std::make_unique<LockFreeNoiseQueue<T>>();
481 if (config_.enable_precomputation) {
482 initialize_precomputed_cache();
486 if (config_.threading_mode != SDEThreadingMode::SINGLE_THREAD) {
487 start_worker_threads();
491 void initialize_precomputed_cache() {
492 precomputed_cache_.resize(config_.num_threads);
494 std::vector<std::future<void>> futures;
496 for (
size_t i = 0; i < config_.num_threads; ++i) {
497 futures.emplace_back(std::async(std::launch::async, [
this, i]() {
498 auto& cache = precomputed_cache_[i];
499 cache.reserve(config_.precompute_buffer_size);
502 for (
size_t j = 0; j < config_.precompute_buffer_size; ++j) {
503 cache.push_back(generators_[i]->generate_batch(1)[0]);
509 for (
auto& future : futures) {
513 std::cout <<
"Precomputed " << (config_.num_threads * config_.precompute_buffer_size)
514 <<
" noise samples\n";
517 void start_worker_threads() {
519 worker_threads_.resize(config_.num_threads);
521 for (
size_t i = 0; i < config_.num_threads; ++i) {
522 worker_threads_[i] = std::thread([
this, i]() {
523 worker_thread_function(i);
527 if (config_.pin_threads) {
531 CPU_SET(i % std::thread::hardware_concurrency(), &cpuset);
532 pthread_setaffinity_np(worker_threads_[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
538 void worker_thread_function(
size_t thread_id) {
541 if (noise_queues_[thread_id]->size() < config_.queue_size / 4) {
542 auto noise_batch = generators_[thread_id]->generate_batch(config_.batch_size);
544 for (
size_t i = 0; i < noise_batch.size(); ++i) {
545 NoiseData<T> data(
static_cast<T
>(i * 0.01), {noise_batch[i]}, NoiseProcessType::WIENER);
546 noise_queues_[thread_id]->push(data);
549 total_noise_generated_ += noise_batch.size();
553 std::this_thread::sleep_for(std::chrono::microseconds(10));
557 NoiseData<T> get_vectorized_noise(T current_time, T dt,
size_t dimensions) {
558 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % config_.num_threads;
559 auto noise_values = generators_[thread_id]->generate_batch(dimensions);
561 total_noise_generated_ += dimensions;
562 return NoiseData<T>(current_time, std::move(noise_values), NoiseProcessType::WIENER);
565 NoiseData<T> get_lockfree_noise(T current_time, T dt,
size_t dimensions) {
566 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % config_.num_threads;
569 if (noise_queues_[thread_id]->pop(result)) {
576 return get_vectorized_noise(current_time, dt, dimensions);
579 NoiseData<T> get_multithreaded_noise(T current_time, T dt,
size_t dimensions) {
580 return get_vectorized_noise(current_time, dt, dimensions);
583 NoiseData<T> get_cached_noise(T current_time, T dt,
size_t dimensions) {
584 if (!config_.enable_precomputation || precomputed_cache_.empty()) {
585 return get_vectorized_noise(current_time, dt, dimensions);
588 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id()) % config_.num_threads;
589 size_t index = cache_index_++ % config_.precompute_buffer_size;
591 std::vector<double> values;
592 values.reserve(dimensions);
594 for (
size_t i = 0; i < dimensions; ++i) {
595 size_t cache_idx = (index + i) % precomputed_cache_[thread_id].size();
596 values.push_back(precomputed_cache_[thread_id][cache_idx]);
600 return NoiseData<T>(current_time, std::move(values), NoiseProcessType::WIENER);
603 std::vector<NoiseData<T>> generate_vectorized_batch(T current_time, T dt,
604 size_t dimensions,
size_t num_simulations) {
605 std::vector<NoiseData<T>> results;
606 results.reserve(num_simulations);
608 const size_t total_samples = dimensions * num_simulations;
609 const size_t batches = (total_samples + config_.batch_size - 1) / config_.batch_size;
611 std::vector<std::future<std::vector<double>>> futures;
613 for (
size_t batch = 0; batch < batches; ++batch) {
614 size_t batch_start = batch * config_.batch_size;
615 size_t batch_end = std::min(batch_start + config_.batch_size, total_samples);
616 size_t batch_size = batch_end - batch_start;
618 size_t thread_id = batch % config_.num_threads;
620 futures.emplace_back(std::async(std::launch::async, [
this, thread_id, batch_size]() {
621 return generators_[thread_id]->generate_batch(batch_size);
626 size_t sample_idx = 0;
627 for (
auto& future : futures) {
628 auto batch_samples = future.get();
630 for (
double sample : batch_samples) {
631 size_t sim_id = sample_idx / dimensions;
632 size_t dim_id = sample_idx % dimensions;
635 results.emplace_back(current_time + sim_id * dt, std::vector<double>{}, NoiseProcessType::WIENER);
636 results.back().increments.reserve(dimensions);
639 results[sim_id].increments.push_back(sample);
644 total_noise_generated_ += total_samples;
648 std::vector<NoiseData<T>> generate_standard_batch(T current_time, T dt,
649 size_t dimensions,
size_t num_simulations) {
650 std::vector<NoiseData<T>> results;
651 results.reserve(num_simulations);
653 for (
size_t i = 0; i < num_simulations; ++i) {
660 template<
typename Integrator,
typename InitialCondition>
661 std::vector<S> run_thread_simulations(std::function<std::unique_ptr<Integrator>()> integrator_factory,
662 std::function<S()> initial_condition_generator,
663 T dt, T end_time,
size_t start_sim,
size_t end_sim,
665 std::vector<S> results;
666 results.reserve(end_sim - start_sim);
668 for (
size_t sim = start_sim; sim < end_sim; ++sim) {
669 auto integrator = integrator_factory();
670 S state = initial_condition_generator();
675 integrator->integrate(state, dt, end_time);
676 results.push_back(state);
685 for (
auto& thread : worker_threads_) {
686 if (thread.joinable()) {
691 worker_threads_.clear();