Skip to content

API Levels

canvodpy exposes four API levels, each targeting a different use case. All levels produce the same (epoch, sid) xarray Dataset format; they differ in how much infrastructure they manage for you.


Quick Comparison

L1: Convenience L2: Fluent L3: Site + Pipeline L4: Functional
Pattern process_date(...) .read().augment().result() site.pipeline().process_date(...) read_rinex(path)
Ephemeris Automatic (SP3/CLK) .augment(source=...) Automatic (SP3/CLK) augment_with_ephemeris(ds)
Store writes Automatic (Icechunk) Optional .to_store() Automatic (Icechunk) None (NetCDF)
File discovery FilenameMapper FilenameMapper FilenameMapper Caller provides paths
Dask parallelism Yes No Yes No
Deduplication 3-layer None 3-layer None
Best for Daily cron jobs Interactive exploration Multi-day batch runs Airflow / custom pipelines

Level 1: Convenience Functions

One-liner entry points that handle everything internally.

from canvodpy import process_date, calculate_vod

# Process one day: read → augment → write to store
process_date("Rosalia", "2025001")

# Compute VOD from stored data
calculate_vod("Rosalia", "canopy_01", "reference_01", "2025001")

Internally, process_date() creates a Pipeline, spawns Dask workers, discovers files via FilenameMapper, downloads SP3/CLK ephemerides, runs Hermite interpolation, computes theta/phi, writes to Icechunk with 3-layer deduplication, and shuts down.


Level 2: Fluent Workflow

Chainable deferred API for interactive use. Steps are recorded and executed on a terminal call (.result(), .to_store()).

import canvodpy

ds = (
    canvodpy.workflow("Rosalia")
    .read("2025001")
    .augment(source="final")     # SP3/CLK ephemeris
    .result()
)
ds = (
    canvodpy.workflow("Rosalia")
    .read("2025001")
    .augment(source="broadcast")   # SBF SatVisibility or RINEX NAV
    .result()
)
vod_ds = (
    canvodpy.workflow("Rosalia")
    .read("2025001")
    .augment(source="final")
    .vod("canopy_01", "reference_01")
    .result()
)
(
    canvodpy.workflow("Rosalia")
    .read("2025001")
    .augment(source="final")
    .to_store()    # terminal: writes to Icechunk
)

Deferred execution

.read(), .augment(), .vod() do not execute immediately. They append to an internal plan. Execution happens on .result() or .to_store().


Level 3: Site + Pipeline

Object-oriented API for batch processing. Holds a Dask cluster across calls.

from canvodpy import Site

site = Site("Rosalia")

with site.pipeline(n_workers=8) as pipeline:
    for date_key, datasets in pipeline.process_range("2025001", "2025007"):
        print(f"{date_key}: {sum(ds.sizes['epoch'] for ds in datasets.values())} epochs")

        # Optional: compute VOD inline
        site.vod.compute_day(datasets, "canopy_01_vs_reference_01")

Level 3 is functionally identical to Level 1 — the orchestrator runs the same code path. The difference is ergonomic: Level 3 reuses the Dask cluster across multiple process_date() calls, avoiding repeated cluster setup/teardown.


Level 4: Functional API

Pure stateless functions for Airflow or custom pipelines. The caller provides file paths and manages all orchestration.

from canvodpy.functional import read_rinex, augment_with_ephemeris, calculate_vod

# Read a single file
ds = read_rinex("station.25o")

# Add satellite geometry (downloads SP3/CLK if needed)
ds = augment_with_ephemeris(ds, site_name="Rosalia", source="final")

# Compute VOD
vod_ds = calculate_vod(canopy_ds, reference_ds)
from canvodpy.functional import read_rinex_to_file, calculate_vod_to_file

# Returns path string (XCom-serializable)
obs_path = read_rinex_to_file("station.25o", output="obs.nc")
vod_path = calculate_vod_to_file(canopy_path, ref_path, output="vod.nc")
from canvod.virtualiconvname import FilenameMapper

mapper = FilenameMapper(site="Rosalia", receiver="canopy_01")
files = mapper.discover("2025001")

datasets = [read_rinex(f) for f in files]

Ephemeris Sources

All levels support three ephemeris sources for computing satellite geometry (theta, phi). The source determines accuracy, latency, and internet requirements.

Source comparison

Source Accuracy Latency Internet Input files
Agency final (SP3/CLK) ~2-3 cm orbit 12-18 days Required SP3 + CLK from COD/ESA/IGS
SBF broadcast ~1-2 m orbit Immediate None SBF binary (SatVisibility block)
RINEX NAV broadcast ~1-2 m orbit Immediate None .YYp / .YYn nav files

Accuracy perspective

A 2 m orbit error at 20,200 km altitude produces <0.00001 deg angular error in theta/phi — six orders of magnitude below GNSS measurement noise. For VOD applications, broadcast ephemerides are more than sufficient.

How each source works

