"""
Statistical analysis of Lyapunov sweep results.
Provides quick programmatic access to the full sweep dataset:
summary tables, regime classification, parameter ranking, and
comparisons across windows, filter types, and Kaiser beta sweeps.
Return types
------------
All public functions that return ``dict`` have corresponding
:class:`~typing.TypedDict` definitions in this module so that IDE
autocompletion and mypy can statically verify the result shape:
* :class:`SummaryRow` — one row of :func:`summary_table`
* :class:`FilterTypeAggregate` — output of :func:`compare_filter_types`
* :class:`OptimalParams` — one entry of :func:`optimal_parameters`
* :class:`LmaxDistribution` — output of :func:`lmax_distribution`
* :class:`CorrelationMatrix` — output of :func:`correlation_matrix`
* :class:`BootstrapConfidence` — one entry of :func:`bootstrap_confidence`
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import TypedDict
import numpy as np
from numpy.typing import NDArray
from .sweep import FILTER_TYPES, SweepResult, load_sweep
[docs]
class SummaryRow(TypedDict):
"""One row of :func:`summary_table`."""
window: str
filter_type: str
n_orders: int
n_cutoffs: int
pct_chaotic: float
pct_periodic: float
pct_divergent: float
mean_lmax: float
max_lmax: float
beta: float | None
[docs]
class FilterTypeAggregate(TypedDict, total=False):
"""Aggregate statistics for a single filter type."""
n_sweeps: int
mean_pct_chaotic: float
mean_pct_periodic: float
mean_pct_divergent: float
mean_lmax: float
[docs]
class OptimalParams(TypedDict):
"""One optimal (order, cutoff) pair from :func:`optimal_parameters`."""
window: str
filter_type: str
order: int
cutoff: float
lmax: float
[docs]
class LmaxDistribution(TypedDict):
"""Distribution statistics for lambda_max per filter type.
Returned by :func:`lmax_distribution`.
"""
hist: list[float]
edges: list[float]
mean: float
std: float
skewness: float
n: int
[docs]
class CorrelationMatrix(TypedDict):
"""Spearman correlation results from :func:`correlation_matrix`."""
order_vs_lmax: float
cutoff_vs_lmax: float
n: int
[docs]
class BootstrapConfidence(TypedDict, total=False):
"""Bootstrap 95% CI entry for one filter type."""
mean: float
ci_low: float
ci_high: float
n: int
[docs]
class AreaSummary(TypedDict):
"""Chaotic-area breakdown for a single :class:`~.sweep.SweepResult`.
Returned by :func:`area_summary`.
"""
n_chaotic: int
n_periodic: int
n_divergent: int
n_total: int
pct_chaotic: float
pct_chaotic_finite: float
[docs]
class LmaxStats(TypedDict):
"""Descriptive statistics of :math:`\\lambda_{\\max}` for a subset of points.
Returned by :func:`lmax_statistics`.
"""
mean: float
median: float
std: float
min: float
max: float
ci_95_low: float
ci_95_high: float
n_used: int
[docs]
class ConfigRank(TypedDict):
"""One entry of :func:`rank_configurations`.
Ranks a (filter_type, window) combination by chaotic-area coverage.
Kaiser-beta sweeps include the beta value and a synthetic window key.
"""
rank: int
filter_type: str
window: str
n_chaotic: int
pct_chaotic: float
pct_chaotic_finite: float
lmax_mean: float
lmax_max: float
lmax_std: float
lmax_ci_95_low: float
lmax_ci_95_high: float
beta: float | None
[docs]
class SweetSpot(TypedDict):
"""Single grid point with the highest :math:`\\lambda_{\\max}` for one filter type.
Returned per filter type by :func:`sweet_spot_per_filter`.
"""
filter_type: str
window: str
n_z: int
omega_c: float
lmax: float
lmax_ci_95_low: float | None
lmax_ci_95_high: float | None
[docs]
class KaiserBetaOptimal(TypedDict):
"""Best Kaiser β per filter type (by chaotic area).
Returned by :func:`kaiser_beta_optimal`.
"""
filter_type: str
beta: float
n_chaotic: int
pct_chaotic_finite: float
lmax_mean: float
lmax_max: float
def _discover_all(data_dir: str | Path = "data/sweeps") -> list[SweepResult]:
"""Load every ``variables_lyapunov.npz`` under *data_dir*."""
results: list[SweepResult] = []
for path in sorted(Path(data_dir).rglob("variables_lyapunov.npz")):
results.append(load_sweep(path))
return results
def _safe_stats(h: NDArray) -> tuple[float, float]:
"""Mean and max of *h* ignoring NaN and Inf.
Returns ``(NaN, NaN)`` when all entries of *h* are non-finite
(e.g. a fully-divergent sweep). Callers are expected to handle
this gracefully when formatting output.
"""
finite = h[np.isfinite(h)]
if finite.size == 0:
return float("nan"), float("nan")
return float(np.mean(finite)), float(np.max(finite))
[docs]
def summary_table(
data_dir: str | Path = "data/sweeps",
) -> list[SummaryRow]:
"""Return one row per sweep with key statistics.
Each row is a :class:`SummaryRow` with: ``window``, ``filter_type``,
``n_orders``, ``n_cutoffs``, ``pct_chaotic``, ``pct_periodic``,
``pct_divergent``, ``mean_lmax``, ``max_lmax``, ``beta`` (Kaiser only).
This is the foundation of all downstream analyses — every other
public function in this module either calls or derives from this table.
"""
rows: list[dict] = []
for result in _discover_all(data_dir):
h = result.h
chaotic = np.sum((~np.isnan(h)) & (h > 0))
periodic = np.sum((~np.isnan(h)) & (h <= 0))
divergent = np.sum(np.isnan(h))
total = h.size
mean_lmax, max_lmax = _safe_stats(h)
row: dict = {
"window": result.window,
"filter_type": result.filter_type,
"n_orders": len(result.orders),
"n_cutoffs": len(result.cutoffs),
"pct_chaotic": round(100 * chaotic / total, 1),
"pct_periodic": round(100 * periodic / total, 1),
"pct_divergent": round(100 * divergent / total, 1),
"mean_lmax": round(mean_lmax, 4),
"max_lmax": round(max_lmax, 4),
"beta": result.metadata.get("kaiser_beta"),
}
rows.append(row)
return rows # type: ignore[return-value]
[docs]
def area_summary(sweep_result: SweepResult) -> AreaSummary:
"""Compute chaotic-area coverage for a single sweep.
Classifies each grid point (order, cutoff) as chaotic
(:math:`\\lambda_{\\max} > 0`, finite), periodic
(:math:`\\lambda_{\\max} \\le 0`, finite), or divergent
(NaN or Inf). Returns absolute counts and two complementary
percentages: one over the full grid, another excluding
divergent points so the chaotic proportion is not
artificially deflated by parameter regions where the
FIR filter itself becomes ill-conditioned.
Parameters
----------
sweep_result : SweepResult
A loaded sweep.
Returns
-------
AreaSummary
Counts and percentages for the three dynamical regimes.
"""
h = sweep_result.h
finite = np.isfinite(h)
chaotic = np.sum(finite & (h > 0))
periodic = np.sum(finite & (h <= 0))
divergent = np.sum(~finite)
total = h.size
pct_chaotic = round(100 * chaotic / total, 1) if total > 0 else 0.0
n_finite = total - divergent
pct_chaotic_finite = round(100 * chaotic / n_finite, 1) if n_finite > 0 else 0.0
return {
"n_chaotic": int(chaotic),
"n_periodic": int(periodic),
"n_divergent": int(divergent),
"n_total": int(total),
"pct_chaotic": pct_chaotic,
"pct_chaotic_finite": pct_chaotic_finite,
}
[docs]
def lmax_statistics(
sweep_result: SweepResult,
region: str = "chaotic",
n_bootstrap: int = 1000,
seed: int = 42,
) -> LmaxStats:
"""Compute descriptive statistics of :math:`\\lambda_{\\max}` for a subset of grid points.
Filters the sweep grid by dynamical regime then computes mean,
median, standard deviation, min, max, and a bootstrap 95%
confidence interval for the mean.
Parameters
----------
sweep_result : SweepResult
region : {"chaotic", "all_finite", "periodic"}
Subset of grid points to include:
* ``"chaotic"`` — only points with :math:`\\lambda_{\\max} > 0` (finite)
* ``"all_finite"`` — all finite points (excludes NaN/Inf)
* ``"periodic"`` — only points with :math:`\\lambda_{\\max} \\le 0` (finite)
n_bootstrap : int
Number of bootstrap resamples (default 1000).
seed : int
RNG seed for reproducibility (default 42).
Returns
-------
LmaxStats
Descriptive statistics including bootstrap 95% CI.
When fewer than 3 valid points exist, the CI fields are NaN.
"""
from scipy.stats import bootstrap as scipy_bootstrap
h = sweep_result.h.ravel()
finite = np.isfinite(h)
if region == "chaotic":
mask = finite & (h > 0)
elif region == "periodic":
mask = finite & (h <= 0)
elif region == "all_finite":
mask = finite
else:
raise ValueError(
f"Unknown region {region!r}; expected 'chaotic', 'all_finite', or 'periodic'"
)
vals = h[mask]
n_used = len(vals)
if n_used < 3:
return {
"mean": 0.0,
"median": 0.0,
"std": 0.0,
"min": 0.0,
"max": 0.0,
"ci_95_low": float("nan"),
"ci_95_high": float("nan"),
"n_used": n_used,
}
ci_result = scipy_bootstrap(
(vals,),
np.mean,
n_resamples=n_bootstrap,
confidence_level=0.95,
random_state=seed,
method="BCa",
)
return {
"mean": round(float(np.mean(vals)), 6),
"median": round(float(np.median(vals)), 6),
"std": round(float(np.std(vals, ddof=1)), 6),
"min": round(float(np.min(vals)), 6),
"max": round(float(np.max(vals)), 6),
"ci_95_low": round(float(ci_result.confidence_interval.low), 6),
"ci_95_high": round(float(ci_result.confidence_interval.high), 6),
"n_used": n_used,
}
# ── Ranking helpers ─────────────────────────────────────────────────────────
_LOADED_SWEEPS_CACHE: dict[Path, dict[tuple[str, str], SweepResult]] = {}
[docs]
def load_all_sweeps(
sweep_dir: str | Path = "data/sweeps",
) -> dict[tuple[str, str], SweepResult]:
"""Load every ``variables_lyapunov.npz`` under *sweep_dir*.
Returns a dict keyed by ``(filter_type, window)`` for non-Kaiser
sweeps or ``(filter_type, "kaiser_<beta>")`` for Kaiser beta-sweeps.
Results are cached in memory so repeated calls are free.
Parameters
----------
sweep_dir
Root directory containing per-configuration subdirectories
and an optional ``kaiser/<filter_type>/beta_<N>`` subtree.
Returns
-------
dict
Mapping from ``(filter_type, window_key)`` to :class:`SweepResult`.
"""
sweep_dir = Path(sweep_dir).resolve()
if sweep_dir in _LOADED_SWEEPS_CACHE:
return _LOADED_SWEEPS_CACHE[sweep_dir]
loaded: dict[tuple[str, str], SweepResult] = {}
for path in sorted(sweep_dir.rglob("variables_lyapunov.npz")):
result = load_sweep(path)
if result.window == "kaiser":
beta = result.metadata.get("kaiser_beta", 0.0)
key = (result.filter_type, f"kaiser_{float(beta):.2f}")
else:
key = (result.filter_type, result.window)
loaded[key] = result
_LOADED_SWEEPS_CACHE[sweep_dir] = loaded
return loaded
[docs]
def rank_configurations(
sweep_results: dict[tuple[str, str], SweepResult],
bootstrap_seed: int = 42,
) -> list[ConfigRank]:
"""Rank every (filter_type, window) combination by chaotic grid-point count.
Parameters
----------
sweep_results
Mapping from ``(filter_type, window_key)`` to a loaded sweep.
Typically obtained via :func:`load_all_sweeps`.
bootstrap_seed
RNG seed forwarded to :func:`lmax_statistics` (default 42).
Returns
-------
list of ConfigRank
Descending by ``n_chaotic``. Rank is 1-indexed.
"""
entries: list[ConfigRank] = []
for (ft, win), result in sweep_results.items():
area = area_summary(result)
lmax = lmax_statistics(result, region="chaotic", seed=bootstrap_seed)
beta_raw = result.metadata.get("kaiser_beta")
beta = round(float(beta_raw), 2) if beta_raw is not None else None
entries.append(
{
"rank": 0,
"filter_type": ft,
"window": win,
"n_chaotic": area["n_chaotic"],
"pct_chaotic": area["pct_chaotic"],
"pct_chaotic_finite": area["pct_chaotic_finite"],
"lmax_mean": lmax["mean"],
"lmax_max": lmax["max"],
"lmax_std": lmax["std"],
"lmax_ci_95_low": lmax["ci_95_low"],
"lmax_ci_95_high": lmax["ci_95_high"],
"beta": beta,
}
)
entries.sort(key=lambda e: e["n_chaotic"], reverse=True)
for i, e in enumerate(entries):
e["rank"] = i + 1
return entries
[docs]
def top_k_per_filter(
sweep_results: dict[tuple[str, str], SweepResult],
k: int = 3,
bootstrap_seed: int = 42,
) -> dict[str, list[ConfigRank]]:
"""Return the top-*k* configurations for each filter type.
Calls :func:`rank_configurations` internally, then groups the
global ranking by ``filter_type``, keeping at most *k* entries
per group.
Parameters
----------
sweep_results
Mapping from ``(filter_type, window_key)`` to loaded sweeps.
k
Maximum number of entries per filter type (default 3).
bootstrap_seed
Forwarded to :func:`rank_configurations`.
Returns
-------
dict of str → list of ConfigRank
Keys are filter types; values are lists of at most *k*
:class:`ConfigRank` entries, ordered by descending
``n_chaotic``.
"""
ranking = rank_configurations(sweep_results, bootstrap_seed=bootstrap_seed)
out: dict[str, list[ConfigRank]] = {}
for entry in ranking:
ft = entry["filter_type"]
out.setdefault(ft, []).append(entry)
for ft in list(out):
entries = out[ft][:k]
for i, e in enumerate(entries):
e["rank"] = i + 1
out[ft] = entries
return out
[docs]
def consolidate_kaiser(
sweep_results: dict[tuple[str, str], SweepResult],
) -> dict[tuple[str, str], SweepResult]:
"""Keep only the best Kaiser β per filter type, by chaotic point count.
Non-Kaiser windows are passed through unchanged. This produces a
balanced dataset where each window family appears once per filter
type, making window-to-window comparisons fair.
Parameters
----------
sweep_results
Mapping from :func:`load_all_sweeps`.
Returns
-------
dict
Same shape as *sweep_results*, but with at most one Kaiser
entry per ``filter_type`` (the β with the most chaotic points).
"""
consolidated: dict[tuple[str, str], SweepResult] = {}
best_kaiser: dict[str, tuple[str, int]] = {}
for (ft, win), result in sweep_results.items():
if result.window == "kaiser":
area = area_summary(result)
n_chaotic = area["n_chaotic"]
if ft not in best_kaiser or n_chaotic > best_kaiser[ft][1]:
best_kaiser[ft] = (win, n_chaotic)
else:
consolidated[(ft, win)] = result
for ft, (win, _) in best_kaiser.items():
consolidated[(ft, win)] = sweep_results[(ft, win)]
return consolidated
[docs]
def kaiser_beta_optimal(
sweep_results: dict[tuple[str, str], SweepResult],
bootstrap_seed: int = 42,
) -> dict[str, KaiserBetaOptimal]:
"""Report the best Kaiser β per filter type with extended statistics.
Parameters
----------
sweep_results
Mapping from :func:`load_all_sweeps`.
bootstrap_seed
Forwarded to :func:`lmax_statistics`.
Returns
-------
dict of str → KaiserBetaOptimal
"""
out: dict[str, KaiserBetaOptimal] = {}
for (ft, _win), result in sweep_results.items():
if result.window != "kaiser":
continue
beta = float(result.metadata.get("kaiser_beta", 0.0))
area = area_summary(result)
lmax = lmax_statistics(result, region="chaotic", seed=bootstrap_seed)
if ft not in out or area["n_chaotic"] > out[ft]["n_chaotic"]:
out[ft] = {
"filter_type": ft,
"beta": beta,
"n_chaotic": area["n_chaotic"],
"pct_chaotic_finite": area["pct_chaotic_finite"],
"lmax_mean": lmax["mean"],
"lmax_max": lmax["max"],
}
return out
[docs]
def sweet_spot_per_filter(
sweep_results: dict[tuple[str, str], SweepResult],
) -> dict[str, SweetSpot]:
"""Identify the grid point with the highest :math:`\\lambda_{\\max}` per filter type.
For each filter type, scans all available window sweeps and returns
the exact ``(N_z, omega_c, window)`` where ``lambda_max`` is
globally maximal.
The 95% confidence interval is computed from the per-point standard
deviation ``h_std`` and ``n_initial`` metadata via the
t-distribution. If ``h_std`` is NaN or ``n_initial`` is unavailable,
the CI fields are ``None``.
Parameters
----------
sweep_results
Mapping from ``(filter_type, window_key)`` to loaded sweeps.
Returns
-------
dict of str → SweetSpot
One entry per filter type that has at least one finite point.
"""
from scipy.stats import t as t_dist
best: dict[str, SweetSpot] = {}
for (ft, win), result in sweep_results.items():
h, h_std, orders, cutoffs = result.h, result.h_std, result.orders, result.cutoffs
n_initial = result.metadata.get("n_initial", None)
for i in range(len(orders)):
for j in range(len(cutoffs)):
val = h[i, j]
if not np.isfinite(val):
continue
if ft not in best or val > best[ft]["lmax"]:
std_val = h_std[i, j]
if (
n_initial is not None
and n_initial > 1
and np.isfinite(std_val)
and std_val > 0
):
se = std_val / np.sqrt(n_initial)
t_crit = t_dist.ppf(0.975, n_initial - 1)
ci_low = round(float(val - t_crit * se), 6)
ci_high = round(float(val + t_crit * se), 6)
else:
ci_low = None
ci_high = None
best[ft] = {
"filter_type": ft,
"window": win,
"n_z": int(orders[i]),
"omega_c": round(float(cutoffs[j]), 4),
"lmax": round(float(val), 6),
"lmax_ci_95_low": ci_low,
"lmax_ci_95_high": ci_high,
}
return best
[docs]
def best_chaos_preserving(
data_dir: str | Path = "data/sweeps",
top_n: int = 5,
) -> list[SummaryRow]:
"""Rank sweeps by percentage of chaotic grid points (descending).
Returns the *top_n* entries with the most chaotic coverage.
"""
rows = summary_table(data_dir)
rows.sort(key=lambda r: r["pct_chaotic"], reverse=True)
return rows[:top_n]
[docs]
def compare_filter_types(
data_dir: str | Path = "data/sweeps",
) -> dict[str, FilterTypeAggregate]:
"""Aggregate statistics per filter type across all windows.
Returns a ``dict`` keyed by filter type (``"lowpass"``, ...) with
each value being a :class:`FilterTypeAggregate` containing mean
percentages and lambda_max across all windows that use that filter type.
"""
rows = summary_table(data_dir)
agg: dict[str, list[SummaryRow]] = {ft: [] for ft in FILTER_TYPES}
for row in rows:
agg[row["filter_type"]].append(row)
out: dict[str, FilterTypeAggregate] = {}
for ft, entries in agg.items():
if not entries:
out[ft] = {}
continue
out[ft] = {
"n_sweeps": len(entries),
"mean_pct_chaotic": round(float(np.mean([e["pct_chaotic"] for e in entries])), 1),
"mean_pct_periodic": round(float(np.mean([e["pct_periodic"] for e in entries])), 1),
"mean_pct_divergent": round(float(np.mean([e["pct_divergent"] for e in entries])), 1),
"mean_lmax": round(float(np.mean([e["mean_lmax"] for e in entries])), 4),
}
return out
[docs]
def optimal_parameters(
data_dir: str | Path = "data/sweeps",
window: str | None = None,
filter_type: str | None = None,
top_n: int = 3,
) -> list[OptimalParams]:
"""Find the (order, cutoff) pairs with the highest λ_max.
Filters results by *window* and *filter_type* if given.
"""
best: list[dict] = []
for result in _discover_all(data_dir):
if window is not None and result.window != window:
continue
if filter_type is not None and result.filter_type != filter_type:
continue
h, orders, cutoffs = result.h, result.orders, result.cutoffs
for i in range(len(orders)):
for j in range(len(cutoffs)):
val = h[i, j]
if np.isnan(val) or np.isinf(val):
continue
best.append(
{
"window": result.window,
"filter_type": result.filter_type,
"order": int(orders[i]),
"cutoff": round(float(cutoffs[j]), 4),
"lmax": round(float(val), 6),
}
)
best.sort(key=lambda x: x["lmax"], reverse=True)
return best[:top_n] # type: ignore[return-value]
[docs]
def export_summary_json(
data_dir: str | Path = "data/sweeps",
output: str | Path = "data/analysis_summary.json",
) -> Path:
"""Write the full summary table to a JSON file."""
output = Path(output)
rows = summary_table(data_dir)
output.write_text(json.dumps(rows, indent=2, default=str))
return output
# ── Kaiser β-sweep analysis ──────────────────────────────────────────────────
[docs]
def beta_summary(
data_dir: str | Path = "data/sweeps/kaiser",
) -> dict[str, dict[float, dict]]:
"""Aggregate per-β statistics for each filter type.
Returns a nested dict: ``{filter_type: {beta: {pct_chaotic, mean_lmax, ...}}}``
"""
out: dict[str, dict[float, dict]] = {}
for ft in FILTER_TYPES:
ft_dir = Path(data_dir) / ft
if not ft_dir.is_dir():
continue
out[ft] = {}
for path in sorted(ft_dir.rglob("variables_lyapunov.npz")):
result = load_sweep(path)
beta = float(result.metadata.get("kaiser_beta", float("nan")))
if np.isnan(beta):
continue
h = result.h
chaotic = np.sum((~np.isnan(h)) & (h > 0))
total = h.size
mn, mx = _safe_stats(h)
out[ft][beta] = {
"pct_chaotic": round(100 * chaotic / total, 1),
"mean_lmax": round(mn, 4),
"max_lmax": round(mx, 4),
}
return out
[docs]
def beta_curve(
data_dir: str | Path = "data/sweeps/kaiser",
filter_type: str = "lowpass",
) -> tuple[NDArray, NDArray]:
"""Return (betas, pct_chaotic) arrays for a single filter type.
Useful for plotting the β-dependence of chaotic coverage.
Returns empty arrays when *filter_type* is absent from the data.
"""
summary = beta_summary(data_dir)
if filter_type not in summary:
return np.array([]), np.array([])
betas = np.array(sorted(summary[filter_type]))
pct = np.array([summary[filter_type][b]["pct_chaotic"] for b in betas])
return betas, pct
# ── Advanced statistical analyses ────────────────────────────────────────────
[docs]
def lmax_distribution(
data_dir: str | Path = "data/sweeps",
bins: int = 50,
) -> dict[str, LmaxDistribution]:
"""Histogram of λ_max values per filter type.
Returns
-------
dict
``{filter_type: {"hist": counts, "edges": bin_edges, "mean": float,
"std": float, "skewness": float, "n": int}}``
"""
from scipy.stats import skew
all_vals: dict[str, list[float]] = {ft: [] for ft in FILTER_TYPES}
for result in _discover_all(data_dir):
vals = result.h.ravel()
vals = vals[np.isfinite(vals)]
all_vals[result.filter_type].extend(vals.tolist())
out: dict[str, LmaxDistribution] = {}
for ft, val_list in all_vals.items():
arr = np.array(val_list)
if len(arr) == 0:
out[ft] = {"hist": [], "edges": [], "mean": 0.0, "std": 0.0, "skewness": 0.0, "n": 0}
continue
hist, edges = np.histogram(arr, bins=bins)
out[ft] = {
"hist": hist.tolist(),
"edges": edges.tolist(),
"mean": round(float(np.mean(arr)), 4),
"std": round(float(np.std(arr)), 4),
"skewness": round(float(skew(arr)), 4),
"n": len(arr),
}
return out
[docs]
def transition_boundary(
data_dir: str | Path = "data/sweeps",
window: str | None = None,
filter_type: str = "lowpass",
) -> tuple[NDArray, NDArray]:
"""Find the cutoff where each order transitions from non-chaotic to chaotic.
For each order, returns the *first* cutoff (lowest frequency) where
λ_max > 0, or NaN if no chaotic point exists.
Returns
-------
orders : ndarray
Filter orders.
cutoffs : ndarray
First chaotic cutoff per order (NaN if never chaotic).
"""
best: dict[int, tuple[float, float]] = {}
for result in _discover_all(data_dir):
if window is not None and result.window != window:
continue
if result.filter_type != filter_type:
continue
h, orders, cutoffs = result.h, result.orders, result.cutoffs
for i in range(len(orders)):
for j in range(len(cutoffs)):
val = h[i, j]
if not np.isfinite(val):
continue
if val > 0:
order = int(orders[i])
wc = float(cutoffs[j])
if order not in best or wc < best[order][0]:
best[order] = (wc, float(val))
if not best:
return np.array([]), np.array([])
orders_sorted = np.array(sorted(best))
cutoffs_sorted = np.array([best[o][0] for o in orders_sorted])
return orders_sorted, cutoffs_sorted
[docs]
def chaos_margin(
data_dir: str | Path = "data/sweeps",
window: str | None = None,
filter_type: str = "lowpass",
) -> tuple[NDArray, NDArray]:
"""For each order, compute the width of the chaotic region in cutoff space.
Returns
-------
orders : ndarray
widths : ndarray
Fraction of cutoffs where λ_max > 0 per order.
"""
accum: dict[int, list[float]] = {}
for result in _discover_all(data_dir):
if window is not None and result.window != window:
continue
if result.filter_type != filter_type:
continue
h, orders = result.h, result.orders
for i in range(len(orders)):
order = int(orders[i])
row = h[i, :]
chaotic_count = int(np.sum(np.isfinite(row) & (row > 0)))
total = int(np.sum(np.isfinite(row)))
if total > 0:
accum.setdefault(order, []).append(chaotic_count / total)
if not accum:
return np.array([]), np.array([])
orders_sorted = np.array(sorted(accum))
widths = np.array([float(np.mean(accum[o])) for o in orders_sorted])
return orders_sorted, widths
[docs]
def correlation_matrix(
data_dir: str | Path = "data/sweeps",
) -> CorrelationMatrix:
"""Spearman correlation between (order, cutoff, λ_max) across all sweeps.
Returns
-------
dict
``{"order_vs_lmax": rho, "cutoff_vs_lmax": rho, "n": int}``
"""
from scipy.stats import spearmanr
orders_all: list[float] = []
cutoffs_all: list[float] = []
lmax_all: list[float] = []
for result in _discover_all(data_dir):
h, orders, cutoffs = result.h, result.orders, result.cutoffs
for i in range(len(orders)):
for j in range(len(cutoffs)):
val = h[i, j]
if not np.isfinite(val):
continue
orders_all.append(float(orders[i]))
cutoffs_all.append(float(cutoffs[j]))
lmax_all.append(float(val))
if len(lmax_all) < 3:
return {"order_vs_lmax": 0.0, "cutoff_vs_lmax": 0.0, "n": len(lmax_all)}
rho_o, _ = spearmanr(orders_all, lmax_all)
rho_c, _ = spearmanr(cutoffs_all, lmax_all)
return {
"order_vs_lmax": round(float(rho_o), 4),
"cutoff_vs_lmax": round(float(rho_c), 4),
"n": len(lmax_all),
}
[docs]
def bootstrap_confidence(
data_dir: str | Path = "data/sweeps",
n_bootstrap: int = 1000,
seed: int = 42,
) -> dict[str, BootstrapConfidence]:
"""Bootstrap 95% CI for mean λ_max per filter type.
Returns
-------
dict
``{filter_type: {"mean": float, "ci_low": float, "ci_high": float}}``
"""
rng = np.random.default_rng(seed)
all_vals: dict[str, list[float]] = {ft: [] for ft in FILTER_TYPES}
for result in _discover_all(data_dir):
vals = result.h.ravel()
vals = vals[np.isfinite(vals)]
all_vals[result.filter_type].extend(vals.tolist())
out: dict[str, dict] = {}
for ft, val_list in all_vals.items():
arr = np.array(val_list)
if len(arr) == 0:
out[ft] = {}
continue
means = np.array(
[np.mean(rng.choice(arr, size=len(arr), replace=True)) for _ in range(n_bootstrap)]
)
out[ft] = {
"mean": round(float(np.mean(arr)), 4),
"ci_low": round(float(np.percentile(means, 2.5)), 4),
"ci_high": round(float(np.percentile(means, 97.5)), 4),
"n": len(arr),
}
return out # type: ignore[return-value]