Source code for chaotic_pfc.cli.analysis

"""Statistical analysis of sweep data and chaotic-region plotting."""

from __future__ import annotations

import argparse
from pathlib import Path

from chaotic_pfc.analysis.stats import SummaryRow
from chaotic_pfc.analysis.sweep import FILTER_TYPES


[docs] def add_parser(subparsers: argparse._SubParsersAction) -> None: """Register the ``run analysis`` group with sub-subcommands.""" p = subparsers.add_parser( "analysis", help="Statistical analysis of Lyapunov sweep results and chaotic-region figures.", description="Summarise and rank sweep data or plot chaotic-region union/density maps.", ) # Optional sub-subcommands (plot-chaotic-regions, plot-chaotic-density) analysis_sub = p.add_subparsers( dest="analysis_action", title="actions", metavar="<action>", ) _add_chaotic_plot_parsers(analysis_sub) _add_export_tables_parser(analysis_sub) # Default: run statistical analysis (backward compatible) p.add_argument( "--data-dir", default="data/sweeps", help="Root sweep directory (default: data/sweeps)" ) p.add_argument( "--report", action="store_true", help="Generate comprehensive statistical report" ) p.add_argument( "--json", default="data/analysis_summary.json", help="Output JSON path (default: data/analysis_summary.json)", ) p.set_defaults(_run=run)
def _add_chaotic_plot_parsers(sub: argparse._SubParsersAction) -> None: """Register ``plot-chaotic-map`` and ``plot-chaotic-density``.""" p_cm = sub.add_parser( "plot-chaotic-map", help="Binary union of chaotic regions across all window × filter sweeps.", ) p_cm.add_argument( "--sweep-dir", default="data/sweeps", help="Root sweep directory (default: data/sweeps)", ) p_cm.add_argument( "--output", default="figures/sweeps/fig_chaotic_map", help="Output stem (default: figures/sweeps/fig_chaotic_map)", ) p_cm.add_argument("--color", default="red", help="Colour for chaotic cells (default: red)") p_cm.add_argument("--lang", default="pt", choices=["pt", "en"], help="Language (default: pt)") p_cm.set_defaults(_run=_run_chaotic_map) p_cd = sub.add_parser( "plot-chaotic-density", help="Chaos density — how many configurations agree at each grid point.", ) p_cd.add_argument( "--sweep-dir", default="data/sweeps", help="Root sweep directory (default: data/sweeps)", ) p_cd.add_argument( "--output", default="figures/sweeps/fig_chaotic_density", help="Output stem (default: figures/sweeps/fig_chaotic_density)", ) p_cd.add_argument("--cmap", default="viridis", help="Colormap (default: viridis)") p_cd.add_argument("--lang", default="pt", choices=["pt", "en"], help="Language (default: pt)") p_cd.set_defaults(_run=_run_chaotic_density) def _add_export_tables_parser(sub: argparse._SubParsersAction) -> None: """Register ``export-tables`` sub-subcommand.""" p_et = sub.add_parser( "export-tables", help="Export analysis tables as LaTeX (.tex) files ready for PFC.", ) p_et.add_argument( "--sweep-dir", default="data/sweeps", help="Root sweep directory (default: data/sweeps)", ) p_et.add_argument( "--output-dir", default="data/analysis_output/tables", help="Output directory for .tex files (default: data/analysis_output/tables)", ) p_et.add_argument( "--lang", default="all", choices=["all", "pt", "en"], help="Language(s) to generate: all (default), pt, en", ) p_et.add_argument( "--format", default="latex", choices=["latex"], help="Output format (default: latex; csv/json reserved for future)", ) p_et.add_argument( "--bootstrap-seed", type=int, default=42, help="RNG seed for bootstrap CI (default: 42)", ) p_et.set_defaults(_run=_run_export_tables) def _run_export_tables(args: argparse.Namespace) -> int: """Generate the four LaTeX analysis tables.""" from pathlib import Path from chaotic_pfc.analysis.latex_export import ( export_consolidated_extended_table, export_consolidated_full_ranking, export_consolidated_top_k_table, export_extended_top_k_table, export_full_ranking_table, export_kaiser_beta_optimal_table, export_sweet_spots_table, export_top_k_table, ) from chaotic_pfc.analysis.stats import ( consolidate_kaiser, kaiser_beta_optimal, load_all_sweeps, rank_configurations, sweet_spot_per_filter, top_k_per_filter, ) sweep_dir = Path(args.sweep_dir) output_dir = Path(args.output_dir) if not sweep_dir.is_dir(): print(f"ERROR: sweep directory not found: {sweep_dir}") return 1 langs = ["pt", "en"] if args.lang == "all" else [args.lang] # Load data once print(f"Loading sweeps from {sweep_dir} ...") sweeps = load_all_sweeps(sweep_dir) print(f" {len(sweeps)} sweeps loaded") # Compute structures once top_k = top_k_per_filter(sweeps, k=3, bootstrap_seed=args.bootstrap_seed) ranking = rank_configurations(sweeps, bootstrap_seed=args.bootstrap_seed) sweet = sweet_spot_per_filter(sweeps) for lang in langs: out = output_dir / lang out.mkdir(parents=True, exist_ok=True) paths: list[Path] = [ export_top_k_table(top_k, out / "tab_top_k.tex", lang=lang), export_extended_top_k_table(top_k, out / "tab_top_k_extended.tex", lang=lang), export_full_ranking_table(ranking, out / "tab_full_ranking.tex", lang=lang), export_sweet_spots_table(sweet, out / "tab_sweet_spots.tex", lang=lang), ] print(f"\n[{lang}]") for p in paths: lines = p.read_text(encoding="utf-8").count("\n") size = p.stat().st_size print(f" {p.name:<30} {lines:>5} lines {size:>7} bytes") # ── Consolidated (Kaiser best-β only) ────────────────────────── consolidated = consolidate_kaiser(sweeps) cons_top_k = top_k_per_filter(consolidated, k=3, bootstrap_seed=args.bootstrap_seed) cons_ranking = rank_configurations(consolidated, bootstrap_seed=args.bootstrap_seed) beta_opt = kaiser_beta_optimal(sweeps, bootstrap_seed=args.bootstrap_seed) cons_paths: list[Path] = [ export_consolidated_top_k_table( cons_top_k, out / "tab_consolidated_top_k.tex", lang=lang ), export_consolidated_extended_table( cons_top_k, out / "tab_consolidated_extended.tex", lang=lang ), export_consolidated_full_ranking( cons_ranking, out / "tab_consolidated_full_ranking.tex", lang=lang ), export_kaiser_beta_optimal_table( beta_opt, out / "tab_kaiser_beta_optimal.tex", lang=lang ), ] print(" [consolidated — Kaiser best-β only]") for p in cons_paths: lines = p.read_text(encoding="utf-8").count("\n") size = p.stat().st_size print(f" {p.name:<30} {lines:>5} lines {size:>7} bytes") print(f"\nDone. Files in {output_dir.resolve()}/") return 0 def _run_chaotic_map(args: argparse.Namespace) -> int: """Invoke :func:`~chaotic_pfc.analysis.sweep_plotting.plot_chaotic_map`.""" import matplotlib.pyplot as plt from chaotic_pfc.analysis.sweep_plotting import plot_chaotic_map fig = plot_chaotic_map( args.sweep_dir, save_path=args.output, color=args.color, lang=args.lang, ) plt.close(fig) stem = Path(args.output) print(f" Saved → {stem.with_suffix('.svg')}") print(f" Saved → {stem.with_suffix('.png')}") return 0 def _run_chaotic_density(args: argparse.Namespace) -> int: """Invoke :func:`~chaotic_pfc.analysis.sweep_plotting.plot_chaotic_density`.""" import matplotlib.pyplot as plt from chaotic_pfc.analysis.sweep_plotting import plot_chaotic_density fig = plot_chaotic_density( args.sweep_dir, save_path=args.output, cmap=args.cmap, lang=args.lang, ) plt.close(fig) stem = Path(args.output) print(f" Saved → {stem.with_suffix('.svg')}") print(f" Saved → {stem.with_suffix('.png')}") return 0 def _hr(title: str = "", width: int = 68) -> None: if title: print(f"\n{'─' * width}") print(f" {title}") print("─" * width) else: print("─" * width)
[docs] def run(args: argparse.Namespace) -> int: """Execute the ``analysis`` experiment.""" from chaotic_pfc.analysis.stats import ( best_chaos_preserving, bootstrap_confidence, chaos_margin, compare_filter_types, correlation_matrix, export_summary_json, lmax_distribution, optimal_parameters, transition_boundary, ) data_dir = Path(args.data_dir) print("=" * 68) print(" STATISTICAL ANALYSIS — Lyapunov Sweep (Henon FIR)") print("=" * 68) # ── 1. Summary table ─────────────────────────────────────────────── path = export_summary_json(data_dir, args.json) print(f" Full table → {path}") # ── 2. Filter type comparison ────────────────────────────────────── _hr("FILTER TYPE COMPARISON") print( f" {'Filter':<12} {'%Chaotic':>8} {'%Periodic':>10} {'%Divergent':>10} {'λ_max mean':>10} {'λ_max max':>10}" ) print(" " + "-" * 62) from chaotic_pfc.analysis.stats import summary_table agg: dict[str, list[SummaryRow]] = {} for row in summary_table(data_dir): agg.setdefault(row["filter_type"], []).append(row) for ft in FILTER_TYPES: entries = agg.get(ft, []) if not entries: continue pct_c = round(float(sum(e["pct_chaotic"] for e in entries) / len(entries)), 1) pct_p = round(float(sum(e["pct_periodic"] for e in entries) / len(entries)), 1) pct_d = round(float(sum(e["pct_divergent"] for e in entries) / len(entries)), 1) mn_l = round(float(sum(e["mean_lmax"] for e in entries) / len(entries)), 4) mx_l = round(float(max(e["max_lmax"] for e in entries)), 4) print( f" {ft:<12} {pct_c:>7.1f}% {pct_p:>9.1f}% {pct_d:>9.1f}% {mn_l:>10.4f} {mx_l:>10.4f}" ) # ── 3. Distribution ──────────────────────────────────────────────── _hr("λ_max DISTRIBUTION BY FILTER") dist = lmax_distribution(data_dir) for ft in FILTER_TYPES: d = dist[ft] print( f" {ft:<12} n={d['n']:>7,} μ={d['mean']:>8.4f} σ={d['std']:>7.4f} " f"skewness={d['skewness']:>7.4f}" ) # ── 4. Transition boundaries ─────────────────────────────────────── import numpy as np _hr("TRANSITION BOUNDARY (1st chaotic cutoff per order)") for ft in FILTER_TYPES: orders, cutoffs = transition_boundary(data_dir, filter_type=ft) if len(orders) == 0: print(f" {ft:<12} no data") continue n_chaotic = np.sum(~np.isnan(cutoffs)) print(f" {ft:<12} {n_chaotic}/{len(orders)} orders with chaotic transition") if n_chaotic > 0: valid = cutoffs[~np.isnan(cutoffs)] print( f" cutoff transition: {valid[0]:.4f}{valid[-1]:.4f} (median: {np.median(valid):.4f})" ) # ── 5. Spectral robustness ───────────────────────────────────────── _hr("SPECTRAL ROBUSTNESS (chaotic region width)") for ft in FILTER_TYPES: orders, widths = chaos_margin(data_dir, filter_type=ft) if len(orders) == 0: print(f" {ft:<12} no data") continue nonzero = widths[widths > 0] pct = 100 * len(nonzero) / len(orders) print( f" {ft:<12} {len(nonzero)}/{len(orders)} orders ({pct:.0f}%) with chaos " f"mean width={np.mean(widths):.4f} max={np.max(widths):.4f}" ) # ── 6. Correlation ───────────────────────────────────────────────── _hr("SPEARMAN CORRELATION") corr = correlation_matrix(data_dir) print(f" n = {corr['n']:,} finite points") print(f" ρ(order, λ_max) = {corr['order_vs_lmax']:+.4f}") print(f" ρ(cutoff, λ_max) = {corr['cutoff_vs_lmax']:+.4f}") # ── 7. Bootstrap CI ──────────────────────────────────────────────── _hr("BOOTSTRAP 95% CI (λ_max mean)") ci = bootstrap_confidence(data_dir) print(f" {'Filter':<12} {'Mean':>8} {'CI 2.5%':>9} {'CI 97.5%':>9} {'n':>7}") print(" " + "-" * 48) for ft in FILTER_TYPES: d_ci = ci[ft] if "mean" not in d_ci: continue print( f" {ft:<12} {d_ci['mean']:>8.4f} {d_ci['ci_low']:>9.4f} {d_ci['ci_high']:>9.4f} {d_ci['n']:>7,}" ) # ── 8. Best and optimal ──────────────────────────────────────────── _hr("TOP 5 — HIGHEST % CHAOTIC") for rank, row in enumerate(best_chaos_preserving(data_dir, top_n=5), start=1): print( f" {rank}. {row['window']:<15} / {row['filter_type']:<10} {row['pct_chaotic']:>5.1f}% λ_mean={row['mean_lmax']:.4f}" ) _hr("OPTIMAL PARAMETERS (highest finite λ_max)") for rank, opt_row in enumerate(optimal_parameters(data_dir, top_n=5), start=1): print( f" {rank}. {opt_row['window']:<15} / {opt_row['filter_type']:<10} order={opt_row['order']:>3} ωc={opt_row['cutoff']:.4f} λ_max={opt_row['lmax']:.6f}" ) # ── 9. Beta sweep ────────────────────────────────────────────────── kaiser_dir = data_dir / "kaiser" if kaiser_dir.is_dir(): _hr("KAISER β-SWEEP — evolution with β") from chaotic_pfc.analysis.stats import beta_summary bs = beta_summary(kaiser_dir) for ft in sorted(bs): betas = sorted(bs[ft]) if len(betas) < 2: continue first = bs[ft][betas[0]]["pct_chaotic"] last = bs[ft][betas[-1]]["pct_chaotic"] arrow = "↑" if last > first else "↓" print( f" {ft:<12} β={betas[0]:.1f}{betas[-1]:.1f} " f"{first:>5.1f}% → {last:>5.1f}% ({arrow} {abs(last - first):.1f}pp)" ) # ── 10. Interpretation ───────────────────────────────────────────── _hr("INTERPRETATION") cmp = compare_filter_types(data_dir) best_ft = max((ft for ft in cmp if cmp[ft]), key=lambda ft: cmp[ft]["mean_pct_chaotic"]) worst_ft = min((ft for ft in cmp if cmp[ft]), key=lambda ft: cmp[ft]["mean_pct_chaotic"]) print(f" • {best_ft} best preserves chaos ({cmp[best_ft]['mean_pct_chaotic']}% of grid)") print(f" • {worst_ft} most suppresses ({cmp[worst_ft]['mean_pct_chaotic']}% chaotic)") print( f" • order × λ_max correlation: {corr['order_vs_lmax']:+.4f} (higher orders {'increase' if corr['order_vs_lmax'] > 0 else 'decrease'} λ_max)" ) print( f" • cutoff × λ_max correlation: {corr['cutoff_vs_lmax']:+.4f} (higher cutoffs {'increase' if corr['cutoff_vs_lmax'] > 0 else 'decrease'} λ_max)" ) print(" • Positive skew in distributions → long tails of high λ_max (strong chaos in islands)") print(f"\n{'=' * 68}") print(" Analysis complete.") return 0