"""Sweep orchestration: FIR precomputation, run_sweep, and helpers."""
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
from numpy.typing import NDArray
from scipy.signal import firwin
from ._kernel import _build_task_order, _sweep_kernel
from ._types import _KAISER_BETA, FILTER_TYPES, WINDOWS, SweepResult
def _precompute_perturbations(
orders: NDArray,
n_cutoffs: int,
n_initial: int,
seed: int | None,
) -> NDArray:
"""Build the deterministic perturbation tensor consumed by the kernel.
Returns an array of shape ``(Ncoef, n_cutoffs, n_initial, max_Ns)``
filled with uniform ``[0, 1)`` samples, where ``max_Ns = orders.max()``.
"""
Ncoef = len(orders)
max_Ns = int(orders.max()) if Ncoef > 0 else 0
rng = np.random.default_rng(seed)
return rng.random((Ncoef, n_cutoffs, n_initial, max_Ns), dtype=np.float64)
[docs]
def precompute_fir_bank(
orders: Sequence[int] | NDArray,
cutoffs: NDArray,
filter_type: str,
window: str,
*,
kaiser_beta: float = _KAISER_BETA,
bandwidth: float = 0.2,
) -> tuple[NDArray, NDArray]:
"""Build the FIR coefficient tensor and gain matrix for a sweep.
Parameters
----------
orders
Filter orders to sweep. Values will be cast to ``int``.
cutoffs
Normalised cutoff frequencies in (0, 1).
filter_type
``"lowpass"``, ``"highpass"``, ``"bandpass"``, or ``"bandstop"``.
window
Window name in :data:`WINDOWS`.
kaiser_beta
Shape parameter of the Kaiser window. Ignored unless
``window == "kaiser"``. Must be ``>= 0``.
bandwidth
Width of the pass/stop band when *filter_type* is
``"bandpass"`` or ``"bandstop"``. Ignored for lowpass and
highpass. Clamped so edges stay inside ``(0, 1)``.
Returns
-------
fir_bank : ndarray, shape (Ncoef, Ncut, max_order)
``fir_bank[i, j, :order_i]`` holds the FIR coefficients for the
``(order_i, cutoff_j)`` pair, zero-padded on the right.
gains : ndarray, shape (Ncoef, Ncut)
DC gain (sum of coefficients) of each filter.
"""
if filter_type not in FILTER_TYPES:
raise ValueError(f"filter_type must be one of {FILTER_TYPES}, got {filter_type!r}")
if window not in WINDOWS:
raise ValueError(f"window must be one of {WINDOWS}, got {window!r}")
if window == "kaiser" and kaiser_beta < 0:
raise ValueError(f"kaiser_beta must be >= 0, got {kaiser_beta!r}")
orders_arr = np.asarray(orders, dtype=np.int64)
if filter_type in ("highpass", "bandstop"):
even = orders_arr[orders_arr % 2 == 0]
if even.size:
raise ValueError(
f"{filter_type} requires odd orders; got even values "
f"{even.tolist()}. Use np.arange(start|1, stop, 2) or "
"filter your orders to odd integers."
)
Ncoef = len(orders_arr)
Ncut = len(cutoffs)
max_taps = int(orders_arr.max())
fir_bank = np.zeros((Ncoef, Ncut, max_taps))
gains = np.zeros((Ncoef, Ncut))
win_arg = ("kaiser", float(kaiser_beta)) if window == "kaiser" else window
use_band = filter_type in ("bandpass", "bandstop")
bw_half = bandwidth / 2.0 if use_band else 0.0
for i, Nss in enumerate(orders_arr):
numtaps = int(Nss)
for j, wc in enumerate(cutoffs):
if use_band:
low = max(1e-5, float(wc) - bw_half)
high = min(1.0 - 1e-5, float(wc) + bw_half)
c = firwin(numtaps, [low, high], pass_zero=filter_type, window=win_arg)
else:
c = firwin(numtaps, wc, pass_zero=filter_type, window=win_arg)
fir_bank[i, j, :numtaps] = c.astype(np.float64)
gains[i, j] = float(np.sum(c))
return fir_bank, gains
[docs]
def quick_sweep_params() -> tuple[NDArray, NDArray, NDArray, dict[str, float | int]]:
"""Return the (orders_lp, orders_hp, cutoffs, params) for quick-sweep mode.
These are small grids suitable for testing and CI smoke runs.
"""
return (
np.arange(2, 8),
np.arange(3, 9, 2),
np.linspace(0.1, 0.9, 10),
dict(Nitera=50, Nmap=200, n_initial=3, bandwidth=0.15),
)
[docs]
def run_sweep(
window: str = "hamming",
filter_type: str = "lowpass",
*,
orders: Sequence[int] | NDArray | None = None,
cutoffs: NDArray | None = None,
Nitera: int = 500,
Nmap: int = 3000,
n_initial: int = 25,
alpha: float = 1.4,
beta: float = 0.3,
seed: int | None = 42,
warmup: bool = True,
kaiser_beta: float = _KAISER_BETA,
bandwidth: float = 0.2,
adaptive: bool = False,
Nmap_min: int = 500,
tol: float = 1e-3,
) -> SweepResult:
"""Run a full Lyapunov sweep for one (window, filter) combination.
Parameters
----------
window, filter_type
FIR configuration (see :data:`WINDOWS`, :data:`FILTER_TYPES`).
orders
Filter orders to sweep. Defaults to ``np.arange(2, 42)`` for
lowpass and ``np.arange(3, 43, 2)`` for highpass (which requires
odd tap counts).
cutoffs
Cutoff frequencies. Defaults to 100 points linearly spaced in
``(0, 1)``.
Nitera
Burn-in iterations applied to the map before starting the
Lyapunov accumulation.
Nmap
Maximum number of iterations used to estimate each Lyapunov
exponent. Also the exact iteration count when ``adaptive=False``.
bandwidth
Width of the pass/stop band for bandpass/bandstop filters.
n_initial
Number of random initial conditions averaged per grid point.
alpha, beta
Henon parameters (alpha, beta).
seed
Seed for the RNG used to shuffle initial conditions. ``None``
leaves the RNG state untouched.
warmup
Run a tiny sweep first to trigger Numba JIT compilation.
kaiser_beta
Shape parameter of the Kaiser window.
adaptive
When ``True``, enable adaptive early-stop in the Lyapunov kernels.
Nmap_min
Minimum iterations before the adaptive criterion may fire.
tol
Convergence tolerance for the adaptive criterion.
Returns
-------
SweepResult
Full result object.
"""
if adaptive:
if Nmap_min < 1:
raise ValueError(f"Nmap_min must be >= 1, got {Nmap_min}")
if Nmap_min > Nmap:
raise ValueError(f"Nmap_min ({Nmap_min}) must be <= Nmap ({Nmap})")
if tol <= 0:
raise ValueError(f"tol must be > 0, got {tol}")
if Nmap_min == Nmap:
raise ValueError(
"adaptive=True with Nmap_min == Nmap is a no-op; "
"set Nmap_min < Nmap or pass adaptive=False."
)
kernel_Nmap_min = Nmap_min
kernel_tol = float(tol)
else:
kernel_Nmap_min = Nmap
kernel_tol = 0.0
if orders is None:
orders = (
np.arange(3, 43, 2) if filter_type in ("highpass", "bandstop") else np.arange(2, 42)
)
if cutoffs is None:
cutoffs = np.linspace(1e-5, 1.0 - 1e-5, 100)
orders_arr = np.asarray(orders, dtype=np.int64)
cutoffs_arr = np.asarray(cutoffs, dtype=np.float64)
fir_bank, gains = precompute_fir_bank(
orders_arr,
cutoffs_arr,
filter_type,
window,
kaiser_beta=kaiser_beta,
bandwidth=bandwidth,
)
perturbations = _precompute_perturbations(
orders_arr,
len(cutoffs_arr),
n_initial,
seed,
)
task_order = _build_task_order(orders_arr, len(cutoffs_arr))
if warmup:
_warmup_perturbations = _precompute_perturbations(
orders_arr[:2],
3,
2,
seed=None,
)
_warmup_order = _build_task_order(orders_arr[:2], 3)
_sweep_kernel(
orders_arr[:2].astype(np.float64),
cutoffs_arr[:3],
fir_bank[:2, :3, :],
gains[:2, :3],
_warmup_perturbations,
_warmup_order,
10,
50,
50,
0.0,
2,
alpha,
beta,
)
h, h_std, n_iters_used = _sweep_kernel(
orders_arr.astype(np.float64),
cutoffs_arr,
fir_bank,
gains,
perturbations,
task_order,
Nitera,
Nmap,
kernel_Nmap_min,
kernel_tol,
n_initial,
alpha,
beta,
)
return SweepResult(
h=h,
h_std=h_std,
orders=orders_arr,
cutoffs=cutoffs_arr,
window=window,
filter_type=filter_type,
n_iters_used=n_iters_used,
metadata={
"Nitera": Nitera,
"Nmap": Nmap,
"n_initial": n_initial,
"alpha": alpha,
"beta": beta,
"seed": seed,
"kaiser_beta": kaiser_beta,
"adaptive": adaptive,
"Nmap_min": Nmap_min if adaptive else None,
"tol": tol if adaptive else None,
},
)