"""Body reprojection and mosaicing utilities.
This module provides the BodyMosaic class for reprojecting planetary body
images onto latitude/longitude grids and accumulating them into mosaics.
"""
import enum
import logging
import math
from dataclasses import dataclass, field
from typing import Any, Final, Literal, cast
import numpy as np
import numpy.ma as ma
import oops
import polymath
from scipy.ndimage import maximum_filter
from nav.reproj._serialization import (
infer_format,
load_fits,
load_npz,
save_fits,
save_npz,
tuple_of_strings_field,
verify_dtype,
)
from nav.reproj.photometric_model import PhotometricModel
from nav.support.image import array_zoom
from nav.support.types import NDArrayBoolType, NDArrayFloatType, NDArrayIntType, PathLike
_LOGGING_NAME = __name__
# Slop values: must be smaller than the smallest resolution we will ever use
_LATITUDE_SLOP = 1e-6 # rad
_LONGITUDE_SLOP = 1e-6 # rad
_MIN_LATITUDE = -math.pi / 2.0
_MAX_LATITUDE = math.pi / 2.0 - _LATITUDE_SLOP * 2
_MAX_LONGITUDE = math.pi * 2.0 - _LONGITUDE_SLOP * 2
# Module-level defaults for BodyMosaic parameters
DEFAULT_LAT_RESOLUTION = 0.1 * math.pi / 180.0 # 0.1 degrees in rad
DEFAULT_LON_RESOLUTION = 0.1 * math.pi / 180.0
DEFAULT_MAX_INCIDENCE: float | None = None
DEFAULT_MAX_EMISSION: float | None = None
DEFAULT_MAX_RESOLUTION: float | None = None
DEFAULT_EDGE_MARGIN = 3
DEFAULT_ZOOM = 1
# ``BodyMosaic.__init__`` numeric validation (``bool`` subclasses ``int``).
_REAL_SCALAR_TYPES = (int, float, np.integer, np.floating)
_INTEGRAL_SCALAR_TYPES = (int, np.integer)
def _validate_navigation_uncertainty(value: object) -> float:
"""Validate ``navigation_uncertainty`` for :meth:`BodyMosaic.reproject`.
Returns:
``float(value)`` when valid.
Raises:
TypeError: If ``value`` is not a real scalar (``bool`` is rejected).
ValueError: If ``value`` is not finite or is negative.
"""
if isinstance(value, bool) or not isinstance(value, _REAL_SCALAR_TYPES):
raise TypeError(f'navigation_uncertainty must be a real number, got {type(value).__name__}')
nu = float(value)
if not math.isfinite(nu) or nu < 0.0:
raise ValueError(
'navigation_uncertainty must be finite and >= 0 (effective resolution uses '
'resolution * (1.0 + navigation_uncertainty)).'
)
return nu
# Lambert is set mainly based on the depth of craters - if the incidence angle
# is too large the crater interior is too shadowed and we don't get a good
# reprojection. If the emission angle is too large we can't see over the crater
# lip.
_LAMBERT_THRESHOLD = 0.0001
class _UseMosaicLimitsSentinel:
"""Singleton marker for :meth:`BodyMosaic.add` max-* keyword defaults."""
__slots__ = ()
def __repr__(self) -> str:
return 'USE_MOSAIC_LIMITS'
USE_MOSAIC_LIMITS: Final[_UseMosaicLimitsSentinel] = _UseMosaicLimitsSentinel()
"""Pass to :meth:`BodyMosaic.add` for ``max_incidence`` / ``max_emission`` / ``max_resolution``.
Uses the :class:`BodyMosaic` constructor limits instead of per-call overrides.
"""
def _create_repro_array(n_lat: int, n_lon: int, dtype: np.dtype) -> ma.MaskedArray:
"""Return an ``n_lat`` x ``n_lon`` zero array that is fully masked (reprojection grid slot)."""
arr = ma.zeros((n_lat, n_lon), dtype=dtype)
arr.mask = True
return arr
[docs]
class BodyMosaicMergeStrategy(enum.Enum):
"""Strategy for resolving per-pixel conflicts when adding data to a BodyMosaic.
Members:
BEST_RESOLUTION: Replace an existing pixel only when the new data has
strictly better effective resolution, or if the existing pixel is
empty (masked).
"""
BEST_RESOLUTION = 'best_resolution'
[docs]
@dataclass(frozen=True)
class BodyReprojResult:
"""Data returned by BodyMosaic.reproject().
This is a :func:`dataclasses.dataclass`; each field is documented on its own
member entry below.
"""
body_name: str
img: ma.MaskedArray
lat_resolution: float
lon_resolution: float
lat_idx_range: tuple[int, int]
lon_idx_range: tuple[int, int]
latlon_type: Literal['centric', 'graphic', 'squashed']
lon_direction: Literal['east', 'west']
resolution: ma.MaskedArray
eff_resolution: ma.MaskedArray
phase: ma.MaskedArray
emission: ma.MaskedArray
incidence: ma.MaskedArray
time: float
photometric_model_name: str | None
image_dtype: np.dtype
metadata_dtype: np.dtype
image_name: str = ''
sub_solar_lon: float = 0.0
sub_solar_lat: float = 0.0
sub_observer_lon: float = 0.0
sub_observer_lat: float = 0.0
[docs]
def save(
self,
path: PathLike,
*,
format_: str | None = None,
compress: bool = True,
) -> None:
"""Save this BodyReprojResult to a file.
Parameters:
path: Output path (``str``, ``pathlib.Path``, or ``filecache.FCPath``).
The format is inferred from the extension (.npz for NumPy archive,
.fits/.fit for FITS) unless ``format_`` is given.
format_: Explicit format: ``'npz'`` or ``'fits'``. Overrides
extension-based inference.
compress: If True (default), use compressed npz. Ignored for
FITS.
Raises:
ValueError: If the format cannot be inferred or is not
supported.
Example::
result = mosaic.reproject(obs)
result.save('reprojection.npz')
result.save('reprojection.fits', format_='fits')
reloaded = BodyReprojResult.load('reprojection.npz')
"""
fmt = infer_format(path, format_)
payload: dict[str, Any] = {
'body_name': self.body_name,
'img': self.img,
'lat_resolution': self.lat_resolution,
'lon_resolution': self.lon_resolution,
'lat_idx_range': self.lat_idx_range,
'lon_idx_range': self.lon_idx_range,
'latlon_type': self.latlon_type,
'lon_direction': self.lon_direction,
'resolution': self.resolution,
'eff_resolution': self.eff_resolution,
'phase': self.phase,
'emission': self.emission,
'incidence': self.incidence,
'time': self.time,
'photometric_model_name': self.photometric_model_name,
'image_dtype': self.image_dtype,
'metadata_dtype': self.metadata_dtype,
'image_name': self.image_name,
'sub_solar_lon': self.sub_solar_lon,
'sub_solar_lat': self.sub_solar_lat,
'sub_observer_lon': self.sub_observer_lon,
'sub_observer_lat': self.sub_observer_lat,
}
if fmt == 'npz':
save_npz(path, 'BodyReprojResult', 1, payload, compress=compress)
else:
save_fits(path, 'BodyReprojResult', 1, payload)
[docs]
@classmethod
def load(
cls,
path: PathLike,
*,
format_: str | None = None,
) -> 'BodyReprojResult':
"""Load a BodyReprojResult from a file.
Parameters:
path: Input path (``str``, ``pathlib.Path``, or ``filecache.FCPath``).
format_: Explicit format (``'npz'`` or ``'fits'``). If None,
inferred from the file extension.
Returns:
A new BodyReprojResult with the loaded data.
Raises:
ValueError: If the file's kind does not match, or if any array
dtype does not match the declared ``image_dtype`` /
``metadata_dtype``.
Example::
result = BodyReprojResult.load('reprojection.npz')
"""
fmt = infer_format(path, format_)
d = (
load_npz(path, 'BodyReprojResult')
if fmt == 'npz'
else load_fits(path, 'BodyReprojResult')
)
image_dtype = np.dtype(str(d['image_dtype']))
metadata_dtype = np.dtype(str(d['metadata_dtype']))
verify_dtype(
{
k: d[k]
for k in ('img', 'resolution', 'eff_resolution', 'phase', 'emission', 'incidence')
},
image_dtype=image_dtype,
metadata_dtype=metadata_dtype,
image_fields=['img'],
metadata_fields=['resolution', 'eff_resolution', 'phase', 'emission', 'incidence'],
)
lat_idx = d['lat_idx_range']
lat_idx_range: tuple[int, int] = (int(lat_idx[0]), int(lat_idx[1]))
lon_idx = d['lon_idx_range']
lon_idx_range: tuple[int, int] = (int(lon_idx[0]), int(lon_idx[1]))
phot = d.get('photometric_model_name')
photometric_model_name: str | None
if phot is not None and str(phot) != 'None':
photometric_model_name = str(phot)
else:
photometric_model_name = None
latlon_type_val = str(d['latlon_type'])
if latlon_type_val not in ('centric', 'graphic', 'squashed'):
raise ValueError(
f"latlon_type {latlon_type_val!r} is not one of 'centric', 'graphic', 'squashed'"
)
lon_direction_val = str(d['lon_direction'])
if lon_direction_val not in ('east', 'west'):
raise ValueError(f"lon_direction {lon_direction_val!r} is not one of 'east', 'west'")
return cls(
body_name=str(d['body_name']),
img=d['img'],
lat_resolution=float(d['lat_resolution']),
lon_resolution=float(d['lon_resolution']),
lat_idx_range=lat_idx_range,
lon_idx_range=lon_idx_range,
# latlon_type_val / lon_direction_val: validated against allowed literals above;
# mypy cannot narrow str to the Literal field types.
latlon_type=latlon_type_val, # type: ignore[arg-type]
lon_direction=lon_direction_val, # type: ignore[arg-type]
resolution=d['resolution'],
eff_resolution=d['eff_resolution'],
phase=d['phase'],
emission=d['emission'],
incidence=d['incidence'],
time=float(d['time']),
photometric_model_name=photometric_model_name,
image_dtype=image_dtype,
metadata_dtype=metadata_dtype,
image_name=str(d.get('image_name', '') or ''),
sub_solar_lon=float(d.get('sub_solar_lon', 0.0)),
sub_solar_lat=float(d.get('sub_solar_lat', 0.0)),
sub_observer_lon=float(d.get('sub_observer_lon', 0.0)),
sub_observer_lat=float(d.get('sub_observer_lat', 0.0)),
)
[docs]
@dataclass(frozen=True)
class BodyMosaicData:
"""Mosaic data returned by BodyMosaic retrieval methods.
This is a :func:`dataclasses.dataclass`; each field is documented on its own
member entry below.
"""
body_name: str
img: ma.MaskedArray
lat_resolution: float
lon_resolution: float
lat_range: tuple[float, float]
lon_range: tuple[float, float]
latlon_type: Literal['centric', 'graphic', 'squashed']
lon_direction: Literal['east', 'west']
resolution: ma.MaskedArray
eff_resolution: ma.MaskedArray
phase: ma.MaskedArray
emission: ma.MaskedArray
incidence: ma.MaskedArray
time: ma.MaskedArray
image_number: ma.MaskedArray
photometric_model_name: str | None
image_dtype: np.dtype
metadata_dtype: np.dtype
contributing_image_names: tuple[str, ...] = ()
sub_solar_lon_per_image: np.ndarray = field(
default_factory=lambda: np.empty((0,), dtype=np.float64)
)
sub_solar_lat_per_image: np.ndarray = field(
default_factory=lambda: np.empty((0,), dtype=np.float64)
)
sub_observer_lon_per_image: np.ndarray = field(
default_factory=lambda: np.empty((0,), dtype=np.float64)
)
sub_observer_lat_per_image: np.ndarray = field(
default_factory=lambda: np.empty((0,), dtype=np.float64)
)
[docs]
def save(
self,
path: PathLike,
*,
format_: str | None = None,
compress: bool = True,
) -> None:
"""Save this BodyMosaicData to a file.
Parameters:
path: Output path (``str``, ``pathlib.Path``, or ``filecache.FCPath``).
The format is inferred from the extension (.npz for NumPy archive,
.fits/.fit for FITS) unless ``format_`` is given.
format_: Explicit format: ``'npz'`` or ``'fits'``. Overrides
extension-based inference.
compress: If True (default), use compressed npz. Ignored for
FITS.
Raises:
ValueError: If the format cannot be inferred or is not
supported.
Example::
data = mosaic.to_bounded()
data.save('mimas.npz')
data.save('mimas.fits', format_='fits')
data.save('mimas.npz', compress=False)
reloaded = BodyMosaicData.load('mimas.npz')
"""
fmt = infer_format(path, format_)
payload: dict[str, Any] = {
'body_name': self.body_name,
'img': self.img,
'lat_resolution': self.lat_resolution,
'lon_resolution': self.lon_resolution,
'lat_range': self.lat_range,
'lon_range': self.lon_range,
'latlon_type': self.latlon_type,
'lon_direction': self.lon_direction,
'resolution': self.resolution,
'eff_resolution': self.eff_resolution,
'phase': self.phase,
'emission': self.emission,
'incidence': self.incidence,
'time': self.time,
'image_number': self.image_number,
'photometric_model_name': self.photometric_model_name,
'image_dtype': self.image_dtype,
'metadata_dtype': self.metadata_dtype,
'contributing_image_names': self.contributing_image_names,
'sub_solar_lon_per_image': self.sub_solar_lon_per_image,
'sub_solar_lat_per_image': self.sub_solar_lat_per_image,
'sub_observer_lon_per_image': self.sub_observer_lon_per_image,
'sub_observer_lat_per_image': self.sub_observer_lat_per_image,
}
if fmt == 'npz':
save_npz(path, 'BodyMosaicData', 1, payload, compress=compress)
else:
save_fits(path, 'BodyMosaicData', 1, payload)
[docs]
@classmethod
def load(
cls,
path: PathLike,
*,
format_: str | None = None,
) -> 'BodyMosaicData':
"""Load a BodyMosaicData from a file.
Parameters:
path: Input path (``str``, ``pathlib.Path``, or ``filecache.FCPath``).
format_: Explicit format (``'npz'`` or ``'fits'``). If None,
inferred from the file extension.
Returns:
A new BodyMosaicData with the loaded data.
Raises:
ValueError: If the file's kind does not match, or if any array
dtype does not match the declared ``image_dtype`` /
``metadata_dtype``, or if ``image_number`` is not uint16,
or if ``time`` is not float64.
Example::
data = BodyMosaicData.load('mimas.npz')
"""
fmt = infer_format(path, format_)
d = load_npz(path, 'BodyMosaicData') if fmt == 'npz' else load_fits(path, 'BodyMosaicData')
image_dtype = np.dtype(str(d['image_dtype']))
metadata_dtype = np.dtype(str(d['metadata_dtype']))
verify_dtype(
{
k: d[k]
for k in (
'img',
'resolution',
'eff_resolution',
'phase',
'emission',
'incidence',
'time',
'image_number',
)
},
image_dtype=image_dtype,
metadata_dtype=metadata_dtype,
image_fields=['img'],
metadata_fields=['resolution', 'eff_resolution', 'phase', 'emission', 'incidence'],
float64_fields=['time'],
)
lat_r = d['lat_range']
lat_range: tuple[float, float] = (float(lat_r[0]), float(lat_r[1]))
lon_r = d['lon_range']
lon_range: tuple[float, float] = (float(lon_r[0]), float(lon_r[1]))
phot = d.get('photometric_model_name')
photometric_model_name: str | None
if phot is not None and str(phot) != 'None':
photometric_model_name = str(phot)
else:
photometric_model_name = None
latlon_type_val = str(d['latlon_type'])
if latlon_type_val not in ('centric', 'graphic', 'squashed'):
raise ValueError(
f"latlon_type {latlon_type_val!r} is not one of 'centric', 'graphic', 'squashed'"
)
lon_direction_val = str(d['lon_direction'])
if lon_direction_val not in ('east', 'west'):
raise ValueError(f"lon_direction {lon_direction_val!r} is not one of 'east', 'west'")
contributing_image_names = tuple_of_strings_field(d.get('contributing_image_names'))
_raw_ssl_lon = d.get('sub_solar_lon_per_image')
_raw_ssl_lat = d.get('sub_solar_lat_per_image')
sub_solar_lon_per_image = (
np.asarray(_raw_ssl_lon, dtype=np.float64)
if _raw_ssl_lon is not None
else np.empty((0,), dtype=np.float64)
)
sub_solar_lat_per_image = (
np.asarray(_raw_ssl_lat, dtype=np.float64)
if _raw_ssl_lat is not None
else np.empty((0,), dtype=np.float64)
)
_raw_sol_lon = d.get('sub_observer_lon_per_image')
_raw_sol_lat = d.get('sub_observer_lat_per_image')
sub_observer_lon_per_image = (
np.asarray(_raw_sol_lon, dtype=np.float64)
if _raw_sol_lon is not None
else np.empty((0,), dtype=np.float64)
)
sub_observer_lat_per_image = (
np.asarray(_raw_sol_lat, dtype=np.float64)
if _raw_sol_lat is not None
else np.empty((0,), dtype=np.float64)
)
return cls(
body_name=str(d['body_name']),
img=d['img'],
lat_resolution=float(d['lat_resolution']),
lon_resolution=float(d['lon_resolution']),
lat_range=lat_range,
lon_range=lon_range,
# latlon_type_val / lon_direction_val: validated against allowed literals above;
# mypy cannot narrow str to the Literal field types.
latlon_type=latlon_type_val, # type: ignore[arg-type]
lon_direction=lon_direction_val, # type: ignore[arg-type]
resolution=d['resolution'],
eff_resolution=d['eff_resolution'],
phase=d['phase'],
emission=d['emission'],
incidence=d['incidence'],
time=d['time'],
image_number=d['image_number'],
photometric_model_name=photometric_model_name,
image_dtype=image_dtype,
metadata_dtype=metadata_dtype,
contributing_image_names=contributing_image_names,
sub_solar_lon_per_image=sub_solar_lon_per_image,
sub_solar_lat_per_image=sub_solar_lat_per_image,
sub_observer_lon_per_image=sub_observer_lon_per_image,
sub_observer_lat_per_image=sub_observer_lat_per_image,
)
[docs]
class BodyMosaic:
"""Reprojects and mosaics body images in latitude/longitude space.
Combines reprojected body images into a single mosaic, resolving conflicts
using a configurable merge strategy.
The coordinate grid uses the specified resolution in both latitude and
longitude. All angular values are in radians.
Internal storage uses a shifted circular buffer to handle longitude
wraparound (data spanning the 0/2*pi boundary) without allocating the
full 0..2*pi range.
Thread safety: reproject() uses oops backplane computation which is
not safe for concurrent use on the same observation. Do not call
reproject() from multiple threads on the same observation.
Example usage::
mosaic = BodyMosaic(body_name='MIMAS')
for obs in observations:
result = mosaic.reproject(obs, data=obs.data)
mosaic.add(result)
data = mosaic.to_bounded()
Attributes:
body_name: The name of the body.
"""
def __init__(
self,
*,
body_name: str,
lat_resolution: float = DEFAULT_LAT_RESOLUTION,
lon_resolution: float = DEFAULT_LON_RESOLUTION,
lat_range: tuple[float, float] | None = None,
lon_range: tuple[float, float] | None = None,
dynamic: bool = True,
max_incidence: float | None = DEFAULT_MAX_INCIDENCE,
max_emission: float | None = DEFAULT_MAX_EMISSION,
max_resolution: float | None = DEFAULT_MAX_RESOLUTION,
edge_margin: int = DEFAULT_EDGE_MARGIN,
zoom: int = DEFAULT_ZOOM,
latlon_type: Literal['centric', 'graphic', 'squashed'] = 'centric',
lon_direction: Literal['east', 'west'] = 'east',
photometric_model: PhotometricModel | None = None,
merge_strategy: BodyMosaicMergeStrategy = BodyMosaicMergeStrategy.BEST_RESOLUTION,
image_dtype: np.typing.DTypeLike = np.float64,
metadata_dtype: np.typing.DTypeLike = np.float32,
) -> None:
"""Initialize a BodyMosaic.
Parameters:
body_name: The name of the body (e.g. 'MIMAS').
lat_resolution: Latitude resolution (rad/pixel).
lon_resolution: Longitude resolution (rad/pixel).
lat_range: (min_lat, max_lat) extent of the mosaic in radians.
If None, use the full valid lat range.
lon_range: (min_lon, max_lon) longitude extent in radians.
If None, use the full valid lon range.
dynamic: If True, the internal arrays grow when add() receives data
outside the current bounds. If False, out-of-bounds data is
silently clipped to the configured lat_range / lon_range.
max_incidence: Maximum incidence angle (rad) for valid pixels. None
means no constraint.
max_emission: Maximum emission angle (rad) for valid pixels. None
means no constraint.
max_resolution: Maximum resolution (km/pixel) for valid pixels. None
means no constraint.
edge_margin: Number of edge pixels to discard (ISS images have
garbage near the edges).
zoom: Zoom factor for sub-pixel interpolation during reprojection.
latlon_type: Coordinate system for latitude and longitude. One of
'centric', 'graphic', or 'squashed'.
lon_direction: Longitude direction. One of 'east' or 'west'.
photometric_model: Photometric correction to apply during
reproject(). None (default) means no correction.
merge_strategy: How to resolve conflicts when the same pixel
appears in multiple reprojections. Defaults to
``BodyMosaicMergeStrategy.BEST_RESOLUTION``.
image_dtype: NumPy dtype for the reprojected brightness ``img``
array. Defaults to ``np.float64``.
metadata_dtype: NumPy dtype for geometry arrays (``resolution``,
``eff_resolution``, ``phase``, ``emission``, ``incidence``).
Defaults to ``np.float32``.
Raises:
TypeError: If ``lat_resolution``, ``lon_resolution``, ``zoom``,
``edge_margin``, or optional angle / resolution limits have an
invalid type.
ValueError: If latlon_type, lon_direction, merge_strategy, or any
numeric parameter is out of range.
Note:
``time`` is always stored as ``float64`` regardless of
``metadata_dtype``. ``image_number`` is always stored as
``uint16``, capping a single mosaic at 65 535 contributing images.
``add()`` raises ``OverflowError`` if that limit is exceeded.
"""
if latlon_type not in ('centric', 'graphic', 'squashed'):
raise ValueError(
f"latlon_type must be 'centric', 'graphic', or 'squashed', got {latlon_type!r}"
)
if lon_direction not in ('east', 'west'):
raise ValueError(f"lon_direction must be 'east' or 'west', got {lon_direction!r}")
if not isinstance(merge_strategy, BodyMosaicMergeStrategy):
raise ValueError(
f'merge_strategy must be a BodyMosaicMergeStrategy member, got {merge_strategy!r}'
)
if isinstance(lat_resolution, bool) or not isinstance(lat_resolution, _REAL_SCALAR_TYPES):
raise TypeError(
f'lat_resolution must be int or float, got {type(lat_resolution).__name__}'
)
if isinstance(lon_resolution, bool) or not isinstance(lon_resolution, _REAL_SCALAR_TYPES):
raise TypeError(
f'lon_resolution must be int or float, got {type(lon_resolution).__name__}'
)
lat_res_f = float(lat_resolution)
lon_res_f = float(lon_resolution)
if not math.isfinite(lat_res_f) or lat_res_f <= 0:
raise ValueError(f'lat_resolution must be finite and > 0, got {lat_resolution!r}')
if not math.isfinite(lon_res_f) or lon_res_f <= 0:
raise ValueError(f'lon_resolution must be finite and > 0, got {lon_resolution!r}')
if isinstance(zoom, bool) or not isinstance(zoom, _INTEGRAL_SCALAR_TYPES):
raise TypeError(f'zoom must be an int, got {type(zoom).__name__}')
zoom_i = int(zoom)
if zoom_i < 1:
raise ValueError(f'zoom must be >= 1, got {zoom!r}')
if isinstance(edge_margin, bool) or not isinstance(edge_margin, _INTEGRAL_SCALAR_TYPES):
raise TypeError(f'edge_margin must be an int, got {type(edge_margin).__name__}')
edge_margin_i = int(edge_margin)
if edge_margin_i < 0:
raise ValueError(f'edge_margin must be >= 0, got {edge_margin!r}')
if max_incidence is not None:
if isinstance(max_incidence, bool) or not isinstance(max_incidence, _REAL_SCALAR_TYPES):
raise TypeError(
'max_incidence must be int or float or None, got '
f'{type(max_incidence).__name__}'
)
mi = float(max_incidence)
if not math.isfinite(mi) or mi < 0.0 or mi > math.pi:
raise ValueError(
f'max_incidence must be in [0, pi] radians when set, got {max_incidence!r}'
)
if max_emission is not None:
if isinstance(max_emission, bool) or not isinstance(max_emission, _REAL_SCALAR_TYPES):
raise TypeError(
f'max_emission must be int or float or None, got {type(max_emission).__name__}'
)
me = float(max_emission)
if not math.isfinite(me) or me < 0.0 or me > math.pi:
raise ValueError(
f'max_emission must be in [0, pi] radians when set, got {max_emission!r}'
)
if max_resolution is not None:
if isinstance(max_resolution, bool) or not isinstance(
max_resolution, _REAL_SCALAR_TYPES
):
raise TypeError(
'max_resolution must be int or float or None, got '
f'{type(max_resolution).__name__}'
)
mr = float(max_resolution)
if not math.isfinite(mr) or mr <= 0.0:
raise ValueError(
'max_resolution (km/pixel) must be finite and > 0 when set, got '
f'{max_resolution!r}'
)
self.body_name = body_name
self._lat_resolution = lat_res_f
self._lon_resolution = lon_res_f
self._lat_range_init = lat_range
self._lon_range_init = lon_range
self._dynamic = dynamic
self._max_incidence = None if max_incidence is None else float(max_incidence)
self._max_emission = None if max_emission is None else float(max_emission)
self._max_resolution = None if max_resolution is None else float(max_resolution)
self._edge_margin = edge_margin_i
self._zoom = zoom_i
self._latlon_type = latlon_type
self._lon_direction = lon_direction
self._photometric_model = photometric_model
self._merge_strategy = merge_strategy
self._image_dtype: np.dtype = np.dtype(image_dtype)
self._metadata_dtype: np.dtype = np.dtype(metadata_dtype)
# Full-grid dimensions: floor(span/res)+1 so the last bin is not dropped
# (matches ``RingMosaic`` longitude count and avoids int() truncation).
self._n_full_lat = math.floor(math.pi / lat_res_f) + 1
self._n_full_lon = math.floor(2.0 * math.pi / lon_res_f) + 1
# Internal mosaic arrays (empty = not yet initialized)
self._img: NDArrayFloatType = np.empty((0, 0), dtype=self._image_dtype)
self._resolution: NDArrayFloatType = np.empty((0, 0), dtype=self._metadata_dtype)
self._eff_resolution: NDArrayFloatType = np.empty((0, 0), dtype=self._metadata_dtype)
self._phase: NDArrayFloatType = np.empty((0, 0), dtype=self._metadata_dtype)
self._emission: NDArrayFloatType = np.empty((0, 0), dtype=self._metadata_dtype)
self._incidence: NDArrayFloatType = np.empty((0, 0), dtype=self._metadata_dtype)
self._time: NDArrayFloatType = np.empty((0, 0), dtype=np.float64)
self._image_number: NDArrayIntType = np.empty((0, 0), dtype=np.uint16)
self._has_data: NDArrayBoolType = np.empty((0, 0), dtype=np.bool_)
# Current buffer dimensions
self._lat_min_bin: int = 0 # full-grid lat bin of internal row 0
self._lon_min_bin: int = 0 # full-grid lon bin of internal column 0
self._n_lat: int = 0
self._n_lon: int = 0
# Count of images added
self._image_count: int = 0
self._contributing_image_names: list[str] = []
self._sub_solar_lons: list[float] = []
self._sub_solar_lats: list[float] = []
self._sub_observer_lons: list[float] = []
self._sub_observer_lats: list[float] = []
# Pre-allocate if not dynamic, or if explicit ranges provided
if not dynamic or lat_range is not None or lon_range is not None:
self._preallocate(lat_range, lon_range)
# -----------------------------------------------------------------------
# Static grid utilities
# -----------------------------------------------------------------------
[docs]
@staticmethod
def generate_latitudes(
*,
latitude_start: float = _MIN_LATITUDE,
latitude_end: float = _MAX_LATITUDE,
lat_resolution: float = DEFAULT_LAT_RESOLUTION,
) -> NDArrayFloatType:
"""Generate a list of latitudes on resolution boundaries.
The returned latitudes are on lat_resolution grid boundaries and lie
within [latitude_start, latitude_end].
Parameters:
latitude_start: Minimum latitude (rad). Defaults to -pi/2.
latitude_end: Maximum latitude (rad). Defaults to pi/2 minus slop.
lat_resolution: Latitude resolution (rad/pixel).
Returns:
1-D array of latitudes (rad).
"""
start_idx = math.ceil(latitude_start / lat_resolution)
end_idx = math.floor(latitude_end / lat_resolution)
return np.arange(start_idx, end_idx + 1) * lat_resolution
[docs]
@staticmethod
def generate_longitudes(
*,
longitude_start: float = 0.0,
longitude_end: float = _MAX_LONGITUDE,
lon_resolution: float = DEFAULT_LON_RESOLUTION,
) -> NDArrayFloatType:
"""Generate a list of longitudes on resolution boundaries.
Parameters:
longitude_start: Minimum longitude (rad). Defaults to 0.
longitude_end: Maximum longitude (rad). Defaults to 2*pi minus slop.
lon_resolution: Longitude resolution (rad/pixel).
Returns:
1-D array of longitudes (rad).
"""
start_idx = math.ceil(longitude_start / lon_resolution)
end_idx = math.floor(longitude_end / lon_resolution)
return np.arange(start_idx, end_idx + 1) * lon_resolution
# -----------------------------------------------------------------------
# Coordinate conversion
# -----------------------------------------------------------------------
[docs]
def latitude_longitude_to_pixels(
self,
obs: Any,
latitude: Any,
longitude: Any,
) -> Any:
"""Convert latitude/longitude arrays to (U, V) image pixel coordinates.
Parameters:
obs: The Observation.
latitude: Latitude values (rad).
longitude: Longitude values (rad).
Returns:
UV pairs as a polymath UV object.
"""
latitude = polymath.Scalar.as_scalar(latitude)
longitude = polymath.Scalar.as_scalar(longitude)
if len(longitude) == 0:
return polymath.Pair(np.empty((0, 2)))
moon_surface = oops.Body.lookup(self.body_name).surface
if self._latlon_type == 'centric':
longitude = moon_surface.lon_from_centric(longitude)
latitude = moon_surface.lat_from_centric(latitude, longitude)
elif self._latlon_type == 'graphic':
longitude = moon_surface.lon_from_graphic(longitude)
latitude = moon_surface.lat_from_graphic(latitude, longitude)
if self._lon_direction == 'west':
longitude = (-longitude) % oops.TWOPI
return obs.uv_from_coords(moon_surface, (longitude, latitude))
def _body_reproj_result_outside_fov(
self,
obs: Any,
*,
mask_only: bool,
image_name: str,
navigation_uncertainty: float,
sub_solar_lon: float = 0.0,
sub_solar_lat: float = 0.0,
sub_observer_lon: float = 0.0,
sub_observer_lat: float = 0.0,
) -> BodyReprojResult:
"""Build a placeholder :class:`BodyReprojResult` when the body is off the detector.
Used when no valid latitude/longitude samples fall on the detector (outside
FOV or fully masked). Allocates only the minimal 2x2 placeholder window
(``lat_idx_range`` / ``lon_idx_range`` ``(0, 1)``). Geometry arrays are fully
masked; ``img`` is either a small fully-masked float grid or a boolean mask
grid when ``mask_only`` is True. Scalar metadata (``time``, ``image_name``, sub-solar /
sub-observer lon/lat, dtypes) is still populated so callers can merge counters
and serialize consistently with normal reprojections.
Parameters:
obs: Observation (midtime read from ``obs.midtime``).
mask_only: If True, ``img`` encodes a boolean valid-pixel mask; else a
masked float subgrid.
image_name: Label stored on the result.
navigation_uncertainty: Scales ``eff_resolution`` when not ``mask_only``.
sub_solar_lon, sub_solar_lat: Sub-solar coordinates (rad) for this obs.
sub_observer_lon, sub_observer_lat: Sub-observer coordinates (rad).
Returns:
A :class:`BodyReprojResult` with minimal lat/lon index ranges and masked
geometry fields suitable for ``BodyMosaic.add()`` bookkeeping.
"""
min_lat_pixel, max_lat_pixel = 0, 1
min_lon_pixel, max_lon_pixel = 0, 1
n_sub_lat = max_lat_pixel - min_lat_pixel + 1
n_sub_lon = max_lon_pixel - min_lon_pixel + 1
empty_ma = ma.zeros((n_sub_lat, n_sub_lon), dtype=self._metadata_dtype)
empty_ma.mask = True
if mask_only:
repro_img = np.zeros((n_sub_lat, n_sub_lon), dtype=np.bool_)
return BodyReprojResult(
body_name=self.body_name,
img=ma.MaskedArray(repro_img.astype(self._image_dtype)),
lat_resolution=self._lat_resolution,
lon_resolution=self._lon_resolution,
lat_idx_range=(min_lat_pixel, max_lat_pixel),
lon_idx_range=(min_lon_pixel, max_lon_pixel),
latlon_type=self._latlon_type,
lon_direction=self._lon_direction,
resolution=empty_ma.copy(),
eff_resolution=empty_ma.copy(),
phase=empty_ma.copy(),
emission=empty_ma.copy(),
incidence=empty_ma.copy(),
time=obs.midtime,
photometric_model_name=None,
image_dtype=self._image_dtype,
metadata_dtype=self._metadata_dtype,
image_name=image_name,
sub_solar_lon=sub_solar_lon,
sub_solar_lat=sub_solar_lat,
sub_observer_lon=sub_observer_lon,
sub_observer_lat=sub_observer_lat,
)
repro_img = _create_repro_array(n_sub_lat, n_sub_lon, self._image_dtype)
repro_res = _create_repro_array(n_sub_lat, n_sub_lon, self._metadata_dtype)
repro_phase = _create_repro_array(n_sub_lat, n_sub_lon, self._metadata_dtype)
repro_emission = _create_repro_array(n_sub_lat, n_sub_lon, self._metadata_dtype)
repro_incidence = _create_repro_array(n_sub_lat, n_sub_lon, self._metadata_dtype)
eff_res = repro_res.copy()
if navigation_uncertainty != 0.0:
eff_res = ma.MaskedArray(
eff_res.data * (1.0 + navigation_uncertainty), mask=eff_res.mask
)
phot_name = self._photometric_model.name if self._photometric_model is not None else None
return BodyReprojResult(
body_name=self.body_name,
img=repro_img.copy(),
lat_resolution=self._lat_resolution,
lon_resolution=self._lon_resolution,
lat_idx_range=(min_lat_pixel, max_lat_pixel),
lon_idx_range=(min_lon_pixel, max_lon_pixel),
latlon_type=self._latlon_type,
lon_direction=self._lon_direction,
resolution=repro_res.copy(),
eff_resolution=eff_res,
phase=repro_phase.copy(),
emission=repro_emission.copy(),
incidence=repro_incidence.copy(),
time=obs.midtime,
photometric_model_name=phot_name,
image_dtype=self._image_dtype,
metadata_dtype=self._metadata_dtype,
image_name=image_name,
sub_solar_lon=sub_solar_lon,
sub_solar_lat=sub_solar_lat,
sub_observer_lon=sub_observer_lon,
sub_observer_lat=sub_observer_lat,
)
# -----------------------------------------------------------------------
# Reprojection
# -----------------------------------------------------------------------
[docs]
def reproject(
self,
obs: Any,
*,
data: NDArrayFloatType | None = None,
navigation_uncertainty: float = 0.0,
mask_only: bool = False,
override_backplane: Any = None,
subimage_edges: tuple[int, int, int, int] | None = None,
mask_bad_areas: bool = False,
image_name: str = '',
) -> BodyReprojResult:
"""Reproject the body into a rectangular latitude/longitude space.
Parameters:
obs: The Observation (with its FOV already configured as needed).
data: Image data to reproject. Defaults to obs.data.
navigation_uncertainty: Maximum uncertainty in pixel pointing
(pixels), used to compute eff_resolution.
mask_only: If True, return only the valid-pixel boolean mask.
Other result fields will be empty masked arrays.
override_backplane: A Backplane to use instead of creating a new
one from obs.
subimage_edges: (u_min, u_max, v_min, v_max) describing the
subimage extent if override_backplane covers only a subimage.
mask_bad_areas: If True, expand bad-pixel regions before masking
to handle images with transmission errors.
image_name: Optional label stored on the result (e.g. source image stem).
Returns:
BodyReprojResult with the reprojected data.
When the body has no valid latitude/longitude on the detector (fully
outside the field of view or not projected onto the image), this returns
immediately with an empty result without building the lat/lon grid or
sampling image pixels.
Note:
The image data is taken from the zoomed, interpolated image, while
the incidence, emission, phase, and resolution are taken from the
original non-interpolated data and thus will be slightly more
coarse-grained. Backplane masks and limit tests are folded into
``ok_body_mask`` (see below); scatter uses that same mask so ``img``
is never valid where ``phase``, ``emission``, ``incidence``, or
``resolution`` would be masked or beyond configured limits.
Raises:
TypeError: If ``navigation_uncertainty`` is not a real number (``bool`` is not
accepted).
ValueError: If ``navigation_uncertainty`` is not finite or is ``< 0``.
"""
logger = logging.getLogger(_LOGGING_NAME + '.reproject')
navigation_uncertainty = _validate_navigation_uncertainty(navigation_uncertainty)
if data is None:
data = obs.data
bp = override_backplane if override_backplane is not None else oops.backplane.Backplane(obs)
sub_solar_lon = float(bp.sub_solar_longitude(self.body_name).vals)
sub_solar_lat = float(bp.sub_solar_latitude(self.body_name).vals)
sub_observer_lon = float(bp.sub_observer_longitude(self.body_name).vals)
sub_observer_lat = float(bp.sub_observer_latitude(self.body_name).vals)
if self._max_incidence is not None or not mask_only:
bp_incidence = bp.incidence_angle(self.body_name).mvals.astype(self._metadata_dtype)
if self._max_emission is not None or not mask_only or self._max_resolution is not None:
bp_emission = bp.emission_angle(self.body_name).mvals.astype(self._metadata_dtype)
if not mask_only or self._max_resolution is not None:
center_resolution = bp.center_resolution(self.body_name).vals
resolution = center_resolution / np.cos(bp_emission)
if not mask_only:
bp_phase = bp.phase_angle(self.body_name).mvals.astype(self._metadata_dtype)
bp_latitude = bp.latitude(self.body_name, lat_type=self._latlon_type)
body_mask_inv = ma.getmaskarray(bp_latitude.mvals).copy()
bp_latitude = bp_latitude.mvals.astype(self._metadata_dtype)
bp_lon_ma = bp.longitude(
self.body_name, direction=self._lon_direction, lon_type=self._latlon_type
)
lon_mask_inv = ma.getmaskarray(bp_lon_ma.mvals).copy()
bp_longitude = bp_lon_ma.mvals.astype(self._metadata_dtype)
# No surface point on the detector: skip grid construction and image sampling.
if np.all(np.logical_or(body_mask_inv, lon_mask_inv)):
logger.info(
'Body %s has no valid lat/lon on the detector (outside FOV); '
'skipping reprojection.',
self.body_name,
)
return self._body_reproj_result_outside_fov(
obs,
mask_only=mask_only,
image_name=image_name,
navigation_uncertainty=navigation_uncertainty,
sub_solar_lon=sub_solar_lon,
sub_solar_lat=sub_solar_lat,
sub_observer_lon=sub_observer_lon,
sub_observer_lat=sub_observer_lat,
)
if subimage_edges is not None:
u_min, u_max, v_min, v_max = subimage_edges
subimg = data[v_min : v_max + 1, u_min : u_max + 1]
else:
u_min = 0
v_min = 0
u_max = data.shape[1] - 1
v_max = data.shape[0] - 1
subimg = data
if not mask_only:
zero_mask = subimg == 0.0
if mask_bad_areas:
zero_mask = maximum_filter(zero_mask, 3)
edge_margin = self._edge_margin
if edge_margin > 0:
body_mask_inv[:edge_margin, :] = True
body_mask_inv[-edge_margin:, :] = True
body_mask_inv[:, :edge_margin] = True
body_mask_inv[:, -edge_margin:] = True
ok_body_mask_inv = body_mask_inv
# Invalid detector pixels: explicit MaskedArray mask bits, plus limit
# tests. Plain ``bp_* > limit`` drops mask on comparisons, so use
# ``ma.filled(..., True)`` so masked elements count as invalid.
if self._max_incidence is not None or not mask_only:
bad_inc = ma.getmaskarray(bp_incidence)
if self._max_incidence is not None:
bad_inc = np.logical_or(
bad_inc,
ma.filled(bp_incidence > self._max_incidence, True),
)
ok_body_mask_inv = np.logical_or(ok_body_mask_inv, bad_inc)
if self._max_emission is not None or not mask_only or self._max_resolution is not None:
bad_emi = ma.getmaskarray(bp_emission)
if self._max_emission is not None:
bad_emi = np.logical_or(
bad_emi,
ma.filled(bp_emission > self._max_emission, True),
)
ok_body_mask_inv = np.logical_or(ok_body_mask_inv, bad_emi)
if not mask_only or self._max_resolution is not None:
bad_res = ma.getmaskarray(resolution)
if self._max_resolution is not None:
bad_res = np.logical_or(
bad_res,
ma.filled(resolution > self._max_resolution, True),
)
ok_body_mask_inv = np.logical_or(ok_body_mask_inv, bad_res)
if not mask_only:
ok_body_mask_inv = np.logical_or(ok_body_mask_inv, ma.getmaskarray(bp_phase))
ok_body_mask_inv = np.logical_or(ok_body_mask_inv, zero_mask)
ok_body_mask = np.logical_not(ok_body_mask_inv)
empty_mask = not np.any(ok_body_mask)
latitude_pixels = self._n_full_lat
longitude_pixels = self._n_full_lon
if empty_mask:
min_lat_pixel, max_lat_pixel = 0, 1
min_lon_pixel, max_lon_pixel = 0, 1
else:
min_latitude = np.min(bp_latitude[ok_body_mask] + math.pi / 2.0)
max_latitude = np.max(bp_latitude[ok_body_mask] + math.pi / 2.0)
min_lat_pixel = int(np.floor(min_latitude / self._lat_resolution))
min_lat_pixel = np.clip(min_lat_pixel, 0, latitude_pixels - 1)
max_lat_pixel = int(np.ceil(max_latitude / self._lat_resolution))
max_lat_pixel = np.clip(max_lat_pixel, 0, latitude_pixels - 1)
min_longitude = np.min(bp_longitude[ok_body_mask])
max_longitude = np.max(bp_longitude[ok_body_mask])
min_lon_pixel = int(np.floor(min_longitude / self._lon_resolution))
min_lon_pixel = np.clip(min_lon_pixel, 0, longitude_pixels - 1)
max_lon_pixel = int(np.ceil(max_longitude / self._lon_resolution))
max_lon_pixel = np.clip(max_lon_pixel, 0, longitude_pixels - 1)
num_lat = max_lat_pixel - min_lat_pixel + 1
num_lon = max_lon_pixel - min_lon_pixel + 1
lat_bins = np.repeat(np.arange(min_lat_pixel, max_lat_pixel + 1, dtype=np.int32), num_lon)
lat_bins_act = lat_bins * self._lat_resolution - math.pi / 2.0
lon_bins = np.tile(np.arange(min_lon_pixel, max_lon_pixel + 1, dtype=np.int32), num_lat)
lon_bins_act = lon_bins * self._lon_resolution
logger.info(
'Lat Res %.3f deg / Lon Res %.3f deg / LatLon %s / LonDir %s',
math.degrees(self._lat_resolution),
math.degrees(self._lon_resolution),
self._latlon_type,
self._lon_direction,
)
if empty_mask:
logger.info('Empty body mask')
else:
logger.info(
'Latitude range %8.2f %8.2f',
math.degrees(np.min(bp_latitude[ok_body_mask])),
math.degrees(np.max(bp_latitude[ok_body_mask])),
)
logger.debug(
'Latitude bin range %8.2f %8.2f',
math.degrees(np.min(lat_bins_act)),
math.degrees(np.max(lat_bins_act)),
)
logger.info(
'Longitude range %8.2f %8.2f',
math.degrees(np.min(bp_longitude[ok_body_mask])),
math.degrees(np.max(bp_longitude[ok_body_mask])),
)
logger.debug(
'Longitude bin range %8.2f %8.2f',
math.degrees(np.min(lon_bins_act)),
math.degrees(np.max(lon_bins_act)),
)
if not mask_only:
logger.info(
'Resolution range %8.2f %8.2f',
np.min(resolution[ok_body_mask]),
np.max(resolution[ok_body_mask]),
)
logger.debug('Latitude pixel range %d %d', int(min_lat_pixel), int(max_lat_pixel))
logger.debug('Longitude pixel range %d %d', int(min_lon_pixel), int(max_lon_pixel))
uv = self.latitude_longitude_to_pixels(obs, lat_bins_act, lon_bins_act)
u_sv, v_sv = uv.to_scalars()
pixmask = ma.getmaskarray(u_sv.mvals)
u_pixels = u_sv.vals
v_pixels = v_sv.vals
goodumask = np.logical_and(u_pixels >= u_min, u_pixels <= u_max)
goodvmask = np.logical_and(v_pixels >= v_min, v_pixels <= v_max)
goodmask = goodumask & goodvmask & ~pixmask
good_u = (u_pixels[goodmask] - u_min).astype('int')
good_v = (v_pixels[goodmask] - v_min).astype('int')
good_lat = lat_bins[goodmask]
good_lon = lon_bins[goodmask]
shape = (latitude_pixels, longitude_pixels)
empty_ma = ma.zeros(shape, dtype=self._metadata_dtype)
empty_ma.mask = True
empty_sub = empty_ma[min_lat_pixel : max_lat_pixel + 1, min_lon_pixel : max_lon_pixel + 1]
if mask_only:
repro_img_full = np.zeros(shape, dtype=np.bool_)
repro_img_full[good_lat, good_lon] = True
repro_img_sub = repro_img_full[
min_lat_pixel : max_lat_pixel + 1, min_lon_pixel : max_lon_pixel + 1
]
# Return an abbreviated result; img is the bool mask on the same crop as metadata
return BodyReprojResult(
body_name=self.body_name,
img=ma.MaskedArray(repro_img_sub.astype(self._image_dtype)),
lat_resolution=self._lat_resolution,
lon_resolution=self._lon_resolution,
lat_idx_range=(min_lat_pixel, max_lat_pixel),
lon_idx_range=(min_lon_pixel, max_lon_pixel),
latlon_type=self._latlon_type,
lon_direction=self._lon_direction,
resolution=empty_sub.copy(),
eff_resolution=empty_sub.copy(),
phase=empty_sub.copy(),
emission=empty_sub.copy(),
incidence=empty_sub.copy(),
time=obs.midtime,
photometric_model_name=None,
image_dtype=self._image_dtype,
metadata_dtype=self._metadata_dtype,
image_name=image_name,
sub_solar_lon=sub_solar_lon,
sub_solar_lat=sub_solar_lat,
sub_observer_lon=sub_observer_lon,
sub_observer_lat=sub_observer_lat,
)
# Filter additional bad data
goodmask2 = subimg[good_v, good_u] != 0.0
good_u = good_u[goodmask2]
good_v = good_v[goodmask2]
good_lat = good_lat[goodmask2]
good_lon = good_lon[goodmask2]
# Lat/lon bin centers can map to (u, v) that are outside ``ok_body_mask``;
# keep scatter aligned with the same invalid mask as incidence/emission/etc.
# ``good_u`` / ``good_v`` are subimage-relative (``subimg`` = ``data`` when
# ``subimage_edges`` is None). ``ok_body_mask`` matches ``subimg`` when the
# backplane (e.g. ``override_backplane``) is built on the subimage; otherwise
# it is full-frame like ``data`` and needs ``u_min`` / ``v_min``.
if ok_body_mask.shape == subimg.shape:
ok_on_detector = ok_body_mask[good_v, good_u]
else:
ok_on_detector = ok_body_mask[good_v + v_min, good_u + u_min]
good_u = good_u[ok_on_detector]
good_v = good_v[ok_on_detector]
good_lat = good_lat[ok_on_detector]
good_lon = good_lon[ok_on_detector]
# ``good_u`` / ``good_v`` index the cropped ``subimg``; backplanes built on
# the full detector use full-frame shapes (same as ``ok_body_mask`` when it
# differs from ``subimg``). Align sampling indices with ``bp_*`` / ``resolution``.
if ok_body_mask.shape == subimg.shape:
bp_v, bp_u = good_v, good_u
else:
bp_v, bp_u = good_v + v_min, good_u + u_min
# Zoom for interpolation (always copy when zoom==1: ``subimg`` may alias
# ``obs.data``, which can be read-only mmap, and we must not mutate the obs.)
if self._zoom == 1:
zoom_data = np.array(subimg, copy=True)
else:
zoom_data = array_zoom(subimg, (self._zoom, self._zoom))
if not zoom_data.flags.writeable:
zoom_data = np.array(zoom_data, copy=True)
bad_data_mask = subimg == 0.0
if self._zoom == 1:
bad_data_mask_zoom = bad_data_mask
else:
bad_data_mask_zoom = array_zoom(bad_data_mask, (self._zoom, self._zoom))
zoom_data[bad_data_mask_zoom] = 0.0
interp_data = zoom_data[
(good_v * self._zoom).astype('int'), (good_u * self._zoom).astype('int')
]
# Apply photometric correction if requested
if self._photometric_model is not None:
inc = bp_incidence[bp_v, bp_u]
emi = bp_emission[bp_v, bp_u]
pha = bp_phase[bp_v, bp_u]
interp_data = self._photometric_model.correct(
interp_data, incidence=inc, emission=emi, phase=pha
)
repro_img_full = _create_repro_array(latitude_pixels, longitude_pixels, self._image_dtype)
repro_img_full[good_lat, good_lon] = interp_data
repro_res_full = _create_repro_array(
latitude_pixels, longitude_pixels, self._metadata_dtype
)
repro_res_full[good_lat, good_lon] = resolution[bp_v, bp_u]
repro_phase_full = _create_repro_array(
latitude_pixels, longitude_pixels, self._metadata_dtype
)
repro_phase_full[good_lat, good_lon] = bp_phase[bp_v, bp_u]
repro_emission_full = _create_repro_array(
latitude_pixels, longitude_pixels, self._metadata_dtype
)
repro_emission_full[good_lat, good_lon] = bp_emission[bp_v, bp_u]
repro_incidence_full = _create_repro_array(
latitude_pixels, longitude_pixels, self._metadata_dtype
)
repro_incidence_full[good_lat, good_lon] = bp_incidence[bp_v, bp_u]
sub = slice(min_lat_pixel, max_lat_pixel + 1), slice(min_lon_pixel, max_lon_pixel + 1)
eff_res = repro_res_full[sub].copy()
if navigation_uncertainty != 0.0:
eff_res = ma.MaskedArray(
eff_res.data * (1.0 + navigation_uncertainty), mask=eff_res.mask
)
phot_name = self._photometric_model.name if self._photometric_model is not None else None
return BodyReprojResult(
body_name=self.body_name,
img=repro_img_full[sub].copy(),
lat_resolution=self._lat_resolution,
lon_resolution=self._lon_resolution,
lat_idx_range=(min_lat_pixel, max_lat_pixel),
lon_idx_range=(min_lon_pixel, max_lon_pixel),
latlon_type=self._latlon_type,
lon_direction=self._lon_direction,
resolution=repro_res_full[sub].copy(),
eff_resolution=eff_res,
phase=repro_phase_full[sub].copy(),
emission=repro_emission_full[sub].copy(),
incidence=repro_incidence_full[sub].copy(),
time=obs.midtime,
photometric_model_name=phot_name,
image_dtype=self._image_dtype,
metadata_dtype=self._metadata_dtype,
image_name=image_name,
sub_solar_lon=sub_solar_lon,
sub_solar_lat=sub_solar_lat,
sub_observer_lon=sub_observer_lon,
sub_observer_lat=sub_observer_lat,
)
# -----------------------------------------------------------------------
# Mosaic accumulation
# -----------------------------------------------------------------------
[docs]
def add(
self,
repro: BodyReprojResult,
*,
resolution_threshold: float = 1.0,
copy_slop: int = 0,
max_incidence: float | None | _UseMosaicLimitsSentinel = USE_MOSAIC_LIMITS,
max_emission: float | None | _UseMosaicLimitsSentinel = USE_MOSAIC_LIMITS,
max_resolution: float | None | _UseMosaicLimitsSentinel = USE_MOSAIC_LIMITS,
) -> None:
"""Add a reprojected image to the mosaic.
For each valid pixel in repro, it is incorporated into the mosaic
according to the merge strategy.
Parameters:
repro: Result from reproject().
resolution_threshold: Factor by which repro's effective resolution
must exceed the mosaic's to trigger replacement. Should be
>= 1.0 (higher = stricter).
copy_slop: Number of additional pixels around each copied pixel to
also copy, reducing isolated-pixel artifacts.
max_incidence: Maximum incidence angle (rad) for pixels that may be
written. :data:`USE_MOSAIC_LIMITS` (default) uses the mosaic's
constructor value; an explicit ``float`` overrides for this call
only; ``None`` disables the incidence cutoff for this call.
max_emission: Same pattern for emission angle (rad).
max_resolution: Same pattern for center resolution in km/pixel
(compared to ``repro.resolution``, as in :meth:`reproject`).
Raises:
OverflowError: If the number of images added would exceed the
uint16 maximum of 65 535.
ValueError: If repro's resolution, coordinate system, or
photometric model name does not match the mosaic's configuration.
"""
if self._image_count > np.iinfo(np.uint16).max:
raise OverflowError(
f'image_count {self._image_count} exceeds uint16 max '
f'{np.iinfo(np.uint16).max}; cannot add more images'
)
self._validate_repro(repro)
lat_min = repro.lat_idx_range[0]
lat_max = repro.lat_idx_range[1]
lon_min = repro.lon_idx_range[0]
lon_max = repro.lon_idx_range[1]
# Expand internal arrays to include the reprojection range if dynamic
if self._dynamic:
self._expand_to_include(lat_min, lat_max, lon_min, lon_max)
elif self._n_lat == 0:
# dynamic=False but not yet allocated (shouldn't happen after _preallocate)
return
# Map repro lat/lon bins to internal row/col indices
repro_lat_bins = np.arange(lat_min, lat_max + 1, dtype=np.int32)
repro_lon_bins = np.arange(lon_min, lon_max + 1, dtype=np.int32)
all_lat_2d, all_lon_2d = np.meshgrid(repro_lat_bins, repro_lon_bins, indexing='ij')
all_lat: NDArrayIntType = all_lat_2d.ravel()
all_lon: NDArrayIntType = all_lon_2d.ravel()
# Valid pixels in the reprojection
repro_valid = ~ma.getmaskarray(repro.img).ravel()
valid_lat = all_lat[repro_valid]
valid_lon = all_lon[repro_valid]
if len(valid_lat) == 0:
self._contributing_image_names.append(repro.image_name)
self._sub_solar_lons.append(repro.sub_solar_lon)
self._sub_solar_lats.append(repro.sub_solar_lat)
self._sub_observer_lons.append(repro.sub_observer_lon)
self._sub_observer_lats.append(repro.sub_observer_lat)
self._image_count += 1
return
# Map to internal indices
rows = valid_lat - self._lat_min_bin
cols = (valid_lon - self._lon_min_bin) % self._n_full_lon
# Filter to in-range (for dynamic=False or after clipping)
in_range = (rows >= 0) & (rows < self._n_lat) & (cols < self._n_lon)
rows = rows[in_range]
cols = cols[in_range]
if len(rows) == 0:
self._contributing_image_names.append(repro.image_name)
self._sub_solar_lons.append(repro.sub_solar_lon)
self._sub_solar_lats.append(repro.sub_solar_lat)
self._sub_observer_lons.append(repro.sub_observer_lon)
self._sub_observer_lats.append(repro.sub_observer_lat)
self._image_count += 1
return
# Build flat index into repro arrays
repro_n_lon = lon_max - lon_min + 1
repro_flat = (valid_lat[in_range] - lat_min) * repro_n_lon + (valid_lon[in_range] - lon_min)
# Get repro values at valid positions
repro_img_flat = repro.img.data.ravel()[repro_flat]
repro_eff_res_flat = repro.eff_resolution.data.ravel()[repro_flat]
repro_res_flat = repro.resolution.data.ravel()[repro_flat]
repro_phase_flat = repro.phase.data.ravel()[repro_flat]
repro_emission_flat = repro.emission.data.ravel()[repro_flat]
repro_incidence_flat = repro.incidence.data.ravel()[repro_flat]
# Determine which pixels to replace based on the merge strategy.
existing_has_data = self._has_data[rows, cols]
existing_eff_res = self._eff_resolution[rows, cols]
if self._merge_strategy == BodyMosaicMergeStrategy.BEST_RESOLUTION:
replace_mask = np.logical_or(
~existing_has_data,
repro_eff_res_flat * resolution_threshold < existing_eff_res,
)
else:
raise NotImplementedError(f'Unsupported merge strategy: {self._merge_strategy!r}')
eff_mi = (
self._max_incidence
if max_incidence is USE_MOSAIC_LIMITS
else cast(float | None, max_incidence)
)
eff_me = (
self._max_emission
if max_emission is USE_MOSAIC_LIMITS
else cast(float | None, max_emission)
)
eff_mr = (
self._max_resolution
if max_resolution is USE_MOSAIC_LIMITS
else cast(float | None, max_resolution)
)
n_cand = len(rows)
limits_ok = np.ones(n_cand, dtype=np.bool_)
if eff_mi is not None:
inc_mask = ma.getmaskarray(repro.incidence).ravel()[repro_flat]
inc_data = repro.incidence.data.ravel()[repro_flat]
bad = np.logical_or(inc_mask, ~np.isfinite(inc_data))
bad = np.logical_or(bad, inc_data > eff_mi)
limits_ok &= ~bad
if eff_me is not None:
emi_mask = ma.getmaskarray(repro.emission).ravel()[repro_flat]
emi_data = repro.emission.data.ravel()[repro_flat]
bad = np.logical_or(emi_mask, ~np.isfinite(emi_data))
bad = np.logical_or(bad, emi_data > eff_me)
limits_ok &= ~bad
if eff_mr is not None:
res_mask = ma.getmaskarray(repro.resolution).ravel()[repro_flat]
res_data = repro.resolution.data.ravel()[repro_flat]
bad = np.logical_or(res_mask, ~np.isfinite(res_data))
bad = np.logical_or(bad, res_data > eff_mr)
limits_ok &= ~bad
replace_mask = np.logical_and(replace_mask, limits_ok)
if copy_slop > 0:
# Expand the replace mask using the slop radius
replace_full = np.zeros((self._n_lat, self._n_lon), dtype=np.bool_)
replace_full[rows, cols] = replace_mask
replace_full = maximum_filter(replace_full, copy_slop * 2 + 1)
replace_mask_2d = replace_full[rows, cols]
replace_mask = replace_mask & replace_mask_2d
r_rows = rows[replace_mask]
r_cols = cols[replace_mask]
self._img[r_rows, r_cols] = repro_img_flat[replace_mask]
self._resolution[r_rows, r_cols] = repro_res_flat[replace_mask]
self._eff_resolution[r_rows, r_cols] = repro_eff_res_flat[replace_mask]
self._phase[r_rows, r_cols] = repro_phase_flat[replace_mask]
self._emission[r_rows, r_cols] = repro_emission_flat[replace_mask]
self._incidence[r_rows, r_cols] = repro_incidence_flat[replace_mask]
self._image_number[r_rows, r_cols] = self._image_count
self._time[r_rows, r_cols] = repro.time
self._has_data[r_rows, r_cols] = True
self._contributing_image_names.append(repro.image_name)
self._sub_solar_lons.append(repro.sub_solar_lon)
self._sub_solar_lats.append(repro.sub_solar_lat)
self._sub_observer_lons.append(repro.sub_observer_lon)
self._sub_observer_lats.append(repro.sub_observer_lat)
self._image_count += 1
# -----------------------------------------------------------------------
# Mosaic retrieval
# -----------------------------------------------------------------------
@property
def bounds(self) -> tuple[tuple[float, float], tuple[float, float]] | None:
"""Current data extent as ((lat_min, lat_max), (lon_min, lon_max)).
Returns the bounding box of all valid data currently in the mosaic.
Longitude bounds are in the contiguous arc order (min may be greater
than max if the data wraps around the 0/2*pi boundary).
Returns:
((lat_min, lat_max), (lon_min, lon_max)) in radians, or None if
no data has been added yet.
"""
if self._n_lat == 0 or not np.any(self._has_data):
return None
lat_min = self._lat_min_bin * self._lat_resolution - math.pi / 2.0
lat_max = (self._lat_min_bin + self._n_lat - 1) * self._lat_resolution - math.pi / 2.0
lon_min = self._lon_min_bin * self._lon_resolution
lon_max = ((self._lon_min_bin + self._n_lon - 1) % self._n_full_lon) * self._lon_resolution
return (lat_min, lat_max), (lon_min, lon_max)
[docs]
def to_bounded(
self,
*,
lat_range: tuple[float, float] | None = None,
lon_range: tuple[float, float] | None = None,
) -> BodyMosaicData:
"""Return the mosaic clipped to the given lat/lon range.
Parameters:
lat_range: (min_lat, max_lat) in radians. Defaults to current data
bounds.
lon_range: (min_lon, max_lon) in radians. May wrap around 0/2*pi
(e.g., (5.9, 0.3) means from 5.9 rad through 2*pi to 0.3 rad).
Defaults to current data bounds.
Returns:
BodyMosaicData with the clipped mosaic.
Raises:
ValueError: If no data has been added yet and no range is specified.
"""
if self._n_lat == 0 or not np.any(self._has_data):
raise ValueError('No data in mosaic; cannot return bounded view')
current_bounds = self.bounds
assert current_bounds is not None
if lat_range is None:
lat_range = current_bounds[0]
if lon_range is None:
lon_range = current_bounds[1]
# Convert to bin ranges
lat_min_bin = round((lat_range[0] + math.pi / 2.0) / self._lat_resolution)
lat_max_bin = round((lat_range[1] + math.pi / 2.0) / self._lat_resolution)
lat_min_bin = max(0, min(lat_min_bin, self._n_full_lat - 1))
lat_max_bin = max(0, min(lat_max_bin, self._n_full_lat - 1))
lon_min_bin = round(lon_range[0] / self._lon_resolution)
lon_max_bin = round(lon_range[1] / self._lon_resolution)
# Handle wraparound
if lon_min_bin <= lon_max_bin:
lon_bins = np.arange(lon_min_bin, lon_max_bin + 1)
else:
# Wraps: from lon_min to end, then from 0 to lon_max
lon_bins = np.concatenate(
[
np.arange(lon_min_bin, self._n_full_lon),
np.arange(0, lon_max_bin + 1),
]
)
lat_bins = np.arange(lat_min_bin, lat_max_bin + 1)
n_out_lat = len(lat_bins)
n_out_lon = len(lon_bins)
return self._extract_region(lat_bins, lon_bins, n_out_lat, n_out_lon, lat_range, lon_range)
[docs]
def to_full(self) -> BodyMosaicData:
"""Return the mosaic as a full latitude x longitude grid.
Returns:
BodyMosaicData covering the full -pi/2 to pi/2 x 0 to 2*pi grid.
Pixels with no data are masked.
"""
lat_range = (
0 * self._lat_resolution - math.pi / 2.0,
(self._n_full_lat - 1) * self._lat_resolution - math.pi / 2.0,
)
lon_range = (0.0, (self._n_full_lon - 1) * self._lon_resolution)
lat_bins = np.arange(self._n_full_lat)
lon_bins = np.arange(self._n_full_lon)
return self._extract_region(
lat_bins, lon_bins, self._n_full_lat, self._n_full_lon, lat_range, lon_range
)
# -----------------------------------------------------------------------
# Private helpers
# -----------------------------------------------------------------------
def _preallocate(
self,
lat_range: tuple[float, float] | None,
lon_range: tuple[float, float] | None,
) -> None:
"""Pre-allocate internal arrays based on given ranges.
When a range is None, the full valid range is used so that
``dynamic=False`` with no explicit range produces a fully-allocated
global mosaic rather than an empty one.
Parameters:
lat_range: (min_lat, max_lat) in radians, or None for full range.
lon_range: (min_lon, max_lon) in radians, or None for full range.
"""
if lat_range is not None:
lat_min_bin = math.ceil((lat_range[0] + math.pi / 2.0) / self._lat_resolution)
lat_max_bin = math.floor((lat_range[1] + math.pi / 2.0) / self._lat_resolution)
lat_min_bin = max(0, lat_min_bin)
lat_max_bin = min(self._n_full_lat - 1, lat_max_bin)
n_lat = lat_max_bin - lat_min_bin + 1
else:
lat_min_bin = 0
lat_max_bin = self._n_full_lat - 1
n_lat = self._n_full_lat
if lon_range is not None:
lon_min_bin = math.ceil(lon_range[0] / self._lon_resolution)
lon_max_bin = math.floor(lon_range[1] / self._lon_resolution)
lon_min_bin = max(0, lon_min_bin)
lon_max_bin = min(self._n_full_lon - 1, lon_max_bin)
n_lon = lon_max_bin - lon_min_bin + 1
else:
lon_min_bin = 0
lon_max_bin = self._n_full_lon - 1
n_lon = self._n_full_lon
self._lat_min_bin = lat_min_bin
self._lon_min_bin = lon_min_bin
self._n_lat = n_lat
self._n_lon = n_lon
if n_lat > 0 and n_lon > 0:
self._allocate(n_lat, n_lon)
def _allocate(self, n_lat: int, n_lon: int) -> None:
"""Allocate fresh internal arrays of the given shape."""
self._img = np.zeros((n_lat, n_lon), dtype=self._image_dtype)
self._resolution = np.zeros((n_lat, n_lon), dtype=self._metadata_dtype)
self._eff_resolution = np.zeros((n_lat, n_lon), dtype=self._metadata_dtype)
self._phase = np.zeros((n_lat, n_lon), dtype=self._metadata_dtype)
self._emission = np.zeros((n_lat, n_lon), dtype=self._metadata_dtype)
self._incidence = np.zeros((n_lat, n_lon), dtype=self._metadata_dtype)
self._time = np.zeros((n_lat, n_lon), dtype=np.float64)
self._image_number = np.zeros((n_lat, n_lon), dtype=np.uint16)
self._has_data = np.zeros((n_lat, n_lon), dtype=np.bool_)
def _expand_to_include(self, lat_min: int, lat_max: int, lon_min: int, lon_max: int) -> None:
"""Expand internal arrays to cover the given lat/lon bin range."""
if self._n_lat == 0:
# First allocation
self._lat_min_bin = lat_min
self._lon_min_bin = lon_min
self._n_lat = lat_max - lat_min + 1
self._n_lon = lon_max - lon_min + 1
self._allocate(self._n_lat, self._n_lon)
return
self._expand_lat(lat_min, lat_max)
self._expand_lon(lon_min, lon_max)
def _expand_lat(self, new_lat_min: int, new_lat_max: int) -> None:
"""Expand latitude dimension to include [new_lat_min, new_lat_max]."""
old_lat_min = self._lat_min_bin
old_lat_max = old_lat_min + self._n_lat - 1
new_min = min(old_lat_min, new_lat_min)
new_max = max(old_lat_max, new_lat_max)
if new_min == old_lat_min and new_max == old_lat_max:
return # No expansion needed
n_new = new_max - new_min + 1
row_offset = old_lat_min - new_min
new_img = np.zeros((n_new, self._n_lon), dtype=self._image_dtype)
new_res = np.zeros((n_new, self._n_lon), dtype=self._metadata_dtype)
new_eff = np.zeros((n_new, self._n_lon), dtype=self._metadata_dtype)
new_phase = np.zeros((n_new, self._n_lon), dtype=self._metadata_dtype)
new_emi = np.zeros((n_new, self._n_lon), dtype=self._metadata_dtype)
new_inc = np.zeros((n_new, self._n_lon), dtype=self._metadata_dtype)
new_time = np.zeros((n_new, self._n_lon), dtype=np.float64)
new_imgnum = np.zeros((n_new, self._n_lon), dtype=np.uint16)
new_has = np.zeros((n_new, self._n_lon), dtype=np.bool_)
sl = slice(row_offset, row_offset + self._n_lat)
new_img[sl] = self._img
new_res[sl] = self._resolution
new_eff[sl] = self._eff_resolution
new_phase[sl] = self._phase
new_emi[sl] = self._emission
new_inc[sl] = self._incidence
new_time[sl] = self._time
new_imgnum[sl] = self._image_number
new_has[sl] = self._has_data
self._img = new_img
self._resolution = new_res
self._eff_resolution = new_eff
self._phase = new_phase
self._emission = new_emi
self._incidence = new_inc
self._time = new_time
self._image_number = new_imgnum
self._has_data = new_has
self._lat_min_bin = new_min
self._n_lat = n_new
def _expand_lon(self, new_lon_min: int, new_lon_max: int) -> None:
"""Expand longitude circular buffer to include [new_lon_min, new_lon_max]."""
# Check if already fully covered
new_min_col = (new_lon_min - self._lon_min_bin) % self._n_full_lon
new_max_col = (new_lon_max - self._lon_min_bin) % self._n_full_lon
if new_min_col < self._n_lon and new_max_col < self._n_lon:
return # Both already in range
# Determine cost of extending right vs left
# Right: keep _lon_min_bin, extend to cover max_col_needed
max_col_needed = max(self._n_lon - 1, new_min_col, new_max_col)
right_cost = max_col_needed + 1 - self._n_lon
# Left: slide start to new_lon_min
shift = (self._lon_min_bin - new_lon_min) % self._n_full_lon
old_max_col_from_new_start = shift + self._n_lon - 1
new_max_col_from_new_start = (new_lon_max - new_lon_min) % self._n_full_lon
max_col_from_new_start = max(old_max_col_from_new_start, new_max_col_from_new_start)
left_cost = shift
if right_cost <= left_cost:
# Extend right
n_new = max_col_needed + 1
self._expand_lon_impl(shift_left=0, n_new=n_new)
else:
# Extend left
n_new = max_col_from_new_start + 1
self._expand_lon_impl(shift_left=shift, n_new=n_new, new_lon_min=new_lon_min)
def _expand_lon_impl(
self,
*,
shift_left: int,
n_new: int,
new_lon_min: int | None = None,
) -> None:
"""Perform the actual longitude array reallocation.
Parameters:
shift_left: Number of new columns to insert at the left (beginning).
n_new: Total new number of columns.
new_lon_min: New longitude min bin (only set when shifting left).
"""
new_img = np.zeros((self._n_lat, n_new), dtype=self._image_dtype)
new_res = np.zeros((self._n_lat, n_new), dtype=self._metadata_dtype)
new_eff = np.zeros((self._n_lat, n_new), dtype=self._metadata_dtype)
new_phase = np.zeros((self._n_lat, n_new), dtype=self._metadata_dtype)
new_emi = np.zeros((self._n_lat, n_new), dtype=self._metadata_dtype)
new_inc = np.zeros((self._n_lat, n_new), dtype=self._metadata_dtype)
new_time = np.zeros((self._n_lat, n_new), dtype=np.float64)
new_imgnum = np.zeros((self._n_lat, n_new), dtype=np.uint16)
new_has = np.zeros((self._n_lat, n_new), dtype=np.bool_)
sl = slice(shift_left, shift_left + self._n_lon)
new_img[:, sl] = self._img
new_res[:, sl] = self._resolution
new_eff[:, sl] = self._eff_resolution
new_phase[:, sl] = self._phase
new_emi[:, sl] = self._emission
new_inc[:, sl] = self._incidence
new_time[:, sl] = self._time
new_imgnum[:, sl] = self._image_number
new_has[:, sl] = self._has_data
self._img = new_img
self._resolution = new_res
self._eff_resolution = new_eff
self._phase = new_phase
self._emission = new_emi
self._incidence = new_inc
self._time = new_time
self._image_number = new_imgnum
self._has_data = new_has
self._n_lon = n_new
if new_lon_min is not None:
self._lon_min_bin = new_lon_min
def _extract_region(
self,
lat_bins: NDArrayIntType,
lon_bins: NDArrayIntType,
n_out_lat: int,
n_out_lon: int,
lat_range: tuple[float, float],
lon_range: tuple[float, float],
) -> BodyMosaicData:
"""Extract a rectangular region from the internal arrays.
Parameters:
lat_bins: Full-grid latitude bin indices to extract.
lon_bins: Full-grid longitude bin indices to extract.
n_out_lat: Number of latitude rows in output.
n_out_lon: Number of longitude columns in output.
lat_range: (min, max) latitude extent in radians.
lon_range: (min, max) longitude extent in radians.
Returns:
BodyMosaicData for the specified region.
"""
out_img = np.zeros((n_out_lat, n_out_lon), dtype=self._image_dtype)
out_res = np.zeros((n_out_lat, n_out_lon), dtype=self._metadata_dtype)
out_eff = np.zeros((n_out_lat, n_out_lon), dtype=self._metadata_dtype)
out_phase = np.zeros((n_out_lat, n_out_lon), dtype=self._metadata_dtype)
out_emi = np.zeros((n_out_lat, n_out_lon), dtype=self._metadata_dtype)
out_inc = np.zeros((n_out_lat, n_out_lon), dtype=self._metadata_dtype)
out_time = np.zeros((n_out_lat, n_out_lon), dtype=np.float64)
out_imgnum = np.zeros((n_out_lat, n_out_lon), dtype=np.uint16)
out_has = np.zeros((n_out_lat, n_out_lon), dtype=np.bool_)
if self._has_data.any():
cols = (lon_bins - self._lon_min_bin) % self._n_full_lon
rows = lat_bins - self._lat_min_bin
col_valid = cols < self._n_lon
row_valid = (rows >= 0) & (rows < self._n_lat)
cols_c = np.clip(cols, 0, self._n_lon - 1)
rows_c = np.clip(rows, 0, self._n_lat - 1)
valid = (
row_valid[:, np.newaxis]
& col_valid[np.newaxis, :]
& self._has_data[rows_c[:, np.newaxis], cols_c[np.newaxis, :]]
)
out_r, out_c = np.nonzero(valid)
src_r = rows_c[out_r]
src_c = cols_c[out_c]
out_img[out_r, out_c] = self._img[src_r, src_c]
out_res[out_r, out_c] = self._resolution[src_r, src_c]
out_eff[out_r, out_c] = self._eff_resolution[src_r, src_c]
out_phase[out_r, out_c] = self._phase[src_r, src_c]
out_emi[out_r, out_c] = self._emission[src_r, src_c]
out_inc[out_r, out_c] = self._incidence[src_r, src_c]
out_time[out_r, out_c] = self._time[src_r, src_c]
out_imgnum[out_r, out_c] = self._image_number[src_r, src_c]
out_has[out_r, out_c] = True
mask = ~out_has
phot_name = self._photometric_model.name if self._photometric_model is not None else None
return BodyMosaicData(
body_name=self.body_name,
img=ma.MaskedArray(out_img, mask=mask),
lat_resolution=self._lat_resolution,
lon_resolution=self._lon_resolution,
lat_range=lat_range,
lon_range=lon_range,
latlon_type=self._latlon_type,
lon_direction=self._lon_direction,
resolution=ma.MaskedArray(out_res, mask=mask),
eff_resolution=ma.MaskedArray(out_eff, mask=mask),
phase=ma.MaskedArray(out_phase, mask=mask),
emission=ma.MaskedArray(out_emi, mask=mask),
incidence=ma.MaskedArray(out_inc, mask=mask),
time=ma.MaskedArray(out_time, mask=mask),
image_number=ma.MaskedArray(out_imgnum, mask=mask),
photometric_model_name=phot_name,
image_dtype=self._image_dtype,
metadata_dtype=self._metadata_dtype,
contributing_image_names=tuple(self._contributing_image_names),
sub_solar_lon_per_image=np.array(self._sub_solar_lons, dtype=np.float64),
sub_solar_lat_per_image=np.array(self._sub_solar_lats, dtype=np.float64),
sub_observer_lon_per_image=np.array(self._sub_observer_lons, dtype=np.float64),
sub_observer_lat_per_image=np.array(self._sub_observer_lats, dtype=np.float64),
)
def _validate_repro(self, repro: BodyReprojResult) -> None:
"""Validate that repro is compatible with this mosaic's configuration."""
if repro.body_name != self.body_name:
raise ValueError(
f'body_name mismatch: mosaic={self.body_name!r}, repro={repro.body_name!r}'
)
if abs(repro.lat_resolution - self._lat_resolution) > 1e-12:
raise ValueError(
f'lat_resolution mismatch: mosaic={self._lat_resolution}, '
f'repro={repro.lat_resolution}'
)
if abs(repro.lon_resolution - self._lon_resolution) > 1e-12:
raise ValueError(
f'lon_resolution mismatch: mosaic={self._lon_resolution}, '
f'repro={repro.lon_resolution}'
)
if repro.latlon_type != self._latlon_type:
raise ValueError(
f'latlon_type mismatch: mosaic={self._latlon_type!r}, repro={repro.latlon_type!r}'
)
if repro.lon_direction != self._lon_direction:
raise ValueError(
f'lon_direction mismatch: mosaic={self._lon_direction!r}, '
f'repro={repro.lon_direction!r}'
)
mosaic_phot_name = (
self._photometric_model.name if self._photometric_model is not None else None
)
if repro.photometric_model_name != mosaic_phot_name:
raise ValueError(
f'photometric_model_name mismatch: mosaic={mosaic_phot_name!r}, '
f'repro={repro.photometric_model_name!r}'
)