Airflow Integration¶
canVODpy ships with Airflow-compatible task functions and a DAG template that automate the daily GNSS processing pipeline. One DAG is generated per configured research site.
How It Works¶
The pipeline mirrors what PipelineOrchestrator does internally, but broken into four independently retriable Airflow tasks:
graph TD
A["check_rinex"] --> B["fetch_aux_data"]
B --> C["process_rinex"]
C --> D["calculate_vod"]
-
check_rinex — Scans the configured receiver directories for the requested date. Both canopy and reference must have RINEX files present (same logic as
PairDataDirMatcher). RaisesRuntimeErrorif either is missing — Airflow retries later. -
fetch_aux_data — Downloads SP3 orbit and CLK clock products from public FTP servers, Hermite-interpolates ephemerides and piecewise-linear-interpolates clocks to match the RINEX sampling rate, writes to a temporary Zarr store. If products are not yet available (data too recent), the FTP download raises
RuntimeError— Airflow retries on the next scheduled run. -
process_rinex — Reads each RINEX file, augments it with satellite positions + clock offsets + spherical coordinates from the Zarr store, and writes to the site's Icechunk RINEX store. Deduplication via
"File Hash"makes re-runs safe. -
calculate_vod — Reads canopy + reference from the RINEX store, runs the
TauOmegaZerothOrderretrieval for each active analysis pair, writes results to the VOD store.
Retry-Driven Scheduling¶
The DAG runs every 6 hours with 3 retries per task. This handles two common lag scenarios:
| Scenario | Typical lag | What happens |
|---|---|---|
| RINEX files not yet transferred | Hours | check_rinex fails, retried next run |
| SP3/CLK products not yet published | 1-14 days (rapid/final) | fetch_aux_data fails, retried next run |
Once both RINEX files and orbit/clock products are available, the full chain completes in a single run. Already-processed dates are skipped via "File Hash" deduplication.
Expected failures are normal
check_rinex and fetch_aux_data failing on recent dates is by design. IGS final orbit products lag ~12-14 days, rapid products ~1 day. The DAG retries until the data appears.
Directory Structure¶
Task functions expect the directory layout defined in sites.yaml:
gnss_site_data_root/ # from sites.yaml → gnss_site_data_root
├── 01_reference/ # from receivers.reference_01.directory
│ ├── 25001/ # YYDDD date subdirectory
│ │ ├── ROSA00TUW_R_...rnx
│ │ └── ...
│ ├── 25002/
│ └── ...
├── 02_canopy/ # from receivers.canopy_01.directory
│ ├── 25001/
│ │ └── ROSA00TUW_R_...rnx
│ └── ...
└── (aux data stored separately via config)
check_rinex uses the same _has_rinex_files() function that PairDataDirMatcher uses — it globs for *.??o, *.rnx, *.RNX, and related patterns in each receiver's YYDDD subdirectory. A date is "ready" only when all configured receivers have at least one RINEX file.
Task Functions¶
All four functions live in canvodpy.workflows.tasks. They accept only primitives (str, dict, list, None) and return JSON-serializable dicts suitable for Airflow XCom. Internally they delegate to existing canVODpy machinery.
from canvodpy.workflows.tasks import (
check_rinex,
fetch_aux_data,
process_rinex,
calculate_vod,
)
check_rinex¶
result = check_rinex(site="Rosalia", yyyydoy="2025001")
# {
# "site": "Rosalia",
# "yyyydoy": "2025001",
# "ready": True,
# "receivers": {
# "canopy_01": {"has_files": True, "files": [...], "count": 1},
# "reference_01": {"has_files": True, "files": [...], "count": 4},
# }
# }
Raises RuntimeError when any receiver is missing files — Airflow marks the task as failed and retries according to retry_delay.
fetch_aux_data¶
result = fetch_aux_data(site="Rosalia", yyyydoy="2025001")
# {
# "site": "Rosalia",
# "yyyydoy": "2025001",
# "aux_zarr_path": "/tmp/canvod/aux_2025001.zarr",
# "sampling_interval_s": 30.0,
# "n_epochs": 2880,
# "n_sids": 384,
# }
The sampling interval is auto-detected from the RINEX v3 long filename (e.g. 05S = 5 s). Falls back to 30 s if detection fails.
FTP credentials
Downloads from NASA CDDIS require an Earthdata account email.
Set nasa_earthdata_acc_mail in config/processing.yaml.
Without it, the pipeline falls back to ESA/BKG mirrors.
process_rinex¶
result = process_rinex(
site="Rosalia",
yyyydoy="2025001",
aux_zarr_path="/tmp/canvod/aux_2025001.zarr",
receiver_files=rinex_info["receivers"], # from check_rinex
)
# {
# "site": "Rosalia",
# "yyyydoy": "2025001",
# "receivers_processed": ["canopy_01", "reference_01"],
# "files_written": 5,
# }
Processing is sequential within the task — no Dask or multiprocessing. Airflow handles parallelism across sites and dates via its executor.
Store group naming
Canopy receivers write to a group named after the receiver
(e.g. canopy_01). Reference receivers are expanded via scs_from
in the config and write to groups like reference_01_canopy_01.
calculate_vod¶
result = calculate_vod(site="Rosalia", yyyydoy="2025001")
# {
# "site": "Rosalia",
# "yyyydoy": "2025001",
# "analyses": {
# "canopy_01_vs_reference_01": {
# "mean_vod": 0.42,
# "std_vod": 0.15,
# "n_epochs": 2700,
# }
# }
# }
DAG Template¶
The file dags/gnss_daily_processing.py uses Airflow's TaskFlow API (@dag / @task decorators). It reads all configured sites from sites.yaml and creates one DAG per site, named canvod_{site_name}.
# Simplified — see dags/gnss_daily_processing.py for the full version
rinex_info = check_rinex(site, yyyydoy) # fail if RINEX missing
aux_info = fetch_aux_data(site, yyyydoy) # fail if SP3/CLK missing
proc_info = process_rinex(site, yyyydoy,
aux_info["aux_zarr_path"],
rinex_info["receivers"])
calculate_vod(site, yyyydoy)
| DAG parameter | Value | Why |
|---|---|---|
schedule |
0 */6 * * * (every 6 hours) |
Retries for missing RINEX or SP3/CLK |
catchup |
False |
Process only the current date |
max_active_runs |
1 |
Prevent parallel writes to the same Icechunk store |
retries |
3 |
Covers multi-day product publication delays |
retry_delay |
6 hours |
Aligned with schedule interval |
Deployment¶
1. Install canvodpy in the Airflow environment¶
uv pip install -e ./canvodpy
2. Link the DAGs directory¶
Copy or symlink dags/ into Airflow's dags_folder:
ln -s /path/to/canvodpy/dags /path/to/airflow/dags/canvod
3. Verify configuration¶
The DAGs read site definitions from canvodpy's YAML config files.
Ensure config/sites.yaml and config/processing.yaml are accessible
from the Airflow worker. The gnss_site_data_root for each site must
point to the directory containing receiver subdirectories.
4. Set the start date¶
Override start_date in the DAG definition or set it via an Airflow
Variable.
Design Decisions¶
-
Primitive-only parameters
All task arguments are
str,dict,list, orNone. No xarray objects orPathinstances cross task boundaries. XCom payloads stay small and JSON-serializable. -
Fail-and-retry for availability
Missing RINEX files or unavailable SP3/CLK products raise
RuntimeError. Airflow retries automatically — no polling loops or sensor operators needed. -
Idempotent via File Hash
process_rinexchecks"File Hash"before writing. Re-running the same date is a no-op.fetch_aux_datarewrites the Zarr cache. Safe to retry any task. -
Reuses existing code
Tasks call into
AuxDataPipeline,preprocess_with_hermite_aux, andGnssResearchSite. The workflow layer is a thin scheduling wrapper — no pipeline rewrite.
Running Multiple Configurations¶
Two processing scenarios are common:
| Scenario | Reader | Ephemeris | DAG |
|---|---|---|---|
| a — agency quality, delayed | RINEX | SP3/CLK (final) | canvod_{site}_rinex |
| b — same-day, broadcast geometry | SBF | SBF SatVisibility | canvod_{site}_sbf |
Both DAGs are generated automatically per site. To run them with different config files (e.g. different store paths, observables, or ephemeris product types), use one of the three patterns below.
Option 1 — dag_run.conf at trigger time¶
Pass a config_path override when triggering manually. The task functions
read it from context["dag_run"].conf and fall back to the default config
if absent.
# Scenario a — RINEX + final SP3/CLK
airflow dags trigger canvod_rosalia_rinex \
--conf '{"config_path": "/etc/canvod/sites_rinex_agency.yaml"}'
# Scenario b — SBF + broadcast geometry
airflow dags trigger canvod_rosalia_sbf \
--conf '{"config_path": "/etc/canvod/sites_sbf_broadcast.yaml"}'
Or via the af CLI:
af runs trigger canvod_rosalia_rinex \
-F config_path=/etc/canvod/sites_rinex_agency.yaml
This is the simplest approach for one-off or backfill runs.
Backfill DAG already supports conf
canvod_backfill accepts site, branch, start_date, and end_date
as params — add config_path to the same --conf dict to backfill with
a specific config.
Option 2 — Airflow Variables (environment-level config)¶
Store the config path as an Airflow Variable. This is appropriate when an entire Airflow environment (prod / dev / staging) should always use a specific config — no per-run override needed.
# Set once per environment
airflow variables set canvod_config_path /etc/canvod/sites_production.yaml
Tasks retrieve it with:
from airflow.models import Variable
config_path = Variable.get("canvod_config_path", default_var=None)
Two Airflow deployments can point at entirely different Icechunk stores and processing parameters without any code changes.
Option 3 — DAG-level Param default¶
For a permanent two-scenario setup where each DAG always defaults to a
specific config, define config_path as a Param in the DAG definition:
from airflow.models.param import Param
@dag(
dag_id=f"canvod_{site}_rinex",
params={
"config_path": Param(
default="/etc/canvod/sites_rinex_agency.yaml",
type="string",
description="Path to sites.yaml — override per run if needed",
)
},
...
)
def rinex_dag(): ...
The Airflow UI exposes config_path as an editable field when triggering
the DAG manually, so ad-hoc overrides remain possible without CLI flags.
Config differences between scenarios¶
The two scenarios typically differ in these processing.yaml fields:
# Scenario a — RINEX + agency ephemeris
processing:
ephemeris_source: final # SP3/CLK, ~12-18 day lag
keep_rnx_vars: [SNR, Pseudorange, Phase, Doppler]
store_radial_distance: true
# Scenario b — SBF + broadcast ephemeris
processing:
ephemeris_source: broadcast # Embedded in SBF binary, same-day
store_sbf_raw_observables: true # SNR_raw, Phase_raw, Pseudorange_unsmoothed
store_radial_distance: true
Separate sites_rinex.yaml and sites_sbf.yaml files can share the same
site geometry and receiver layout while differing only in the processing block.
Calling Tasks Without Airflow¶
The task functions are plain Python functions with no Airflow dependency. Use them directly for scripting or debugging:
from canvodpy.workflows.tasks import (
check_rinex, fetch_aux_data, process_rinex, calculate_vod,
)
rinex = check_rinex("Rosalia", "2025001")
aux = fetch_aux_data("Rosalia", "2025001")
proc = process_rinex("Rosalia", "2025001", aux["aux_zarr_path"],
rinex["receivers"])
vod = calculate_vod("Rosalia", "2025001")
File Layout¶
dags/
gnss_daily_processing.py DAG template (one DAG per site)
README.md Deployment notes
canvodpy/src/canvodpy/workflows/
__init__.py Re-exports task functions
tasks.py check_rinex, fetch_aux_data,
process_rinex, calculate_vod