flowchart LR
    subgraph Agency["Agency Final (SP3/CLK)"]
        A1[Download SP3+CLK] --> A2[Hermite interpolation]
        A2 --> A3["ECEF → θ, φ, r"]
    end

    subgraph SBF["SBF Broadcast"]
        B1["SBF file scan"] --> B2["SatVisibility block"]
        B2 --> B3["θ, φ directly from receiver"]
    end

    subgraph NAV["RINEX NAV Broadcast"]
        C1["Parse .YYp nav file"] --> C2["Keplerian propagation"]
        C2 --> C3["ECEF → θ, φ, r"]
    end

    Agency --> DS["ds with theta, phi"]
    SBF --> DS
    NAV --> DS

Usage across levels

# Level 1/3: config-driven (processing.yaml)
# ephemeris_source: "final" | "broadcast" | "auto"

# Level 2: explicit step
.augment(source="final")      # SP3/CLK
.augment(source="broadcast")  # SBF SatVisibility or RINEX NAV

# Level 4: explicit function
augment_with_ephemeris(ds, site_name="Rosalia", source="final")
augment_with_ephemeris(ds, site_name="Rosalia", source="broadcast")

EphemerisProvider architecture

All three sources implement the same abstract interface:

class EphemerisProvider(ABC):
    @abstractmethod
    def augment_dataset(self, ds, receiver_position) -> xr.Dataset:
        """Add theta, phi (and optionally r) to the observation dataset."""

    @abstractmethod
    def preprocess_day(self, date, site_config) -> Path | None:
        """Download/prepare ephemeris for a day. Returns cache path or None."""
Provider preprocess_day() augment_dataset()
AgencyEphemerisProvider Downloads SP3/CLK, Hermite interpolation → Zarr cache Opens Zarr, selects epochs, compute_spherical_coordinates()
SbfBroadcastProvider No-op (geometry embedded in file) Extracts theta/phi from sbf_obs auxiliary dataset
RinexNavProvider Parses NAV file, Keplerian propagation → Zarr cache Opens Zarr, selects epochs, compute_spherical_coordinates()

Data Flow Diagram

flowchart TD
    subgraph Input["Data Ingestion"]
        FILES["GNSS Files<br/>(RINEX / SBF)"]
        EPHEM["Ephemeris Source<br/>(SP3 / SBF / NAV)"]
    end

    subgraph Discovery["File Discovery"]
        FM["FilenameMapper<br/>canvod-virtualiconvname"]
    end

    subgraph Reading["Parsing"]
        READER["GNSSDataReader<br/>Rnxv3Obs / SbfReader"]
    end

    subgraph Augmentation["Geometry Augmentation"]
        EP["EphemerisProvider<br/>Agency / SBF / NAV"]
        SCS["θ, φ, r coordinates"]
    end

    subgraph Storage["Versioned Storage"]
        ICE["Icechunk Store<br/>(epoch × sid)"]
        DEDUP["3-Layer Dedup<br/>hash + temporal + intra-batch"]
    end

    subgraph Analysis["VOD Analysis"]
        VOD["VodComputer<br/>tau-omega model"]
        GRID["Grid Assignment<br/>equal-area / geodesic"]
    end

    FILES --> FM --> READER
    EPHEM --> EP --> SCS
    READER --> SCS
    SCS --> DEDUP --> ICE
    ICE --> VOD --> GRID

    style Input fill:#fff3e0,stroke:#e65100
    style Discovery fill:#e3f2fd,stroke:#1565c0
    style Reading fill:#ffecb3,stroke:#f57c00
    style Augmentation fill:#e1f5fe,stroke:#0277bd
    style Storage fill:#f3e5f5,stroke:#4a148c
    style Analysis fill:#e8f5e9,stroke:#2e7d32

What each level handles

Step L1 L2 L3 L4
File discovery caller
Reading
Ephemeris augmentation auto .augment() auto augment_with_ephemeris()
Deduplication
Store write auto .to_store() auto
VOD computation calculate_vod() .vod() site.vod.compute_day() calculate_vod()
Dask parallelism

VOD Computation

VOD is computed via VodComputer, which offers two strategies:

# Compute VOD immediately after processing
with site.pipeline() as pipeline:
    for date_key, datasets in pipeline.process_range("2025001", "2025007"):
        site.vod.compute_day(datasets, "canopy_01_vs_reference_01")
# Recompute VOD for an entire time range from the RINEX store
site.vod.compute_bulk(
    "canopy_01_vs_reference_01",
    start="2025001",
    end="2025031",
)

Both strategies use the same core: rechunk → clear encodings → VODFactory.create()calculator.calculate_vod() → write to VOD store.


Choosing the Right Level

I want to process data daily as a cron job

Level 1 (process_date) or Level 3 (site.pipeline()). Both handle everything: file discovery, ephemeris, store writes, dedup. Level 3 is better if you process multiple days in one run (reuses Dask cluster).

I want to explore data interactively in a notebook

Level 2 (fluent workflow). Chain .read().augment().result() to get an in-memory Dataset without side effects. Add .vod() to compute VOD inline.

I want to integrate with Airflow

Level 4 (functional). Use *_to_file variants that return path strings for XCom serialization. Each function is stateless and pure.

I want to read a single file quickly

Level 4: read_rinex("file.rnx") or use the reader directly: SbfReader(fpath="file.sbf").to_ds().


Next in the trail: Getting Started · Audit Suite · Architecture · AI Development