Source code for chaotic_pfc.cli.comm_order_n

"""Communication using N-th order Henon with internal FIR filter."""

from __future__ import annotations

import argparse
from pathlib import Path

from ._common import add_lang_flag, add_save_display_flags, compute_psds, pick_backend, save_or_show


[docs] def add_parser(subparsers: argparse._SubParsersAction) -> None: """Register the ``run comm-order-n`` subcommand.""" from chaotic_pfc.config import DEFAULT_CONFIG as cfg p = subparsers.add_parser( "comm-order-n", help="Transmitter/receiver using an N-th order Henon with internal FIR.", description="Communication using N-th order Henon with internal FIR filter.", ) p.add_argument("--N", type=int, default=cfg.comm.N) p.add_argument("--mu", type=float, default=cfg.comm.mu) p.add_argument("--period", type=int, default=cfg.comm.message_period) p.add_argument( "--cutoff", type=float, default=cfg.channel.cutoff, help="External channel cutoff" ) p.add_argument("--taps", type=int, default=cfg.channel.num_taps, help="External channel taps") p.add_argument( "--internal-cutoff", type=float, default=cfg.internal_fir.cutoff, dest="internal_cutoff", help="Internal FIR cutoff inside the Hénon oscillator", ) p.add_argument( "--Nc", type=int, default=cfg.internal_fir.num_taps, dest="Nc", help="Internal FIR order (number of coefficients inside the oscillator)", ) add_save_display_flags(p) add_lang_flag(p) p.set_defaults(_run=run)
[docs] def run(args: argparse.Namespace) -> int: """Execute the ``comm-order-n`` experiment.""" headless = pick_backend(args.no_display) import numpy as np from scipy.signal import firwin from chaotic_pfc._i18n import t from chaotic_pfc.comms.channel import fir_channel from chaotic_pfc.comms.receiver import receive_order_n from chaotic_pfc.comms.transmitter import transmit_order_n from chaotic_pfc.config import DEFAULT_CONFIG as cfg from chaotic_pfc.dynamics.signals import binary_message from chaotic_pfc.plotting.figures import plot_comm_grid a, b = cfg.comm.henon.a, cfg.comm.henon.b fmt = cfg.plot.fmt c = firwin( numtaps=args.Nc, cutoff=args.internal_cutoff, window="hamming", pass_zero=True, fs=2.0 ) Nc = len(c) fdir = Path(cfg.plot.figures_dir) if args.save: fdir.mkdir(parents=True, exist_ok=True) rng = np.random.default_rng(cfg.seed) x0 = 0.5 * rng.random(Nc) y0 = 0.5 * rng.random(Nc) m = binary_message(args.N, period=args.period) s, _ = transmit_order_n(m, c, mu=args.mu, a=a, b=b, x0=x0) r, h = fir_channel(s, cutoff=args.cutoff, num_taps=args.taps) m_hat, _ = receive_order_n(r, c, mu=args.mu, a=a, b=b, y0=y0) mse = np.mean((m[cfg.comm.transient :] - m_hat[cfg.comm.transient :]) ** 2) print(f"[05] N-th order Henon | N={args.N:,} mu={args.mu} Nc={Nc}") print(f" MSE (n > {cfg.comm.transient}): {mse:.4e}") omega, psd_m, psd_s, psd_r, psd_mhat = compute_psds(m, s, r, m_hat, cfg.spectral) win = slice(cfg.plot.time_window_start, min(cfg.plot.time_window_end, 1000)) save_path = str(fdir / f"comm_order_n.{fmt}") if args.save else None mhat_ss = m_hat[cfg.comm.transient :] mhat_max = np.percentile(np.abs(mhat_ss), 99) y_lim_mhat = ( (-min(mhat_max * 1.1, 300), min(mhat_max * 1.1, 300)) if mhat_max > 5 else (-1.5, 1.5) ) fig = plot_comm_grid( np.arange(args.N), m, s, r, m_hat, omega, psd_m, psd_s, psd_r, psd_mhat, time_window=win, suptitle=( t("comm.order_n", lang=args.lang) + r" ($N_c=" + str(Nc) + r",\; \mu=" + str(args.mu) + r"$)" ), y_lim_mhat=y_lim_mhat, h_channel=h, save_path=save_path, lang=args.lang, ) save_or_show(fig, headless, save_path, args) return 0