"""Per-instrument configuration consumed by the orchestrator.
Each ``ObsSnapshotInst`` carries its already-resolved per-camera config
mapping on ``obs.inst_config`` (set by ``ObsInst.from_file``). This
module reads the ``data_units``, ``noise``, and
``image_quality_thresholds`` blocks defined in
``config_4N0_inst_*.yaml`` and returns the values the orchestrator
needs.
The returned ``InstrumentSettings`` is a plain dataclass so the
orchestrator can branch on ``data_units`` without re-reading raw YAML.
"""
from __future__ import annotations
import math
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Literal, cast
from nav.nav_orchestrator.image_classifier import ImageQualityThresholds
__all__ = [
'InstrumentSettings',
'instrument_settings_from_obs',
]
DataUnits = Literal['raw_dn', 'calibrated_if']
"""The two recognised per-instrument data-unit kinds.
``raw_dn`` instruments expose pixels in raw analog-to-digital counts; their
saturation, blank, and noise thresholds are quoted in DN. ``calibrated_if``
instruments expose pixels in I/F (incidence-corrected reflectance);
DN-keyed thresholds are unavailable and the I/F variants are used instead.
"""
[docs]
@dataclass(frozen=True)
class InstrumentSettings:
"""Per-instrument settings the orchestrator needs at navigate time.
Parameters:
data_units: ``raw_dn`` or ``calibrated_if`` per
``config_4N0_inst_*.yaml``.
saturation_dn: Per-instrument saturation DN; ``None`` for
calibrated-IF instruments.
marker_value: Missing-data sentinel value (``0`` for most raw
instruments; ``NaN`` for calibrated-IF).
thresholds: Image-quality thresholds for the classifier. All
values normalised to the appropriate units (DN for
``raw_dn``, I/F for ``calibrated_if``).
fit_camera_rotation: Per-camera flag enabling 3-DoF technique
fits.
max_rotation_deg: Maximum allowed rotation magnitude when
``fit_camera_rotation`` is True.
"""
data_units: DataUnits
saturation_dn: float | None
marker_value: float
thresholds: ImageQualityThresholds
fit_camera_rotation: bool
max_rotation_deg: float
def _coerce_marker_value(value: Any) -> float:
"""Map a YAML marker_value entry to a float (``NaN`` permitted)."""
if value is None:
return float('nan')
if isinstance(value, str):
if value.strip().lower() == 'nan':
return float('nan')
raise ValueError(f'noise.marker_value string {value!r} must be "NaN" or numeric')
return float(value)
def _required_float(block: Any, key: str, *, location: str) -> float:
"""Read a required finite numeric field from a YAML mapping.
Raises ``ValueError`` with a path-aware error message when the key
is missing, non-numeric, or non-finite (NaN / +-Inf). Non-finite
config values would propagate into thresholds the orchestrator
compares against pixel data, so they are rejected at load time.
"""
if block is None or key not in block:
raise ValueError(f'{location} missing required field {key!r}')
value = block[key]
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ValueError(f'{location}.{key} must be numeric; got {type(value).__name__}={value!r}')
coerced = float(value)
if not math.isfinite(coerced):
raise ValueError(f'{location}.{key} must be a finite numeric value; got {value!r}')
return coerced
[docs]
def instrument_settings_from_obs(obs: Any) -> InstrumentSettings:
"""Build the orchestrator's per-instrument settings from an obs.
Reads ``obs.inst_config`` (an ``AttrDict`` populated by
``ObsInst.from_file``) and returns a frozen ``InstrumentSettings``
instance. When ``inst_config`` is ``None`` (test fixtures, simulated
obs without per-instrument wiring), defaults appropriate for an
untyped ``raw_dn`` instrument are returned: ``saturation_dn=None``
(no saturation mask) and the standard ``ImageQualityThresholds``
defaults so the legacy code path stays unchanged.
Parameters:
obs: An ``ObsSnapshotInst`` (or a stand-in carrying the
``inst_config`` attribute).
Returns:
An ``InstrumentSettings`` instance.
Raises:
ValueError: If ``inst_config`` is supplied but missing
required fields, or carries an unrecognised ``data_units``
value.
"""
inst_config = getattr(obs, 'inst_config', None)
if inst_config is None:
return InstrumentSettings(
data_units='raw_dn',
saturation_dn=None,
marker_value=0.0,
thresholds=ImageQualityThresholds(),
fit_camera_rotation=False,
max_rotation_deg=5.0,
)
if not isinstance(inst_config, Mapping):
raise TypeError(
f'obs.inst_config must be a mapping (dict / AttrDict); got {type(inst_config).__name__}'
)
data_units_raw = inst_config.get('data_units')
if not isinstance(data_units_raw, str) or data_units_raw not in (
'raw_dn',
'calibrated_if',
):
raise ValueError(
'instrument config missing required field data_units '
f"(must be 'raw_dn' or 'calibrated_if'); got {data_units_raw!r}"
)
data_units: DataUnits = cast(DataUnits, data_units_raw)
noise = inst_config.get('noise')
iqt_block = inst_config.get('image_quality_thresholds')
if not isinstance(iqt_block, Mapping):
raise ValueError(
'instrument config missing required image_quality_thresholds block '
f'(or block is not a mapping); got {type(iqt_block).__name__}'
)
if noise is not None and not isinstance(noise, Mapping):
raise TypeError(
f'instrument config noise block must be a mapping; got {type(noise).__name__}'
)
if data_units == 'raw_dn':
if noise is None:
raise ValueError(
"instrument config with data_units='raw_dn' missing required noise block"
)
saturation_dn: float | None = _required_float(noise, 'saturation_dn', location='noise')
marker_value = _coerce_marker_value(noise.get('marker_value', 0))
thresholds = ImageQualityThresholds(
saturation_threshold_dn=_required_float(
iqt_block,
'saturation_threshold_dn',
location='image_quality_thresholds',
),
missing_data_marker_dn=marker_value,
max_saturation_frac_clean=float(iqt_block.get('max_overexposed_frac_clean', 0.80)),
max_missing_frac_clean=float(iqt_block.get('max_missing_frac_clean', 0.30)),
partial_dropout_min_frac=float(iqt_block.get('partial_dropout_min_frac', 0.05)),
blank_max_dn=_required_float(
iqt_block, 'blank_max_dn', location='image_quality_thresholds'
),
noisy_threshold=_required_float(
iqt_block, 'noisy_threshold_dn', location='image_quality_thresholds'
),
)
else:
# calibrated_if: DN-keyed fields are unavailable, and the
# saturation gate is intentionally disabled. An I/F-keyed
# saturation threshold is meaningless — the same physical
# full-well DN maps to a different I/F value for every
# combination of exposure time, filter, and gain — so we never
# compute one. The orchestrator emits an empty saturation mask
# (see ``_build_saturation_mask``) and the classifier is
# handed an ``inf`` threshold so ``saturation_frac`` is always
# 0.0 and the ``fully_overexposed`` early-out cannot fire.
# Reject any explicit ``saturation_threshold_if`` so stale
# configs surface immediately rather than silently no-op.
marker_value = _coerce_marker_value(
noise.get('marker_value', float('nan')) if noise is not None else float('nan')
)
saturation_dn = None
if 'saturation_threshold_if' in iqt_block:
raise ValueError(
'calibrated_if instrument must not declare '
'image_quality_thresholds.saturation_threshold_if; the '
'saturation gate is off for calibrated images because I/F '
'depends on exposure / filter / gain (see Phase 10 §F)'
)
thresholds = ImageQualityThresholds(
saturation_threshold_dn=math.inf,
missing_data_marker_dn=marker_value,
max_saturation_frac_clean=float(iqt_block.get('max_overexposed_frac_clean', 0.80)),
max_missing_frac_clean=float(iqt_block.get('max_missing_frac_clean', 0.30)),
partial_dropout_min_frac=float(iqt_block.get('partial_dropout_min_frac', 0.05)),
blank_max_dn=_required_float(
iqt_block, 'blank_max_if', location='image_quality_thresholds'
),
noisy_threshold=_required_float(
iqt_block, 'noisy_threshold_if', location='image_quality_thresholds'
),
)
fit_rotation = bool(inst_config.get('fit_camera_rotation', False))
max_rotation = float(inst_config.get('max_rotation_deg', 5.0))
if not math.isfinite(max_rotation) or max_rotation <= 0.0:
raise ValueError(f'max_rotation_deg must be finite and > 0; got {max_rotation!r}')
return InstrumentSettings(
data_units=data_units,
saturation_dn=saturation_dn,
marker_value=marker_value,
thresholds=thresholds,
fit_camera_rotation=fit_rotation,
max_rotation_deg=max_rotation,
)