Source code for nav.nav_orchestrator.orchestrator

"""NavOrchestrator — top-level driver for autonomous navigation.

The orchestrator turns one observation into one ``NavResult``:

1. Build a ``NavContext`` (image, masks, classifier verdict, provenance).
2. Iterate registered ``NavModel`` instances and gather features and
   annotations from each.
3. Apply the ``FeatureReliabilityGate`` to drop bad-data features.
4. Run every feasible prior-free technique on the surviving features.
5. Combine pass-1 results via the ``ensemble`` function to derive a prior.
6. Run prior-required techniques against the derived prior.
7. Combine the union of pass-1 and pass-2 results via ``ensemble``.

Glob-pattern filters at construction time let an operator restrict which
models or techniques run for debugging (``only_models='body:MIMAS'``,
``only_techniques='!StarFieldFromCatalogNav'``).
"""

from __future__ import annotations

import dataclasses
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, Literal

import numpy as np

from nav.annotation import Annotations
from nav.config import Config
from nav.feature.feature import NavFeature
from nav.feature.feature_type import NavFeatureType
from nav.feature.reliability import FeatureReliabilityGate, GatedFeatureRecord
from nav.nav_model import NavModel
from nav.nav_orchestrator.ensemble import EnsembleConfig, ensemble
from nav.nav_orchestrator.feature_summary import NavFeatureSummary
from nav.nav_orchestrator.image_classifier import (
    ImageQualityThresholds,
    NavImageClassifier,
)
from nav.nav_orchestrator.image_classifier_result import (
    ImageClass,
    NavImageClassifierResult,
)
from nav.nav_orchestrator.image_derivatives import (
    ImageDerivativesConfig,
    compute_all_image_derivatives,
)
from nav.nav_orchestrator.instrument_config import (
    InstrumentSettings,
    instrument_settings_from_obs,
)
from nav.nav_orchestrator.nav_context import NavContext
from nav.nav_orchestrator.nav_result import NavResult
from nav.nav_orchestrator.provenance import (
    Provenance,
    collect_provenance_metadata,
)
from nav.nav_orchestrator.status_reason_info import STATUS_REASON_INFO_TEMPLATE
from nav.nav_technique.nav_technique import (
    NavTechnique,
    filter_technique_names,
    technique_tier,
)
from nav.nav_technique.technique_result import NavTechniqueResult
from nav.support.filters import NavFilterKind, NavFilterSpec, apply_filter
from nav.support.image_quality import cosmic_ray_mask, saturation_mask
from nav.support.nav_base import NavBase
from nav.support.noise_estimate import estimate_image_noise_sigma
from nav.support.status_reason import NavStatusReason

if TYPE_CHECKING:  # pragma: no cover - typing-only import
    from nav.obs import ObsSnapshotInst

__all__ = [
    'NavOrchestrator',
    'OrchestratorPrep',
]


