2#include <boost/asio.hpp>
3#include <boost/asio/thread_pool.hpp>
4#include <boost/asio/co_spawn.hpp>
5#include <boost/asio/detached.hpp>
6#include <boost/asio/use_awaitable.hpp>
13namespace asio = boost::asio;
26template<
typename State>
29 asio::io_context io_context_;
30 asio::thread_pool thread_pool_;
31 std::unique_ptr<diffeq::core::AbstractIntegrator<State>> integrator_;
34 std::vector<std::future<void>> pending_tasks_;
35 std::atomic<size_t> completed_integrations_{0};
36 std::atomic<size_t> total_integrations_{0};
45 size_t thread_count = std::thread::hardware_concurrency())
46 : thread_pool_(thread_count)
47 , integrator_(std::move(integrator)) {
50 throw std::invalid_argument(
"Integrator cannot be null");
69 template<
typename PostTask>
71 typename diffeq::core::AbstractIntegrator<State>::time_type dt,
72 typename diffeq::core::AbstractIntegrator<State>::time_type end_time,
73 PostTask&& post_integration_task) {
75 ++total_integrations_;
78 asio::co_spawn(io_context_,
79 [
this, initial_state = std::move(initial_state), dt, end_time,
80 task = std::forward<PostTask>(post_integration_task)]()
mutable -> asio::awaitable<void> {
84 auto integration_result =
co_await asio::co_spawn(thread_pool_,
85 [
this, &initial_state, dt, end_time]() -> asio::awaitable<std::pair<State, double>> {
88 integrator_->set_time(0.0);
89 integrator_->integrate(initial_state, dt, end_time);
91 co_return std::make_pair(initial_state, integrator_->current_time());
92 }, asio::use_awaitable);
95 co_await asio::co_spawn(thread_pool_,
96 [task = std::move(task), state = std::move(integration_result.first),
97 final_time = integration_result.second]() mutable -> asio::awaitable<void> {
100 task(state, final_time);
102 }, asio::use_awaitable);
104 ++completed_integrations_;
106 }
catch (
const std::exception& e) {
107 std::cerr <<
"Integration task failed: " << e.what() << std::endl;
116 template<
typename TaskList>
118 for (
auto& task : tasks) {
120 task.dt, task.end_time,
121 std::move(task.post_task));
129 void run(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) {
130 if (timeout != std::chrono::milliseconds::max()) {
132 asio::steady_timer timer(io_context_, timeout);
133 timer.async_wait([
this](
const asio::error_code&) {
145 while (completed_integrations_.load() < total_integrations_.load()) {
146 std::this_thread::sleep_for(std::chrono::milliseconds(10));
154 return {completed_integrations_.load(), total_integrations_.load()};
161 completed_integrations_.store(0);
162 total_integrations_.store(0);
173 void operator()(std::vector<double>& dx,
const std::vector<double>& x,
double t)
const {
174 dx[0] = alpha * x[0] - beta * x[0] * x[1];
175 dx[1] = -beta * x[1] + alpha * x[0] * x[1];
182 std::vector<std::pair<std::vector<double>,
double>> trajectory_data_;
183 std::mutex data_mutex_;
190 std::lock_guard<std::mutex> lock(data_mutex_);
193 std::cout <<
"分析轨迹数据: 最终状态 = ["
194 << final_state[0] <<
", " << final_state[1]
195 <<
"], 时间 = " << final_time << std::endl;
198 double stability_metric = std::abs(final_state[0] - final_state[1]);
199 std::cout <<
"稳定性指标: " << stability_metric << std::endl;
201 trajectory_data_.emplace_back(final_state, final_time);
208 return trajectory_data_;
215 std::string filename_prefix_;
216 std::atomic<size_t> save_count_{0};
220 : filename_prefix_(std::move(prefix)) {}
226 auto count = ++save_count_;
227 std::string filename = filename_prefix_ + std::to_string(count) +
".dat";
230 std::cout <<
"保存轨迹到文件: " << filename
231 <<
" (状态: [" << final_state[0] <<
", " << final_state[1]
232 <<
"], 时间: " << final_time <<
")" << std::endl;
241 std::cout <<
"=== Boost.Asio 与积分器集成示例 ===" << std::endl;
244 auto integrator = std::make_unique<diffeq::RK4Integrator<std::vector<double>>>();
254 std::vector<std::pair<double, double>> parameter_sets = {
255 {0.5, 0.3}, {0.8, 0.2}, {0.3, 0.7}, {0.6, 0.4},
256 {0.4, 0.6}, {0.7, 0.3}, {0.2, 0.8}, {0.9, 0.1}
259 std::cout <<
"\n启动 " << parameter_sets.size() <<
" 个异步积分任务..." << std::endl;
262 for (
size_t i = 0; i < parameter_sets.size(); ++i) {
263 const auto& [alpha, beta] = parameter_sets[i];
267 manager.integrate_async(
271 [&analyzer, &saver, alpha, beta, i](
const std::vector<double>& final_state,
double final_time) {
272 std::cout <<
"任务 " << i <<
" 完成 (α=" << alpha <<
", β=" << beta <<
")" << std::endl;
276 saver.save_trajectory(final_state, final_time);
282 std::cout <<
"\n运行事件循环..." << std::endl;
283 auto start_time = std::chrono::high_resolution_clock::now();
287 auto end_time = std::chrono::high_resolution_clock::now();
288 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
291 auto [completed, total] = manager.get_progress();
292 std::cout <<
"\n=== 执行完成 ===" << std::endl;
293 std::cout <<
"完成的任务: " << completed <<
"/" << total << std::endl;
294 std::cout <<
"总耗时: " << duration.count() <<
"ms" << std::endl;
295 std::cout <<
"平均每个任务: " << (duration.count() / total) <<
"ms" << std::endl;
299 std::cout <<
"\n收集的轨迹数据点: " << trajectory_data.size() << std::endl;
~AsioIntegrationManager()
析构函数 - 确保所有任务完成
void integrate_async(State initial_state, typename diffeq::core::AbstractIntegrator< State >::time_type dt, typename diffeq::core::AbstractIntegrator< State >::time_type end_time, PostTask &&post_integration_task)
异步执行积分任务
void integrate_batch_async(TaskList &&tasks)
批量执行多个积分任务
std::pair< size_t, size_t > get_progress() const
获取完成统计
void run(std::chrono::milliseconds timeout=std::chrono::milliseconds::max())
运行事件循环
void wait_for_all_tasks()
等待所有任务完成
AsioIntegrationManager(std::unique_ptr< diffeq::core::AbstractIntegrator< State > > integrator, size_t thread_count=std::thread::hardware_concurrency())
构造函数
void analyze_and_adjust_parameters(const std::vector< double > &final_state, double final_time)
分析轨迹数据并调整参数
const std::vector< std::pair< std::vector< double >, double > > & get_trajectory_data() const
获取分析结果
void save_trajectory(const std::vector< double > &final_state, double final_time)
保存轨迹数据
Modern C++ ODE Integration Library with Real-time Signal Processing.