"""Statistical analysis of sweep data and chaotic-region plotting."""
from __future__ import annotations
import argparse
from pathlib import Path
from chaotic_pfc.analysis.stats import SummaryRow
from chaotic_pfc.analysis.sweep import FILTER_TYPES
[docs]
def add_parser(subparsers: argparse._SubParsersAction) -> None:
"""Register the ``run analysis`` group with sub-subcommands."""
p = subparsers.add_parser(
"analysis",
help="Statistical analysis of Lyapunov sweep results and chaotic-region figures.",
description="Summarise and rank sweep data or plot chaotic-region union/density maps.",
)
# Optional sub-subcommands (plot-chaotic-regions, plot-chaotic-density)
analysis_sub = p.add_subparsers(
dest="analysis_action",
title="actions",
metavar="<action>",
)
_add_chaotic_plot_parsers(analysis_sub)
_add_export_tables_parser(analysis_sub)
# Default: run statistical analysis (backward compatible)
p.add_argument(
"--data-dir", default="data/sweeps", help="Root sweep directory (default: data/sweeps)"
)
p.add_argument(
"--report", action="store_true", help="Generate comprehensive statistical report"
)
p.add_argument(
"--json",
default="data/analysis_summary.json",
help="Output JSON path (default: data/analysis_summary.json)",
)
p.set_defaults(_run=run)
def _add_chaotic_plot_parsers(sub: argparse._SubParsersAction) -> None:
"""Register ``plot-chaotic-map`` and ``plot-chaotic-density``."""
p_cm = sub.add_parser(
"plot-chaotic-map",
help="Binary union of chaotic regions across all window × filter sweeps.",
)
p_cm.add_argument(
"--sweep-dir",
default="data/sweeps",
help="Root sweep directory (default: data/sweeps)",
)
p_cm.add_argument(
"--output",
default="figures/sweeps/fig_chaotic_map",
help="Output stem (default: figures/sweeps/fig_chaotic_map)",
)
p_cm.add_argument("--color", default="red", help="Colour for chaotic cells (default: red)")
p_cm.add_argument("--lang", default="pt", choices=["pt", "en"], help="Language (default: pt)")
p_cm.set_defaults(_run=_run_chaotic_map)
p_cd = sub.add_parser(
"plot-chaotic-density",
help="Chaos density — how many configurations agree at each grid point.",
)
p_cd.add_argument(
"--sweep-dir",
default="data/sweeps",
help="Root sweep directory (default: data/sweeps)",
)
p_cd.add_argument(
"--output",
default="figures/sweeps/fig_chaotic_density",
help="Output stem (default: figures/sweeps/fig_chaotic_density)",
)
p_cd.add_argument("--cmap", default="viridis", help="Colormap (default: viridis)")
p_cd.add_argument("--lang", default="pt", choices=["pt", "en"], help="Language (default: pt)")
p_cd.set_defaults(_run=_run_chaotic_density)
def _add_export_tables_parser(sub: argparse._SubParsersAction) -> None:
"""Register ``export-tables`` sub-subcommand."""
p_et = sub.add_parser(
"export-tables",
help="Export analysis tables as LaTeX (.tex) files ready for PFC.",
)
p_et.add_argument(
"--sweep-dir",
default="data/sweeps",
help="Root sweep directory (default: data/sweeps)",
)
p_et.add_argument(
"--output-dir",
default="data/analysis_output/tables",
help="Output directory for .tex files (default: data/analysis_output/tables)",
)
p_et.add_argument(
"--lang",
default="all",
choices=["all", "pt", "en"],
help="Language(s) to generate: all (default), pt, en",
)
p_et.add_argument(
"--format",
default="latex",
choices=["latex"],
help="Output format (default: latex; csv/json reserved for future)",
)
p_et.add_argument(
"--bootstrap-seed",
type=int,
default=42,
help="RNG seed for bootstrap CI (default: 42)",
)
p_et.set_defaults(_run=_run_export_tables)
def _run_export_tables(args: argparse.Namespace) -> int:
"""Generate the four LaTeX analysis tables."""
from pathlib import Path
from chaotic_pfc.analysis.latex_export import (
export_consolidated_extended_table,
export_consolidated_full_ranking,
export_consolidated_top_k_table,
export_extended_top_k_table,
export_full_ranking_table,
export_kaiser_beta_optimal_table,
export_sweet_spots_table,
export_top_k_table,
)
from chaotic_pfc.analysis.stats import (
consolidate_kaiser,
kaiser_beta_optimal,
load_all_sweeps,
rank_configurations,
sweet_spot_per_filter,
top_k_per_filter,
)
sweep_dir = Path(args.sweep_dir)
output_dir = Path(args.output_dir)
if not sweep_dir.is_dir():
print(f"ERROR: sweep directory not found: {sweep_dir}")
return 1
langs = ["pt", "en"] if args.lang == "all" else [args.lang]
# Load data once
print(f"Loading sweeps from {sweep_dir} ...")
sweeps = load_all_sweeps(sweep_dir)
print(f" {len(sweeps)} sweeps loaded")
# Compute structures once
top_k = top_k_per_filter(sweeps, k=3, bootstrap_seed=args.bootstrap_seed)
ranking = rank_configurations(sweeps, bootstrap_seed=args.bootstrap_seed)
sweet = sweet_spot_per_filter(sweeps)
for lang in langs:
out = output_dir / lang
out.mkdir(parents=True, exist_ok=True)
paths: list[Path] = [
export_top_k_table(top_k, out / "tab_top_k.tex", lang=lang),
export_extended_top_k_table(top_k, out / "tab_top_k_extended.tex", lang=lang),
export_full_ranking_table(ranking, out / "tab_full_ranking.tex", lang=lang),
export_sweet_spots_table(sweet, out / "tab_sweet_spots.tex", lang=lang),
]
print(f"\n[{lang}]")
for p in paths:
lines = p.read_text(encoding="utf-8").count("\n")
size = p.stat().st_size
print(f" {p.name:<30} {lines:>5} lines {size:>7} bytes")
# ── Consolidated (Kaiser best-β only) ──────────────────────────
consolidated = consolidate_kaiser(sweeps)
cons_top_k = top_k_per_filter(consolidated, k=3, bootstrap_seed=args.bootstrap_seed)
cons_ranking = rank_configurations(consolidated, bootstrap_seed=args.bootstrap_seed)
beta_opt = kaiser_beta_optimal(sweeps, bootstrap_seed=args.bootstrap_seed)
cons_paths: list[Path] = [
export_consolidated_top_k_table(
cons_top_k, out / "tab_consolidated_top_k.tex", lang=lang
),
export_consolidated_extended_table(
cons_top_k, out / "tab_consolidated_extended.tex", lang=lang
),
export_consolidated_full_ranking(
cons_ranking, out / "tab_consolidated_full_ranking.tex", lang=lang
),
export_kaiser_beta_optimal_table(
beta_opt, out / "tab_kaiser_beta_optimal.tex", lang=lang
),
]
print(" [consolidated — Kaiser best-β only]")
for p in cons_paths:
lines = p.read_text(encoding="utf-8").count("\n")
size = p.stat().st_size
print(f" {p.name:<30} {lines:>5} lines {size:>7} bytes")
print(f"\nDone. Files in {output_dir.resolve()}/")
return 0
def _run_chaotic_map(args: argparse.Namespace) -> int:
"""Invoke :func:`~chaotic_pfc.analysis.sweep_plotting.plot_chaotic_map`."""
import matplotlib.pyplot as plt
from chaotic_pfc.analysis.sweep_plotting import plot_chaotic_map
fig = plot_chaotic_map(
args.sweep_dir,
save_path=args.output,
color=args.color,
lang=args.lang,
)
plt.close(fig)
stem = Path(args.output)
print(f" Saved → {stem.with_suffix('.svg')}")
print(f" Saved → {stem.with_suffix('.png')}")
return 0
def _run_chaotic_density(args: argparse.Namespace) -> int:
"""Invoke :func:`~chaotic_pfc.analysis.sweep_plotting.plot_chaotic_density`."""
import matplotlib.pyplot as plt
from chaotic_pfc.analysis.sweep_plotting import plot_chaotic_density
fig = plot_chaotic_density(
args.sweep_dir,
save_path=args.output,
cmap=args.cmap,
lang=args.lang,
)
plt.close(fig)
stem = Path(args.output)
print(f" Saved → {stem.with_suffix('.svg')}")
print(f" Saved → {stem.with_suffix('.png')}")
return 0
def _hr(title: str = "", width: int = 68) -> None:
if title:
print(f"\n{'─' * width}")
print(f" {title}")
print("─" * width)
else:
print("─" * width)
[docs]
def run(args: argparse.Namespace) -> int:
"""Execute the ``analysis`` experiment."""
from chaotic_pfc.analysis.stats import (
best_chaos_preserving,
bootstrap_confidence,
chaos_margin,
compare_filter_types,
correlation_matrix,
export_summary_json,
lmax_distribution,
optimal_parameters,
transition_boundary,
)
data_dir = Path(args.data_dir)
print("=" * 68)
print(" STATISTICAL ANALYSIS — Lyapunov Sweep (Henon FIR)")
print("=" * 68)
# ── 1. Summary table ───────────────────────────────────────────────
path = export_summary_json(data_dir, args.json)
print(f" Full table → {path}")
# ── 2. Filter type comparison ──────────────────────────────────────
_hr("FILTER TYPE COMPARISON")
print(
f" {'Filter':<12} {'%Chaotic':>8} {'%Periodic':>10} {'%Divergent':>10} {'λ_max mean':>10} {'λ_max max':>10}"
)
print(" " + "-" * 62)
from chaotic_pfc.analysis.stats import summary_table
agg: dict[str, list[SummaryRow]] = {}
for row in summary_table(data_dir):
agg.setdefault(row["filter_type"], []).append(row)
for ft in FILTER_TYPES:
entries = agg.get(ft, [])
if not entries:
continue
pct_c = round(float(sum(e["pct_chaotic"] for e in entries) / len(entries)), 1)
pct_p = round(float(sum(e["pct_periodic"] for e in entries) / len(entries)), 1)
pct_d = round(float(sum(e["pct_divergent"] for e in entries) / len(entries)), 1)
mn_l = round(float(sum(e["mean_lmax"] for e in entries) / len(entries)), 4)
mx_l = round(float(max(e["max_lmax"] for e in entries)), 4)
print(
f" {ft:<12} {pct_c:>7.1f}% {pct_p:>9.1f}% {pct_d:>9.1f}% {mn_l:>10.4f} {mx_l:>10.4f}"
)
# ── 3. Distribution ────────────────────────────────────────────────
_hr("λ_max DISTRIBUTION BY FILTER")
dist = lmax_distribution(data_dir)
for ft in FILTER_TYPES:
d = dist[ft]
print(
f" {ft:<12} n={d['n']:>7,} μ={d['mean']:>8.4f} σ={d['std']:>7.4f} "
f"skewness={d['skewness']:>7.4f}"
)
# ── 4. Transition boundaries ───────────────────────────────────────
import numpy as np
_hr("TRANSITION BOUNDARY (1st chaotic cutoff per order)")
for ft in FILTER_TYPES:
orders, cutoffs = transition_boundary(data_dir, filter_type=ft)
if len(orders) == 0:
print(f" {ft:<12} no data")
continue
n_chaotic = np.sum(~np.isnan(cutoffs))
print(f" {ft:<12} {n_chaotic}/{len(orders)} orders with chaotic transition")
if n_chaotic > 0:
valid = cutoffs[~np.isnan(cutoffs)]
print(
f" cutoff transition: {valid[0]:.4f}–{valid[-1]:.4f} (median: {np.median(valid):.4f})"
)
# ── 5. Spectral robustness ─────────────────────────────────────────
_hr("SPECTRAL ROBUSTNESS (chaotic region width)")
for ft in FILTER_TYPES:
orders, widths = chaos_margin(data_dir, filter_type=ft)
if len(orders) == 0:
print(f" {ft:<12} no data")
continue
nonzero = widths[widths > 0]
pct = 100 * len(nonzero) / len(orders)
print(
f" {ft:<12} {len(nonzero)}/{len(orders)} orders ({pct:.0f}%) with chaos "
f"mean width={np.mean(widths):.4f} max={np.max(widths):.4f}"
)
# ── 6. Correlation ─────────────────────────────────────────────────
_hr("SPEARMAN CORRELATION")
corr = correlation_matrix(data_dir)
print(f" n = {corr['n']:,} finite points")
print(f" ρ(order, λ_max) = {corr['order_vs_lmax']:+.4f}")
print(f" ρ(cutoff, λ_max) = {corr['cutoff_vs_lmax']:+.4f}")
# ── 7. Bootstrap CI ────────────────────────────────────────────────
_hr("BOOTSTRAP 95% CI (λ_max mean)")
ci = bootstrap_confidence(data_dir)
print(f" {'Filter':<12} {'Mean':>8} {'CI 2.5%':>9} {'CI 97.5%':>9} {'n':>7}")
print(" " + "-" * 48)
for ft in FILTER_TYPES:
d_ci = ci[ft]
if "mean" not in d_ci:
continue
print(
f" {ft:<12} {d_ci['mean']:>8.4f} {d_ci['ci_low']:>9.4f} {d_ci['ci_high']:>9.4f} {d_ci['n']:>7,}"
)
# ── 8. Best and optimal ────────────────────────────────────────────
_hr("TOP 5 — HIGHEST % CHAOTIC")
for rank, row in enumerate(best_chaos_preserving(data_dir, top_n=5), start=1):
print(
f" {rank}. {row['window']:<15} / {row['filter_type']:<10} {row['pct_chaotic']:>5.1f}% λ_mean={row['mean_lmax']:.4f}"
)
_hr("OPTIMAL PARAMETERS (highest finite λ_max)")
for rank, opt_row in enumerate(optimal_parameters(data_dir, top_n=5), start=1):
print(
f" {rank}. {opt_row['window']:<15} / {opt_row['filter_type']:<10} order={opt_row['order']:>3} ωc={opt_row['cutoff']:.4f} λ_max={opt_row['lmax']:.6f}"
)
# ── 9. Beta sweep ──────────────────────────────────────────────────
kaiser_dir = data_dir / "kaiser"
if kaiser_dir.is_dir():
_hr("KAISER β-SWEEP — evolution with β")
from chaotic_pfc.analysis.stats import beta_summary
bs = beta_summary(kaiser_dir)
for ft in sorted(bs):
betas = sorted(bs[ft])
if len(betas) < 2:
continue
first = bs[ft][betas[0]]["pct_chaotic"]
last = bs[ft][betas[-1]]["pct_chaotic"]
arrow = "↑" if last > first else "↓"
print(
f" {ft:<12} β={betas[0]:.1f} → {betas[-1]:.1f} "
f"{first:>5.1f}% → {last:>5.1f}% ({arrow} {abs(last - first):.1f}pp)"
)
# ── 10. Interpretation ─────────────────────────────────────────────
_hr("INTERPRETATION")
cmp = compare_filter_types(data_dir)
best_ft = max((ft for ft in cmp if cmp[ft]), key=lambda ft: cmp[ft]["mean_pct_chaotic"])
worst_ft = min((ft for ft in cmp if cmp[ft]), key=lambda ft: cmp[ft]["mean_pct_chaotic"])
print(f" • {best_ft} best preserves chaos ({cmp[best_ft]['mean_pct_chaotic']}% of grid)")
print(f" • {worst_ft} most suppresses ({cmp[worst_ft]['mean_pct_chaotic']}% chaotic)")
print(
f" • order × λ_max correlation: {corr['order_vs_lmax']:+.4f} (higher orders {'increase' if corr['order_vs_lmax'] > 0 else 'decrease'} λ_max)"
)
print(
f" • cutoff × λ_max correlation: {corr['cutoff_vs_lmax']:+.4f} (higher cutoffs {'increase' if corr['cutoff_vs_lmax'] > 0 else 'decrease'} λ_max)"
)
print(" • Positive skew in distributions → long tails of high λ_max (strong chaos in islands)")
print(f"\n{'=' * 68}")
print(" Analysis complete.")
return 0