[docs] @dataclass(frozen=True) class OrchestratorPrep: """Per-image artifacts produced by :meth:`NavOrchestrator.prepare`. Bundles every prep-phase output a downstream caller might want. The autonomous :meth:`NavOrchestrator.navigate` does not use this struct (it inlines the prep so it can short-circuit on hard-failure images), but the manual-navigation driver does: it wraps the operator's chosen offset in a full :class:`NavResult` populated from ``provenance`` / ``image_classifier`` / ``feature_inventory`` / ``model_metadata`` / ``annotations`` so the manual pipeline writes the same ``_metadata.json`` and ``_summary.png`` files the autonomous pipeline does. Parameters: context: NavContext built from the observation. provenance: Reproducibility envelope shared with the autonomous pipeline. image_classifier: Image-quality classifier verdict. features: Either the gated-kept feature list (when ``prepare(..., apply_gate=True)``) or every emitted feature (when ``apply_gate=False``). The manual driver always requests the un-gated list because the operator visually overrides gate decisions. feature_inventory: Per-feature summary entries. When the gate ran, both kept and gated features are included with their ``gated`` flag set accordingly; when the gate was skipped, every entry has ``gated=False``. model_metadata: Per-NavModel diagnostic dicts keyed by model name. annotations: Merged annotation collection assembled from every built NavModel's ``to_annotations``. """ context: NavContext provenance: Provenance image_classifier: NavImageClassifierResult features: list[NavFeature] feature_inventory: list[NavFeatureSummary] model_metadata: dict[str, dict[str, Any]] annotations: Annotations
_HARD_FAILURE_TO_REASON: dict[ImageClass, NavStatusReason] = { 'blank': NavStatusReason.NO_SIGNAL_IN_IMAGE, 'fully_overexposed': NavStatusReason.IMAGE_OVEREXPOSED, 'mostly_missing_data': NavStatusReason.MISSING_DATA_DOMINANT, 'corrupt': NavStatusReason.IMAGE_CORRUPT, } """Image-classifier classes that short-circuit before any technique runs. Maps each hard-failure ``ImageClass`` to the matching ``NavStatusReason`` returned by the orchestrator's preflight. Reading from this dict is the sole admission test for the hard-failure short-circuit. """ @dataclass class _ModelRegistry: """Registry of NavModel instances built per-image by the orchestrator. Concrete NavModel subclasses do not auto-register at import time because they require an observation; the orchestrator instantiates them per image and registers them here. Parameters: models: List of constructed NavModel instances for the current image. """ models: list[NavModel] = field(default_factory=list) def filter_by_glob(self, patterns: str | list[str]) -> list[NavModel]: """Return models whose ``name`` matches the glob pattern list. Model names follow the ``prefix:VALUE`` convention (``rings:SATURN``, ``body:DIONE``, plain ``stars`` for the catalog-driven star model). This helper normalizes user patterns to that convention so the common shorthand forms match without requiring explicit globs: - ``"rings"`` is expanded to ``"rings:*"`` so a pattern missing a colon matches every namespaced model under that prefix. ``"stars"`` (which has no colon in its model name) continues to match itself directly. - The value part of ``"prefix:VALUE"`` is normalized to uppercase so ``"body:saturn"`` matches ``"body:SATURN"``. The leading-``!`` exclusion convention is preserved. """ names = [m.name for m in self.models] normalized = _normalize_model_patterns(patterns, names) kept = set(filter_technique_names(names, normalized)) return [m for m in self.models if m.name in kept] def _normalize_model_patterns(patterns: str | list[str], names: list[str]) -> list[str]: """Normalize model glob patterns to the ``prefix:VALUE`` convention. See :meth:`_ModelRegistry.filter_by_glob` for the rules. Parameters: patterns: Single pattern or list of patterns from the user. names: Available model names; used to detect whether the ``prefix:*`` expansion makes sense (any name contains ``:``). Returns: A list of normalized patterns suitable for :func:`filter_technique_names`. """ if isinstance(patterns, str): patterns = [patterns] has_namespaced_models = any(':' in n for n in names) out: list[str] = [] for raw in patterns: if raw.startswith('!'): prefix, body = '!', raw[1:] else: prefix, body = '', raw if ':' in body: head, _, tail = body.partition(':') normalized = f'{head}:{tail.upper()}' elif has_namespaced_models and body and not any(c in body for c in '*?['): # Plain prefix-only token (e.g. "rings"): expand to "rings:*" # so it matches every namespaced model under that prefix. # Names that have no namespace (like "stars") still match # via the unmodified pattern below. normalized = f'{body}:*' out.append(prefix + body) # also keep the literal for "stars" else: normalized = body out.append(prefix + normalized) return out def _feature_source_bodies(feature: NavFeature) -> frozenset[str]: """Return the set of body names a feature was emitted for. Reads the structured :attr:`NavFeature.body_name`; ring and star features have no body and yield an empty set. """ return frozenset({feature.body_name}) if feature.body_name else frozenset() def _bodies_with_non_spurious_primary( results: list[NavTechniqueResult], ) -> frozenset[str]: """Return the set of body names that have a non-spurious primary result. Reads each result's structured ``source_bodies`` (populated by the body techniques from the consumed features) rather than parsing ``feature_ids``. """ covered: set[str] = set() for r in results: if r.spurious: continue if technique_tier(r.technique_name) != 'primary': continue covered.update(r.source_bodies) return frozenset(covered) def _summary_from_feature( feature: NavFeature, *, gated: bool, gate_reason: str | None ) -> NavFeatureSummary: """Project a NavFeature down to a NavFeatureSummary.""" bbox = _bbox_from_geometry(feature) return NavFeatureSummary( feature_id=feature.feature_id, feature_type=feature.feature_type, source_model=feature.source_model, reliability=feature.reliability, gated=gated, gate_reason=gate_reason, bbox_extfov_vu=bbox, ) def _bbox_from_geometry(feature: NavFeature) -> tuple[int, int, int, int]: """Return the ``bbox_extfov_vu`` tuple for a feature's geometry payload. Every ``NavFeatureGeometry`` variant carries ``bbox_extfov_vu``; direct attribute access is safe because the union excludes any payload that lacks it. """ bbox = feature.geometry.bbox_extfov_vu return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])) def _format_reliability_breakdown(breakdown: Any) -> str: """Format a ``NavReliabilityBreakdown`` as ``key=value`` for the per-feature DEBUG log. Only fields with non-``None`` values are rendered so the line stays readable; floats round to three decimals, bools render as ``True``/``False`` directly. """ parts: list[str] = [] for f in dataclasses.fields(breakdown): value = getattr(breakdown, f.name) if value is None: continue if isinstance(value, float): parts.append(f'{f.name}={value:.3f}') else: parts.append(f'{f.name}={value}') return ', '.join(parts)