DiffEq - Modern C++ ODE Integration Library 1.0.0
High-performance C++ library for solving ODEs with async signal processing
Loading...
Searching...
No Matches
asio_integration_demo.cpp
1#include <diffeq.hpp>
2#include <boost/asio.hpp>
3#include <boost/asio/thread_pool.hpp>
4#include <boost/asio/co_spawn.hpp>
5#include <boost/asio/detached.hpp>
6#include <boost/asio/use_awaitable.hpp>
7#include <iostream>
8#include <vector>
9#include <memory>
10#include <chrono>
11#include <random>
12
13namespace asio = boost::asio;
14
26template<typename State>
28private:
29 asio::io_context io_context_;
30 asio::thread_pool thread_pool_;
31 std::unique_ptr<diffeq::core::AbstractIntegrator<State>> integrator_;
32
33 // 任务队列和状态管理
34 std::vector<std::future<void>> pending_tasks_;
35 std::atomic<size_t> completed_integrations_{0};
36 std::atomic<size_t> total_integrations_{0};
37
38public:
45 size_t thread_count = std::thread::hardware_concurrency())
46 : thread_pool_(thread_count)
47 , integrator_(std::move(integrator)) {
48
49 if (!integrator_) {
50 throw std::invalid_argument("Integrator cannot be null");
51 }
52 }
53
59 thread_pool_.join();
60 }
61
69 template<typename PostTask>
70 void integrate_async(State initial_state,
71 typename diffeq::core::AbstractIntegrator<State>::time_type dt,
72 typename diffeq::core::AbstractIntegrator<State>::time_type end_time,
73 PostTask&& post_integration_task) {
74
75 ++total_integrations_;
76
77 // 使用 asio::co_spawn 启动协程
78 asio::co_spawn(io_context_,
79 [this, initial_state = std::move(initial_state), dt, end_time,
80 task = std::forward<PostTask>(post_integration_task)]() mutable -> asio::awaitable<void> {
81
82 try {
83 // 在线程池中执行积分计算
84 auto integration_result = co_await asio::co_spawn(thread_pool_,
85 [this, &initial_state, dt, end_time]() -> asio::awaitable<std::pair<State, double>> {
86
87 // 执行积分
88 integrator_->set_time(0.0);
89 integrator_->integrate(initial_state, dt, end_time);
90
91 co_return std::make_pair(initial_state, integrator_->current_time());
92 }, asio::use_awaitable);
93
94 // 积分完成,执行后续任务
95 co_await asio::co_spawn(thread_pool_,
96 [task = std::move(task), state = std::move(integration_result.first),
97 final_time = integration_result.second]() mutable -> asio::awaitable<void> {
98
99 // 执行用户定义的后处理任务
100 task(state, final_time);
101 co_return;
102 }, asio::use_awaitable);
103
104 ++completed_integrations_;
105
106 } catch (const std::exception& e) {
107 std::cerr << "Integration task failed: " << e.what() << std::endl;
108 }
109 }, asio::detached);
110 }
111
116 template<typename TaskList>
117 void integrate_batch_async(TaskList&& tasks) {
118 for (auto& task : tasks) {
119 integrate_async(std::move(task.initial_state),
120 task.dt, task.end_time,
121 std::move(task.post_task));
122 }
123 }
124
129 void run(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
130 if (timeout != std::chrono::milliseconds::max()) {
131 // 设置超时
132 asio::steady_timer timer(io_context_, timeout);
133 timer.async_wait([this](const asio::error_code&) {
134 io_context_.stop();
135 });
136 }
137
138 io_context_.run();
139 }
140
145 while (completed_integrations_.load() < total_integrations_.load()) {
146 std::this_thread::sleep_for(std::chrono::milliseconds(10));
147 }
148 }
149
153 std::pair<size_t, size_t> get_progress() const {
154 return {completed_integrations_.load(), total_integrations_.load()};
155 }
156
160 void reset_stats() {
161 completed_integrations_.store(0);
162 total_integrations_.store(0);
163 }
164};
165
166// 示例:参数化 ODE 系统
168 double alpha;
169 double beta;
170
171 ParameterizedSystem(double a, double b) : alpha(a), beta(b) {}
172
173 void operator()(std::vector<double>& dx, const std::vector<double>& x, double t) const {
174 dx[0] = alpha * x[0] - beta * x[0] * x[1]; // 捕食者-被捕食者模型
175 dx[1] = -beta * x[1] + alpha * x[0] * x[1];
176 }
177};
178
179// 示例:数据分析任务
181private:
182 std::vector<std::pair<std::vector<double>, double>> trajectory_data_;
183 std::mutex data_mutex_;
184
185public:
189 void analyze_and_adjust_parameters(const std::vector<double>& final_state, double final_time) {
190 std::lock_guard<std::mutex> lock(data_mutex_);
191
192 // 模拟数据分析
193 std::cout << "分析轨迹数据: 最终状态 = ["
194 << final_state[0] << ", " << final_state[1]
195 << "], 时间 = " << final_time << std::endl;
196
197 // 基于分析结果调整参数(这里只是示例)
198 double stability_metric = std::abs(final_state[0] - final_state[1]);
199 std::cout << "稳定性指标: " << stability_metric << std::endl;
200
201 trajectory_data_.emplace_back(final_state, final_time);
202 }
203
207 const std::vector<std::pair<std::vector<double>, double>>& get_trajectory_data() const {
208 return trajectory_data_;
209 }
210};
211
212// 示例:轨迹保存任务
214private:
215 std::string filename_prefix_;
216 std::atomic<size_t> save_count_{0};
217
218public:
219 explicit TrajectorySaver(std::string prefix = "trajectory_")
220 : filename_prefix_(std::move(prefix)) {}
221
225 void save_trajectory(const std::vector<double>& final_state, double final_time) {
226 auto count = ++save_count_;
227 std::string filename = filename_prefix_ + std::to_string(count) + ".dat";
228
229 // 模拟文件保存操作
230 std::cout << "保存轨迹到文件: " << filename
231 << " (状态: [" << final_state[0] << ", " << final_state[1]
232 << "], 时间: " << final_time << ")" << std::endl;
233
234 // 这里可以实际写入文件
235 // std::ofstream file(filename);
236 // file << final_time << " " << final_state[0] << " " << final_state[1] << "\n";
237 }
238};
239
240int main() {
241 std::cout << "=== Boost.Asio 与积分器集成示例 ===" << std::endl;
242
243 // 创建积分器
244 auto integrator = std::make_unique<diffeq::RK4Integrator<std::vector<double>>>();
245
246 // 创建异步管理器
247 AsioIntegrationManager<std::vector<double>> manager(std::move(integrator), 4);
248
249 // 创建后处理组件
250 DataAnalyzer analyzer;
251 TrajectorySaver saver("async_traj_");
252
253 // 定义不同的参数组合
254 std::vector<std::pair<double, double>> parameter_sets = {
255 {0.5, 0.3}, {0.8, 0.2}, {0.3, 0.7}, {0.6, 0.4},
256 {0.4, 0.6}, {0.7, 0.3}, {0.2, 0.8}, {0.9, 0.1}
257 };
258
259 std::cout << "\n启动 " << parameter_sets.size() << " 个异步积分任务..." << std::endl;
260
261 // 启动多个异步积分任务
262 for (size_t i = 0; i < parameter_sets.size(); ++i) {
263 const auto& [alpha, beta] = parameter_sets[i];
264
265 // 设置系统
266 ParameterizedSystem system(alpha, beta);
267 manager.integrate_async(
268 {1.0, 0.5}, // 初始状态
269 0.01, // 时间步长
270 10.0, // 结束时间
271 [&analyzer, &saver, alpha, beta, i](const std::vector<double>& final_state, double final_time) {
272 std::cout << "任务 " << i << " 完成 (α=" << alpha << ", β=" << beta << ")" << std::endl;
273
274 // 并行执行数据分析和轨迹保存
275 analyzer.analyze_and_adjust_parameters(final_state, final_time);
276 saver.save_trajectory(final_state, final_time);
277 }
278 );
279 }
280
281 // 运行事件循环
282 std::cout << "\n运行事件循环..." << std::endl;
283 auto start_time = std::chrono::high_resolution_clock::now();
284
285 manager.run();
286
287 auto end_time = std::chrono::high_resolution_clock::now();
288 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
289
290 // 显示结果
291 auto [completed, total] = manager.get_progress();
292 std::cout << "\n=== 执行完成 ===" << std::endl;
293 std::cout << "完成的任务: " << completed << "/" << total << std::endl;
294 std::cout << "总耗时: " << duration.count() << "ms" << std::endl;
295 std::cout << "平均每个任务: " << (duration.count() / total) << "ms" << std::endl;
296
297 // 显示分析结果
298 const auto& trajectory_data = analyzer.get_trajectory_data();
299 std::cout << "\n收集的轨迹数据点: " << trajectory_data.size() << std::endl;
300
301 return 0;
302}
使用 boost.asio 的异步积分器包装器
~AsioIntegrationManager()
析构函数 - 确保所有任务完成
void integrate_async(State initial_state, typename diffeq::core::AbstractIntegrator< State >::time_type dt, typename diffeq::core::AbstractIntegrator< State >::time_type end_time, PostTask &&post_integration_task)
异步执行积分任务
void integrate_batch_async(TaskList &&tasks)
批量执行多个积分任务
std::pair< size_t, size_t > get_progress() const
获取完成统计
void run(std::chrono::milliseconds timeout=std::chrono::milliseconds::max())
运行事件循环
void wait_for_all_tasks()
等待所有任务完成
AsioIntegrationManager(std::unique_ptr< diffeq::core::AbstractIntegrator< State > > integrator, size_t thread_count=std::thread::hardware_concurrency())
构造函数
void analyze_and_adjust_parameters(const std::vector< double > &final_state, double final_time)
分析轨迹数据并调整参数
const std::vector< std::pair< std::vector< double >, double > > & get_trajectory_data() const
获取分析结果
void save_trajectory(const std::vector< double > &final_state, double final_time)
保存轨迹数据
Modern C++ ODE Integration Library with Real-time Signal Processing.