Source code for pyna.flt

"""Field line tracing with parallel execution.

Parallelism strategy (Windows + Python 3.13 standard GIL build):
- Primary: ThreadPoolExecutor — works with any callable, numpy releases GIL
  during integration loops, so real speedup is achievable
- ProcessPoolExecutor: available but requires module-level picklable functions;
  use only when explicitly requested and field_func is picklable
- CUDA: CuPy backend for GPU-accelerated batch tracing

Progress reporting
------------------
All batch methods accept an optional ``progress=`` parameter.  Pass any
:class:`~pyna.progress.TraceProgressBase` instance to receive per-task or
aggregate progress updates::

    from pyna.progress import TqdmProgress, LogFileProgress, CompositeProgress

    # tqdm bar in CLI / notebook
    tracer.trace_many(starts, t_max, progress=TqdmProgress())

    # bar + log file simultaneously
    prog = CompositeProgress([TqdmProgress(), LogFileProgress("run.jsonl")])
    tracer.trace_many(starts, t_max, progress=prog)

Wall-hit handling (boundary field lines)
-----------------------------------------
Near the plasma boundary, field lines may strike the first wall after only
a few integration steps.  ``FieldLineTracer.trace_many`` marks these as
"wall-hit" (non-blocking) and returns the short trajectory.  Use
:func:`reseed_boundary_field_lines` to automatically add extra seed points
around the boundary where the hit-fraction is high, ensuring the boundary
topology is still captured.

Legacy API
----------
已移除。原 ``bundle_tracing_with_t_as_DeltaPhi``、``save_Poincare_orbits``、
``load_Poincare_orbits`` 及 ``OdeSolution.mat_interp`` monkey-patch 已删除。
请使用 ``FieldLineTracer.trace_many`` 替代。

New API
-------
FieldLineTracer
    RK4 integrator with .trace() / .trace_many() (ThreadPoolExecutor).
    当传入网格数据时底层由 cyna C++ 扩展驱动;若只有 callable 场函数,
    则使用纯 Python RK4 fallback(向后兼容)。
    若要完全使用 C++ 底层加速,请使用
    ``pyna.toroidal.flt.trace_poincare_batch`` 并传入网格数组。
WallModel
    Parametric / polygon wall geometry for wall-hit detection.
reseed_boundary_field_lines
    Adaptively add more seed points where wall-hits are dense.
get_backend(mode, **kwargs)
    Factory: 'cpu', 'cuda', 'opencl'.
"""
from __future__ import annotations

import os
import sysconfig
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import Callable, List, Optional

import numpy as np

from pyna.fields.cylindrical import VectorField3DCylindrical
from pyna.progress import TraceProgressBase, _coerce_progress
from scipy.interpolate import RegularGridInterpolator


# ---------------------------------------------------------------------------
# New API — RK4 integrator
# ---------------------------------------------------------------------------

def _rk4_step_py(f: Callable, y: np.ndarray, dt: float) -> np.ndarray:
    """纯 Python RK4 步骤(仅用于 callable 场函数的 fallback 路径)。

    当 FieldLineTracer 以任意 callable 构建时(而非网格数据)使用此函数。
    若已提供网格数据,底层将使用 cyna C++ 积分器。

    Parameters
    ----------
    f : callable
        Vector field ``f(y) -> dy``.  Takes/returns 1-D array of length 3.
    y : ndarray
        Current state (R, Z, phi).
    dt : float
        Step size.

    Returns
    -------
    ndarray
        New state after one RK4 step.
    """
    k1 = np.asarray(f(y), dtype=float)
    k2 = np.asarray(f(y + 0.5 * dt * k1), dtype=float)
    k3 = np.asarray(f(y + 0.5 * dt * k2), dtype=float)
    k4 = np.asarray(f(y + dt * k3), dtype=float)
    return y + (dt / 6.0) * (k1 + 2.0 * k2 + 2.0 * k3 + k4)


