Source code for nav.reproj.rings

"""Ring reprojection and mosaicing utilities.

This module provides the RingMosaic class for reprojecting planetary ring
images onto radius/longitude grids and accumulating them into sparse mosaics.

Thread safety: RingMosaic.reproject() is not thread-safe because it may
temporarily mutate obs.fov and oops global precision settings. Concurrent
calls from separate threads will interfere.
"""

import enum
import logging
import math
from dataclasses import dataclass
from typing import Any

import numpy as np
import numpy.ma as ma
import oops
import scipy.ndimage as nd
from polymath import Scalar, Vector3

from nav.reproj._context_managers import _reduced_oops_precision
from nav.reproj._serialization import (
    infer_format,
    load_fits,
    load_npz,
    orbit_model_from_dict,
    orbit_model_to_dict,
    save_fits,
    save_npz,
    tuple_of_strings_field,
    verify_dtype,
)
from nav.reproj.photometric_model import PhotometricModel
from nav.reproj.ring_orbit_model import RingOrbitModel
from nav.support.image import array_unzoom, array_zoom
from nav.support.types import NDArrayBoolType, NDArrayFloatType, NDArrayIntType, PathLike

_LOGGING_NAME = 'nav.' + __name__


def _ring_plane_surface(ring_body_name: str) -> Any:
    """Return the oops ring surface used for longitude/radius <-> pixel conversion.

    Nav and oops backplanes use names like ``'saturn:ring'``. The corresponding
    ``oops.Body`` registry entry for the unbounded plane is ``SATURN_RING_PLANE``,
    not ``SATURN_RING``; the parent planet's ``ring_body`` attribute points at the
    correct body. For a two-part ``name:ring`` string we use that; otherwise we
    treat ``ring_body_name`` as a direct ``Body.lookup`` key (underscores for
    colons, uppercased), e.g. ``SATURN_MAIN_RINGS``.
    """
    parts = ring_body_name.lower().split(':')
    if len(parts) == 2 and parts[1] == 'ring':
        parent = oops.Body.lookup(parts[0].upper())
        ring_body = parent.ring_body
        if ring_body is None:
            raise ValueError(
                f'No ring plane is defined for body {parts[0]!r} '
                f'(ring_body_name was {ring_body_name!r})'
            )
        return ring_body.surface
    return oops.Body.lookup(ring_body_name.replace(':', '_').upper()).surface


