Skip to content

canvod.ops API Reference

Configurable preprocessing pipeline for GNSS observation datasets.

Pipeline

canvod.ops — Preprocessing operations pipeline for GNSS VOD data.

Op

Bases: ABC

Abstract base class for a preprocessing operation.

Each Op is a callable carrying config set at construction time. At call time it is a pure Dataset -> (Dataset, OpResult) transform.

Source code in packages/canvod-ops/src/canvod/ops/base.py
25
26
27
28
29
30
31
32
33
34
35
36
37
class Op(ABC):
    """Abstract base class for a preprocessing operation.

    Each ``Op`` is a callable carrying config set at construction time.
    At call time it is a pure ``Dataset -> (Dataset, OpResult)`` transform.
    """

    @property
    @abstractmethod
    def name(self) -> str: ...

    @abstractmethod
    def __call__(self, ds: xr.Dataset) -> tuple[xr.Dataset, OpResult]: ...

OpResult dataclass

Immutable record of a single preprocessing operation.

Source code in packages/canvod-ops/src/canvod/ops/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
@dataclass(frozen=True)
class OpResult:
    """Immutable record of a single preprocessing operation."""

    op_name: str
    parameters: dict[str, Any]
    input_shape: dict[str, int]
    output_shape: dict[str, int]
    duration_seconds: float
    notes: str = ""

    def to_dict(self) -> dict[str, Any]:
        return asdict(self)

Pipeline

Ordered chain of :class:~canvod.ops.base.Op instances.

Source code in packages/canvod-ops/src/canvod/ops/pipeline.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Pipeline:
    """Ordered chain of :class:`~canvod.ops.base.Op` instances."""

    def __init__(self, ops: list[Op] | None = None) -> None:
        self._ops: list[Op] = list(ops) if ops else []

    def add(self, op: Op) -> Pipeline:
        """Append an operation and return self for chaining."""
        self._ops.append(op)
        return self

    def __call__(self, ds: xr.Dataset) -> tuple[xr.Dataset, PipelineResult]:
        t0 = time.perf_counter()
        results: list[OpResult] = []

        for op in self._ops:
            logger.info("running_op", op_name=op.name)
            ds, op_result = op(ds)
            results.append(op_result)

        total = time.perf_counter() - t0
        pr = PipelineResult(results=results, total_duration_seconds=total)

        logger.info(
            "pipeline_complete",
            n_ops=len(results),
            duration_s=round(total, 2),
        )
        return ds, pr

add(op)

Append an operation and return self for chaining.

Source code in packages/canvod-ops/src/canvod/ops/pipeline.py
36
37
38
39
def add(self, op: Op) -> Pipeline:
    """Append an operation and return self for chaining."""
    self._ops.append(op)
    return self

PipelineResult dataclass

Aggregated result from running a full pipeline.

Source code in packages/canvod-ops/src/canvod/ops/pipeline.py
15
16
17
18
19
20
21
22
23
24
25
26
27
@dataclass
class PipelineResult:
    """Aggregated result from running a full pipeline."""

    results: list[OpResult] = field(default_factory=list)
    total_duration_seconds: float = 0.0

    def to_metadata_dict(self) -> dict[str, Any]:
        """Serialise to a dict suitable for ``ds.attrs``."""
        return {
            "preprocessing_ops": [r.to_dict() for r in self.results],
            "preprocessing_total_seconds": self.total_duration_seconds,
        }

to_metadata_dict()

Serialise to a dict suitable for ds.attrs.

Source code in packages/canvod-ops/src/canvod/ops/pipeline.py
22
23
24
25
26
27
def to_metadata_dict(self) -> dict[str, Any]:
    """Serialise to a dict suitable for ``ds.attrs``."""
    return {
        "preprocessing_ops": [r.to_dict() for r in self.results],
        "preprocessing_total_seconds": self.total_duration_seconds,
    }

build_default_pipeline(config=None)

Build the default preprocessing pipeline from config.

Parameters

config : PreprocessingConfig | None Explicit config. If None, attempts to load from the user's config files via load_config(). Falls back to PreprocessingConfig() defaults if no config is available.

Returns

Pipeline Ready-to-call pipeline.

