DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
parallel_decorator.hpp
1#pragma once
2
3#include "integrator_decorator.hpp"
4#include <vector>
5#include <thread>
6#include <algorithm>
7#include <ranges>
8#include <type_traits>
9
10// Check for parallel execution support
11#if defined(__cpp_lib_execution) && __cpp_lib_execution >= 201902L
12 #include <execution>
13 #define PARALLEL_EXECUTION_AVAILABLE
14#endif
15
16namespace diffeq::core::composable {
17
22 size_t max_threads{0}; // 0 = auto-detect
23 size_t chunk_size{1}; // Minimum work unit size
24 bool enable_auto_chunking{true};
25 double load_balance_threshold{0.1}; // 10% load imbalance tolerance
26
27 // Validation settings
28 bool validate_thread_count{true};
29 size_t min_threads{1};
30 size_t max_threads_limit{std::thread::hardware_concurrency() * 2};
31
36 void validate() const {
37 if (validate_thread_count && max_threads > 0) {
38 if (max_threads < min_threads) {
39 throw std::invalid_argument("max_threads must be >= " + std::to_string(min_threads));
40 }
41 if (max_threads > max_threads_limit) {
42 throw std::invalid_argument("max_threads exceeds system limit of " +
43 std::to_string(max_threads_limit));
44 }
45 }
46
47 if (chunk_size == 0) {
48 throw std::invalid_argument("chunk_size must be positive");
49 }
50
51 if (load_balance_threshold < 0.0 || load_balance_threshold > 1.0) {
52 throw std::invalid_argument("load_balance_threshold must be between 0.0 and 1.0");
53 }
54 }
55};
56
74template<system_state S>
76private:
77 ParallelConfig config_;
78
79public:
86 explicit ParallelDecorator(std::unique_ptr<AbstractIntegrator<S>> integrator,
88 : IntegratorDecorator<S>(std::move(integrator)), config_(std::move(config)) {
89
90 config_.validate();
91
92 // Auto-detect optimal thread count if not specified
93 if (config_.max_threads == 0) {
94 config_.max_threads = std::thread::hardware_concurrency();
95 if (config_.max_threads == 0) {
96 config_.max_threads = 1; // Fallback if detection fails
97 }
98 }
99 }
100
109 template<typename StateRange>
110 void integrate_batch(StateRange&& states, typename IntegratorDecorator<S>::time_type dt,
111 typename IntegratorDecorator<S>::time_type end_time) {
112 const size_t batch_size = std::ranges::size(states);
113
114 if (batch_size == 0) {
115 return; // Nothing to do
116 }
117
118 if (batch_size == 1 || config_.max_threads == 1) {
119 // Sequential processing for single state or single thread
120 for (auto& state : states) {
121 this->wrapped_integrator_->integrate(state, dt, end_time);
122 }
123 return;
124 }
125
126 // Parallel processing
127 try {
128#ifdef PARALLEL_EXECUTION_AVAILABLE
129 std::for_each(std::execution::par_unseq,
130 std::ranges::begin(states), std::ranges::end(states),
131 [this, dt, end_time](auto& state) {
132 // Create thread-local copy of integrator
133 auto local_integrator = this->create_copy();
134 local_integrator->integrate(state, dt, end_time);
135 });
136#else
137 // Sequential fallback for platforms without parallel execution support
138 for (auto& state : states) {
139 this->wrapped_integrator_->integrate(state, dt, end_time);
140 }
141#endif
142 } catch (const std::exception& e) {
143 // Fall back to sequential processing if parallel fails
144 for (auto& state : states) {
145 this->wrapped_integrator_->integrate(state, dt, end_time);
146 }
147 }
148 }
149
162 template<typename Generator, typename Processor>
163 auto integrate_monte_carlo(size_t num_simulations, Generator&& generator,
164 Processor&& processor, typename IntegratorDecorator<S>::time_type dt,
165 typename IntegratorDecorator<S>::time_type end_time) {
166 using result_type = std::invoke_result_t<Processor, S>;
167 std::vector<result_type> results(num_simulations);
168
169 if (num_simulations == 0) {
170 return results; // Nothing to do
171 }
172
173 if (num_simulations == 1 || config_.max_threads == 1) {
174 // Sequential processing
175 for (size_t i = 0; i < num_simulations; ++i) {
176 auto state = generator(i);
177 this->wrapped_integrator_->integrate(state, dt, end_time);
178 results[i] = processor(state);
179 }
180 return results;
181 }
182
183 // Parallel processing
184 try {
185#ifdef PARALLEL_EXECUTION_AVAILABLE
186 std::for_each(std::execution::par_unseq,
187 std::views::iota(0UL, num_simulations).begin(),
188 std::views::iota(0UL, num_simulations).end(),
189 [&](size_t i) {
190 auto local_integrator = this->create_copy();
191 auto state = generator(i);
192 local_integrator->integrate(state, dt, end_time);
193 results[i] = processor(state);
194 });
195#else
196 // Sequential fallback for platforms without parallel execution support
197 for (size_t i = 0; i < num_simulations; ++i) {
198 auto state = generator(i);
199 this->wrapped_integrator_->integrate(state, dt, end_time);
200 results[i] = processor(state);
201 }
202#endif
203 } catch (const std::exception& e) {
204 // Fall back to sequential processing if parallel fails
205 for (size_t i = 0; i < num_simulations; ++i) {
206 auto state = generator(i);
207 this->wrapped_integrator_->integrate(state, dt, end_time);
208 results[i] = processor(state);
209 }
210 }
211
212 return results;
213 }
214
222 template<typename StateRange>
223 void integrate_batch_chunked(StateRange&& states, typename IntegratorDecorator<S>::time_type dt,
224 typename IntegratorDecorator<S>::time_type end_time) {
225 const size_t batch_size = std::ranges::size(states);
226
227 if (batch_size <= config_.chunk_size || config_.max_threads == 1) {
228 integrate_batch(std::forward<StateRange>(states), dt, end_time);
229 return;
230 }
231
232 // Calculate optimal chunk size for load balancing
233 size_t effective_chunk_size = config_.enable_auto_chunking ?
234 calculate_optimal_chunk_size(batch_size) : config_.chunk_size;
235
236 // Process in chunks
237 auto states_begin = std::ranges::begin(states);
238 auto states_end = std::ranges::end(states);
239
240 for (auto chunk_start = states_begin; chunk_start < states_end;) {
241 auto chunk_end = chunk_start;
242 std::advance(chunk_end, std::min(effective_chunk_size,
243 static_cast<size_t>(std::distance(chunk_start, states_end))));
244
245 // Create range for this chunk and process in parallel
246#ifdef PARALLEL_EXECUTION_AVAILABLE
247 std::for_each(std::execution::par_unseq, chunk_start, chunk_end,
248 [this, dt, end_time](auto& state) {
249 auto local_integrator = this->create_copy();
250 local_integrator->integrate(state, dt, end_time);
251 });
252#else
253 // Sequential fallback for platforms without parallel execution support
254 std::for_each(chunk_start, chunk_end,
255 [this, dt, end_time](auto& state) {
256 this->wrapped_integrator_->integrate(state, dt, end_time);
257 });
258#endif
259
260 chunk_start = chunk_end;
261 }
262 }
263
267 ParallelConfig& config() { return config_; }
268 const ParallelConfig& config() const { return config_; }
269
275 void update_config(ParallelConfig new_config) {
276 new_config.validate();
277 config_ = std::move(new_config);
278 }
279
284 return config_.max_threads;
285 }
286
287private:
296 std::unique_ptr<AbstractIntegrator<S>> create_copy() {
297 // This is a placeholder - actual implementation would depend on integrator type
298 // Could use a factory pattern, registry, or require integrators to be copyable
299 throw std::runtime_error("Integrator copying not implemented - need factory pattern");
300 }
301
307 size_t calculate_optimal_chunk_size(size_t total_size) const {
308 // Simple heuristic: distribute work evenly across available threads
309 size_t base_chunk_size = std::max(config_.chunk_size,
310 total_size / config_.max_threads);
311
312 // Adjust for load balancing
313 size_t adjusted_size = static_cast<size_t>(base_chunk_size *
314 (1.0 + config_.load_balance_threshold));
315
316 return std::min(adjusted_size, total_size);
317 }
318};
319
320} // namespace diffeq::core::composable
Base decorator interface for integrator enhancements.
Parallel execution decorator - adds batch processing to any integrator.
size_t get_optimal_thread_count() const
Get optimal number of threads for current configuration.
void update_config(ParallelConfig new_config)
Update parallel configuration with validation.
void integrate_batch(StateRange &&states, typename IntegratorDecorator< S >::time_type dt, typename IntegratorDecorator< S >::time_type end_time)
Integrate multiple states in parallel.
ParallelDecorator(std::unique_ptr< AbstractIntegrator< S > > integrator, ParallelConfig config={})
Construct parallel decorator.
void integrate_batch_chunked(StateRange &&states, typename IntegratorDecorator< S >::time_type dt, typename IntegratorDecorator< S >::time_type end_time)
Chunked parallel processing with load balancing.
auto integrate_monte_carlo(size_t num_simulations, Generator &&generator, Processor &&processor, typename IntegratorDecorator< S >::time_type dt, typename IntegratorDecorator< S >::time_type end_time)
Monte Carlo integration with parallel execution.
ParallelConfig & config()
Access and modify parallel configuration.
Configuration for parallel execution.
void validate() const
Validate configuration parameters.