Source code for chaotic_pfc.analysis.stats

"""
Statistical analysis of Lyapunov sweep results.

Provides quick programmatic access to the full sweep dataset:
summary tables, regime classification, parameter ranking, and
comparisons across windows, filter types, and Kaiser beta sweeps.

Return types
------------
All public functions that return ``dict`` have corresponding
:class:`~typing.TypedDict` definitions in this module so that IDE
autocompletion and mypy can statically verify the result shape:

* :class:`SummaryRow` — one row of :func:`summary_table`
* :class:`FilterTypeAggregate` — output of :func:`compare_filter_types`
* :class:`OptimalParams` — one entry of :func:`optimal_parameters`
* :class:`LmaxDistribution` — output of :func:`lmax_distribution`
* :class:`CorrelationMatrix` — output of :func:`correlation_matrix`
* :class:`BootstrapConfidence` — one entry of :func:`bootstrap_confidence`
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import TypedDict

import numpy as np
from numpy.typing import NDArray

from .sweep import FILTER_TYPES, SweepResult, load_sweep


[docs] class SummaryRow(TypedDict): """One row of :func:`summary_table`.""" window: str filter_type: str n_orders: int n_cutoffs: int pct_chaotic: float pct_periodic: float pct_divergent: float mean_lmax: float max_lmax: float beta: float | None
[docs] class FilterTypeAggregate(TypedDict, total=False): """Aggregate statistics for a single filter type.""" n_sweeps: int mean_pct_chaotic: float mean_pct_periodic: float mean_pct_divergent: float mean_lmax: float
[docs] class OptimalParams(TypedDict): """One optimal (order, cutoff) pair from :func:`optimal_parameters`.""" window: str filter_type: str order: int cutoff: float lmax: float
[docs] class LmaxDistribution(TypedDict): """Distribution statistics for lambda_max per filter type. Returned by :func:`lmax_distribution`. """ hist: list[float] edges: list[float] mean: float std: float skewness: float n: int
[docs] class CorrelationMatrix(TypedDict): """Spearman correlation results from :func:`correlation_matrix`.""" order_vs_lmax: float cutoff_vs_lmax: float n: int
[docs] class BootstrapConfidence(TypedDict, total=False): """Bootstrap 95% CI entry for one filter type.""" mean: float ci_low: float ci_high: float n: int
[docs] class AreaSummary(TypedDict): """Chaotic-area breakdown for a single :class:`~.sweep.SweepResult`. Returned by :func:`area_summary`. """ n_chaotic: int n_periodic: int n_divergent: int n_total: int pct_chaotic: float pct_chaotic_finite: float
[docs] class LmaxStats(TypedDict): """Descriptive statistics of :math:`\\lambda_{\\max}` for a subset of points. Returned by :func:`lmax_statistics`. """ mean: float median: float std: float min: float max: float ci_95_low: float ci_95_high: float n_used: int
[docs] class ConfigRank(TypedDict): """One entry of :func:`rank_configurations`. Ranks a (filter_type, window) combination by chaotic-area coverage. Kaiser-beta sweeps include the beta value and a synthetic window key. """ rank: int filter_type: str window: str n_chaotic: int pct_chaotic: float pct_chaotic_finite: float lmax_mean: float lmax_max: float lmax_std: float lmax_ci_95_low: float lmax_ci_95_high: float beta: float | None
[docs] class SweetSpot(TypedDict): """Single grid point with the highest :math:`\\lambda_{\\max}` for one filter type. Returned per filter type by :func:`sweet_spot_per_filter`. """ filter_type: str window: str n_z: int omega_c: float lmax: float lmax_ci_95_low: float | None lmax_ci_95_high: float | None
[docs] class KaiserBetaOptimal(TypedDict): """Best Kaiser β per filter type (by chaotic area). Returned by :func:`kaiser_beta_optimal`. """ filter_type: str beta: float n_chaotic: int pct_chaotic_finite: float lmax_mean: float lmax_max: float
def _discover_all(data_dir: str | Path = "data/sweeps") -> list[SweepResult]: """Load every ``variables_lyapunov.npz`` under *data_dir*.""" results: list[SweepResult] = [] for path in sorted(Path(data_dir).rglob("variables_lyapunov.npz")): results.append(load_sweep(path)) return results def _safe_stats(h: NDArray) -> tuple[float, float]: """Mean and max of *h* ignoring NaN and Inf. Returns ``(NaN, NaN)`` when all entries of *h* are non-finite (e.g. a fully-divergent sweep). Callers are expected to handle this gracefully when formatting output. """ finite = h[np.isfinite(h)] if finite.size == 0: return float("nan"), float("nan") return float(np.mean(finite)), float(np.max(finite))
[docs] def summary_table( data_dir: str | Path = "data/sweeps", ) -> list[SummaryRow]: """Return one row per sweep with key statistics. Each row is a :class:`SummaryRow` with: ``window``, ``filter_type``, ``n_orders``, ``n_cutoffs``, ``pct_chaotic``, ``pct_periodic``, ``pct_divergent``, ``mean_lmax``, ``max_lmax``, ``beta`` (Kaiser only). This is the foundation of all downstream analyses — every other public function in this module either calls or derives from this table. """ rows: list[dict] = [] for result in _discover_all(data_dir): h = result.h chaotic = np.sum((~np.isnan(h)) & (h > 0)) periodic = np.sum((~np.isnan(h)) & (h <= 0)) divergent = np.sum(np.isnan(h)) total = h.size mean_lmax, max_lmax = _safe_stats(h) row: dict = { "window": result.window, "filter_type": result.filter_type, "n_orders": len(result.orders), "n_cutoffs": len(result.cutoffs), "pct_chaotic": round(100 * chaotic / total, 1), "pct_periodic": round(100 * periodic / total, 1), "pct_divergent": round(100 * divergent / total, 1), "mean_lmax": round(mean_lmax, 4), "max_lmax": round(max_lmax, 4), "beta": result.metadata.get("kaiser_beta"), } rows.append(row) return rows # type: ignore[return-value]
[docs] def area_summary(sweep_result: SweepResult) -> AreaSummary: """Compute chaotic-area coverage for a single sweep. Classifies each grid point (order, cutoff) as chaotic (:math:`\\lambda_{\\max} > 0`, finite), periodic (:math:`\\lambda_{\\max} \\le 0`, finite), or divergent (NaN or Inf). Returns absolute counts and two complementary percentages: one over the full grid, another excluding divergent points so the chaotic proportion is not artificially deflated by parameter regions where the FIR filter itself becomes ill-conditioned. Parameters ---------- sweep_result : SweepResult A loaded sweep. Returns ------- AreaSummary Counts and percentages for the three dynamical regimes. """ h = sweep_result.h finite = np.isfinite(h) chaotic = np.sum(finite & (h > 0)) periodic = np.sum(finite & (h <= 0)) divergent = np.sum(~finite) total = h.size pct_chaotic = round(100 * chaotic / total, 1) if total > 0 else 0.0 n_finite = total - divergent pct_chaotic_finite = round(100 * chaotic / n_finite, 1) if n_finite > 0 else 0.0 return { "n_chaotic": int(chaotic), "n_periodic": int(periodic), "n_divergent": int(divergent), "n_total": int(total), "pct_chaotic": pct_chaotic, "pct_chaotic_finite": pct_chaotic_finite, }
[docs] def lmax_statistics( sweep_result: SweepResult, region: str = "chaotic", n_bootstrap: int = 1000, seed: int = 42, ) -> LmaxStats: """Compute descriptive statistics of :math:`\\lambda_{\\max}` for a subset of grid points. Filters the sweep grid by dynamical regime then computes mean, median, standard deviation, min, max, and a bootstrap 95% confidence interval for the mean. Parameters ---------- sweep_result : SweepResult region : {"chaotic", "all_finite", "periodic"} Subset of grid points to include: * ``"chaotic"`` — only points with :math:`\\lambda_{\\max} > 0` (finite) * ``"all_finite"`` — all finite points (excludes NaN/Inf) * ``"periodic"`` — only points with :math:`\\lambda_{\\max} \\le 0` (finite) n_bootstrap : int Number of bootstrap resamples (default 1000). seed : int RNG seed for reproducibility (default 42). Returns ------- LmaxStats Descriptive statistics including bootstrap 95% CI. When fewer than 3 valid points exist, the CI fields are NaN. """ from scipy.stats import bootstrap as scipy_bootstrap h = sweep_result.h.ravel() finite = np.isfinite(h) if region == "chaotic": mask = finite & (h > 0) elif region == "periodic": mask = finite & (h <= 0) elif region == "all_finite": mask = finite else: raise ValueError( f"Unknown region {region!r}; expected 'chaotic', 'all_finite', or 'periodic'" ) vals = h[mask] n_used = len(vals) if n_used < 3: return { "mean": 0.0, "median": 0.0, "std": 0.0, "min": 0.0, "max": 0.0, "ci_95_low": float("nan"), "ci_95_high": float("nan"), "n_used": n_used, } ci_result = scipy_bootstrap( (vals,), np.mean, n_resamples=n_bootstrap, confidence_level=0.95, random_state=seed, method="BCa", ) return { "mean": round(float(np.mean(vals)), 6), "median": round(float(np.median(vals)), 6), "std": round(float(np.std(vals, ddof=1)), 6), "min": round(float(np.min(vals)), 6), "max": round(float(np.max(vals)), 6), "ci_95_low": round(float(ci_result.confidence_interval.low), 6), "ci_95_high": round(float(ci_result.confidence_interval.high), 6), "n_used": n_used, }
# ── Ranking helpers ───────────────────────────────────────────────────────── _LOADED_SWEEPS_CACHE: dict[Path, dict[tuple[str, str], SweepResult]] = {}
[docs] def load_all_sweeps( sweep_dir: str | Path = "data/sweeps", ) -> dict[tuple[str, str], SweepResult]: """Load every ``variables_lyapunov.npz`` under *sweep_dir*. Returns a dict keyed by ``(filter_type, window)`` for non-Kaiser sweeps or ``(filter_type, "kaiser_<beta>")`` for Kaiser beta-sweeps. Results are cached in memory so repeated calls are free. Parameters ---------- sweep_dir Root directory containing per-configuration subdirectories and an optional ``kaiser/<filter_type>/beta_<N>`` subtree. Returns ------- dict Mapping from ``(filter_type, window_key)`` to :class:`SweepResult`. """ sweep_dir = Path(sweep_dir).resolve() if sweep_dir in _LOADED_SWEEPS_CACHE: return _LOADED_SWEEPS_CACHE[sweep_dir] loaded: dict[tuple[str, str], SweepResult] = {} for path in sorted(sweep_dir.rglob("variables_lyapunov.npz")): result = load_sweep(path) if result.window == "kaiser": beta = result.metadata.get("kaiser_beta", 0.0) key = (result.filter_type, f"kaiser_{float(beta):.2f}") else: key = (result.filter_type, result.window) loaded[key] = result _LOADED_SWEEPS_CACHE[sweep_dir] = loaded return loaded
[docs] def rank_configurations( sweep_results: dict[tuple[str, str], SweepResult], bootstrap_seed: int = 42, ) -> list[ConfigRank]: """Rank every (filter_type, window) combination by chaotic grid-point count. Parameters ---------- sweep_results Mapping from ``(filter_type, window_key)`` to a loaded sweep. Typically obtained via :func:`load_all_sweeps`. bootstrap_seed RNG seed forwarded to :func:`lmax_statistics` (default 42). Returns ------- list of ConfigRank Descending by ``n_chaotic``. Rank is 1-indexed. """ entries: list[ConfigRank] = [] for (ft, win), result in sweep_results.items(): area = area_summary(result) lmax = lmax_statistics(result, region="chaotic", seed=bootstrap_seed) beta_raw = result.metadata.get("kaiser_beta") beta = round(float(beta_raw), 2) if beta_raw is not None else None entries.append( { "rank": 0, "filter_type": ft, "window": win, "n_chaotic": area["n_chaotic"], "pct_chaotic": area["pct_chaotic"], "pct_chaotic_finite": area["pct_chaotic_finite"], "lmax_mean": lmax["mean"], "lmax_max": lmax["max"], "lmax_std": lmax["std"], "lmax_ci_95_low": lmax["ci_95_low"], "lmax_ci_95_high": lmax["ci_95_high"], "beta": beta, } ) entries.sort(key=lambda e: e["n_chaotic"], reverse=True) for i, e in enumerate(entries): e["rank"] = i + 1 return entries
[docs] def top_k_per_filter( sweep_results: dict[tuple[str, str], SweepResult], k: int = 3, bootstrap_seed: int = 42, ) -> dict[str, list[ConfigRank]]: """Return the top-*k* configurations for each filter type. Calls :func:`rank_configurations` internally, then groups the global ranking by ``filter_type``, keeping at most *k* entries per group. Parameters ---------- sweep_results Mapping from ``(filter_type, window_key)`` to loaded sweeps. k Maximum number of entries per filter type (default 3). bootstrap_seed Forwarded to :func:`rank_configurations`. Returns ------- dict of str → list of ConfigRank Keys are filter types; values are lists of at most *k* :class:`ConfigRank` entries, ordered by descending ``n_chaotic``. """ ranking = rank_configurations(sweep_results, bootstrap_seed=bootstrap_seed) out: dict[str, list[ConfigRank]] = {} for entry in ranking: ft = entry["filter_type"] out.setdefault(ft, []).append(entry) for ft in list(out): entries = out[ft][:k] for i, e in enumerate(entries): e["rank"] = i + 1 out[ft] = entries return out
[docs] def consolidate_kaiser( sweep_results: dict[tuple[str, str], SweepResult], ) -> dict[tuple[str, str], SweepResult]: """Keep only the best Kaiser β per filter type, by chaotic point count. Non-Kaiser windows are passed through unchanged. This produces a balanced dataset where each window family appears once per filter type, making window-to-window comparisons fair. Parameters ---------- sweep_results Mapping from :func:`load_all_sweeps`. Returns ------- dict Same shape as *sweep_results*, but with at most one Kaiser entry per ``filter_type`` (the β with the most chaotic points). """ consolidated: dict[tuple[str, str], SweepResult] = {} best_kaiser: dict[str, tuple[str, int]] = {} for (ft, win), result in sweep_results.items(): if result.window == "kaiser": area = area_summary(result) n_chaotic = area["n_chaotic"] if ft not in best_kaiser or n_chaotic > best_kaiser[ft][1]: best_kaiser[ft] = (win, n_chaotic) else: consolidated[(ft, win)] = result for ft, (win, _) in best_kaiser.items(): consolidated[(ft, win)] = sweep_results[(ft, win)] return consolidated
[docs] def kaiser_beta_optimal( sweep_results: dict[tuple[str, str], SweepResult], bootstrap_seed: int = 42, ) -> dict[str, KaiserBetaOptimal]: """Report the best Kaiser β per filter type with extended statistics. Parameters ---------- sweep_results Mapping from :func:`load_all_sweeps`. bootstrap_seed Forwarded to :func:`lmax_statistics`. Returns ------- dict of str → KaiserBetaOptimal """ out: dict[str, KaiserBetaOptimal] = {} for (ft, _win), result in sweep_results.items(): if result.window != "kaiser": continue beta = float(result.metadata.get("kaiser_beta", 0.0)) area = area_summary(result) lmax = lmax_statistics(result, region="chaotic", seed=bootstrap_seed) if ft not in out or area["n_chaotic"] > out[ft]["n_chaotic"]: out[ft] = { "filter_type": ft, "beta": beta, "n_chaotic": area["n_chaotic"], "pct_chaotic_finite": area["pct_chaotic_finite"], "lmax_mean": lmax["mean"], "lmax_max": lmax["max"], } return out
[docs] def sweet_spot_per_filter( sweep_results: dict[tuple[str, str], SweepResult], ) -> dict[str, SweetSpot]: """Identify the grid point with the highest :math:`\\lambda_{\\max}` per filter type. For each filter type, scans all available window sweeps and returns the exact ``(N_z, omega_c, window)`` where ``lambda_max`` is globally maximal. The 95% confidence interval is computed from the per-point standard deviation ``h_std`` and ``n_initial`` metadata via the t-distribution. If ``h_std`` is NaN or ``n_initial`` is unavailable, the CI fields are ``None``. Parameters ---------- sweep_results Mapping from ``(filter_type, window_key)`` to loaded sweeps. Returns ------- dict of str → SweetSpot One entry per filter type that has at least one finite point. """ from scipy.stats import t as t_dist best: dict[str, SweetSpot] = {} for (ft, win), result in sweep_results.items(): h, h_std, orders, cutoffs = result.h, result.h_std, result.orders, result.cutoffs n_initial = result.metadata.get("n_initial", None) for i in range(len(orders)): for j in range(len(cutoffs)): val = h[i, j] if not np.isfinite(val): continue if ft not in best or val > best[ft]["lmax"]: std_val = h_std[i, j] if ( n_initial is not None and n_initial > 1 and np.isfinite(std_val) and std_val > 0 ): se = std_val / np.sqrt(n_initial) t_crit = t_dist.ppf(0.975, n_initial - 1) ci_low = round(float(val - t_crit * se), 6) ci_high = round(float(val + t_crit * se), 6) else: ci_low = None ci_high = None best[ft] = { "filter_type": ft, "window": win, "n_z": int(orders[i]), "omega_c": round(float(cutoffs[j]), 4), "lmax": round(float(val), 6), "lmax_ci_95_low": ci_low, "lmax_ci_95_high": ci_high, } return best
[docs] def best_chaos_preserving( data_dir: str | Path = "data/sweeps", top_n: int = 5, ) -> list[SummaryRow]: """Rank sweeps by percentage of chaotic grid points (descending). Returns the *top_n* entries with the most chaotic coverage. """ rows = summary_table(data_dir) rows.sort(key=lambda r: r["pct_chaotic"], reverse=True) return rows[:top_n]
[docs] def compare_filter_types( data_dir: str | Path = "data/sweeps", ) -> dict[str, FilterTypeAggregate]: """Aggregate statistics per filter type across all windows. Returns a ``dict`` keyed by filter type (``"lowpass"``, ...) with each value being a :class:`FilterTypeAggregate` containing mean percentages and lambda_max across all windows that use that filter type. """ rows = summary_table(data_dir) agg: dict[str, list[SummaryRow]] = {ft: [] for ft in FILTER_TYPES} for row in rows: agg[row["filter_type"]].append(row) out: dict[str, FilterTypeAggregate] = {} for ft, entries in agg.items(): if not entries: out[ft] = {} continue out[ft] = { "n_sweeps": len(entries), "mean_pct_chaotic": round(float(np.mean([e["pct_chaotic"] for e in entries])), 1), "mean_pct_periodic": round(float(np.mean([e["pct_periodic"] for e in entries])), 1), "mean_pct_divergent": round(float(np.mean([e["pct_divergent"] for e in entries])), 1), "mean_lmax": round(float(np.mean([e["mean_lmax"] for e in entries])), 4), } return out
[docs] def optimal_parameters( data_dir: str | Path = "data/sweeps", window: str | None = None, filter_type: str | None = None, top_n: int = 3, ) -> list[OptimalParams]: """Find the (order, cutoff) pairs with the highest λ_max. Filters results by *window* and *filter_type* if given. """ best: list[dict] = [] for result in _discover_all(data_dir): if window is not None and result.window != window: continue if filter_type is not None and result.filter_type != filter_type: continue h, orders, cutoffs = result.h, result.orders, result.cutoffs for i in range(len(orders)): for j in range(len(cutoffs)): val = h[i, j] if np.isnan(val) or np.isinf(val): continue best.append( { "window": result.window, "filter_type": result.filter_type, "order": int(orders[i]), "cutoff": round(float(cutoffs[j]), 4), "lmax": round(float(val), 6), } ) best.sort(key=lambda x: x["lmax"], reverse=True) return best[:top_n] # type: ignore[return-value]
[docs] def export_summary_json( data_dir: str | Path = "data/sweeps", output: str | Path = "data/analysis_summary.json", ) -> Path: """Write the full summary table to a JSON file.""" output = Path(output) rows = summary_table(data_dir) output.write_text(json.dumps(rows, indent=2, default=str)) return output
# ── Kaiser β-sweep analysis ──────────────────────────────────────────────────
[docs] def beta_summary( data_dir: str | Path = "data/sweeps/kaiser", ) -> dict[str, dict[float, dict]]: """Aggregate per-β statistics for each filter type. Returns a nested dict: ``{filter_type: {beta: {pct_chaotic, mean_lmax, ...}}}`` """ out: dict[str, dict[float, dict]] = {} for ft in FILTER_TYPES: ft_dir = Path(data_dir) / ft if not ft_dir.is_dir(): continue out[ft] = {} for path in sorted(ft_dir.rglob("variables_lyapunov.npz")): result = load_sweep(path) beta = float(result.metadata.get("kaiser_beta", float("nan"))) if np.isnan(beta): continue h = result.h chaotic = np.sum((~np.isnan(h)) & (h > 0)) total = h.size mn, mx = _safe_stats(h) out[ft][beta] = { "pct_chaotic": round(100 * chaotic / total, 1), "mean_lmax": round(mn, 4), "max_lmax": round(mx, 4), } return out
[docs] def beta_curve( data_dir: str | Path = "data/sweeps/kaiser", filter_type: str = "lowpass", ) -> tuple[NDArray, NDArray]: """Return (betas, pct_chaotic) arrays for a single filter type. Useful for plotting the β-dependence of chaotic coverage. Returns empty arrays when *filter_type* is absent from the data. """ summary = beta_summary(data_dir) if filter_type not in summary: return np.array([]), np.array([]) betas = np.array(sorted(summary[filter_type])) pct = np.array([summary[filter_type][b]["pct_chaotic"] for b in betas]) return betas, pct
# ── Advanced statistical analyses ────────────────────────────────────────────
[docs] def lmax_distribution( data_dir: str | Path = "data/sweeps", bins: int = 50, ) -> dict[str, LmaxDistribution]: """Histogram of λ_max values per filter type. Returns ------- dict ``{filter_type: {"hist": counts, "edges": bin_edges, "mean": float, "std": float, "skewness": float, "n": int}}`` """ from scipy.stats import skew all_vals: dict[str, list[float]] = {ft: [] for ft in FILTER_TYPES} for result in _discover_all(data_dir): vals = result.h.ravel() vals = vals[np.isfinite(vals)] all_vals[result.filter_type].extend(vals.tolist()) out: dict[str, LmaxDistribution] = {} for ft, val_list in all_vals.items(): arr = np.array(val_list) if len(arr) == 0: out[ft] = {"hist": [], "edges": [], "mean": 0.0, "std": 0.0, "skewness": 0.0, "n": 0} continue hist, edges = np.histogram(arr, bins=bins) out[ft] = { "hist": hist.tolist(), "edges": edges.tolist(), "mean": round(float(np.mean(arr)), 4), "std": round(float(np.std(arr)), 4), "skewness": round(float(skew(arr)), 4), "n": len(arr), } return out
[docs] def transition_boundary( data_dir: str | Path = "data/sweeps", window: str | None = None, filter_type: str = "lowpass", ) -> tuple[NDArray, NDArray]: """Find the cutoff where each order transitions from non-chaotic to chaotic. For each order, returns the *first* cutoff (lowest frequency) where λ_max > 0, or NaN if no chaotic point exists. Returns ------- orders : ndarray Filter orders. cutoffs : ndarray First chaotic cutoff per order (NaN if never chaotic). """ best: dict[int, tuple[float, float]] = {} for result in _discover_all(data_dir): if window is not None and result.window != window: continue if result.filter_type != filter_type: continue h, orders, cutoffs = result.h, result.orders, result.cutoffs for i in range(len(orders)): for j in range(len(cutoffs)): val = h[i, j] if not np.isfinite(val): continue if val > 0: order = int(orders[i]) wc = float(cutoffs[j]) if order not in best or wc < best[order][0]: best[order] = (wc, float(val)) if not best: return np.array([]), np.array([]) orders_sorted = np.array(sorted(best)) cutoffs_sorted = np.array([best[o][0] for o in orders_sorted]) return orders_sorted, cutoffs_sorted
[docs] def chaos_margin( data_dir: str | Path = "data/sweeps", window: str | None = None, filter_type: str = "lowpass", ) -> tuple[NDArray, NDArray]: """For each order, compute the width of the chaotic region in cutoff space. Returns ------- orders : ndarray widths : ndarray Fraction of cutoffs where λ_max > 0 per order. """ accum: dict[int, list[float]] = {} for result in _discover_all(data_dir): if window is not None and result.window != window: continue if result.filter_type != filter_type: continue h, orders = result.h, result.orders for i in range(len(orders)): order = int(orders[i]) row = h[i, :] chaotic_count = int(np.sum(np.isfinite(row) & (row > 0))) total = int(np.sum(np.isfinite(row))) if total > 0: accum.setdefault(order, []).append(chaotic_count / total) if not accum: return np.array([]), np.array([]) orders_sorted = np.array(sorted(accum)) widths = np.array([float(np.mean(accum[o])) for o in orders_sorted]) return orders_sorted, widths
[docs] def correlation_matrix( data_dir: str | Path = "data/sweeps", ) -> CorrelationMatrix: """Spearman correlation between (order, cutoff, λ_max) across all sweeps. Returns ------- dict ``{"order_vs_lmax": rho, "cutoff_vs_lmax": rho, "n": int}`` """ from scipy.stats import spearmanr orders_all: list[float] = [] cutoffs_all: list[float] = [] lmax_all: list[float] = [] for result in _discover_all(data_dir): h, orders, cutoffs = result.h, result.orders, result.cutoffs for i in range(len(orders)): for j in range(len(cutoffs)): val = h[i, j] if not np.isfinite(val): continue orders_all.append(float(orders[i])) cutoffs_all.append(float(cutoffs[j])) lmax_all.append(float(val)) if len(lmax_all) < 3: return {"order_vs_lmax": 0.0, "cutoff_vs_lmax": 0.0, "n": len(lmax_all)} rho_o, _ = spearmanr(orders_all, lmax_all) rho_c, _ = spearmanr(cutoffs_all, lmax_all) return { "order_vs_lmax": round(float(rho_o), 4), "cutoff_vs_lmax": round(float(rho_c), 4), "n": len(lmax_all), }
[docs] def bootstrap_confidence( data_dir: str | Path = "data/sweeps", n_bootstrap: int = 1000, seed: int = 42, ) -> dict[str, BootstrapConfidence]: """Bootstrap 95% CI for mean λ_max per filter type. Returns ------- dict ``{filter_type: {"mean": float, "ci_low": float, "ci_high": float}}`` """ rng = np.random.default_rng(seed) all_vals: dict[str, list[float]] = {ft: [] for ft in FILTER_TYPES} for result in _discover_all(data_dir): vals = result.h.ravel() vals = vals[np.isfinite(vals)] all_vals[result.filter_type].extend(vals.tolist()) out: dict[str, dict] = {} for ft, val_list in all_vals.items(): arr = np.array(val_list) if len(arr) == 0: out[ft] = {} continue means = np.array( [np.mean(rng.choice(arr, size=len(arr), replace=True)) for _ in range(n_bootstrap)] ) out[ft] = { "mean": round(float(np.mean(arr)), 4), "ci_low": round(float(np.percentile(means, 2.5)), 4), "ci_high": round(float(np.percentile(means, 97.5)), 4), "n": len(arr), } return out # type: ignore[return-value]