Source code in packages/canvod-ops/src/canvod/ops/registry.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def build_default_pipeline(
    config: PreprocessingConfig | None = None,
) -> Pipeline:
    """Build the default preprocessing pipeline from config.

    Parameters
    ----------
    config : PreprocessingConfig | None
        Explicit config. If ``None``, attempts to load from the user's
        config files via ``load_config()``. Falls back to
        ``PreprocessingConfig()`` defaults if no config is available.

    Returns
    -------
    Pipeline
        Ready-to-call pipeline.
    """
    if config is None:
        try:
            from canvod.utils.config import load_config

            config = load_config().processing.preprocessing
        except Exception:
            config = PreprocessingConfig()

    pipeline = Pipeline()

    if config.temporal_aggregation.enabled:
        pipeline.add(
            TemporalAggregate(
                freq=config.temporal_aggregation.freq,
                method=config.temporal_aggregation.method,
            )
        )

    if config.grid_assignment.enabled:
        pipeline.add(
            GridAssignment(
                grid_type=config.grid_assignment.grid_type,
                angular_resolution=config.grid_assignment.angular_resolution,
            )
        )

    return pipeline

Temporal

Temporal aggregation operation.

TemporalAggregate

Bases: Op

Aggregate an (epoch, sid) dataset to regular time bins.

Parameters

freq : str Target frequency as a pandas offset alias (e.g. "1min", "30s"). method : str Aggregation method: "mean" or "median".

Source code in packages/canvod-ops/src/canvod/ops/temporal.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class TemporalAggregate(Op):
    """Aggregate an ``(epoch, sid)`` dataset to regular time bins.

    Parameters
    ----------
    freq : str
        Target frequency as a pandas offset alias (e.g. ``"1min"``, ``"30s"``).
    method : str
        Aggregation method: ``"mean"`` or ``"median"``.
    """

    def __init__(self, freq: str = "1min", method: str = "mean") -> None:
        if method not in ("mean", "median"):
            msg = f"Unsupported aggregation method: {method!r}"
            raise ValueError(msg)
        self._freq = freq
        self._method = method

    @property
    def name(self) -> str:
        return "temporal_aggregate"

    def __call__(self, ds: xr.Dataset) -> tuple[xr.Dataset, OpResult]:
        t0 = time.perf_counter()
        params: dict[str, Any] = {"freq": self._freq, "method": self._method}
        input_shape = {str(k): int(v) for k, v in dict(ds.sizes).items()}

        # --- Early exit if data is already at or coarser than requested freq ---
        requested_ns = int(pd.tseries.frequencies.to_offset(self._freq).nanos)
        requested_td = pd.Timedelta(requested_ns, unit="ns")
        median_spacing = _median_epoch_spacing(ds.epoch.values)

        if median_spacing >= requested_td:
            logger.info(
                "temporal_aggregation_skipped",
                median_spacing=str(median_spacing),
                requested=str(requested_td),
            )
            result = OpResult(
                op_name=self.name,
                parameters=params,
                input_shape=input_shape,
                output_shape=input_shape,
                duration_seconds=time.perf_counter() - t0,
                notes=f"no-op: median spacing {median_spacing} >= {requested_td}",
            )
            return ds, result

        polars_freq = _convert_to_polars_freq(self._freq)

        # --- Identify coordinate roles ---
        sid_only_coords: list[str] = []
        epoch_sid_coords: list[str] = []
        for cname, coord in ds.coords.items():
            if cname in ("epoch", "sid"):
                continue
            dims = coord.dims
            if dims == ("sid",):
                sid_only_coords.append(str(cname))
            elif set(dims) == {"epoch", "sid"}:
                epoch_sid_coords.append(str(cname))

        data_var_names: list[str] = [str(v) for v in ds.data_vars]

        # --- Build long-form Polars DataFrame ---
        epoch_vals = ds.epoch.values  # datetime64
        sid_vals = ds.sid.values

        rows: dict[str, list[Any]] = {"epoch": [], "sid": []}
        agg_columns: list[str] = data_var_names + epoch_sid_coords
        for col in agg_columns:
            rows[col] = []

        for col in agg_columns:
            arr = ds[col].values if col in data_var_names else ds.coords[col].values
            # arr shape is (epoch, sid)
            rows[col] = arr.ravel().tolist()

        n_epoch, n_sid = len(epoch_vals), len(sid_vals)
        rows["epoch"] = np.repeat(epoch_vals, n_sid).tolist()
        rows["sid"] = np.tile(sid_vals, n_epoch).tolist()

        df = pl.DataFrame(rows)

        # --- Truncate + group_by ---
        df = df.with_columns(
            pl.col("epoch")
            .cast(pl.Datetime("ns"))
            .dt.truncate(polars_freq)
            .alias("time_bin")
        )

        agg_exprs = []
        for col in agg_columns:
            if self._method == "mean":
                agg_exprs.append(pl.col(col).mean().alias(col))
            else:
                agg_exprs.append(pl.col(col).median().alias(col))

        grouped = (
            df.group_by(["time_bin", "sid"]).agg(agg_exprs).sort(["time_bin", "sid"])
        )

        # --- Pivot back to (epoch, sid) ---
        new_epochs = grouped["time_bin"].unique().sort().to_numpy()
        new_sids = sid_vals  # sid dimension unchanged

        n_new_epoch = len(new_epochs)
        n_new_sid = len(new_sids)

        # Build sid index for fast lookup
        sid_to_idx = {s: i for i, s in enumerate(new_sids)}
        epoch_to_idx = {np.datetime64(e): i for i, e in enumerate(new_epochs)}

        # Pre-allocate arrays
        var_arrays: dict[str, np.ndarray] = {}
        for col in agg_columns:
            var_arrays[col] = np.full(
                (n_new_epoch, n_new_sid), np.nan, dtype=np.float64
            )

        # Fill from grouped
        g_time = grouped["time_bin"].to_numpy()
        g_sid = grouped["sid"].to_numpy()

        for col in agg_columns:
            g_vals = grouped[col].to_numpy()
            arr = var_arrays[col]
            for k in range(len(g_time)):
                ei = epoch_to_idx.get(np.datetime64(g_time[k]))
                si = sid_to_idx.get(g_sid[k])
                if ei is not None and si is not None:
                    arr[ei, si] = g_vals[k]

        # --- Rebuild xarray Dataset ---
        new_coords: dict[str, Any] = {
            "epoch": new_epochs,
            "sid": new_sids,
        }
        # Preserve sid-only coords
        for cname in sid_only_coords:
            new_coords[cname] = ds.coords[cname]

        # Add aggregated (epoch, sid) coords
        for cname in epoch_sid_coords:
            new_coords[cname] = (("epoch", "sid"), var_arrays.pop(cname))

        new_data_vars: dict[str, Any] = {}
        for vname in data_var_names:
            new_data_vars[vname] = (("epoch", "sid"), var_arrays[vname])

        out = xr.Dataset(new_data_vars, coords=new_coords, attrs=ds.attrs.copy())

        duration = time.perf_counter() - t0
        output_shape = {str(k): int(v) for k, v in dict(out.sizes).items()}

        logger.info(
            "temporal_aggregation_complete",
            input_shape=input_shape,
            output_shape=output_shape,
            duration_s=round(duration, 2),
        )

        result = OpResult(
            op_name=self.name,
            parameters=params,
            input_shape=input_shape,
            output_shape=output_shape,
            duration_seconds=duration,
        )
        return out, result

