Source code for nav.support.correlate

import math
from typing import Any, cast

import numpy as np
import numpy.typing as npt
from numpy.fft import fft2, fftfreq, ifft2, ifftshift
from pdslogger import PdsLogger
from scipy.ndimage import gaussian_filter

from nav.config import IMAGE_LOGGER
from nav.support.image import crop_center, normalize_array, pad_top_left
from nav.support.misc import mad_std
from nav.support.types import NDArrayBoolType, NDArrayFloatType

_NDArrayComplexType = npt.NDArray[np.complexfloating[Any, Any]]

# Floor for masked NCC denominators and weight sums (avoid divide-by-zero).
_NCC_EPS = 1e-12

# ==============================================================
# Small utilities
# ==============================================================


[docs] def int_to_signed(idx: int, size: int) -> int: """Map ``[0 .. size-1]`` argmax index to a signed displacement coordinate. Parameters: idx: Peak index from correlation search in ``[0, size)``. size: FFT / correlation length (modulus for wrapping). Returns: ``int`` signed offset equivalent to ``idx`` when ``idx < size // 2``, else ``idx - size`` (negative branch for peaks in the second half). """ return idx if idx < size // 2 else idx - size
# ============================================================== # Fourier helpers # ==============================================================
[docs] def fourier_shift(img: NDArrayFloatType, dy: float, dx: float) -> NDArrayFloatType: """Subpixel shift via Fourier shift theorem (positive = down/right).""" fy = fftfreq(img.shape[0])[:, None] fx = fftfreq(img.shape[1])[None, :] phase = np.exp(-2j * np.pi * (dy * fy + dx * fx)) return np.real(ifft2(fft2(img) * phase))
[docs] def upsampled_dft( X: _NDArrayComplexType, up_factor: int, region_sz: tuple[int, int], offsets: tuple[int, int], ) -> _NDArrayComplexType: """Localized upsampled DFT. From Guizar-Sicairos, 2008. "Efficient subpixel image registration via cross-correlation." Optics Leters, 33(2):156-158 """ X_v_size, X_u_size = X.shape region_v, region_u = region_sz oy, ox = offsets ky = ifftshift(np.arange(X_v_size)) kx = ifftshift(np.arange(X_u_size)) a = np.arange(region_v) - oy b = np.arange(region_u) - ox j2pi = 1j * 2.0 * np.pi # Use +j sign to evaluate localized inverse DFT samples (consistent with ifft). Er = np.exp((j2pi / (X_v_size * up_factor)) * (a[:, None] @ ky[None, :])) Ec = np.exp((j2pi / (X_u_size * up_factor)) * (kx[:, None] @ b[None, :])) return cast(_NDArrayComplexType, Er @ X @ Ec)
# ============================================================== # Masked NCC (linear correlation via padding) # ============================================================== # Default overlap and variance floors for the bi-directional masked NCC # (only used when ``data_mask`` is provided). ``w_frac_min`` rejects # shifts whose effective-overlap weight w(s) is less than this fraction of # max(w); ``var_frac_min`` similarly rejects shifts where the image or # model variance under the mask is a tiny fraction of the best case, which # is where floating-point noise in the denominator would otherwise produce # spurious |NCC| >> 1. _NCC_BIDIR_W_FRAC_MIN: float = 0.3 _NCC_BIDIR_VAR_FRAC_MIN: float = 0.1
[docs] def masked_ncc( image: NDArrayFloatType, model: NDArrayFloatType, mask: NDArrayBoolType, data_mask: NDArrayBoolType | None = None, ) -> tuple[NDArrayFloatType, NDArrayFloatType]: """Masked normalized cross-correlation surface between image and model. Computes shift-wise NCC (Pearson r) using FFT-based sums. Also returns the unnormalized NCC numerator (mean-subtracted covariance) as a diagnostic. The NCC surface contains values in [-1, 1] at every shift where the mask overlaps non-constant image content, and is the primary peak-selection surface: it is invariant to the image variance under the shifted mask and therefore does not suffer from the "scale bias" that causes the numerator to prefer shifts where the fixed template mask straddles bright/dark boundaries in the image (e.g., the limb of a Lambert-shaded body disc). The numerator surface scales with sqrt(image-variance-under-mask) times the NCC, which is useful as a sanity check but must not be used for peak finding when the template is dense. Parameters: image: Padded image array. model: Padded model array (same shape as image). mask: Padded boolean mask (same shape as image). True where model pixels are valid. data_mask: Optional padded boolean mask indicating where ``image`` contains real sensor data (True) versus zero-padded margin (False). When provided, the NCC is computed in its bi-directional form so that pixel pairs where the shifted model mask straddles zero-padded image pixels do not bias the normalization. This removes the edge-ridge artifact that otherwise appears at ``|dV| = extfov_margin_v`` when the body model extends into the extfov margin. When ``None``, the standard (single-mask) NCC formula is used. Returns: Tuple of (ncc, numerator) arrays, each with the same shape as the inputs. ``ncc`` is the Pearson-r surface used for peak localization; ``numerator`` is the mean-subtracted cross-covariance (unnormalized) returned as a diagnostic. When ``data_mask`` is provided, shifts with degenerate overlap or variance are set to ``-inf`` in the NCC surface. """ ref_shape = image.shape if image.shape != model.shape or image.shape != mask.shape: raise ValueError( 'masked_ncc: image, model, and mask must have the same shape; ' f'got image {image.shape}, model {model.shape}, mask {mask.shape}.' ) if data_mask is not None and data_mask.shape != ref_shape: raise ValueError( 'masked_ncc: data_mask must match image shape ' f'{ref_shape}; got data_mask {data_mask.shape}.' ) if data_mask is not None: return _masked_ncc_bidir(image, model, mask, data_mask) image_fft = fft2(image) mask_fft = fft2(mask) model_mask_fft = fft2(model * mask) sum_w: float = float(np.sum(mask)) safe_w = sum_w + _NCC_EPS # Shift-wise sums via FFT (take real to discard floating-point imaginary noise) sum_iw = np.real(ifft2(image_fft * np.conj(mask_fft))) sum_i2w = np.real(ifft2(fft2(image**2) * np.conj(mask_fft))) sum_imw = np.real(ifft2(image_fft * np.conj(model_mask_fft))) # Model stats (constant over shifts) sum_mw: float = float(np.sum(model * mask)) mean_m: float = sum_mw / safe_w ssd_mw: float = float(np.sum(((model - mean_m) * mask) ** 2)) + _NCC_EPS # Shift-wise image mean mean_i = sum_iw / safe_w # Numerator: sum of (I - mean_I)(M - mean_M) * W over the mask num = sum_imw - mean_i * sum_mw # Denominator: sqrt( SSD_I(s) * SSD_M ) # Epsilon inside the sqrt so the floor is sqrt(_NCC_EPS) rather than # _NCC_EPS; this prevents floating-point noise in the numerator from # producing |NCC| >> 1 at shifts where the image variance is zero. ssd_iw = sum_i2w - sum_iw**2 / safe_w ssd_iw[ssd_iw < 0] = 0.0 denom = np.sqrt(ssd_iw * ssd_mw + _NCC_EPS) ncc = num / denom return ncc, num
def _masked_ncc_bidir( image: NDArrayFloatType, model: NDArrayFloatType, mask: NDArrayBoolType, data_mask: NDArrayBoolType, ) -> tuple[NDArrayFloatType, NDArrayFloatType]: """Bi-directional (data-mask aware) masked NCC; see :func:`masked_ncc`. The weight at shift s is w(s) = sum_i data_mask(i) * mask(i - s) rather than a constant sum(mask); image and model statistics under the mask are computed likewise with data_mask(i) as a per-pixel inclusion factor. This eliminates the zero-padding bias that drives a spurious peak at ``|dV| = extfov_margin_v`` when the model extends into the padded margin. Shifts with small overlap or degenerate variance are set to ``-inf`` so that peak finding does not chase floating-point noise. """ image = np.asarray(image, np.float64) model = np.asarray(model, np.float64) mask_f = mask.astype(np.float64) dmask_f = data_mask.astype(np.float64) image_fft = fft2(image) image2_fft = fft2(image * image) dmask_fft = fft2(dmask_f) mask_fft = fft2(mask_f) model_mask_fft = fft2(model * mask_f) model2_mask_fft = fft2((model * model) * mask_f) # Effective overlap weight at each shift. w_d = np.real(ifft2(dmask_fft * np.conj(mask_fft))) w_d_max = float(w_d.max()) w_floor = max(_NCC_BIDIR_W_FRAC_MIN * w_d_max, _NCC_EPS) overlap_ok = w_d >= w_floor # Shift-wise sums. ``image`` is already zero outside data_mask, so the # I*mask and I^2*mask sums are equivalent to dmask*I*mask and # dmask*I^2*mask respectively. sum_iw = np.real(ifft2(image_fft * np.conj(mask_fft))) sum_i2w = np.real(ifft2(image2_fft * np.conj(mask_fft))) sum_imw = np.real(ifft2(image_fft * np.conj(model_mask_fft))) # Model stats are shift-dependent because dmask selects which model # pixels participate at each shift. sum_mw = np.real(ifft2(dmask_fft * np.conj(model_mask_fft))) sum_m2w = np.real(ifft2(dmask_fft * np.conj(model2_mask_fft))) safe_w = np.where(overlap_ok, w_d, w_floor) mean_i = sum_iw / safe_w num = sum_imw - mean_i * sum_mw ssd_iw = np.maximum(sum_i2w - sum_iw**2 / safe_w, 0.0) ssd_mw = np.maximum(sum_m2w - sum_mw**2 / safe_w, 0.0) # Reject shifts where the image or model has near-constant content # under the effective mask; their NCC is dominated by floating-point # noise in the denominator. ssd_i_max = float(ssd_iw.max()) ssd_m_max = float(ssd_mw.max()) var_ok = (ssd_iw > _NCC_BIDIR_VAR_FRAC_MIN * ssd_i_max) & ( ssd_mw > _NCC_BIDIR_VAR_FRAC_MIN * ssd_m_max ) valid = overlap_ok & var_ok denom = np.sqrt(ssd_iw * ssd_mw) ncc = np.where(valid, num / np.maximum(denom, _NCC_EPS), 0.0) # Mathematically |NCC| <= 1; floating-point error can push it slightly # above, so clamp before invalidating. ncc = np.clip(ncc, -1.0, 1.0) ncc[~valid] = -np.inf return ncc, num # ============================================================== # Peak metrics & selection # ==============================================================
[docs] def psr_metric(corr: NDArrayFloatType, rc: tuple[int, int], guard: int = 5) -> float: corr_v, corr_u = corr.shape row, col = rc peak = corr[row, col] y, x = np.ogrid[:corr_v, :corr_u] mask = (y - row) ** 2 + (x - col) ** 2 > guard**2 bg = corr[mask] # Exclude invalidated-shift sentinels (-inf) before computing background stats; # otherwise ``bg.mean()`` collapses to -inf and PSR becomes NaN. bg = bg[np.isfinite(bg)] if bg.size == 0: return float('nan') return float((peak - bg.mean()) / (bg.std() + 1e-12))
[docs] def pmr_metric(corr: NDArrayFloatType, peak_val: float) -> float: finite = corr[np.isfinite(corr)] if finite.size == 0: return float('nan') return float(peak_val / (finite.mean() + 1e-12))
[docs] def per_metric(corr: NDArrayFloatType, peak_val: float) -> float: finite = corr[np.isfinite(corr)] if finite.size == 0: return float('nan') return float(peak_val / (np.sqrt(np.sum(finite**2)) + 1e-12))
[docs] def nms_topk( corr: NDArrayFloatType, k: int = 5, radius: int = 5, max_offset_vu: tuple[int, int] | None = None, ) -> list[tuple[int, int, float]]: """Non-maximum suppression to get top-k peaks. Parameters: corr: 2-D correlation surface (V x U). k: Maximum number of peaks to return. radius: Suppression radius around each selected peak in pixels. max_offset_vu: If given, only positions whose signed offset satisfies ``|dV| <= max_offset_vu[0]`` and ``|dU| <= max_offset_vu[1]`` are eligible. Signed offsets are derived from the FFT-convention wrap-around used by :func:`int_to_signed`. Returns: List of ``(row, col, value)`` tuples for up to ``k`` peaks. """ corr_v, corr_u = corr.shape work = corr.copy() if max_offset_vu is not None: max_v, max_u = max_offset_vu rows = np.arange(corr_v) cols = np.arange(corr_u) # Signed offsets via FFT wrap-around convention (same as int_to_signed). signed_rows = np.where(rows < corr_v // 2, rows, rows - corr_v) signed_cols = np.where(cols < corr_u // 2, cols, cols - corr_u) work[np.abs(signed_rows) > max_v, :] = -np.inf work[:, np.abs(signed_cols) > max_u] = -np.inf peaks: list[tuple[int, int, float]] = [] if not np.any(np.isfinite(work)): return peaks for _ in range(k): idx = int(np.argmax(work)) v = float(work.flat[idx]) if not np.isfinite(v): break row_i, col_i = np.unravel_index(idx, work.shape) row, col = int(row_i), int(col_i) peaks.append((row, col, v)) y, x = np.ogrid[:corr_v, :corr_u] work[(y - row) ** 2 + (x - col) ** 2 <= radius**2] = -np.inf return peaks
# ============================================================== # Fisher / CRLB # ==============================================================
[docs] def fisher_covariance(model_aligned: NDArrayFloatType, sigma_n: float) -> NDArrayFloatType: sy = np.gradient(model_aligned, axis=0) sx = np.gradient(model_aligned, axis=1) Sxx = np.sum(sx * sx) Syy = np.sum(sy * sy) Sxy = np.sum(sx * sy) F = (1.0 / (sigma_n**2 + 1e-18)) * np.array([[Sxx, Sxy], [Sxy, Syy]]) if np.linalg.cond(F) > 1e10: # Degenerate case: return large uncertainty return np.diag([1e6, 1e6]) return np.linalg.pinv(F + 1e-12 * np.eye(2))
# ============================================================== # Single-scale, K-peak evaluation (with optional prior) # ==============================================================
[docs] def evaluate_candidate( *, image_pad: NDArrayFloatType, model_pad: NDArrayFloatType, mask_pad: NDArrayBoolType, corr: NDArrayFloatType, rc: tuple[int, int], upsample_factor: int, model_shape: tuple[int, int], image_shape: tuple[int, int], logger: PdsLogger, prior_shift: tuple[float, float] | None = None, prior_weight: float = 0.0, metric: str = 'psr', ) -> dict[str, Any]: """ Evaluate a candidate for the navigation. Parameters: image_pad: The padded image. model_pad: The padded model. mask_pad: The padded mask. corr: The correlation matrix. rc: The row and column of the candidate. upsample_factor: The upsample factor. model_shape: The shape of the model. image_shape: The shape of the image. prior_shift: The prior shift. prior_weight: The prior weight. metric: The metric to use for the navigation. Returns: A dictionary containing the navigation result. """ corr_v, corr_u = corr.shape p, q = rc dy_i, dx_i = int_to_signed(p, corr_v), int_to_signed(q, corr_u) # Subpixel refinement: local upsampled DFT of correlation spectrum numerator spec = fft2(image_pad) * np.conj(fft2(model_pad * mask_pad)) # Center of the local upsampled DFT window should be the middle of the # evaluation region (e.g., 3x3 -> center index 1), not half the upsample factor. # Using upsample_factor here caused increasing bias for large factors. # Use a region size that scales with upsample_factor so the true peak # (which can be ~0.5*upsample_factor samples from center) is inside the window. region_v = upsample_factor + 1 region_u = upsample_factor + 1 oy = region_v // 2 ox = region_u // 2 Up = upsampled_dft( spec, upsample_factor, (region_v, region_u), (oy - dy_i * upsample_factor, ox - dx_i * upsample_factor), ) upy, upx = np.unravel_index(np.argmax(np.abs(Up)), Up.shape) dy = dy_i + (upy - oy) / upsample_factor dx = dx_i + (upx - ox) / upsample_factor # Align combined model and compute residual stats model_h, model_w = model_shape image_h, image_w = image_shape model_shift = fourier_shift(model_pad[:model_h, :model_w], dy, dx) model_crop = crop_center(model_shift, (image_h, image_w)) image_crop = image_pad[:image_h, :image_w] resid = normalize_array(image_crop) - normalize_array(model_crop) sigma_n = mad_std(resid) if sigma_n <= 1e-12: # When mad_std(resid) returns a value <= 1e-12, the code falls back to # max(resid.std(), 1e-6). This might indicate a perfect match or a numerical # issue, but the fallback silently continues. Consider logging this condition # or investigating why the residual variance is zero. # Add logging or a warning: # self.logger.warning("Residual variance near zero; using fallback sigma") sigma_n = max(resid.std(), 1e-6) cov = fisher_covariance(model_crop, sigma_n) # Quality metric peak_val = corr[p, q] if metric == 'psr': quality = psr_metric(corr, (p, q)) elif metric == 'pmr': quality = pmr_metric(corr, peak_val) elif metric == 'per': quality = per_metric(corr, peak_val) else: raise ValueError(f"metric must be 'psr', 'pmr', or 'per', not '{metric}'") # Prior penalty (encourage pyramid consistency or external priors) if prior_shift is not None and prior_weight > 0.0: # TODO The prior penalty subtracts prior_weight * dist from quality, but the units/scale of # quality (PSR/PMR/PER) may not be comparable to distance. This could make the penalty # disproportionately strong or weak depending on the metric choice. # Consider normalizing the distance penalty or using a separate scoring function that # combines quality and distance in a principled way (e.g., weighted sum with normalized # components). dist = np.hypot(dy - prior_shift[0], dx - prior_shift[1]) quality -= prior_weight * dist return { 'offset': (float(dy), float(dx)), 'cov': cov, 'sigma_xy': (float(np.sqrt(cov[0, 0])), float(np.sqrt(cov[1, 1]))), 'quality': float(quality.real), 'peak_val': float(peak_val.real), 'rc': (int(p), int(q)), }
# ============================================================== # Pyramid wrapper with K-peak final selection # ==============================================================