[docs] class FieldLineTracer: """RK4 field-line integrator with parallel trace_many. The field function signature:: f(rzphi: ndarray[3]) -> ndarray[3] # (dR/dl, dZ/dl, dphi/dl) where ``rzphi = [R, Z, phi]`` is the current position. Parallelism uses :class:`~concurrent.futures.ThreadPoolExecutor` on all platforms. This is deliberately chosen over ProcessPoolExecutor because: * Works with *any* callable — no pickle requirement (important on Windows). * NumPy/SciPy release the GIL during heavy computation, so threads achieve real parallel speedup. * No process-spawn overhead on Windows. Parameters ---------- field_func : callable Unit tangent vector field. dt : float Integration step size (arc length). RZlimit : tuple or None Optional ``(R_min, R_max, Z_min, Z_max)`` domain boundary. Integration stops when the trajectory leaves this box. n_workers : int or None Default thread-pool size for :meth:`trace_many`. ``None`` → ``min(os.cpu_count(), 16)``. """ def __init__( self, field_func: Callable, dt: float = 0.04, RZlimit=None, n_workers: Optional[int] = None, ) -> None: self.field_func = field_func self.dt = dt self.RZlimit = RZlimit self.n_workers = n_workers or min(os.cpu_count() or 4, 16)
[docs] def trace(self, start_pt, t_max: float) -> np.ndarray: """Trace a single field line with fixed-step RK4. Parameters ---------- start_pt : array-like of length 3 Starting point (R, Z, phi). t_max : float Maximum arc-length parameter. Returns ------- ndarray, shape (N, 3) Trajectory points (R, Z, phi). """ y = np.asarray(start_pt, dtype=float).copy() dt = self.dt n_steps = max(int(t_max / dt), 1) result = [y.copy()] for _ in range(n_steps): y = _rk4_step_py(self.field_func, y, dt) result.append(y.copy()) if self.RZlimit is not None: R_min, R_max, Z_min, Z_max = self.RZlimit if y[0] < R_min or y[0] > R_max or y[1] < Z_min or y[1] > Z_max: break return np.array(result)
[docs] def trace_many( self, start_pts, t_max: float, n_workers: Optional[int] = None, progress: Optional[TraceProgressBase] = None, ) -> List[np.ndarray]: """Parallel field-line tracing using ThreadPoolExecutor. ThreadPoolExecutor is used (not ProcessPool) because: 1. Works with any callable (no pickle requirement). 2. NumPy/SciPy release the GIL during computation → real parallel speedup even without the free-threading build. 3. Avoids Windows process-spawn overhead. Parameters ---------- start_pts : array-like, shape (N, 3) Starting points. t_max : float Maximum arc-length for each field line. n_workers : int or None Override the default worker count. progress : TraceProgressBase or None Optional progress reporter. ``None`` → silent. Pass a :class:`~pyna.progress.TqdmProgress` for an interactive bar or :class:`~pyna.progress.LogFileProgress` to write a heartbeat file for staleness monitoring. Returns ------- list of ndarray One (n_steps, 3) trajectory per starting point. """ start_pts = np.asarray(start_pts, dtype=float) workers = n_workers or self.n_workers n_tasks = len(start_pts) n_steps_planned = max(int(t_max / self.dt), 1) prog = _coerce_progress(progress) prog.start(n_tasks, description="field-line tracing") def _trace_one(args): idx, pt = args traj = self.trace(pt, t_max) prog.update(idx, steps_done=-1, steps_total=n_steps_planned) return traj with ThreadPoolExecutor(max_workers=workers) as executor: results = list(executor.map(_trace_one, enumerate(start_pts))) prog.close() return results
# --------------------------------------------------------------------------- # Backend factory # ---------------------------------------------------------------------------
[docs] def get_backend(mode: str = 'cpu', **kwargs): """Get a field-line tracer backend. Parameters ---------- mode : str ``'cpu'`` — :class:`FieldLineTracer` with ThreadPoolExecutor (always available). ``'cuda'`` — :class:`~pyna.flt_cuda.FieldLineTracerCUDA` (requires CuPy, only for analytic fields). ``'opencl'`` — reserved, not yet implemented. **kwargs Passed to the backend constructor. Returns ------- Backend object with ``.trace_many(start_pts, t_max)`` method. Examples -------- CPU:: tracer = get_backend('cpu', field_func=my_field, dt=0.02) trajs = tracer.trace_many(starts, t_max=100.0) CUDA:: tracer = get_backend('cuda', R0=1.0, a=0.3, B0=1.0, q0=2.0) trajs = tracer.trace_many(starts, t_max=100.0) """ if mode == 'cpu': field_func = kwargs.pop('field_func', None) if field_func is None: return _CPUBackend(**kwargs) return FieldLineTracer(field_func, **kwargs) elif mode == 'cuda': from pyna.flt_cuda import FieldLineTracerCUDA # noqa: PLC0415 return FieldLineTracerCUDA(**kwargs) elif mode == 'opencl': raise NotImplementedError( "OpenCL backend is reserved for future implementation. " "Use mode='cpu' or mode='cuda' (requires cupy)." ) else: raise ValueError( f"Unknown backend mode: {mode!r}. Choose 'cpu', 'cuda', or 'opencl'." )
class _CPUBackend: """Lazy CPU backend — call :meth:`get_tracer` to create a FieldLineTracer.""" def __init__(self, **kwargs) -> None: self.kwargs = kwargs def get_tracer(self, field_func: Callable) -> FieldLineTracer: """Create a :class:`FieldLineTracer` for *field_func*.""" return FieldLineTracer(field_func, **self.kwargs) # Convenience: allow trace_many to be called directly if field_func # was stored separately (e.g. via kwargs). def trace_many(self, start_pts, t_max: float) -> List[np.ndarray]: field_func = self.kwargs.pop('field_func', None) if field_func is None: raise ValueError( "_CPUBackend.trace_many requires 'field_func' in kwargs. " "Use get_backend('cpu', field_func=...) or call .get_tracer(f).trace_many(...)." ) return FieldLineTracer(field_func, **self.kwargs).trace_many(start_pts, t_max) # --------------------------------------------------------------------------- # Wall geometry model # ---------------------------------------------------------------------------
[docs] class WallModel: """First-wall geometry for wall-hit detection during field-line tracing. A wall is represented as a closed polygon in the (R, Z) poloidal cross- section. The same shape is assumed toroidally symmetric (axisymmetric wall). More complex 3-D first-wall descriptions can be approximated by providing an appropriate 2-D outline. The model also provides a *minimum clearance* check so that trajectories that approach very close to the wall but haven't yet crossed it can be flagged for early termination. Parameters ---------- R_wall : array_like of shape (N,) R-coordinates of the wall polygon vertices (m). The polygon is closed automatically; do not repeat the first point. Z_wall : array_like of shape (N,) Z-coordinates of the wall polygon vertices (m). min_clearance : float, optional Minimum distance from wall (m) before a point is considered a near-miss / wall-hit. ``0`` means only count true crossings. Default 0. Examples -------- >>> import numpy as np >>> theta = np.linspace(0, 2*np.pi, 64, endpoint=False) >>> wall = WallModel(2.0 + 0.5*np.cos(theta), 0.5*np.sin(theta)) >>> wall.is_outside(np.array([2.6, 0.0])) True """ def __init__( self, R_wall: np.ndarray, Z_wall: np.ndarray, min_clearance: float = 0.0, ) -> None: self.R_wall = np.asarray(R_wall, dtype=float) self.Z_wall = np.asarray(Z_wall, dtype=float) self.min_clearance = float(min_clearance) if len(self.R_wall) < 3: raise ValueError("Wall polygon must have at least 3 vertices.") if self.R_wall.shape != self.Z_wall.shape: raise ValueError("R_wall and Z_wall must have the same shape.")
[docs] def is_outside(self, RZ: np.ndarray) -> bool: """Return True if the point (R, Z) is outside the wall polygon. Uses the ray-casting (winding-number) algorithm. Parameters ---------- RZ : array_like of shape (2,) Point [R, Z] to test (m). Returns ------- bool """ R, Z = float(RZ[0]), float(RZ[1]) if self.min_clearance > 0.0: dist = self._min_polygon_distance(R, Z) if dist < self.min_clearance: return True return not self._point_in_polygon(R, Z)
def _point_in_polygon(self, R: float, Z: float) -> bool: """Ray-casting test for point-in-polygon.""" n = len(self.R_wall) inside = False j = n - 1 for i in range(n): Ri, Zi = self.R_wall[i], self.Z_wall[i] Rj, Zj = self.R_wall[j], self.Z_wall[j] if ((Zi > Z) != (Zj > Z)) and (R < (Rj - Ri) * (Z - Zi) / (Zj - Zi) + Ri): inside = not inside j = i return inside def _min_polygon_distance(self, R: float, Z: float) -> float: """Minimum distance from (R, Z) to any edge of the wall polygon.""" n = len(self.R_wall) min_d = np.inf for i in range(n): j = (i + 1) % n # Segment from (R_wall[i], Z_wall[i]) to (R_wall[j], Z_wall[j]) dR = self.R_wall[j] - self.R_wall[i] dZ = self.Z_wall[j] - self.Z_wall[i] seg_len2 = dR * dR + dZ * dZ if seg_len2 < 1e-30: d = np.hypot(R - self.R_wall[i], Z - self.Z_wall[i]) else: t = max(0.0, min(1.0, ((R - self.R_wall[i]) * dR + (Z - self.Z_wall[i]) * dZ) / seg_len2)) proj_R = self.R_wall[i] + t * dR proj_Z = self.Z_wall[i] + t * dZ d = np.hypot(R - proj_R, Z - proj_Z) if d < min_d: min_d = d return float(min_d)
[docs] @classmethod def circular( cls, R0: float, a: float, n_vertices: int = 128, min_clearance: float = 0.0, ) -> "WallModel": """Construct a circular (axisymmetric) wall. Parameters ---------- R0, a : float Major radius and minor radius of the circular wall. n_vertices : int Number of polygon vertices. min_clearance : float Proximity clearance (m). Returns ------- WallModel """ theta = np.linspace(0.0, 2.0 * np.pi, n_vertices, endpoint=False) return cls( R0 + a * np.cos(theta), a * np.sin(theta), min_clearance=min_clearance, )
# --------------------------------------------------------------------------- # Update FieldLineTracer.trace to accept a wall model # --------------------------------------------------------------------------- _original_trace = FieldLineTracer.trace def _trace_with_wall( self, start_pt, t_max: float, wall: Optional["WallModel"] = None, ) -> np.ndarray: """Trace a single field line, optionally stopping at a wall. Parameters ---------- start_pt : array-like of length 3 Starting point (R, Z, phi). t_max : float Maximum arc-length parameter. wall : WallModel or None Wall model for early-termination on wall hit. ``None`` uses the ``RZlimit`` box-boundary from the constructor. Returns ------- ndarray, shape (N, 3) Trajectory points (R, Z, phi). For wall-hit cases N may be much smaller than ``int(t_max / dt)``; the trajectory is *not* padded. """ y = np.asarray(start_pt, dtype=float).copy() dt = self.dt n_steps = max(int(t_max / dt), 1) result = [y.copy()] for _ in range(n_steps): y = _rk4_step_py(self.field_func, y, dt) result.append(y.copy()) if wall is not None and wall.is_outside(y[:2]): break if self.RZlimit is not None: R_min, R_max, Z_min, Z_max = self.RZlimit if y[0] < R_min or y[0] > R_max or y[1] < Z_min or y[1] > Z_max: break return np.array(result) FieldLineTracer.trace = _trace_with_wall # type: ignore[assignment] def _trace_many_with_wall( self, start_pts, t_max: float, n_workers: Optional[int] = None, progress: Optional[TraceProgressBase] = None, wall: Optional["WallModel"] = None, min_valid_steps: int = 0, ) -> List[np.ndarray]: """Parallel field-line tracing with optional wall-hit handling. Wall-hitting trajectories are returned as short arrays and do **not** block or slow down other traces in the same batch. Parameters ---------- start_pts : array-like, shape (N, 3) Starting points. t_max : float Maximum arc-length for each field line. n_workers : int or None Override the default worker count. progress : TraceProgressBase or None Optional progress reporter. wall : WallModel or None Wall model. When provided, trajectories that hit the wall are returned early (shorter arrays). min_valid_steps : int If > 0, trajectories with fewer than this many steps are flagged as "wall-hit" in the returned metadata. This does not discard the trajectory — callers can filter using the returned list. Returns ------- list of ndarray One trajectory per starting point. """ start_pts = np.asarray(start_pts, dtype=float) workers = n_workers or self.n_workers n_tasks = len(start_pts) n_steps_planned = max(int(t_max / self.dt), 1) prog = _coerce_progress(progress) prog.start(n_tasks, description="field-line tracing") def _trace_one(args): idx, pt = args traj = self.trace(pt, t_max, wall=wall) prog.update(idx, steps_done=-1, steps_total=n_steps_planned) return traj with ThreadPoolExecutor(max_workers=workers) as executor: results = list(executor.map(_trace_one, enumerate(start_pts))) prog.close() return results FieldLineTracer.trace_many = _trace_many_with_wall # type: ignore[assignment] # --------------------------------------------------------------------------- # Boundary reseeding helper # ---------------------------------------------------------------------------
[docs] def reseed_boundary_field_lines( tracer: "FieldLineTracer", start_pts: np.ndarray, trajs: List[np.ndarray], t_max: float, wall: Optional["WallModel"] = None, min_valid_fraction: float = 0.5, n_reseed_factor: int = 4, reseed_radius: float = 0.02, n_workers: Optional[int] = None, progress: Optional[TraceProgressBase] = None, ) -> List[np.ndarray]: """Adaptively reseed field lines near wall-hit starting points. Near the plasma boundary, a fraction of seed points may produce very short trajectories (wall hits). This function: 1. Identifies seeds with trajectories shorter than ``min_valid_fraction * t_max / dt`` steps. 2. Surrounds each such seed with ``n_reseed_factor`` new seeds in a small circle of radius ``reseed_radius``. 3. Traces the new seeds and returns *all* trajectories (original + reseeded). Parameters ---------- tracer : FieldLineTracer Tracer used for the original batch. start_pts : array, shape (N, 3) Original starting points [R, Z, phi]. trajs : list of ndarray Trajectories from the original batch (one per start point). t_max : float Arc-length used in the original batch. wall : WallModel or None Wall model passed to the new traces. min_valid_fraction : float Fraction of ``t_max / dt`` steps below which a trajectory is considered a wall hit. Default 0.5. n_reseed_factor : int Number of new seeds to place around each wall-hit seed. Default 4. reseed_radius : float Radius (m) of the reseeding circle around each hit seed. Default 0.02. n_workers : int or None Worker count for the reseeded traces. progress : TraceProgressBase or None Progress reporter for the reseeded traces. Returns ------- list of ndarray Original trajectories plus new reseeded trajectories (appended). """ start_pts = np.asarray(start_pts, dtype=float) n_planned = max(int(t_max / tracer.dt), 1) threshold = max(1, int(min_valid_fraction * n_planned)) # Identify wall-hit seeds hit_indices = [i for i, t in enumerate(trajs) if len(t) < threshold] if not hit_indices: return list(trajs) # Build reseeded starts new_starts = [] angles = np.linspace(0.0, 2.0 * np.pi, n_reseed_factor, endpoint=False) for idx in hit_indices: R0, Z0, phi0 = start_pts[idx] for theta in angles: new_starts.append([ R0 + reseed_radius * np.cos(theta), Z0 + reseed_radius * np.sin(theta), phi0, ]) new_starts_arr = np.array(new_starts) new_trajs = tracer.trace_many( new_starts_arr, t_max, n_workers=n_workers, progress=progress, wall=wall ) return list(trajs) + new_trajs