Source code for chaotic_pfc.comms.channel

"""
channel.py
==========
Transmission-channel models sitting between the chaotic transmitter and
the receiver.

Six channels are provided:

* :func:`ideal_channel` — noiseless pass-through used to establish a
  best-case baseline for chaos synchronisation.
* :func:`fir_channel` — a symmetric FIR low-pass filter built on top of
  :func:`scipy.signal.firwin`, representing a band-limited physical
  link.
* :func:`awgn` — additive white Gaussian noise.
* :func:`channel_impulsive` — AWGN with Middleton Class-A impulsive noise.
* :func:`channel_multipath` — multipath channel with configurable tap
  delays and gains.

All accept the transmitted signal ``s`` as a 1-D array and return the
received signal ``r`` of the same length. The FIR channel additionally
returns its filter coefficients so they can be overlaid on the PSD
plots produced by :mod:`chaotic_pfc.plotting`.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray
from scipy.signal import firwin, lfilter


[docs] def ideal_channel(s: NDArray) -> NDArray: """Transmit ``s`` through a noiseless, distortion-free channel. This is literally a copy of the input. It is still defined as a function (rather than aliased to ``numpy.ndarray.copy``) so that the receiver-side code can always call ``ideal_channel(s)`` consistently regardless of the channel configuration chosen by the caller. Parameters ---------- s Transmitted signal, shape ``(N,)``. Returns ------- ndarray, shape (N,) An independent copy of ``s``. Mutating the output does not affect the input. Implements: :class:`~chaotic_pfc.comms.protocols.Channel`.""" return s.copy()
[docs] def fir_channel( s: NDArray, cutoff: float = 0.99, num_taps: int = 201, window: str = "hamming", ) -> tuple[NDArray, NDArray]: """Transmit ``s`` through a symmetric FIR low-pass channel. The channel is built with :func:`scipy.signal.firwin` using ``pass_zero=True`` (true low-pass) and unit sampling frequency normalisation (``fs=2.0``). The DC gain of the resulting filter is normalised to ``1.0``. Parameters ---------- s Transmitted signal, shape ``(N,)``. cutoff Normalised cutoff frequency ``ω_c / π ∈ (0, 1)``. Defaults to ``0.99`` — i.e. the channel passes almost everything, mimicking a very lightly band-limited link. num_taps Length of the FIR filter in samples. Must be a positive integer. window Window function passed through to ``firwin``. Any name accepted by :func:`scipy.signal.get_window` is valid — common choices are ``"hamming"``, ``"hann"``, ``"blackman"``. Returns ------- r : ndarray, shape (N,) Received signal, ``s`` filtered through the FIR coefficients. h : ndarray, shape (num_taps,) Filter coefficients, useful for overlaying the channel response on PSD plots. Notes ----- Unlike :func:`ideal_channel`, this function returns the filter taps alongside the filtered signal. It therefore does **not** satisfy :class:`~chaotic_pfc.comms.protocols.Channel` (which expects a single ``NDArray`` return).""" h = firwin(numtaps=num_taps, cutoff=cutoff, window=window, pass_zero=True, fs=2.0) r = lfilter(h, [1.0], s) return r, h
# ── Channel models ───────────────────────────────────────────────────────────
[docs] def awgn(sig: NDArray, snr_db: float, rng: np.random.Generator | None = None) -> NDArray: """Add white Gaussian noise to *sig* for a given SNR in dB. The noise power is ``E[|sig|²] / 10^(SNR/10)``, so a higher SNR means less noise. Parameters ---------- sig Signal samples, shape ``(N,)``. snr_db Signal-to-noise ratio in dB. ``snr_db → ∞`` gives the original signal; ``snr_db = 0`` gives equal-power noise. rng NumPy ``Generator``. ``None`` (default) creates a fresh ``default_rng()`` with no fixed seed, so results are not reproducible across calls. Pass a seeded generator for deterministic noise. Returns ------- ndarray, shape (N,) Signal with additive white Gaussian noise. Notes ----- The SNR is defined as :math:`E[|s|^2] / 10^{SNR/10}` [Haykin01]_. References ---------- .. [Haykin01] S. Haykin. "Communication Systems." 4th ed., Wiley, 2001. """ if rng is None: rng = np.random.default_rng() p = float(np.mean(sig**2)) / 10 ** (snr_db / 10) return sig + rng.normal(0.0, float(np.sqrt(p)), sig.shape)
[docs] def channel_impulsive( sig: NDArray, snr_db: float, prob_impulse: float = 0.01, amp_factor: float = 10.0, rng: np.random.Generator | None = None, ) -> NDArray: """AWGN channel with Middleton Class-A impulsive noise. A fraction *prob_impulse* of samples receive an additional impulse of randomised sign and amplitude *amp_factor* × ``std(sig)`` on top of the AWGN background. Parameters ---------- sig Signal samples, shape ``(N,)``. snr_db Background AWGN signal-to-noise ratio in dB. prob_impulse Probability a sample is hit by an impulse (e.g. 0.01 = 1 %). amp_factor Impulse amplitude in multiples of the signal std. rng Random generator. ``None`` creates a fresh ``default_rng()``. Returns ------- ndarray, shape (N,) Signal with AWGN + Middleton Class-A impulsive noise. """ if rng is None: rng = np.random.default_rng() rx = awgn(sig, snr_db, rng) std = float(np.std(sig)) mask = rng.random(len(sig)) < prob_impulse rx[mask] += amp_factor * std * rng.choice(np.array([-1.0, 1.0]), mask.sum()) return rx
[docs] def channel_multipath( sig: NDArray, snr_db: float, delays: list[int] | None = None, gains: list[float] | None = None, rng: np.random.Generator | None = None, ) -> NDArray: """Multipath channel with configurable tap delays and gains. Simulates a frequency-selective channel by summing delayed copies of the input. Output power is normalised to the input power, then AWGN is added at the requested SNR. Parameters ---------- sig Signal samples, shape ``(N,)``. snr_db AWGN signal-to-noise ratio after multipath combining, in dB. delays Delay of each path in samples (default: ``[0, 3, 7, 15]``). gains Attenuation per path (default: ``[1.0, 0.6, 0.4, 0.2]``). rng Random generator. ``None`` creates a fresh ``default_rng()``. Returns ------- ndarray, shape (N,) Multipath-combined signal with AWGN. """ if delays is None: delays = [0, 3, 7, 15] if gains is None: gains = [1.0, 0.6, 0.4, 0.2] N = len(sig) out = np.zeros(N) for d, g in zip(delays, gains, strict=True): if d == 0: out += g * sig else: out[d:] += g * sig[: N - d] out *= float(np.sqrt(np.mean(sig**2))) / (float(np.std(out)) + 1e-12) return awgn(out, snr_db, rng)