Source code for chaotic_pfc.comms.receiver

"""
receiver.py
===========
Chaos-synchronisation demodulator. Implements the
:class:`~chaotic_pfc.comms.protocols.Receiver` protocol — a callable that
extracts the original message from a transmitted (and possibly
channel-distorted) carrier.

Both receivers rely on the Pecora-Carroll synchronisation principle:
running a second copy of the Hénon oscillator driven by the received
signal causes it to track the transmitter's state after a short
transient. The recovered message is then extracted as the difference
between the driving signal and the local state estimate.

Two receivers are provided, mirroring
:mod:`chaotic_pfc.comms.transmitter`:

* :func:`receive` — 2-D Hénon demodulator. The message is recovered via

  .. math::

      \\hat{m}[n] = \\frac{r[n] - y_1[n]}{\\mu}

  where ``y_1`` is the local 2-D state driven by ``r``.

* :func:`receive_order_n` — higher-order demodulator. Uses the same
  FIR filter the transmitter used. The carrier is taken from the
  filtered state ``y[2][n]`` and the message follows the same formula
  with ``y_1`` replaced by ``v = y[2]``.
"""

import numpy as np
from numpy.typing import NDArray

from ..dynamics.maps import henon_n4_step_inplace


[docs] def receive( r: NDArray, mu: float = 0.01, a: float = 1.4, b: float = 0.3, y0: float = 0.0, z0: float = 0.0, ) -> NDArray: """Demodulate the carrier ``r`` via 2-D Hénon synchronisation. The local oscillator is driven by ``r`` itself: .. math:: y_1[n+1] &= a - r[n]^2 + b \\, y_2[n] \\\\ y_2[n+1] &= y_1[n] This is the Pecora-Carroll trick: once ``y`` locks onto the transmitter's state, ``r[n] - y_1[n]`` equals the instantaneous modulation and dividing by ``μ`` recovers the message. Parameters ---------- r Received carrier, shape ``(N,)``. Usually the output of :func:`chaotic_pfc.comms.channel.ideal_channel` or :func:`chaotic_pfc.comms.channel.fir_channel`. mu Modulation depth — must match the value used at the transmitter, otherwise the recovered amplitude is rescaled. a, b Hénon parameters — must match the transmitter's. y0, z0 Initial conditions of the local oscillator. A random pair is a safe default; synchronisation converges regardless after a short transient. Returns ------- ndarray, shape (N,) Recovered message estimate ``m̂[n] = (r[n] - y_1[n]) / μ``. Transient samples (first few hundred) may differ from the true message while the local oscillator locks in; see :data:`chaotic_pfc.config.CommConfig.transient` for the default rejection window. Implements: :class:`~chaotic_pfc.comms.protocols.Receiver`.""" N = len(r) y1 = np.empty(N + 1) y2 = np.empty(N + 1) y1[0], y2[0] = y0, z0 m_hat = np.empty(N) for k in range(N): y1[k + 1] = a - r[k] ** 2 + b * y2[k] y2[k + 1] = y1[k] m_hat[k] = (r[k] - y1[k]) / mu return m_hat
[docs] def receive_order_n( r: NDArray, fir_coeffs: NDArray, mu: float = 0.01, a: float = 1.4, b: float = 0.3, y0: NDArray | None = None, seed: int | None = None, ) -> tuple[NDArray, NDArray]: """Demodulate the carrier ``r`` via N-th order Hénon synchronisation. Runs a local copy of the N-th order filtered Hénon map driven by ``r``, and recovers the message from the difference between the received sample and the local filtered state ``v = y[2]``: .. math:: \\hat{m}[n] = \\frac{r[n] - v[n]}{\\mu} Parameters ---------- r Received carrier, shape ``(N,)``. fir_coeffs FIR feedback coefficients, shape ``(Nc,)``. Must match the ones used at the transmitter — otherwise synchronisation fails. mu, a, b Must match the transmitter's parameters. y0 Optional explicit initial state, shape ``(Nc,)``. If omitted, a random state is drawn from ``Uniform(0, 0.5)``. seed Seed for the RNG used when ``y0=None``. Has no effect when ``y0`` is provided. Returns ------- m_hat : ndarray, shape (N,) Recovered message estimate. state : ndarray, shape (Nc, N + 1) Full state trajectory of the local oscillator. Column 0 holds ``y0``; each subsequent column is the next iterate. Note: this function does NOT implement :class:`~chaotic_pfc.comms.protocols.Receiver` because it returns a ``(recovered, state)`` tuple instead of a single ndarray. Use :func:`receive` (the 2-D variant) for code that types against the Receiver protocol.""" N = len(r) Nc = len(fir_coeffs) rng = np.random.default_rng(seed) c = np.asarray(fir_coeffs, dtype=float) state = np.zeros((Nc, N + 1)) state[:, 0] = y0 if y0 is not None else 0.5 * rng.random(Nc) m_hat = np.empty(N) for i in range(N): v = state[2, i] m_hat[i] = (r[i] - v) / mu henon_n4_step_inplace(state[:, i + 1], state[:, i], r[i], a, b, c) return m_hat, state