temporal_aggregate(ds, freq='1min', method='mean')

Convenience function: temporally aggregate a dataset.

Parameters

ds : xr.Dataset Input dataset with (epoch, sid) dimensions. freq : str Target frequency. method : str "mean" or "median".

Returns

xr.Dataset Aggregated dataset.

Source code in packages/canvod-ops/src/canvod/ops/temporal.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def temporal_aggregate(
    ds: xr.Dataset,
    freq: str = "1min",
    method: str = "mean",
) -> xr.Dataset:
    """Convenience function: temporally aggregate a dataset.

    Parameters
    ----------
    ds : xr.Dataset
        Input dataset with ``(epoch, sid)`` dimensions.
    freq : str
        Target frequency.
    method : str
        ``"mean"`` or ``"median"``.

    Returns
    -------
    xr.Dataset
        Aggregated dataset.
    """
    op = TemporalAggregate(freq=freq, method=method)
    out, _ = op(ds)
    return out

Grid

Grid cell assignment operation.

GridAssignment

Bases: Op

Assign each (epoch, sid) observation to a grid cell.

Parameters

grid_type : str Grid builder name (e.g. "equal_area"). angular_resolution : float Grid resolution in degrees.

Source code in packages/canvod-ops/src/canvod/ops/grid.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class GridAssignment(Op):
    """Assign each ``(epoch, sid)`` observation to a grid cell.

    Parameters
    ----------
    grid_type : str
        Grid builder name (e.g. ``"equal_area"``).
    angular_resolution : float
        Grid resolution in degrees.
    """

    def __init__(
        self,
        grid_type: str = "equal_area",
        angular_resolution: float = 2.0,
    ) -> None:
        self._grid_type = grid_type
        self._angular_resolution = angular_resolution
        self._grid = None  # lazy

    @property
    def name(self) -> str:
        return "grid_assign"

    def _get_grid(self):
        if self._grid is None:
            from canvod.grids import create_hemigrid

            self._grid = create_hemigrid(
                cast(Any, self._grid_type),
                angular_resolution=self._angular_resolution,
            )
        return self._grid

    def __call__(self, ds: xr.Dataset) -> tuple[xr.Dataset, OpResult]:
        t0 = time.perf_counter()
        params: dict[str, Any] = {
            "grid_type": self._grid_type,
            "angular_resolution": self._angular_resolution,
        }
        input_shape = {str(k): int(v) for k, v in dict(ds.sizes).items()}

        # Prerequisite check
        has_phi = "phi" in ds.coords and set(ds.coords["phi"].dims) == {"epoch", "sid"}
        has_theta = "theta" in ds.coords and set(ds.coords["theta"].dims) == {
            "epoch",
            "sid",
        }

        if not (has_phi and has_theta):
            logger.warning(
                "Grid assignment skipped — dataset missing phi/theta (epoch,sid) coords"
            )
            result = OpResult(
                op_name=self.name,
                parameters=params,
                input_shape=input_shape,
                output_shape=input_shape,
                duration_seconds=time.perf_counter() - t0,
                notes="skipped: missing phi/theta coords",
            )
            return ds, result

        grid = self._get_grid()
        grid_name = f"{self._grid_type}_{self._angular_resolution}deg"

        # Inline cell assignment using KDTree (avoids add_cell_ids_to_vod_fast
        # which assumes a "VOD" data variable exists).
        from canvod.grids.operations import _build_kdtree, _query_points

        tree = _build_kdtree(grid)
        cell_id_col = grid.grid["cell_id"].to_numpy()

        phi_vals = ds.coords["phi"].values.ravel()
        theta_vals = ds.coords["theta"].values.ravel()
        valid = np.isfinite(phi_vals) & np.isfinite(theta_vals)

        cell_ids = np.full(len(phi_vals), np.nan, dtype=np.float64)
        if np.any(valid):
            cell_ids[valid] = _query_points(
                tree, cell_id_col, phi_vals[valid], theta_vals[valid]
            )

        shape_2d = ds.coords["phi"].shape
        cell_ids_2d = cell_ids.reshape(shape_2d)

        coord_name = f"cell_id_{grid_name}"
        ds[coord_name] = (("epoch", "sid"), cell_ids_2d)

        n_assigned = int(np.sum(np.isfinite(cell_ids_2d)))
        n_unique = len(np.unique(cell_ids[np.isfinite(cell_ids)]))
        duration = time.perf_counter() - t0

        logger.info(
            "grid_assignment_complete",
            n_cells=n_unique,
            n_assigned=n_assigned,
            duration_s=round(duration, 2),
        )

        output_shape = {str(k): int(v) for k, v in dict(ds.sizes).items()}
        result = OpResult(
            op_name=self.name,
            parameters=params,
            input_shape=input_shape,
            output_shape=output_shape,
            duration_seconds=duration,
        )
        return ds, result

grid_assign(ds, grid_type='equal_area', angular_resolution=2.0)

Convenience function: assign grid cells to a dataset.

Parameters

ds : xr.Dataset Input dataset with phi(epoch, sid) and theta(epoch, sid) coords. grid_type : str Grid builder name. angular_resolution : float Resolution in degrees.

Returns

xr.Dataset Dataset with cell_id_<grid_name>(epoch, sid) variable added.

Source code in packages/canvod-ops/src/canvod/ops/grid.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def grid_assign(
    ds: xr.Dataset,
    grid_type: str = "equal_area",
    angular_resolution: float = 2.0,
) -> xr.Dataset:
    """Convenience function: assign grid cells to a dataset.

    Parameters
    ----------
    ds : xr.Dataset
        Input dataset with ``phi(epoch, sid)`` and ``theta(epoch, sid)`` coords.
    grid_type : str
        Grid builder name.
    angular_resolution : float
        Resolution in degrees.

    Returns
    -------
    xr.Dataset
        Dataset with ``cell_id_<grid_name>(epoch, sid)`` variable added.
    """
    op = GridAssignment(grid_type=grid_type, angular_resolution=angular_resolution)
    out, _ = op(ds)
    return out