Skip to content

canvod-ops

Purpose

The canvod-ops package provides a configurable preprocessing pipeline that transforms raw GNSS observation datasets during ingestion. Operations are applied as a chain: each operation receives a dataset, transforms it, and passes the result to the next.


Pipeline

A Pipeline is an ordered sequence of operations (Op instances). It processes a dataset through each operation and collects metadata about what happened:

from canvod.ops import Pipeline, TemporalAggregate, GridAssignment

pipeline = Pipeline([
    TemporalAggregate(freq="1min", method="mean"),
    GridAssignment(grid_type="equal_area", angular_resolution=2.0),
])

ds_out, result = pipeline(ds_in)

# result.to_metadata_dict() → stored in dataset attrs for reproducibility
flowchart TD
    RAW["`**Raw Dataset**
    epoch, sid`"]
    RAW --> TA["`**TemporalAggregate**
    1-min mean`"]
    TA --> GA["`**GridAssignment**
    2-deg equal-area`"]
    GA --> OUT["`**Enriched Dataset**
    + PipelineResult`"]

Built-in Operations

TemporalAggregate

Aggregates observations into regular time bins. Reduces the number of epochs by grouping into frequency buckets and computing the mean or median.

from canvod.ops import TemporalAggregate

op = TemporalAggregate(freq="1min", method="mean")
ds_out, result = op(ds_in)
Parameter Default Description
freq "1min" Target frequency (pandas offset alias)
method "mean" Aggregation: "mean" or "median"

If the dataset is already at or coarser than the target frequency, the operation is a no-op and returns the dataset unchanged.

Per-SID independence

TemporalAggregate groups by (time_bin, sid) before computing the aggregate. This is critical because each SID (satellite + band + code) observes the canopy from a different sky position (θ, φ). Mixing VOD or SNR values across satellites within a time bin conflates spatial variability (different view angles) with temporal variability — producing a physically meaningless average.

Geometry coordinates (θ, φ) are averaged per-SID to produce the centroid of all contributing sky positions. Using .first() instead would assign an arbitrary single observation's geometry to the averaged value — misleading because it does not represent where the average came from.

Coordinate handling:

Coordinate type Example Aggregation
Data variables VOD, SNR Mean or median per (time_bin, sid)
Epoch×SID coords phi, theta Mean per (time_bin, sid) (centroid)
SID-only coords sv, band, code Preserved unchanged

Anti-pattern: naive xarray resampling

A plain ds.resample(epoch="1D").mean() preserves the sid dimension (xarray resamples along epoch only), so per-SID independence is maintained. However, it does not distinguish between data variables and geometry coordinates, and does not handle sid-only coords explicitly. For production use, prefer TemporalAggregate.

GridAssignment

Assigns each observation to a spatial grid cell based on its spherical coordinates (phi, theta). Adds a cell_id_* variable to the dataset.

from canvod.ops import GridAssignment

op = GridAssignment(grid_type="equal_area", angular_resolution=2.0)
ds_out, result = op(ds_in)
# ds_out now has cell_id_equal_area_2.0deg(epoch, sid)
Parameter Default Description
grid_type "equal_area" Grid builder name
angular_resolution 2.0 Resolution in degrees

If the dataset is missing phi or theta coordinates, the operation is skipped.


Op and OpResult

All operations inherit from the Op abstract base class:

from canvod.ops import Op, OpResult

class MyOp(Op):
    @property
    def name(self) -> str:
        return "my_operation"

    def __call__(self, ds: xr.Dataset) -> tuple[xr.Dataset, OpResult]:
        # transform ds
        return ds, OpResult(
            op_name=self.name,
            parameters={...},
            input_shape=dict(ds.sizes),
            output_shape=dict(ds.sizes),
            duration_seconds=elapsed,
        )

Each OpResult records:

  • Operation name and parameters
  • Input/output dataset dimensions
  • Execution time
  • Optional notes (e.g., "no-op: data already at target frequency")

The PipelineResult aggregates all OpResult objects and can be serialized to dataset attributes via to_metadata_dict().


Config-Driven Pipeline

The default pipeline is built from processing.yaml configuration:

preprocessing:
  temporal_aggregation:
    enabled: true
    freq: "1min"
    method: mean
  grid_assignment:
    enabled: true
    grid_type: equal_area
    angular_resolution: 2.0
from canvod.ops import build_default_pipeline

pipeline = build_default_pipeline()  # reads from config
ds_out, result = pipeline(ds_in)

Set enabled: false to skip an operation. The pipeline is empty if all operations are disabled.