[docs] class RingMosaicMergeStrategy(enum.Enum): """Strategy for resolving per-longitude-column conflicts when adding data to a RingMosaic. Members: BEST_RESOLUTION: Replace an existing longitude column only when the new data has strictly better mean radial resolution for that column. MOST_COVERAGE_THEN_RESOLUTION: Fill empty (missing) longitude columns first; for already-present columns, replace only when the new data has better mean radial resolution. """ BEST_RESOLUTION = 'best_resolution' MOST_COVERAGE_THEN_RESOLUTION = 'most_coverage_then_resolution'
# Main ring radius bounds used when no explicit range is given RINGS_MIN_RADIUS = oops.body.SATURN_MAIN_RINGS[0] RINGS_MAX_RADIUS = oops.body.SATURN_MAIN_RINGS[1] # Slop values: must be smaller than the smallest resolution we will ever use _LONGITUDE_SLOP = 1e-6 # rad _RADIUS_SLOP = 1e-6 # km _MAX_LONGITUDE = math.pi * 2.0 - _LONGITUDE_SLOP * 2 # ``bool`` subclasses ``int``; reject for range endpoints. _REAL_SCALAR_TYPES = (int, float, np.integer, np.floating) def _validate_reproject_longitude_range(longitude_range: object) -> tuple[float, float]: """Validate ``RingMosaic.reproject(..., longitude_range=...)``; return finite floats.""" if not isinstance(longitude_range, (tuple, list)): raise TypeError( 'reproject() longitude_range must be a tuple or list of two numbers (rad), ' f'got {type(longitude_range).__name__}' ) if len(longitude_range) != 2: raise ValueError( 'reproject() longitude_range must have exactly two elements (start, end) in radians, ' f'got length {len(longitude_range)}' ) lon_start, lon_end = longitude_range[0], longitude_range[1] for label, x in (('start', lon_start), ('end', lon_end)): if isinstance(x, bool) or not isinstance(x, _REAL_SCALAR_TYPES): raise TypeError( f'reproject() longitude_range {label} must be int or float, got {type(x).__name__}' ) lon_start_f = float(lon_start) lon_end_f = float(lon_end) if not math.isfinite(lon_start_f) or not math.isfinite(lon_end_f): raise ValueError( 'reproject() longitude_range values must be finite; ' f'got start={longitude_range[0]!r}, end={longitude_range[1]!r}' ) if lon_start_f < 0.0 or lon_start_f > _MAX_LONGITUDE: raise ValueError( 'reproject() longitude_range start must satisfy 0 <= start <= _MAX_LONGITUDE (rad); ' f'got start={lon_start_f!r}' ) if lon_end_f < 0.0 or lon_end_f > _MAX_LONGITUDE: raise ValueError( 'reproject() longitude_range end must satisfy 0 <= end <= _MAX_LONGITUDE (rad); ' f'got end={lon_end_f!r}' ) if lon_start_f > lon_end_f: raise ValueError( 'reproject() longitude_range must have start <= end (rad); ' f'got start={lon_start_f!r}, end={lon_end_f!r}' ) return lon_start_f, lon_end_f def _validate_reproject_radius_range(radius_range: object) -> tuple[float, float]: """Validate ``RingMosaic.reproject(..., radius_range=...)``; return finite floats. ``inner < outer`` is required in both modes: absolute radii (km) when ``orbit_model is None``, or signed offsets (km) from the orbital radius when a model is used. """ if not isinstance(radius_range, (tuple, list)): raise TypeError( 'reproject() radius_range must be a tuple or list of two numbers (km), ' f'got {type(radius_range).__name__}' ) if len(radius_range) != 2: raise ValueError( 'reproject() radius_range must have exactly two elements (inner, outer) in km, ' f'got length {len(radius_range)}' ) ri, ro = radius_range[0], radius_range[1] for label, x in (('inner', ri), ('outer', ro)): if isinstance(x, bool) or not isinstance(x, _REAL_SCALAR_TYPES): raise TypeError( f'reproject() radius_range {label} must be int or float, got {type(x).__name__}' ) ri_f = float(ri) ro_f = float(ro) if not math.isfinite(ri_f) or not math.isfinite(ro_f): raise ValueError( 'reproject() radius_range values must be finite; ' f'got inner={radius_range[0]!r}, outer={radius_range[1]!r}' ) if ri_f >= ro_f: raise ValueError( 'reproject() radius_range must have inner < outer (km). With orbit_model=None ' 'these are absolute radii; with a model they are signed offsets from the orbital ' f'radius at each (longitude, time). Got inner={ri_f!r}, outer={ro_f!r}.' ) return ri_f, ro_f def _validate_reproject_zoom_amt(zoom_amt: object) -> tuple[int, int]: """Validate ``RingMosaic.reproject(..., zoom_amt=...)``; return ``(radial, longitude)`` ints.""" if isinstance(zoom_amt, (list, tuple)): if len(zoom_amt) != 2: raise ValueError( 'reproject() zoom_amt as a sequence must have exactly two elements ' f'(radial, longitude zoom factors), got length {len(zoom_amt)}' ) out: list[int] = [] for i, z in enumerate(zoom_amt, start=1): if isinstance(z, (bool, np.bool_)): raise TypeError( f'reproject() zoom_amt element {i} must be int-like (not bool), got bool' ) if not isinstance(z, (int, np.integer)): raise TypeError( f'reproject() zoom_amt element {i} must be int or numpy integer, ' f'got {type(z).__name__}' ) out.append(int(z)) return (out[0], out[1]) if isinstance(zoom_amt, (bool, np.bool_)): raise TypeError( 'reproject() zoom_amt must be int (not bool) or a tuple/list of two ints, got bool' ) if isinstance(zoom_amt, (int, np.integer)): z = int(zoom_amt) return (z, z) raise TypeError( 'reproject() zoom_amt must be int (not bool) or a tuple/list of two ints, ' f'got {type(zoom_amt).__name__}' ) # Module-level defaults for RingMosaic parameters DEFAULT_LONGITUDE_RESOLUTION = 0.02 * math.pi / 180.0 # 0.02 degrees in rad DEFAULT_RADIUS_RESOLUTION = 5.0 # km DEFAULT_ZOOM = (1, 1) DEFAULT_MARGIN = 3
[docs] @dataclass(frozen=True) class RingReprojResult: """Data returned by RingMosaic.reproject(). The image and per-longitude metadata arrays are always sparse: only longitude columns with valid data are included. Use ``longitude_antimask`` to reconstruct the actual longitude values. The interpretation of ``radius_inner`` / ``radius_outer`` and longitudes depends on whether ``orbit_model`` is ``None``: - ``orbit_model is None``: longitudes are inertial J2000 ring longitudes; ``radius_inner`` and ``radius_outer`` are absolute ring radii in km. - ``orbit_model is not None``: longitudes are co-rotating; ``radius_inner`` and ``radius_outer`` are signed offsets in km from the orbit's actual radial position at each (longitude, time). For an eccentric orbit, that radial position varies between ``a (1 - e)`` and ``a (1 + e)``; the offset semantics make an eccentric ring appear as a straight line in the reprojection. This is a :func:`dataclasses.dataclass`; each field is documented on its own member entry below. """ body_name: str img: ma.MaskedArray longitude_resolution: float radius_resolution: float radius_inner: float radius_outer: float longitude_antimask: NDArrayBoolType mean_radial_resolution: NDArrayFloatType mean_angular_resolution: NDArrayFloatType mean_phase: NDArrayFloatType mean_emission: NDArrayFloatType incidence: float time: float orbit_model: RingOrbitModel | None image_dtype: np.dtype metadata_dtype: np.dtype photometric_model_name: str | None = None image_name: str = ''
[docs] def save( self, path: PathLike, *, format_: str | None = None, compress: bool = True, ) -> None: """Save this RingReprojResult to a file. Parameters: path: Output path (``str``, ``pathlib.Path``, or ``filecache.FCPath``). The format is inferred from the extension (.npz for NumPy archive, .fits/.fit for FITS) unless ``format_`` is given. format_: Explicit format: ``'npz'`` or ``'fits'``. compress: If True (default), use compressed npz. Ignored for FITS. Raises: ValueError: If the format cannot be inferred or is not supported. Example:: result = ring_mosaic.reproject(obs) result.save('ring_reproj.npz') reloaded = RingReprojResult.load('ring_reproj.npz') """ fmt = infer_format(path, format_) payload: dict[str, Any] = { 'body_name': self.body_name, 'img': self.img, 'longitude_resolution': self.longitude_resolution, 'radius_resolution': self.radius_resolution, 'radius_inner': self.radius_inner, 'radius_outer': self.radius_outer, 'longitude_antimask': self.longitude_antimask, 'mean_radial_resolution': self.mean_radial_resolution, 'mean_angular_resolution': self.mean_angular_resolution, 'mean_phase': self.mean_phase, 'mean_emission': self.mean_emission, 'incidence': self.incidence, 'time': self.time, 'orbit_model': orbit_model_to_dict(self.orbit_model), 'image_dtype': self.image_dtype, 'metadata_dtype': self.metadata_dtype, 'photometric_model_name': self.photometric_model_name, 'image_name': self.image_name, } if fmt == 'npz': save_npz(path, 'RingReprojResult', 1, payload, compress=compress) else: save_fits(path, 'RingReprojResult', 1, payload)
[docs] @classmethod def load( cls, path: PathLike, *, format_: str | None = None, ) -> 'RingReprojResult': """Load a RingReprojResult from a file. Parameters: path: Input path (``str``, ``pathlib.Path``, or ``filecache.FCPath``). format_: Explicit format (``'npz'`` or ``'fits'``). If None, inferred from the file extension. Returns: A new RingReprojResult with the loaded data. Raises: ValueError: If the file's kind does not match, or if any array dtype does not match the declared ``image_dtype`` / ``metadata_dtype``. Example:: result = RingReprojResult.load('ring_reproj.npz') """ fmt = infer_format(path, format_) d = ( load_npz(path, 'RingReprojResult') if fmt == 'npz' else load_fits(path, 'RingReprojResult') ) _REQUIRED_KEYS_REPROJ = { 'image_dtype', 'metadata_dtype', 'body_name', 'img', 'longitude_resolution', 'radius_resolution', 'radius_inner', 'radius_outer', 'longitude_antimask', 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', 'incidence', 'time', } missing = _REQUIRED_KEYS_REPROJ - d.keys() if missing: raise ValueError( f'RingReprojResult file is missing required keys: {sorted(missing)}. ' f'Found keys: {sorted(d.keys())!r}. ' 'If this path ends in .fits but the file is actually an npz archive, ' 'rename it to .npz or pass format_= explicitly; otherwise the file may be ' 'truncated or not written by RingReprojResult.save().' ) image_dtype = np.dtype(str(d['image_dtype'])) metadata_dtype = np.dtype(str(d['metadata_dtype'])) verify_dtype( { k: d[k] for k in ( 'img', 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', ) }, image_dtype=image_dtype, metadata_dtype=metadata_dtype, image_fields=['img'], metadata_fields=[ 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', ], ) # Reconstruct orbit model from flattened dict keys orbit_d: dict[str, Any] = {} prefix = 'orbit_model__' for k, v in d.items(): if k.startswith(prefix): orbit_d[k[len(prefix) :]] = v orbit_model = orbit_model_from_dict(orbit_d) if orbit_d else None phot = d.get('photometric_model_name') if phot is None or (isinstance(phot, str) and phot.strip() == ''): photometric_model_name: str | None = None else: photometric_model_name = str(phot) return cls( body_name=str(d['body_name']), img=d['img'], longitude_resolution=float(d['longitude_resolution']), radius_resolution=float(d['radius_resolution']), radius_inner=float(d['radius_inner']), radius_outer=float(d['radius_outer']), longitude_antimask=np.asarray(d['longitude_antimask'], dtype=np.bool_), mean_radial_resolution=d['mean_radial_resolution'], mean_angular_resolution=d['mean_angular_resolution'], mean_phase=d['mean_phase'], mean_emission=d['mean_emission'], incidence=float(d['incidence']), time=float(d['time']), orbit_model=orbit_model, image_dtype=image_dtype, metadata_dtype=metadata_dtype, photometric_model_name=photometric_model_name, image_name=str(d.get('image_name', '') or ''), )
[docs] @dataclass(frozen=True) class RingMosaicData: """Mosaic data returned by RingMosaic retrieval methods. The meaning of ``radius_inner`` / ``radius_outer`` and longitudes depends on whether ``orbit_model_name`` is ``None``: - ``orbit_model_name is None``: longitudes are inertial J2000 ring longitudes; ``radius_inner`` and ``radius_outer`` are absolute ring radii in km. - ``orbit_model_name is not None``: longitudes are co-rotating in the named orbit model's frame; ``radius_inner`` and ``radius_outer`` are signed offsets in km from the orbital radius at each (longitude, time) — i.e. from ``orbit_model.radius_at_longitude(inertial_lon, et)``. This is a :func:`dataclasses.dataclass`; each field is documented on its own member entry below. """ body_name: str ring_body_name: str shadow_body_name: str longitude_resolution: float radius_resolution: float radius_inner: float radius_outer: float longitude_antimask: NDArrayBoolType img: ma.MaskedArray longitude_range: tuple[float, float] | None mean_radial_resolution: ma.MaskedArray mean_angular_resolution: ma.MaskedArray mean_phase: ma.MaskedArray mean_emission: ma.MaskedArray mean_incidence: float image_number: ma.MaskedArray time: ma.MaskedArray image_dtype: np.dtype metadata_dtype: np.dtype contributing_image_names: tuple[str, ...] = () orbit_model_name: str | None = None photometric_model_name: str | None = None
[docs] def save( self, path: PathLike, *, format_: str | None = None, compress: bool = True, ) -> None: """Save this RingMosaicData to a file. Parameters: path: Output path (``str``, ``pathlib.Path``, or ``filecache.FCPath``). The format is inferred from the extension (.npz for NumPy archive, .fits/.fit for FITS) unless ``format_`` is given. format_: Explicit format: ``'npz'`` or ``'fits'``. compress: If True (default), use compressed npz. Ignored for FITS. Raises: ValueError: If the format cannot be inferred or is not supported. Example:: data = ring_mosaic.to_bounded(longitude_range=(0.0, math.pi)) data.save('saturn_rings.npz') data.save('saturn_rings.fits', format_='fits') reloaded = RingMosaicData.load('saturn_rings.npz') """ fmt = infer_format(path, format_) payload: dict[str, Any] = { 'body_name': self.body_name, 'ring_body_name': self.ring_body_name, 'shadow_body_name': self.shadow_body_name, 'longitude_resolution': self.longitude_resolution, 'radius_resolution': self.radius_resolution, 'radius_inner': self.radius_inner, 'radius_outer': self.radius_outer, 'longitude_antimask': self.longitude_antimask, 'img': self.img, 'longitude_range': self.longitude_range, 'mean_radial_resolution': self.mean_radial_resolution, 'mean_angular_resolution': self.mean_angular_resolution, 'mean_phase': self.mean_phase, 'mean_emission': self.mean_emission, 'mean_incidence': self.mean_incidence, 'image_number': self.image_number, 'time': self.time, 'image_dtype': self.image_dtype, 'metadata_dtype': self.metadata_dtype, 'contributing_image_names': self.contributing_image_names, 'orbit_model_name': self.orbit_model_name, 'photometric_model_name': self.photometric_model_name, } if fmt == 'npz': save_npz(path, 'RingMosaicData', 1, payload, compress=compress) else: save_fits(path, 'RingMosaicData', 1, payload)
[docs] @classmethod def load( cls, path: PathLike, *, format_: str | None = None, ) -> 'RingMosaicData': """Load a RingMosaicData from a file. Parameters: path: Input path (``str``, ``pathlib.Path``, or ``filecache.FCPath``). format_: Explicit format (``'npz'`` or ``'fits'``). If None, inferred from the file extension. Returns: A new RingMosaicData with the loaded data. Raises: ValueError: If the file's kind does not match, or if any array dtype does not match the declared ``image_dtype`` / ``metadata_dtype``, or if ``image_number`` is not uint16, or if ``time`` is not float64. Example:: data = RingMosaicData.load('saturn_rings.npz') """ fmt = infer_format(path, format_) d = load_npz(path, 'RingMosaicData') if fmt == 'npz' else load_fits(path, 'RingMosaicData') _REQUIRED_KEYS_MOSAIC = { 'image_dtype', 'metadata_dtype', 'body_name', 'ring_body_name', 'shadow_body_name', 'img', 'longitude_resolution', 'radius_resolution', 'radius_inner', 'radius_outer', 'longitude_antimask', 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', 'mean_incidence', 'time', 'image_number', } missing = _REQUIRED_KEYS_MOSAIC - d.keys() if missing: raise ValueError( f'RingMosaicData file is missing required keys: {sorted(missing)}. ' f'Found keys: {sorted(d.keys())!r}. ' 'If the path ends in .fits but the payload is npz (ZIP), rename to .npz ' 'or pass format_=; otherwise the file may be corrupt or from another tool.' ) image_dtype = np.dtype(str(d['image_dtype'])) metadata_dtype = np.dtype(str(d['metadata_dtype'])) verify_dtype( { k: d[k] for k in ( 'img', 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', 'time', 'image_number', ) }, image_dtype=image_dtype, metadata_dtype=metadata_dtype, image_fields=['img'], metadata_fields=[ 'mean_radial_resolution', 'mean_angular_resolution', 'mean_phase', 'mean_emission', ], float64_fields=['time'], ) lon_range_raw = d.get('longitude_range') longitude_range: tuple[float, float] | None if lon_range_raw is None: longitude_range = None else: longitude_range = (float(lon_range_raw[0]), float(lon_range_raw[1])) contributing_image_names = tuple_of_strings_field(d.get('contributing_image_names')) om_raw = d.get('orbit_model_name') orbit_model_name = None if om_raw is None else str(om_raw) pm_raw = d.get('photometric_model_name') if pm_raw is None or (isinstance(pm_raw, str) and pm_raw.strip() == ''): photometric_model_name: str | None = None else: photometric_model_name = str(pm_raw) return cls( body_name=str(d['body_name']), ring_body_name=str(d['ring_body_name']), shadow_body_name=str(d['shadow_body_name']), longitude_resolution=float(d['longitude_resolution']), radius_resolution=float(d['radius_resolution']), radius_inner=float(d['radius_inner']), radius_outer=float(d['radius_outer']), longitude_antimask=np.asarray(d['longitude_antimask'], dtype=np.bool_), img=d['img'], longitude_range=longitude_range, mean_radial_resolution=d['mean_radial_resolution'], mean_angular_resolution=d['mean_angular_resolution'], mean_phase=d['mean_phase'], mean_emission=d['mean_emission'], mean_incidence=float(d['mean_incidence']), image_number=d['image_number'], time=d['time'], image_dtype=image_dtype, metadata_dtype=metadata_dtype, contributing_image_names=contributing_image_names, orbit_model_name=orbit_model_name, photometric_model_name=photometric_model_name, )
[docs] class RingMosaic: """Sparse ring mosaic accumulator. Reprojects ring images onto a radius/longitude grid and accumulates them into a mosaic using true sparse longitude storage. Only longitude columns that actually contain reprojected data are stored; new columns are inserted in sorted order using batched ``np.insert`` calls. The interpretation of ``radius_inner`` / ``radius_outer`` and longitudes depends on ``orbit_model``: - ``orbit_model is None`` (default): longitudes are inertial J2000 ring longitudes; ``radius_inner`` and ``radius_outer`` are absolute ring radii in km. - ``orbit_model is not None``: longitudes are co-rotating in the model's frame; ``radius_inner`` and ``radius_outer`` are signed offsets in km from the orbital radius at each (longitude, time). For an eccentric orbit, that radial position varies between ``a (1 - e)`` and ``a (1 + e)``. The offset semantics make an eccentric ring appear as a straight line in the reprojection. A typical 2000 km wide window centred on the orbit uses ``radius_inner = -1000`` and ``radius_outer = +1000``. Parameters: body_name: The planet name (e.g. 'SATURN'). Used to derive the oops ring body name and shadow body name for backplane calls. radius_inner: Inner radius (km, absolute) when ``orbit_model is None``; signed offset (km) from the orbital radius at each (longitude, time) otherwise. radius_outer: Outer radius (km, absolute) when ``orbit_model is None``; signed offset (km) from the orbital radius at each (longitude, time) otherwise. Must be greater than ``radius_inner``. longitude_resolution: Longitude bin width (rad/pixel). radius_resolution: Radius bin height (km/pixel). merge_strategy: How to resolve conflicts when the same longitude column appears in multiple reprojections. orbit_model: Default RingOrbitModel for co-rotating frame reprojections; selects the offset-radius semantics described above. Can be overridden per-call in reproject(); the override must agree with this default. image_dtype: NumPy dtype for the reprojected brightness ``img`` array. Defaults to ``np.float64``. metadata_dtype: NumPy dtype for geometry arrays (``mean_radial_resolution``, ``mean_angular_resolution``, ``mean_phase``, ``mean_emission``). Defaults to ``np.float32``. photometric_model: Optional photometric correction applied to ``img`` in ``reproject()`` before accumulation. ``None`` means raw brightness. Notes: reproject() is not thread-safe because it mutates obs.fov and oops global precision settings. ``time`` is always stored as ``float64`` regardless of ``metadata_dtype``. ``image_number`` is always stored as ``uint16``; ``_image_count`` is stored as the image_number before incrementing, so valid image_number values span 0..65535 (a total of 65,536 images). ``add()`` raises ``OverflowError`` after the 65,536th image is added. """ def __init__( self, body_name: str, radius_inner: float, radius_outer: float, *, longitude_resolution: float = DEFAULT_LONGITUDE_RESOLUTION, radius_resolution: float = DEFAULT_RADIUS_RESOLUTION, merge_strategy: RingMosaicMergeStrategy = ( RingMosaicMergeStrategy.MOST_COVERAGE_THEN_RESOLUTION ), orbit_model: RingOrbitModel | None = None, image_dtype: np.typing.DTypeLike = np.float64, metadata_dtype: np.typing.DTypeLike = np.float32, photometric_model: PhotometricModel | None = None, ) -> None: """Initialize an empty RingMosaic.""" if radius_inner >= radius_outer: raise ValueError( f'radius_inner ({radius_inner}) must be less than radius_outer ({radius_outer})' ) if longitude_resolution <= 0 or longitude_resolution >= 2 * math.pi: raise ValueError( f'longitude_resolution must be positive and < 2*pi, got {longitude_resolution!r}' ) if radius_resolution <= 0: raise ValueError(f'radius_resolution must be positive, got {radius_resolution!r}') self._body_name = body_name self._ring_body_name = body_name.lower() + ':ring' self._shadow_body_name = body_name.lower() self._radius_inner = radius_inner self._radius_outer = radius_outer self._lon_resolution = longitude_resolution self._rad_resolution = radius_resolution self._merge_strategy = merge_strategy self._orbit_model = orbit_model self._image_dtype: np.dtype = np.dtype(image_dtype) self._metadata_dtype: np.dtype = np.dtype(metadata_dtype) self._photometric_model = photometric_model self._n_radius = math.ceil((radius_outer - radius_inner + _RADIUS_SLOP) / radius_resolution) # Span [0, _MAX_LONGITUDE] (slightly below 2π); matches bin indexing that clamps to # ``_MAX_LONGITUDE`` (e.g. ``longitude_valid_range(..., longitude_end=_MAX_LONGITUDE)``). self._n_full_lon = math.floor(_MAX_LONGITUDE / longitude_resolution) + 1 # Sparse storage: only valid longitude columns are held. # _antimask[i] is True iff longitude bin i has data. self._antimask: NDArrayBoolType = np.zeros(self._n_full_lon, dtype=np.bool_) self._img_sparse: ma.MaskedArray = ma.MaskedArray( np.empty((self._n_radius, 0), dtype=self._image_dtype), mask=np.ones((self._n_radius, 0), dtype=np.bool_), ) self._mean_radial_res: NDArrayFloatType = np.empty(0, dtype=self._metadata_dtype) self._mean_angular_res: NDArrayFloatType = np.empty(0, dtype=self._metadata_dtype) self._mean_phase: NDArrayFloatType = np.empty(0, dtype=self._metadata_dtype) self._mean_emission: NDArrayFloatType = np.empty(0, dtype=self._metadata_dtype) self._mean_incidence_sum: float = 0.0 self._mean_incidence_count: int = 0 self._mean_incidence: float = 0.0 self._image_number: NDArrayIntType = np.empty(0, dtype=np.uint16) self._time: NDArrayFloatType = np.empty(0, dtype=np.float64) self._image_count: int = 0 self._contributing_image_names: list[str] = [] # ------------------------------------------------------------------ # Public read-only properties # ------------------------------------------------------------------ @property def body_name(self) -> str: """The planet name supplied at construction.""" return self._body_name @property def ring_body_name(self) -> str: """oops body name for the ring plane (e.g. 'saturn:ring').""" return self._ring_body_name @property def shadow_body_name(self) -> str: """oops body name for the shadow-casting planet (e.g. 'saturn').""" return self._shadow_body_name @property def bounds(self) -> tuple[float, float] | None: """Longitude extent of current mosaic data as (min, max) in radians. Returns None if no data has been added yet. """ true_indices = np.where(self._antimask)[0] if len(true_indices) == 0: return None return ( float(true_indices[0]) * self._lon_resolution, float(true_indices[-1]) * self._lon_resolution, ) # ------------------------------------------------------------------ # Static grid generation utilities # ------------------------------------------------------------------
[docs] @staticmethod def generate_longitudes( longitude_start: float = 0.0, longitude_end: float = _MAX_LONGITUDE, *, longitude_resolution: float = DEFAULT_LONGITUDE_RESOLUTION, ) -> NDArrayFloatType: """Generate a longitude array aligned to grid boundaries. Parameters: longitude_start: Start of the range (rad). Defaults to 0. longitude_end: End of the range (rad). Defaults to just under 2*pi. longitude_resolution: Step size (rad/pixel). Returns: 1-D array of longitudes (rad) on resolution boundaries, with no value less than longitude_start or greater than longitude_end. """ if longitude_resolution <= 0: raise ValueError(f'longitude_resolution must be positive, got {longitude_resolution!r}') start_idx = math.ceil(longitude_start / longitude_resolution) end_idx = math.floor(longitude_end / longitude_resolution) return np.arange(start_idx, end_idx + 1) * longitude_resolution
[docs] @staticmethod def generate_radii( radius_inner: float, radius_outer: float, *, radius_resolution: float = DEFAULT_RADIUS_RESOLUTION, ) -> NDArrayFloatType: """Generate a radius array from inner to outer. Parameters: radius_inner: Inner radius (km). radius_outer: Outer radius (km). radius_resolution: Step size (km/pixel). Returns: 1-D array of radii (km) starting at radius_inner with step radius_resolution, ending at or just before radius_outer. """ if radius_resolution <= 0: raise ValueError(f'radius_resolution must be positive, got {radius_resolution!r}') if radius_outer < radius_inner: raise ValueError( f'radius_outer ({radius_outer}) must be >= radius_inner ({radius_inner})' ) n = math.ceil((radius_outer - radius_inner + _RADIUS_SLOP) / radius_resolution) return np.arange(n) * radius_resolution + radius_inner
# ------------------------------------------------------------------ # Coordinate conversion # ------------------------------------------------------------------
[docs] @staticmethod def longitude_radius_to_pixels( obs: Any, longitude: NDArrayFloatType, radius: NDArrayFloatType, *, orbit_model: RingOrbitModel | None = None, ring_body_name: str = 'saturn:ring', ) -> tuple[NDArrayFloatType, NDArrayFloatType]: """Convert longitude/radius pairs to image pixel coordinates. If orbit_model is given, the longitude values are treated as co-rotating and are converted to inertial before the coordinate lookup. Parameters: obs: The Observation whose FOV is used. longitude: Longitude array (rad). radius: Radius array (km). orbit_model: RingOrbitModel for co-rotating frame conversion, or None for inertial longitude. ring_body_name: oops ring body name for the surface lookup. Returns: Tuple of (u, v) floating-point pixel coordinate arrays. """ longitude = Scalar(longitude) radius = Scalar(radius) if orbit_model is not None: longitude = orbit_model.corotating_to_inertial(longitude, obs.midtime) if len(longitude) == 0: return np.zeros(0, dtype=float), np.zeros(0, dtype=float) ring_surface = _ring_plane_surface(ring_body_name) obs_event = oops.Event(obs.midtime, (Vector3.ZERO, Vector3.ZERO), obs.path, obs.frame) _, obs_event = ring_surface.photon_to_event_by_coords(obs_event, (radius, longitude)) uv = obs.fov.uv_from_los(obs_event.neg_arr_ap) u, v = uv.to_scalars() return u.vals, v.vals
[docs] @staticmethod def orbit_pixels( obs: Any, orbit_model: RingOrbitModel, *, ring_body_name: str = 'saturn:ring', longitude_step: float = 0.002 * math.pi / 180.0, ) -> tuple[NDArrayFloatType, NDArrayFloatType]: """Return (u, v) pixel pairs for an orbit model feature in an image. Computes the full 0..2pi longitude range, converts each to image coordinates, and filters to those inside the FOV. Parameters: obs: The Observation (with its FOV already configured as needed). orbit_model: The ring orbit model to trace. ring_body_name: oops ring body name. longitude_step: Longitude step for sampling (rad). Returns: Tuple of (u_pixels, v_pixels) integer pixel coordinate arrays. """ longitudes, radii = orbit_model.longitude_radius(obs.midtime, step=longitude_step) bp = obs.ext_bp u_min = 0 v_min = 0 u_max = obs.extdata_shape_uv[0] - 1 v_max = obs.extdata_shape_uv[1] - 1 bp_radius = bp.ring_radius(ring_body_name) bp_longitude = bp.ring_longitude(ring_body_name) min_bp_radius = bp_radius.min() max_bp_radius = bp_radius.max() min_bp_longitude = bp_longitude.min() max_bp_longitude = bp_longitude.max() goodr = (radii >= min_bp_radius) & (radii <= max_bp_radius) goodl = (longitudes >= min_bp_longitude) & (longitudes <= max_bp_longitude) good = goodr & goodl longitudes = longitudes[good] radii = radii[good] u_pix, v_pix = RingMosaic.longitude_radius_to_pixels( obs, longitudes, radii, ring_body_name=ring_body_name ) in_fov = (u_pix >= u_min) & (u_pix <= u_max) & (v_pix >= v_min) & (v_pix <= v_max) return u_pix[in_fov], v_pix[in_fov]
# ------------------------------------------------------------------ # Reprojection # ------------------------------------------------------------------
[docs] def reproject( self, obs: Any, data: ma.MaskedArray | None = None, *, longitude_range: tuple[float, float] | None = None, radius_range: tuple[float, float] | None = None, margin: int = DEFAULT_MARGIN, zoom_amt: int | tuple[int, int] = DEFAULT_ZOOM, orbit_model: RingOrbitModel | None = None, uv_range: tuple[int, int, int, int] | None = None, omit_shadow: bool = True, image_name: str = '', ) -> RingReprojResult: """Reproject the ring in an image to a sparse radius/longitude grid. Always returns a sparse RingReprojResult: only longitude columns with valid data are stored in the result. The returned longitude_antimask indicates which of the ``n_full_lon`` bins are present. Parameters: obs: The Observation (with its FOV already configured as needed). data: Image data to reproject. If None, uses obs.data. longitude_range: (start, end) longitude limits (rad). Defaults to the full 0..2pi range. radius_range: (inner, outer) radius limits (km). Defaults to the mosaic's own radius_inner/outer. margin: Number of edge pixels to exclude. Must be >= 1. zoom_amt: Positive integer or (radial, longitude) tuple giving the zoom factor for sub-pixel interpolation. Negative values select spline interpolation order (not yet supported). orbit_model: RingOrbitModel for co-rotating frame conversion. Overrides the default set at construction. None uses inertial longitude. uv_range: (start_u, end_u, start_v, end_v) to restrict the image region reprojected. omit_shadow: If True, mask pixels inside the planet's shadow. image_name: Optional label stored on the result (e.g. source image stem). Returns: RingReprojResult with sparse image and metadata arrays. Raises: NotImplementedError: If negative zoom_amt (spline order) is requested. TypeError: If ``longitude_range`` or ``radius_range`` is not a tuple/list of numeric endpoints, or an endpoint has the wrong type. ValueError: If ``longitude_range`` or ``radius_range`` has wrong length, non-finite values, longitudes outside ``[0, _MAX_LONGITUDE]``, ``start > end``, ``radius_inner >= radius_outer``, if ``zoom_amt`` is a sequence with length other than 2, if ``n_longitude_bins_zoom`` is not a multiple of ``l_zoom_amt``, or for other argument errors. TypeError: If ``zoom_amt`` is not an int (excluding ``bool``) or a length-2 tuple/list of ints, or an element is not int-like. """ logger = logging.getLogger(_LOGGING_NAME + '.reproject') logger.debug( 'longitude_range=%s radius_range=%s zoom=%s', longitude_range, radius_range, zoom_amt, ) if margin < 1: raise ValueError(f'margin must be >= 1, got {margin!r}') if orbit_model is None: orbit_model = self._orbit_model elif orbit_model != self._orbit_model: raise ValueError( 'reproject() orbit_model must match the orbit_model passed to ' "RingMosaic's constructor (radius_inner / radius_outer semantics " 'are tied to that choice).' ) if data is None: data = obs.data data = data.view(ma.MaskedArray) if radius_range is None: radius_inner = self._radius_inner radius_outer = self._radius_outer else: # radius_range uses the same convention as the constructor's # radius_inner / radius_outer: absolute km if orbit_model is None, # signed offsets from orbit_model.a otherwise. radius_inner, radius_outer = _validate_reproject_radius_range(radius_range) if longitude_range is None: longitude_start = 0.0 longitude_end = _MAX_LONGITUDE else: longitude_start, longitude_end = _validate_reproject_longitude_range(longitude_range) zoom_radial, zoom_longitude = _validate_reproject_zoom_amt(zoom_amt) if zoom_radial > 0: r_zoom_amt: int = zoom_radial r_spline_order = 0 else: r_zoom_amt = 1 r_spline_order = -zoom_radial if zoom_longitude > 0: l_zoom_amt: int = zoom_longitude l_spline_order = 0 else: l_zoom_amt = 1 l_spline_order = -zoom_longitude with _reduced_oops_precision(): return self._reproject_inner( obs=obs, data=data, longitude_start=longitude_start, longitude_end=longitude_end, radius_inner=radius_inner, radius_outer=radius_outer, margin=margin, r_zoom_amt=r_zoom_amt, l_zoom_amt=l_zoom_amt, r_spline_order=r_spline_order, l_spline_order=l_spline_order, orbit_model=orbit_model, uv_range=uv_range, omit_shadow=omit_shadow, logger=logger, image_name=image_name, )
def _reproject_inner( self, *, obs: Any, data: ma.MaskedArray, longitude_start: float, longitude_end: float, radius_inner: float, radius_outer: float, margin: int, r_zoom_amt: int, l_zoom_amt: int, r_spline_order: int, l_spline_order: int, orbit_model: RingOrbitModel | None, uv_range: tuple[int, int, int, int] | None, omit_shadow: bool, logger: logging.Logger, image_name: str, ) -> RingReprojResult: """Inner reprojection logic, runs with reduced oops precision.""" if r_spline_order != 0 or l_spline_order != 0: raise NotImplementedError( 'Spline interpolation (negative zoom_amt) is not yet supported' ) meshgrid = None start_u = 0 end_u = obs.data_shape_uv[0] - 1 start_v = 0 end_v = obs.data_shape_uv[1] - 1 if uv_range is not None: start_u, end_u, start_v, end_v = uv_range meshgrid = oops.Meshgrid.for_fov( obs.fov, origin=(start_u + 0.5, start_v + 0.5), limit=(end_u + 0.5, end_v + 0.5), swap=True, ) bp = oops.backplane.Backplane(obs, meshgrid) logger.debug('Computing radius backplane') bp_radius = bp.ring_radius(self._ring_body_name) logger.debug('Computing longitude backplane') bp_longitude = bp.ring_longitude(self._ring_body_name) if ma.is_masked(data): bp_radius = bp_radius.remask_or(data.mask) bp_longitude = bp_longitude.remask_or(data.mask) logger.debug('Computing geometry backplanes') bp_radial_res = bp.ring_radial_resolution(self._ring_body_name) bp_angular_res = bp.ring_angular_resolution(self._ring_body_name) bp_phase = bp.phase_angle(self._ring_body_name) bp_emission = bp.emission_angle(self._ring_body_name) bp_incidence = bp.incidence_angle(self._ring_body_name) if ma.is_masked(data): bp_radial_res = bp_radial_res.remask_or(data.mask) bp_angular_res = bp_angular_res.remask_or(data.mask) bp_phase = bp_phase.remask_or(data.mask) bp_emission = bp_emission.remask_or(data.mask) bp_incidence = bp_incidence.remask_or(data.mask) if omit_shadow: logger.debug('Computing shadow mask') shadow = bp.where_inside_shadow(self._ring_body_name, self._shadow_body_name).vals data = ma.masked_where(shadow, data) # Match image masking so per-column geometry means exclude shadowed pixels. bp_radius = bp_radius.remask_or(shadow) bp_longitude = bp_longitude.remask_or(shadow) bp_radial_res = bp_radial_res.remask_or(shadow) bp_angular_res = bp_angular_res.remask_or(shadow) bp_phase = bp_phase.remask_or(shadow) bp_emission = bp_emission.remask_or(shadow) bp_incidence = bp_incidence.remask_or(shadow) if orbit_model is not None: # Per-pixel signed offset from the orbit model's radius at each # pixel's inertial longitude. The radius filter uses these offsets # because radius_inner / radius_outer are themselves offsets from # the orbital radius at each (longitude, time) in the orbit-model # semantics. ``radius_at_longitude`` calls ``np.cos`` internally, # which does not handle polymath Scalars, so we feed it the raw # numpy values; subtracting the resulting numpy array from the # ``bp_radius`` Scalar preserves the existing mask. model_radii = orbit_model.radius_at_longitude(bp_longitude.vals, obs.midtime) bp_radius_filter = bp_radius - model_radii bp_longitude = orbit_model.inertial_to_corotating(bp_longitude, obs.midtime) else: bp_radius_filter = bp_radius n_radius_bins = math.ceil( (radius_outer - radius_inner + _RADIUS_SLOP) / self._rad_resolution ) n_radius_bins_zoom = n_radius_bins * r_zoom_amt full_min_lon_bin = int(np.floor(longitude_start / self._lon_resolution)) full_max_lon_bin = int(np.floor(longitude_end / self._lon_resolution)) n_full_lon_bins = full_max_lon_bin - full_min_lon_bin + 1 restr_bp_lon = bp_longitude[ (bp_longitude >= longitude_start) & (bp_longitude <= longitude_end) & (bp_radius_filter >= radius_inner) & (bp_radius_filter <= radius_outer) ] bp_lon_binned = np.floor( (restr_bp_lon.vals - longitude_start) / self._lon_resolution ).astype('int') full_good_antimask = np.zeros(n_full_lon_bins, dtype=np.bool_) full_good_antimask[bp_lon_binned] = True filter_width = int(1.0 / np.degrees(self._lon_resolution)) if filter_width > 1: full_good_antimask = nd.maximum_filter1d(full_good_antimask, filter_width, mode='wrap') full_lon_bins: NDArrayIntType = np.arange(n_full_lon_bins, dtype=np.int32) if l_zoom_amt == 1: full_lon_bins_zoom: NDArrayFloatType = full_lon_bins.astype(np.float64) else: full_lon_bins_zoom = np.arange(n_full_lon_bins * l_zoom_amt) / float(l_zoom_amt) full_good_antimask_zoom = array_zoom(full_good_antimask, (l_zoom_amt,)) lon_bins_restr: NDArrayIntType = full_lon_bins[full_good_antimask] lon_bins_restr_zoom: NDArrayFloatType if l_zoom_amt == 1: lon_bins_restr_zoom = lon_bins_restr.astype(np.float64) else: lon_bins_restr_zoom = full_lon_bins_zoom[full_good_antimask_zoom] n_lon_bins = len(lon_bins_restr) n_lon_bins_zoom = len(lon_bins_restr_zoom) if (n_lon_bins_zoom % l_zoom_amt) != 0: raise ValueError( f'n_lon_bins_zoom ({n_lon_bins_zoom}) is not a multiple of ' f'l_zoom_amt ({l_zoom_amt})' ) long_bins = np.tile(np.arange(n_lon_bins), n_radius_bins) long_bins_act = np.tile( lon_bins_restr * self._lon_resolution + longitude_start, n_radius_bins ) if r_zoom_amt == 1 and l_zoom_amt == 1: long_bins_zoom = long_bins long_bins_act_zoom = long_bins_act else: long_bins_zoom = np.tile(np.arange(n_lon_bins_zoom), n_radius_bins_zoom) long_bins_act_zoom = np.tile( lon_bins_restr_zoom * self._lon_resolution + longitude_start, n_radius_bins_zoom, ) rad_bins = np.repeat(np.arange(n_radius_bins), n_lon_bins) if r_zoom_amt == 1 and l_zoom_amt == 1: rad_bins_zoom = rad_bins else: rad_bins_zoom = np.repeat(np.arange(n_radius_bins_zoom), n_lon_bins_zoom) rad_bins_act: NDArrayFloatType rad_bins_act_zoom: NDArrayFloatType if orbit_model is not None: # radius_inner is a signed offset from the orbital radius at each # (longitude, time); the absolute radius at column c, row r is # (offset_at_row_r) + model_r(c). This makes an eccentric ring # appear as a straight line in the reprojection. inertial_lons_act = orbit_model.corotating_to_inertial(long_bins_act, obs.midtime) rad_bins_act = ( rad_bins * self._rad_resolution + radius_inner + orbit_model.radius_at_longitude(inertial_lons_act, obs.midtime) ) if r_zoom_amt == 1 and l_zoom_amt == 1: rad_bins_act_zoom = rad_bins_act else: inertial_lons_zoom = orbit_model.corotating_to_inertial( long_bins_act_zoom, obs.midtime ) rad_offset_zoom = orbit_model.radius_at_longitude(inertial_lons_zoom, obs.midtime) rad_bins_act_zoom = ( rad_bins_zoom / float(r_zoom_amt) * self._rad_resolution + radius_inner + rad_offset_zoom ) else: rad_bins_act = rad_bins * self._rad_resolution + radius_inner if r_zoom_amt == 1 and l_zoom_amt == 1: rad_bins_act_zoom = rad_bins_act else: raise NotImplementedError( 'Zoom with non-corotating inertial radius is not yet implemented' ) u_frac_zoom, v_frac_zoom = self.longitude_radius_to_pixels( obs, long_bins_act_zoom, rad_bins_act_zoom, orbit_model=orbit_model, ring_body_name=self._ring_body_name, ) u_frac_zoom_rect = u_frac_zoom.reshape((n_radius_bins_zoom, n_lon_bins_zoom)) u_frac = u_frac_zoom_rect[::r_zoom_amt, ::l_zoom_amt].reshape(long_bins_act.shape) v_frac_zoom_rect = v_frac_zoom.reshape((n_radius_bins_zoom, n_lon_bins_zoom)) v_frac = v_frac_zoom_rect[::r_zoom_amt, ::l_zoom_amt].reshape(long_bins_act.shape) u_frac -= start_u v_frac -= start_v u_pix = np.floor(u_frac).astype('int') v_pix = np.floor(v_frac).astype('int') good = ( (u_pix >= margin) & (u_pix <= end_u - start_u - margin) & (v_pix >= margin) & (v_pix <= end_v - start_v - margin) ) u_frac = u_frac[good] v_frac = v_frac[good] u_pix = u_pix[good] v_pix = v_pix[good] if r_zoom_amt == 1 and l_zoom_amt == 1: u_pix_zoom, v_pix_zoom = u_pix, v_pix good_zoom = good else: u_frac_zoom -= start_u v_frac_zoom -= start_v u_pix_zoom = np.floor(u_frac_zoom).astype('int') v_pix_zoom = np.floor(v_frac_zoom).astype('int') good_zoom = ( (u_pix_zoom >= margin) & (u_pix_zoom <= end_u - start_u - margin) & (v_pix_zoom >= margin) & (v_pix_zoom <= end_v - start_v - margin) ) u_pix_zoom = u_pix_zoom[good_zoom] v_pix_zoom = v_pix_zoom[good_zoom] good_rad = rad_bins[good] good_lon = long_bins[good] zoomed = r_zoom_amt != 1 or l_zoom_amt != 1 good_rad_zoom = rad_bins_zoom[good_zoom] if zoomed else good_rad good_lon_zoom = long_bins_zoom[good_zoom] if zoomed else good_lon restr_data = data[v_pix_zoom, u_pix_zoom] repro_img: ma.MaskedArray = ma.zeros( (n_radius_bins_zoom, n_lon_bins_zoom), dtype=self._image_dtype ) repro_img[:, :] = ma.masked repro_img[good_rad_zoom, good_lon_zoom] = restr_data repro_img = ma.MaskedArray(array_unzoom(repro_img, (r_zoom_amt, l_zoom_amt))) good_lon_antimask = ~ma.getmaskarray(ma.sum(repro_img, axis=0)) repro_radial_res = ma.zeros((n_radius_bins, n_lon_bins), dtype=self._metadata_dtype) repro_radial_res[:] = ma.masked repro_radial_res[good_rad, good_lon] = bp_radial_res.mvals[v_pix, u_pix] radial_res_antimask = ~ma.getmaskarray(ma.mean(repro_radial_res, axis=0)) good_lon_antimask &= radial_res_antimask repro_angular_res = ma.zeros((n_radius_bins, n_lon_bins), dtype=self._metadata_dtype) repro_angular_res[:] = ma.masked repro_angular_res[good_rad, good_lon] = bp_angular_res.mvals[v_pix, u_pix] repro_phase = ma.zeros((n_radius_bins, n_lon_bins), dtype=self._metadata_dtype) repro_phase[:] = ma.masked repro_phase[good_rad, good_lon] = bp_phase.mvals[v_pix, u_pix] repro_emission = ma.zeros((n_radius_bins, n_lon_bins), dtype=self._metadata_dtype) repro_emission[:] = ma.masked repro_emission[good_rad, good_lon] = bp_emission.mvals[v_pix, u_pix] mean_incidence = ma.mean(bp_incidence.mvals[v_pix, u_pix]) repro_incidence = float(mean_incidence) if mean_incidence is not ma.masked else float('nan') # Compress to sparse representation repro_img = repro_img[:, good_lon_antimask] repro_mean_radial_res = ma.mean(repro_radial_res[:, good_lon_antimask], axis=0).filled(0.0) repro_mean_angular_res = ma.mean(repro_angular_res[:, good_lon_antimask], axis=0).filled( 0.0 ) repro_mean_phase = ma.mean(repro_phase[:, good_lon_antimask], axis=0).filled(0.0) repro_mean_emission = ma.mean(repro_emission[:, good_lon_antimask], axis=0).filled(0.0) # Full antimask for the longitude range covered new_antimask = np.zeros(self._n_full_lon, dtype=np.bool_) new_antimask[lon_bins_restr[good_lon_antimask] + full_min_lon_bin] = True logger.debug('Reprojection complete: %d valid longitudes', int(good_lon_antimask.sum())) photometric_model_name: str | None = None if self._photometric_model is not None and math.isfinite(float(repro_incidence)): photometric_model_name = self._photometric_model.name n_rad_sparse, n_lon_sparse = repro_img.shape phase_2d = np.broadcast_to( np.asarray(repro_mean_phase, dtype=np.float64)[np.newaxis, :], (n_rad_sparse, n_lon_sparse), ) emission_2d = np.broadcast_to( np.asarray(repro_mean_emission, dtype=np.float64)[np.newaxis, :], (n_rad_sparse, n_lon_sparse), ) incidence_2d = np.broadcast_to( np.asarray(repro_incidence, dtype=np.float64), (n_rad_sparse, n_lon_sparse), ) mask_img = ma.getmaskarray(repro_img) d_work = np.asarray(repro_img.filled(np.nan), dtype=np.float64) d_work = np.nan_to_num(d_work, nan=0.0) inc_w = np.nan_to_num(incidence_2d, nan=0.0) emi_w = np.nan_to_num(emission_2d, nan=0.0) pha_w = np.nan_to_num(phase_2d, nan=0.0) corr = self._photometric_model.correct( d_work, incidence=inc_w, emission=emi_w, phase=pha_w ) out_dtype = repro_img.dtype corr = np.asarray(corr, dtype=out_dtype) corr = np.where(mask_img, np.nan, corr) repro_img = ma.masked_array(corr, mask=mask_img) return RingReprojResult( body_name=self._body_name, longitude_resolution=self._lon_resolution, radius_resolution=self._rad_resolution, radius_inner=radius_inner, radius_outer=radius_outer, longitude_antimask=new_antimask, img=repro_img, mean_radial_resolution=np.asarray(repro_mean_radial_res, dtype=self._metadata_dtype), mean_angular_resolution=np.asarray(repro_mean_angular_res, dtype=self._metadata_dtype), mean_phase=np.asarray(repro_mean_phase, dtype=self._metadata_dtype), mean_emission=np.asarray(repro_mean_emission, dtype=self._metadata_dtype), incidence=repro_incidence, time=obs.midtime, orbit_model=orbit_model, image_dtype=self._image_dtype, metadata_dtype=self._metadata_dtype, photometric_model_name=photometric_model_name, image_name=image_name, ) # ------------------------------------------------------------------ # Mosaic accumulation # ------------------------------------------------------------------ def _validate_repro_compatible(self, repro: RingReprojResult) -> None: """Raise ``ValueError`` if ``repro`` does not match this mosaic's grid and dtypes.""" if repro.body_name != self._body_name: raise ValueError( 'RingMosaic.add(): body mismatch (field body): ' f'mosaic={self._body_name!r}, repro={repro.body_name!r}' ) if abs(repro.longitude_resolution - self._lon_resolution) > 1e-12: raise ValueError( 'RingMosaic.add(): grid resolution mismatch (field longitude_resolution): ' f'mosaic={self._lon_resolution!r}, repro={repro.longitude_resolution!r}' ) if abs(repro.radius_resolution - self._rad_resolution) > 1e-9: raise ValueError( 'RingMosaic.add(): grid resolution mismatch (field radius_resolution): ' f'mosaic={self._rad_resolution!r}, repro={repro.radius_resolution!r}' ) if not math.isclose(repro.radius_inner, self._radius_inner, rel_tol=0.0, abs_tol=1e-9): raise ValueError( 'RingMosaic.add(): radius bounds mismatch (field radius_inner): ' f'mosaic={self._radius_inner!r}, repro={repro.radius_inner!r}' ) if not math.isclose(repro.radius_outer, self._radius_outer, rel_tol=0.0, abs_tol=1e-9): raise ValueError( 'RingMosaic.add(): radius bounds mismatch (field radius_outer): ' f'mosaic={self._radius_outer!r}, repro={repro.radius_outer!r}' ) if np.dtype(repro.image_dtype) != np.dtype(self._image_dtype): raise ValueError( 'RingMosaic.add(): dtype mismatch (field image_dtype): ' f'mosaic={self._image_dtype!r}, repro={repro.image_dtype!r}' ) if np.dtype(repro.metadata_dtype) != np.dtype(self._metadata_dtype): raise ValueError( 'RingMosaic.add(): dtype mismatch (field metadata_dtype): ' f'mosaic={self._metadata_dtype!r}, repro={repro.metadata_dtype!r}' ) if repro.longitude_antimask.shape != (self._n_full_lon,): raise ValueError( 'RingMosaic.add(): grid mismatch (field longitude_antimask length): ' f'mosaic expects {self._n_full_lon} longitude bins, repro has ' f'{int(np.size(repro.longitude_antimask))}' ) if repro.img.ndim != 2: raise ValueError( 'RingMosaic.add(): repro.img must be 2-D ' f'(field sparse img), got shape {repro.img.shape!r}' ) if repro.img.shape[0] != self._n_radius: raise ValueError( 'RingMosaic.add(): grid mismatch (field radius bin count): ' f'mosaic has {self._n_radius} radius bins, repro.img.shape[0]={repro.img.shape[0]}' ) n_valid = int(repro.img.shape[1]) for field_name, arr in ( ('mean_radial_resolution', repro.mean_radial_resolution), ('mean_angular_resolution', repro.mean_angular_resolution), ('mean_phase', repro.mean_phase), ('mean_emission', repro.mean_emission), ): if arr.ndim != 1 or int(arr.shape[0]) != n_valid: raise ValueError( 'RingMosaic.add(): sparse column count mismatch ' f'(field {field_name} vs repro.img): ' f'img has {n_valid} columns, {field_name}.shape={arr.shape!r}' )
[docs] def add(self, repro: RingReprojResult) -> None: """Add a reprojected image to the mosaic. New longitude columns are inserted into the sparse arrays using a single batched ``np.insert`` call. Existing columns are updated according to the merge strategy. The reprojection's orbit model and photometric model must match the mosaic's. ``orbit_model`` must be the same instance (or both ``None``); ``photometric_model_name`` must equal the mosaic's photometric model name (or both ``None``). Mixing different settings would silently corrupt the mosaic because radii and longitudes have different meanings under different orbit models. Parameters: repro: Sparse reprojection result from reproject(). Raises: OverflowError: If the number of images added would exceed the uint16 maximum of 65 535. ValueError: If the reprojection's body name, grid resolution, radius bounds, dtypes, sparse shapes, ``longitude_antimask`` true count vs. sparse column count, orbit model, or photometric model does not match the mosaic's. """ self._validate_repro_compatible(repro) n_lon_true = int(np.count_nonzero(repro.longitude_antimask)) n_sparse_cols = int(repro.img.shape[1]) if n_lon_true != n_sparse_cols: raise ValueError( 'RingMosaic.add(): RingReprojResult.longitude_antimask must have one True ' 'entry per sparse column in repro.img; ' f'count_nonzero(longitude_antimask)={n_lon_true}, ' f'repro.img.shape[1]={n_sparse_cols}' ) # RingOrbitModel is a frozen dataclass with value-based equality # (auto-generated __eq__), so a model loaded from disk is equal to # but not identical to the in-memory mosaic instance. Compare by # value, not identity. if repro.orbit_model != self._orbit_model: mosaic_name = self._orbit_model.name if self._orbit_model is not None else None repro_name = repro.orbit_model.name if repro.orbit_model is not None else None raise ValueError( 'RingMosaic.add(): reprojection orbit model does not match ' f'mosaic orbit model (mosaic={mosaic_name!r}, repro={repro_name!r}). ' 'All reprojections added to a mosaic must use the same orbit ' 'model so that radius and longitude semantics agree.' ) mosaic_phot_name = ( self._photometric_model.name if self._photometric_model is not None else None ) if repro.photometric_model_name != mosaic_phot_name: raise ValueError( 'RingMosaic.add(): reprojection photometric model does not match ' f'mosaic photometric model (mosaic={mosaic_phot_name!r}, ' f'repro={repro.photometric_model_name!r}).' ) if self._image_count > np.iinfo(np.uint16).max: raise OverflowError( f'image_count {self._image_count} exceeds uint16 max ' f'{np.iinfo(np.uint16).max}; cannot add more images' ) valid_bins = np.where(repro.longitude_antimask)[0] if len(valid_bins) == 0: return existing_bins = np.where(self._antimask)[0] new_mask = ~self._antimask[valid_bins] new_bins = valid_bins[new_mask] old_bins = valid_bins[~new_mask] if len(new_bins) > 0: self._insert_new_columns(new_bins, valid_bins, repro) # Update antimask immediately so _update_existing_columns # sees the correct sparse column offsets. self._antimask[new_bins] = True if len(old_bins) > 0: self._update_existing_columns(old_bins, valid_bins, existing_bins, repro) self._antimask[valid_bins] = True self._contributing_image_names.append(repro.image_name) self._image_count += 1 if math.isfinite(repro.incidence): self._mean_incidence_sum += float(repro.incidence) self._mean_incidence_count += 1 self._mean_incidence = ( self._mean_incidence_sum / float(self._mean_incidence_count) if self._mean_incidence_count > 0 else 0.0 )
def _insert_new_columns( self, new_bins: NDArrayIntType, valid_bins: NDArrayIntType, repro: RingReprojResult, ) -> None: """Insert columns for brand-new longitude bins via batched np.insert.""" # For each new bin, find where it sits among the currently-stored # columns (which correspond to sorted existing_bins). existing_bins = np.where(self._antimask)[0] insert_positions = np.searchsorted(existing_bins, new_bins) # Indices of new_bins in the repro valid_bins array new_repro_idx = np.searchsorted(valid_bins, new_bins) # Expand all sparse arrays by inserting the new columns new_img_cols = repro.img[:, new_repro_idx] # [n_radius, n_new] new_img_data = ma.getdata(new_img_cols) new_img_mask = ma.getmaskarray(new_img_cols) img_data = np.insert(ma.getdata(self._img_sparse), insert_positions, new_img_data, axis=1) img_mask = np.insert( ma.getmaskarray(self._img_sparse), insert_positions, new_img_mask, axis=1 ) self._img_sparse = ma.MaskedArray(img_data, mask=img_mask) self._mean_radial_res = np.insert( self._mean_radial_res, insert_positions, repro.mean_radial_resolution[new_repro_idx] ) self._mean_angular_res = np.insert( self._mean_angular_res, insert_positions, repro.mean_angular_resolution[new_repro_idx] ) self._mean_phase = np.insert( self._mean_phase, insert_positions, repro.mean_phase[new_repro_idx] ) self._mean_emission = np.insert( self._mean_emission, insert_positions, repro.mean_emission[new_repro_idx] ) self._image_number = np.insert( self._image_number, insert_positions, np.full(len(new_bins), self._image_count, dtype=np.uint16), ) self._time = np.insert( self._time, insert_positions, np.full(len(new_bins), repro.time, dtype=np.float64), ) def _update_existing_columns( self, old_bins: NDArrayIntType, valid_bins: NDArrayIntType, existing_bins: NDArrayIntType, repro: RingReprojResult, ) -> None: """Update columns that already exist in the mosaic per merge strategy.""" # sparse index of each old bin in current storage existing_bins_current = np.where(self._antimask)[0] sparse_idx = np.searchsorted(existing_bins_current, old_bins) # index of each old bin in repro.valid_bins repro_idx = np.searchsorted(valid_bins, old_bins) for k in range(len(old_bins)): si = sparse_idx[k] ri = repro_idx[k] if self._should_replace(si, ri, repro): self._img_sparse[:, si] = repro.img[:, ri] self._mean_radial_res[si] = repro.mean_radial_resolution[ri] self._mean_angular_res[si] = repro.mean_angular_resolution[ri] self._mean_phase[si] = repro.mean_phase[ri] self._mean_emission[si] = repro.mean_emission[ri] self._image_number[si] = self._image_count self._time[si] = repro.time def _should_replace( self, sparse_idx: int, repro_idx: int, repro: RingReprojResult, ) -> bool: """Return True if the repro column should replace the existing sparse column.""" if self._merge_strategy == RingMosaicMergeStrategy.BEST_RESOLUTION: return bool(repro.mean_radial_resolution[repro_idx] < self._mean_radial_res[sparse_idx]) # MOST_COVERAGE_THEN_RESOLUTION existing_valid = int((~ma.getmaskarray(self._img_sparse[:, sparse_idx])).sum()) new_valid = int((~ma.getmaskarray(repro.img[:, repro_idx])).sum()) if new_valid != existing_valid: return new_valid > existing_valid return bool(repro.mean_radial_resolution[repro_idx] < self._mean_radial_res[sparse_idx]) # ------------------------------------------------------------------ # Retrieval # ------------------------------------------------------------------
[docs] def to_sparse(self) -> RingMosaicData: """Return the mosaic in its native sparse representation. The returned img has shape [n_radius, n_valid_longitudes] and the longitude_antimask has length n_full_lon with True at each stored column. Returns: RingMosaicData with the sparse internal arrays. """ return self._build_result( img=self._img_sparse, antimask=self._antimask.copy(), longitude_range=None, per_lon_1d=True, )
[docs] def to_full(self) -> RingMosaicData: """Return the mosaic as a dense full-circle array. The returned img has shape [n_radius, n_full_lon]. Longitude columns with no data are fully masked. Returns: RingMosaicData with a full-circle dense array. """ full_img_data = np.zeros((self._n_radius, self._n_full_lon), dtype=self._image_dtype) full_img_mask = np.ones((self._n_radius, self._n_full_lon), dtype=np.bool_) full_mean_rad_res = np.zeros(self._n_full_lon, dtype=self._metadata_dtype) full_mean_ang_res = np.zeros(self._n_full_lon, dtype=self._metadata_dtype) full_mean_phase = np.zeros(self._n_full_lon, dtype=self._metadata_dtype) full_mean_emission = np.zeros(self._n_full_lon, dtype=self._metadata_dtype) full_img_number = np.zeros(self._n_full_lon, dtype=np.uint16) full_time = np.zeros(self._n_full_lon, dtype=np.float64) full_mask_1d = np.ones(self._n_full_lon, dtype=np.bool_) valid_bins = np.where(self._antimask)[0] if len(valid_bins) > 0: full_img_data[:, valid_bins] = ma.getdata(self._img_sparse) full_img_mask[:, valid_bins] = ma.getmaskarray(self._img_sparse) full_mean_rad_res[valid_bins] = self._mean_radial_res full_mean_ang_res[valid_bins] = self._mean_angular_res full_mean_phase[valid_bins] = self._mean_phase full_mean_emission[valid_bins] = self._mean_emission full_img_number[valid_bins] = self._image_number full_time[valid_bins] = self._time full_mask_1d[valid_bins] = False return RingMosaicData( body_name=self._body_name, ring_body_name=self._ring_body_name, shadow_body_name=self._shadow_body_name, longitude_resolution=self._lon_resolution, radius_resolution=self._rad_resolution, radius_inner=self._radius_inner, radius_outer=self._radius_outer, longitude_antimask=self._antimask.copy(), img=ma.MaskedArray(full_img_data, mask=full_img_mask), longitude_range=None, mean_radial_resolution=ma.MaskedArray(full_mean_rad_res, mask=full_mask_1d), mean_angular_resolution=ma.MaskedArray(full_mean_ang_res, mask=full_mask_1d), mean_phase=ma.MaskedArray(full_mean_phase, mask=full_mask_1d), mean_emission=ma.MaskedArray(full_mean_emission, mask=full_mask_1d), mean_incidence=self._mean_incidence, image_number=ma.MaskedArray(full_img_number, mask=full_mask_1d), time=ma.MaskedArray(full_time, mask=full_mask_1d), image_dtype=self._image_dtype, metadata_dtype=self._metadata_dtype, contributing_image_names=tuple(self._contributing_image_names), orbit_model_name=self._orbit_model.name if self._orbit_model else None, photometric_model_name=( self._photometric_model.name if self._photometric_model is not None else None ), )
[docs] def to_bounded( self, *, longitude_range: tuple[float, float], ) -> RingMosaicData: """Return the mosaic restricted to the given longitude range. The returned img has shape [n_radius, n_bins_in_range] where every bin from the start to end of the range is included. Bins without data are fully masked. Parameters: longitude_range: (start, end) in radians. Returns: RingMosaicData covering exactly the requested longitude range. Raises: TypeError: If ``longitude_range`` is not a tuple/list of two numbers, or an endpoint has the wrong type (same rules as :meth:`reproject`). ValueError: If endpoints are non-finite, outside ``[0, _MAX_LONGITUDE]``, or ``start > end`` (same rules as :meth:`reproject`). """ lon_start_f, lon_end_f = _validate_reproject_longitude_range(longitude_range) start_bin = round(lon_start_f / self._lon_resolution) end_bin = round(lon_end_f / self._lon_resolution) n_bins = end_bin - start_bin + 1 bounded_img_data = np.zeros((self._n_radius, n_bins), dtype=self._image_dtype) bounded_img_mask = np.ones((self._n_radius, n_bins), dtype=np.bool_) bounded_mean_rad_res = np.zeros(n_bins, dtype=self._metadata_dtype) bounded_mean_ang_res = np.zeros(n_bins, dtype=self._metadata_dtype) bounded_mean_phase = np.zeros(n_bins, dtype=self._metadata_dtype) bounded_mean_emission = np.zeros(n_bins, dtype=self._metadata_dtype) bounded_img_number = np.zeros(n_bins, dtype=np.uint16) bounded_time = np.zeros(n_bins, dtype=np.float64) bounded_mask_1d = np.ones(n_bins, dtype=np.bool_) bounded_antimask = np.zeros(self._n_full_lon, dtype=np.bool_) valid_global_bins = np.where(self._antimask)[0] in_range = (valid_global_bins >= start_bin) & (valid_global_bins <= end_bin) range_global_bins = valid_global_bins[in_range] if len(range_global_bins) > 0: lbs = range_global_bins - start_bin sparse_idxs = np.searchsorted(valid_global_bins, range_global_bins) bounded_img_data[:, lbs] = ma.getdata(self._img_sparse)[:, sparse_idxs] bounded_img_mask[:, lbs] = ma.getmaskarray(self._img_sparse)[:, sparse_idxs] bounded_mean_rad_res[lbs] = self._mean_radial_res[sparse_idxs] bounded_mean_ang_res[lbs] = self._mean_angular_res[sparse_idxs] bounded_mean_phase[lbs] = self._mean_phase[sparse_idxs] bounded_mean_emission[lbs] = self._mean_emission[sparse_idxs] bounded_img_number[lbs] = self._image_number[sparse_idxs] bounded_time[lbs] = self._time[sparse_idxs] bounded_mask_1d[lbs] = False bounded_antimask[range_global_bins] = True return RingMosaicData( body_name=self._body_name, ring_body_name=self._ring_body_name, shadow_body_name=self._shadow_body_name, longitude_resolution=self._lon_resolution, radius_resolution=self._rad_resolution, radius_inner=self._radius_inner, radius_outer=self._radius_outer, longitude_antimask=bounded_antimask, img=ma.MaskedArray(bounded_img_data, mask=bounded_img_mask), longitude_range=(lon_start_f, lon_end_f), mean_radial_resolution=ma.MaskedArray(bounded_mean_rad_res, mask=bounded_mask_1d), mean_angular_resolution=ma.MaskedArray(bounded_mean_ang_res, mask=bounded_mask_1d), mean_phase=ma.MaskedArray(bounded_mean_phase, mask=bounded_mask_1d), mean_emission=ma.MaskedArray(bounded_mean_emission, mask=bounded_mask_1d), mean_incidence=self._mean_incidence, image_number=ma.MaskedArray(bounded_img_number, mask=bounded_mask_1d), time=ma.MaskedArray(bounded_time, mask=bounded_mask_1d), image_dtype=self._image_dtype, metadata_dtype=self._metadata_dtype, contributing_image_names=tuple(self._contributing_image_names), orbit_model_name=self._orbit_model.name if self._orbit_model else None, photometric_model_name=( self._photometric_model.name if self._photometric_model is not None else None ), )
def _build_result( self, *, img: ma.MaskedArray, antimask: NDArrayBoolType, longitude_range: tuple[float, float] | None, per_lon_1d: bool, ) -> RingMosaicData: """Build a RingMosaicData for to_sparse().""" if per_lon_1d: mask_1d = np.zeros(img.shape[1], dtype=np.bool_) else: mask_1d = np.ones(img.shape[1], dtype=np.bool_) return RingMosaicData( body_name=self._body_name, ring_body_name=self._ring_body_name, shadow_body_name=self._shadow_body_name, longitude_resolution=self._lon_resolution, radius_resolution=self._rad_resolution, radius_inner=self._radius_inner, radius_outer=self._radius_outer, longitude_antimask=antimask, img=img.copy(), longitude_range=longitude_range, mean_radial_resolution=ma.MaskedArray(self._mean_radial_res.copy(), mask=mask_1d), mean_angular_resolution=ma.MaskedArray(self._mean_angular_res.copy(), mask=mask_1d), mean_phase=ma.MaskedArray(self._mean_phase.copy(), mask=mask_1d), mean_emission=ma.MaskedArray(self._mean_emission.copy(), mask=mask_1d), mean_incidence=self._mean_incidence, image_number=ma.MaskedArray(self._image_number.copy(), mask=mask_1d), time=ma.MaskedArray(self._time.copy(), mask=mask_1d), image_dtype=self._image_dtype, metadata_dtype=self._metadata_dtype, contributing_image_names=tuple(self._contributing_image_names), orbit_model_name=self._orbit_model.name if self._orbit_model else None, photometric_model_name=( self._photometric_model.name if self._photometric_model is not None else None ), )