Skip to content

canvod.auxiliary API Reference

SP3 ephemeris and CLK clock correction processing, interpolation, coordinate transformations, and the GNSS product registry.

Package

canvod-aux: Auxiliary data augmentation for GNSS VOD analysis

Handles downloading, parsing, and interpolating SP3 ephemerides and clock corrections for GNSS satellite data processing.

Sp3File

Bases: AuxFile

Handler for SP3 orbit files with multi-product support.

Now supports all IGS analysis centers via product registry: COD, GFZ, ESA, JPL, IGS, WHU, GRG, SHA

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True.

Attributes

date : str String in YYYYDOY format. agency : str Agency code (e.g., "COD", "GFZ", "ESA"). product_type : str Product type ("final", "rapid"). ftp_server : str Base URL for downloads. local_dir : Path Local storage directory. add_velocities : bool | None, default True Whether to compute velocities. dimensionless : bool | None, default True Whether to strip units (store magnitudes only). product_spec : ProductSpec | None, optional Product specification resolved from the registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class Sp3File(AuxFile):
    """Handler for SP3 orbit files with multi-product support.

    Now supports all IGS analysis centers via product registry:
    COD, GFZ, ESA, JPL, IGS, WHU, GRG, SHA

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True`.

    Attributes
    ----------
    date : str
        String in YYYYDOY format.
    agency : str
        Agency code (e.g., "COD", "GFZ", "ESA").
    product_type : str
        Product type ("final", "rapid").
    ftp_server : str
        Base URL for downloads.
    local_dir : Path
        Local storage directory.
    add_velocities : bool | None, default True
        Whether to compute velocities.
    dimensionless : bool | None, default True
        Whether to strip units (store magnitudes only).
    product_spec : ProductSpec | None, optional
        Product specification resolved from the registry.
    """

    date: str
    agency: str
    product_type: str
    ftp_server: str
    local_dir: Path
    add_velocities: bool | None = True
    dimensionless: bool | None = True
    product_spec: ProductSpec | None = None

    def __post_init__(self) -> None:
        """Initialize with product validation."""
        self.file_type = ["orbit"]
        self.local_dir = Path(self.local_dir)
        self.local_dir.mkdir(parents=True, exist_ok=True)

        # Validate product exists in registry
        self.product_spec = get_product_spec(self.agency, self.product_type)

        super().__post_init__()

    def get_interpolation_strategy(self) -> Interpolator:
        """Get appropriate interpolation strategy for SP3 files."""
        config = Sp3Config(
            use_velocities=bool(self.add_velocities),
            fallback_method="linear",
        )
        return Sp3InterpolationStrategy(config=config)

    def generate_filename_based_on_type(self) -> Path:
        """Generate filename using product registry.

        Pattern: {PREFIX}_{YYYYDOY}0000_{DURATION}_{SAMPLING}_ORB.SP3

        Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3
        """
        assert self.product_spec is not None, (
            "product_spec must be set via \
            __post_init__"
        )
        prefix = self.product_spec.prefix
        duration = self.product_spec.duration
        sampling = self.product_spec.sampling_rate

        return Path(f"{prefix}_{self.date}0000_{duration}_{sampling}_ORB.SP3")

    def download_aux_file(self) -> None:
        """Download SP3 file, trying all servers from the product registry.

        Servers are tried in priority order from products.toml. Auth-required
        servers are skipped when no credentials are configured. Warnings are
        printed when falling back to alternate servers.

        Raises
        ------
        RuntimeError
            If download fails from all available servers.
        ValueError
            If GPS week calculation fails.
        """
        assert self.product_spec is not None, (
            "product_spec must be set via \
            __post_init__"
        )
        orbit_file = self.generate_filename_based_on_type()
        gps_week = get_gps_week_from_filename(orbit_file)

        ftp_path = self.product_spec.ftp_path_pattern.format(
            gps_week=gps_week,
            file=f"{orbit_file}.gz",
        )
        destination = self.local_dir / orbit_file
        file_info = {
            "gps_week": gps_week,
            "filename": orbit_file,
            "type": "orbit",
            "agency": self.agency,
        }

        self.download_with_fallback(ftp_path, destination, file_info, self.product_spec)
        print(f"Downloaded orbit file for {self.agency} on date {self.date}")

    def read_file(self) -> xr.Dataset:
        """Read and validate SP3 file.

        Returns
        -------
        xr.Dataset
            Dataset with satellite positions (X, Y, Z) in meters.

        Raises
        ------
        FileNotFoundError
            If file does not exist.
        ValueError
            If validation fails.
        """
        # Use dedicated parser
        assert self.fpath is not None, "fpath must be set before calling read_file"
        parser = Sp3Parser(self.fpath, dimensionless=bool(self.dimensionless))
        dataset = parser.parse()

        # Validate format
        validator = Sp3Validator(dataset, self.fpath)
        result = validator.validate()

        if not result.is_valid:
            raise ValueError(f"SP3 validation failed:\n{result.summary()}")

        # Add metadata
        dataset = self._add_metadata(dataset)

        # Compute velocities if requested
        if self.add_velocities:
            dataset = self.compute_velocity(dataset)

        return dataset

    def _add_metadata(self, ds: xr.Dataset) -> xr.Dataset:
        """Add file-level metadata to dataset."""
        assert self.fpath is not None, "fpath must be set before calling _add_metadata"
        assert self.product_spec is not None, (
            "product_spec must be set before \
            calling _add_metadata"
        )
        ds.attrs = {
            "file": str(self.fpath.name),
            "agency": self.agency,
            "agency_name": self.product_spec.agency_name,
            "product_type": self.product_type,
            "ftp_server": self.ftp_server,
            "date": self.date,
            "sampling_rate": self.product_spec.sampling_rate,
            "duration": self.product_spec.duration,
        }
        return ds

    def compute_velocity(self, ds: xr.Dataset) -> xr.Dataset:
        """Compute satellite velocities from position data.

        Uses central differences for interior points, forward/backward
        differences for endpoints.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset with X, Y, Z coordinates.

        Returns
        -------
        xr.Dataset
            Dataset augmented with Vx, Vy, Vz velocities.
        """
        # Calculate time step
        time_diffs = np.diff(ds["epoch"].values)
        dt = np.median(time_diffs).astype("timedelta64[s]").astype(float)

        # Initialize velocity arrays
        vx = np.zeros_like(ds["X"].values)
        vy = np.zeros_like(ds["Y"].values)
        vz = np.zeros_like(ds["Z"].values)

        # Central difference for interior points
        vx[1:-1] = (ds["X"].values[2:] - ds["X"].values[:-2]) / (2 * dt)
        vy[1:-1] = (ds["Y"].values[2:] - ds["Y"].values[:-2]) / (2 * dt)
        vz[1:-1] = (ds["Z"].values[2:] - ds["Z"].values[:-2]) / (2 * dt)

        # Forward difference for first point
        vx[0] = (ds["X"].values[1] - ds["X"].values[0]) / dt
        vy[0] = (ds["Y"].values[1] - ds["Y"].values[0]) / dt
        vz[0] = (ds["Z"].values[1] - ds["Z"].values[0]) / dt

        # Backward difference for last point
        vx[-1] = (ds["X"].values[-1] - ds["X"].values[-2]) / dt
        vy[-1] = (ds["Y"].values[-1] - ds["Y"].values[-2]) / dt
        vz[-1] = (ds["Z"].values[-1] - ds["Z"].values[-2]) / dt

        # Add units if needed
        if not self.dimensionless:
            vx = vx * (UREG.meter / UREG.second)
            vy = vy * (UREG.meter / UREG.second)
            vz = vz * (UREG.meter / UREG.second)

        # Add to dataset
        ds = ds.assign(
            Vx=(("epoch", "sv"), vx),
            Vy=(("epoch", "sv"), vy),
            Vz=(("epoch", "sv"), vz),
        )

        # Add velocity attributes
        for var, attrs in self._get_velocity_attributes(dt).items():
            if var in ds:
                ds[var].attrs = attrs

        return ds

    def _get_velocity_attributes(
        self,
        dt: float,
    ) -> dict[str, dict[str, str | float]]:
        """Get standardized attributes for velocity variables."""
        base_attrs = {
            "units": "m/s",
            "computation_method": "central_difference",
            "time_step": float(dt),
            "reference_frame": "ECEF",
        }

        return {
            "Vx": {
                "long_name": "x-component of velocity",
                "standard_name": "v_x",
                "short_name": "v_x",
                "axis": "v_x",
                **base_attrs,
            },
            "Vy": {
                "long_name": "y-component of velocity",
                "standard_name": "v_y",
                "short_name": "v_y",
                "axis": "v_y",
                **base_attrs,
            },
            "Vz": {
                "long_name": "z-component of velocity",
                "standard_name": "v_z",
                "short_name": "v_z",
                "axis": "v_z",
                **base_attrs,
            },
        }

__post_init__()

Initialize with product validation.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
62
63
64
65
66
67
68
69
70
71
def __post_init__(self) -> None:
    """Initialize with product validation."""
    self.file_type = ["orbit"]
    self.local_dir = Path(self.local_dir)
    self.local_dir.mkdir(parents=True, exist_ok=True)

    # Validate product exists in registry
    self.product_spec = get_product_spec(self.agency, self.product_type)

    super().__post_init__()

get_interpolation_strategy()

Get appropriate interpolation strategy for SP3 files.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
73
74
75
76
77
78
79
def get_interpolation_strategy(self) -> Interpolator:
    """Get appropriate interpolation strategy for SP3 files."""
    config = Sp3Config(
        use_velocities=bool(self.add_velocities),
        fallback_method="linear",
    )
    return Sp3InterpolationStrategy(config=config)

generate_filename_based_on_type()

Generate filename using product registry.

Pattern: {PREFIX}{YYYYDOY}0000_ORB.SP3}_{SAMPLING

Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def generate_filename_based_on_type(self) -> Path:
    """Generate filename using product registry.

    Pattern: {PREFIX}_{YYYYDOY}0000_{DURATION}_{SAMPLING}_ORB.SP3

    Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3
    """
    assert self.product_spec is not None, (
        "product_spec must be set via \
        __post_init__"
    )
    prefix = self.product_spec.prefix
    duration = self.product_spec.duration
    sampling = self.product_spec.sampling_rate

    return Path(f"{prefix}_{self.date}0000_{duration}_{sampling}_ORB.SP3")

download_aux_file()

Download SP3 file, trying all servers from the product registry.

Servers are tried in priority order from products.toml. Auth-required servers are skipped when no credentials are configured. Warnings are printed when falling back to alternate servers.

Raises

RuntimeError If download fails from all available servers. ValueError If GPS week calculation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def download_aux_file(self) -> None:
    """Download SP3 file, trying all servers from the product registry.

    Servers are tried in priority order from products.toml. Auth-required
    servers are skipped when no credentials are configured. Warnings are
    printed when falling back to alternate servers.

    Raises
    ------
    RuntimeError
        If download fails from all available servers.
    ValueError
        If GPS week calculation fails.
    """
    assert self.product_spec is not None, (
        "product_spec must be set via \
        __post_init__"
    )
    orbit_file = self.generate_filename_based_on_type()
    gps_week = get_gps_week_from_filename(orbit_file)

    ftp_path = self.product_spec.ftp_path_pattern.format(
        gps_week=gps_week,
        file=f"{orbit_file}.gz",
    )
    destination = self.local_dir / orbit_file
    file_info = {
        "gps_week": gps_week,
        "filename": orbit_file,
        "type": "orbit",
        "agency": self.agency,
    }

    self.download_with_fallback(ftp_path, destination, file_info, self.product_spec)
    print(f"Downloaded orbit file for {self.agency} on date {self.date}")

read_file()

Read and validate SP3 file.

Returns

xr.Dataset Dataset with satellite positions (X, Y, Z) in meters.

Raises

FileNotFoundError If file does not exist. ValueError If validation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def read_file(self) -> xr.Dataset:
    """Read and validate SP3 file.

    Returns
    -------
    xr.Dataset
        Dataset with satellite positions (X, Y, Z) in meters.

    Raises
    ------
    FileNotFoundError
        If file does not exist.
    ValueError
        If validation fails.
    """
    # Use dedicated parser
    assert self.fpath is not None, "fpath must be set before calling read_file"
    parser = Sp3Parser(self.fpath, dimensionless=bool(self.dimensionless))
    dataset = parser.parse()

    # Validate format
    validator = Sp3Validator(dataset, self.fpath)
    result = validator.validate()

    if not result.is_valid:
        raise ValueError(f"SP3 validation failed:\n{result.summary()}")

    # Add metadata
    dataset = self._add_metadata(dataset)

    # Compute velocities if requested
    if self.add_velocities:
        dataset = self.compute_velocity(dataset)

    return dataset

compute_velocity(ds)

Compute satellite velocities from position data.

Uses central differences for interior points, forward/backward differences for endpoints.

Parameters

ds : xr.Dataset Dataset with X, Y, Z coordinates.

Returns

xr.Dataset Dataset augmented with Vx, Vy, Vz velocities.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def compute_velocity(self, ds: xr.Dataset) -> xr.Dataset:
    """Compute satellite velocities from position data.

    Uses central differences for interior points, forward/backward
    differences for endpoints.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with X, Y, Z coordinates.

    Returns
    -------
    xr.Dataset
        Dataset augmented with Vx, Vy, Vz velocities.
    """
    # Calculate time step
    time_diffs = np.diff(ds["epoch"].values)
    dt = np.median(time_diffs).astype("timedelta64[s]").astype(float)

    # Initialize velocity arrays
    vx = np.zeros_like(ds["X"].values)
    vy = np.zeros_like(ds["Y"].values)
    vz = np.zeros_like(ds["Z"].values)

    # Central difference for interior points
    vx[1:-1] = (ds["X"].values[2:] - ds["X"].values[:-2]) / (2 * dt)
    vy[1:-1] = (ds["Y"].values[2:] - ds["Y"].values[:-2]) / (2 * dt)
    vz[1:-1] = (ds["Z"].values[2:] - ds["Z"].values[:-2]) / (2 * dt)

    # Forward difference for first point
    vx[0] = (ds["X"].values[1] - ds["X"].values[0]) / dt
    vy[0] = (ds["Y"].values[1] - ds["Y"].values[0]) / dt
    vz[0] = (ds["Z"].values[1] - ds["Z"].values[0]) / dt

    # Backward difference for last point
    vx[-1] = (ds["X"].values[-1] - ds["X"].values[-2]) / dt
    vy[-1] = (ds["Y"].values[-1] - ds["Y"].values[-2]) / dt
    vz[-1] = (ds["Z"].values[-1] - ds["Z"].values[-2]) / dt

    # Add units if needed
    if not self.dimensionless:
        vx = vx * (UREG.meter / UREG.second)
        vy = vy * (UREG.meter / UREG.second)
        vz = vz * (UREG.meter / UREG.second)

    # Add to dataset
    ds = ds.assign(
        Vx=(("epoch", "sv"), vx),
        Vy=(("epoch", "sv"), vy),
        Vz=(("epoch", "sv"), vz),
    )

    # Add velocity attributes
    for var, attrs in self._get_velocity_attributes(dt).items():
        if var in ds:
            ds[var].attrs = attrs

    return ds

ClkFile

Bases: AuxFile

Handler for GNSS clock files in CLK format.

This class reads and processes clock offset files containing satellite clock corrections. It handles the parsing of CLK format files and provides the data in xarray Dataset format.

Supports multiple analysis centers via product registry with proper FTP paths and filename conventions.

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True.

Attributes

date : str String in YYYYDOY format representing the start date. agency : str Analysis center identifier (e.g., "COD", "GFZ"). product_type : str Product type ("final", "rapid", "ultrarapid"). ftp_server : str Base URL for file downloads. local_dir : Path Local storage directory. dimensionless : bool | None, default True If True, outputs magnitude-only values (no units attached).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class ClkFile(AuxFile):
    """Handler for GNSS clock files in CLK format.

    This class reads and processes clock offset files containing satellite clock
    corrections. It handles the parsing of CLK format files and provides the data
    in xarray Dataset format.

    Supports multiple analysis centers via product registry with proper FTP paths
    and filename conventions.

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True`.

    Attributes
    ----------
    date : str
        String in YYYYDOY format representing the start date.
    agency : str
        Analysis center identifier (e.g., "COD", "GFZ").
    product_type : str
        Product type ("final", "rapid", "ultrarapid").
    ftp_server : str
        Base URL for file downloads.
    local_dir : Path
        Local storage directory.
    dimensionless : bool | None, default True
        If True, outputs magnitude-only values (no units attached).
    """

    date: str
    agency: str
    product_type: str
    ftp_server: str
    local_dir: Path
    dimensionless: bool | None = True

    def __post_init__(self) -> None:
        """Initialize CLK file handler."""
        self.file_type = ["clock"]
        self.local_dir = Path(self.local_dir)
        self.local_dir.mkdir(parents=True, exist_ok=True)
        super().__post_init__()

    def get_interpolation_strategy(self) -> Interpolator:
        """Get appropriate interpolation strategy for CLK files."""
        config = ClockConfig(
            window_size=9,
            jump_threshold=1e-6,
        )
        return ClockInterpolationStrategy(config=config)

    def generate_filename_based_on_type(self) -> Path:
        """Generate standard CLK filename using product registry.

        Uses product registry to get correct prefix for the agency/product combination.
        Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

        Returns
        -------
        Path
            Filename according to CLK conventions.

        Raises
        ------
        ValueError
            If agency/product combination not in registry.
        """
        # Get product spec from registry
        spec = get_product_spec(self.agency, self.product_type)

        # CLK files use 30S sampling for most products
        sampling = "30S"  # Could be made configurable via registry

        return Path(f"{spec.prefix}_{self.date}0000_01D_{sampling}_CLK.CLK")

    def download_aux_file(self) -> None:
        """Download CLK file, trying all servers from the product registry.

        Servers are tried in priority order from products.toml. Auth-required
        servers are skipped when no credentials are configured. Warnings are
        printed when falling back to alternate servers.

        Raises
        ------
        RuntimeError
            If file cannot be downloaded from any available server.
        ValueError
            If GPS week calculation fails.
        """
        clock_file = self.generate_filename_based_on_type()
        gps_week = get_gps_week_from_filename(clock_file)

        spec = get_product_spec(self.agency, self.product_type)
        ftp_path = spec.ftp_path_pattern.format(
            gps_week=gps_week,
            file=f"{clock_file}.gz",
        )
        destination = self.local_dir / clock_file
        file_info = {
            "gps_week": gps_week,
            "filename": clock_file,
            "type": "clock",
            "agency": self.agency,
        }

        self.download_with_fallback(ftp_path, destination, file_info, spec)
        print(f"Downloaded clock file for {self.agency} on date {self.date}")

    def read_file(self) -> xr.Dataset:
        """Read and parse CLK file into xarray Dataset.

        Uses modular parser for data extraction and validator for quality checks.
        Applies unit conversion from microseconds to seconds.

        Returns
        -------
        xr.Dataset
            Clock offsets with dimensions (epoch, sv). Values are in seconds
            (or dimensionless if specified).
        """
        # Parse file using modular parser
        assert self.fpath is not None, "fpath must be set before calling read_file"
        epochs, satellites, clock_offsets = parse_clk_file(self.fpath)

        # Convert units (microseconds → seconds)
        clock_offsets = (UREG.microsecond * clock_offsets).to("s")

        if self.dimensionless:
            clock_offsets = clock_offsets.magnitude

        # Create dataset
        ds = xr.Dataset(
            data_vars={"clock_offset": (("epoch", "sv"), clock_offsets)},
            coords={
                "epoch": np.array(epochs, dtype="datetime64[ns]"),
                "sv": np.array(satellites),
            },
        )

        # Validate and add metadata
        validation = validate_clk_dataset(ds)
        ds = self._prepare_dataset(ds, validation)

        return ds

    def _prepare_dataset(self, ds: xr.Dataset, validation: dict) -> xr.Dataset:
        """Add metadata and attributes to dataset.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset from parsing.
        validation : dict
            Validation results dictionary.

        Returns
        -------
        xr.Dataset
            Dataset with complete metadata.
        """
        assert self.fpath is not None, (
            "fpath must be set \
            before calling _prepare_dataset"
        )
        ds.attrs = {
            "file": str(self.fpath.name),
            "agency": self.agency,
            "product_type": self.product_type,
            "ftp_server": self.ftp_server,
            "date": self.date,
            "valid_data_percent": validation["valid_data_percent"],
            "num_epochs": validation["num_epochs"],
            "num_satellites": validation["num_satellites"],
        }

        # Add variable attributes
        ds.clock_offset.attrs = {
            "long_name": "Satellite clock offset",
            "standard_name": "clock_offset",
            "units": "seconds",
            "description": "Clock correction for each satellite",
        }

        return ds

__post_init__()

Initialize CLK file handler.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
65
66
67
68
69
70
def __post_init__(self) -> None:
    """Initialize CLK file handler."""
    self.file_type = ["clock"]
    self.local_dir = Path(self.local_dir)
    self.local_dir.mkdir(parents=True, exist_ok=True)
    super().__post_init__()

get_interpolation_strategy()

Get appropriate interpolation strategy for CLK files.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
72
73
74
75
76
77
78
def get_interpolation_strategy(self) -> Interpolator:
    """Get appropriate interpolation strategy for CLK files."""
    config = ClockConfig(
        window_size=9,
        jump_threshold=1e-6,
    )
    return ClockInterpolationStrategy(config=config)

generate_filename_based_on_type()

Generate standard CLK filename using product registry.

Uses product registry to get correct prefix for the agency/product combination. Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

Returns

Path Filename according to CLK conventions.

Raises

ValueError If agency/product combination not in registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def generate_filename_based_on_type(self) -> Path:
    """Generate standard CLK filename using product registry.

    Uses product registry to get correct prefix for the agency/product combination.
    Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

    Returns
    -------
    Path
        Filename according to CLK conventions.

    Raises
    ------
    ValueError
        If agency/product combination not in registry.
    """
    # Get product spec from registry
    spec = get_product_spec(self.agency, self.product_type)

    # CLK files use 30S sampling for most products
    sampling = "30S"  # Could be made configurable via registry

    return Path(f"{spec.prefix}_{self.date}0000_01D_{sampling}_CLK.CLK")

download_aux_file()

Download CLK file, trying all servers from the product registry.

Servers are tried in priority order from products.toml. Auth-required servers are skipped when no credentials are configured. Warnings are printed when falling back to alternate servers.

Raises

RuntimeError If file cannot be downloaded from any available server. ValueError If GPS week calculation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def download_aux_file(self) -> None:
    """Download CLK file, trying all servers from the product registry.

    Servers are tried in priority order from products.toml. Auth-required
    servers are skipped when no credentials are configured. Warnings are
    printed when falling back to alternate servers.

    Raises
    ------
    RuntimeError
        If file cannot be downloaded from any available server.
    ValueError
        If GPS week calculation fails.
    """
    clock_file = self.generate_filename_based_on_type()
    gps_week = get_gps_week_from_filename(clock_file)

    spec = get_product_spec(self.agency, self.product_type)
    ftp_path = spec.ftp_path_pattern.format(
        gps_week=gps_week,
        file=f"{clock_file}.gz",
    )
    destination = self.local_dir / clock_file
    file_info = {
        "gps_week": gps_week,
        "filename": clock_file,
        "type": "clock",
        "agency": self.agency,
    }

    self.download_with_fallback(ftp_path, destination, file_info, spec)
    print(f"Downloaded clock file for {self.agency} on date {self.date}")

read_file()

Read and parse CLK file into xarray Dataset.

Uses modular parser for data extraction and validator for quality checks. Applies unit conversion from microseconds to seconds.

Returns

xr.Dataset Clock offsets with dimensions (epoch, sv). Values are in seconds (or dimensionless if specified).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def read_file(self) -> xr.Dataset:
    """Read and parse CLK file into xarray Dataset.

    Uses modular parser for data extraction and validator for quality checks.
    Applies unit conversion from microseconds to seconds.

    Returns
    -------
    xr.Dataset
        Clock offsets with dimensions (epoch, sv). Values are in seconds
        (or dimensionless if specified).
    """
    # Parse file using modular parser
    assert self.fpath is not None, "fpath must be set before calling read_file"
    epochs, satellites, clock_offsets = parse_clk_file(self.fpath)

    # Convert units (microseconds → seconds)
    clock_offsets = (UREG.microsecond * clock_offsets).to("s")

    if self.dimensionless:
        clock_offsets = clock_offsets.magnitude

    # Create dataset
    ds = xr.Dataset(
        data_vars={"clock_offset": (("epoch", "sv"), clock_offsets)},
        coords={
            "epoch": np.array(epochs, dtype="datetime64[ns]"),
            "sv": np.array(satellites),
        },
    )

    # Validate and add metadata
    validation = validate_clk_dataset(ds)
    ds = self._prepare_dataset(ds, validation)

    return ds

AuxFile

Bases: ABC

Abstract base class for GNSS auxiliary files (SP3, CLK, IONEX, etc.).

This class provides two ways to create instances: 1. from_datetime_date(): Create from a datetime.date object and metadata 2. from_file(): Create directly from an existing file path

The class handles both newly downloaded files and existing local files, maintaining consistent behavior regardless of how the instance is created.

FTP Server Configuration:

  • user_email: Optional email for NASA CDDIS authentication
  • If None: Uses ESA FTP server exclusively (no authentication required)
  • If provided: Enables NASA CDDIS as fallback server (requires registration)
  • To enable CDDIS, set nasa_earthdata_acc_mail in config/processing.yaml

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True, and it uses ABC to define required subclass hooks.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class AuxFile(ABC):
    """Abstract base class for GNSS auxiliary files (SP3, CLK, IONEX, etc.).

    This class provides two ways to create instances:
    1. from_datetime_date(): Create from a datetime.date object and metadata
    2. from_file(): Create directly from an existing file path

    The class handles both newly downloaded files and existing local files,
    maintaining consistent behavior regardless of how the instance is created.

    FTP Server Configuration:
    ------------------------
    - user_email: Optional email for NASA CDDIS authentication
      - If None: Uses ESA FTP server exclusively (no authentication required)
      - If provided: Enables NASA CDDIS as fallback server (requires registration)
      - To enable CDDIS, set nasa_earthdata_acc_mail in config/processing.yaml

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True`, and
    it uses `ABC` to define required subclass hooks.
    """

    date: str
    agency: str
    product_type: str
    ftp_server: str
    local_dir: Path
    file_type: list[str] = Field(default_factory=list)
    fpath: Path | None = None
    user_email: str | None = None
    downloader: FileDownloader | None = None
    _data: xr.Dataset | None = Field(default=None, init=False)

    def __post_init__(self) -> None:
        """Initialize after dataclass creation.

        Sets up paths, downloader, and verifies local file existence.
        """
        if not self.file_type:
            self.file_type = ["unknown"]
        self.local_dir = Path(self.local_dir)
        self.local_dir.mkdir(parents=True, exist_ok=True)

        if self.downloader is None:
            self.downloader = FtpDownloader(user_email=self.user_email)

        self.fpath = self.check_file_exists()

    @classmethod
    def from_datetime_date(
        cls,
        date: datetime.date,
        agency: str,
        product_type: str,
        ftp_server: str,
        local_dir: Path,
        **kwargs: Any,
    ) -> AuxFile:
        """Create an AuxFile instance from a datetime.date.

        Parameters
        ----------
        date : datetime.date
            Date for the desired auxiliary file.
        agency : str
            Agency providing the data (e.g., "COD", "IGS").
        product_type : str
            Product type ("final", "rapid", "ultrarapid").
        ftp_server : str
            Base URL for file downloads.
        local_dir : Path
            Directory for storing files locally.
        **kwargs : Any
            Extra keyword arguments for subclass construction.

        Returns
        -------
        AuxFile
            A new instance of the AuxFile subclass.
        """
        yyyydoy = YYYYDOY.from_date(date=date).to_str()
        return cls(
            date=yyyydoy,
            agency=agency,
            product_type=product_type,
            ftp_server=ftp_server,
            local_dir=local_dir,
            **kwargs,
        )

    @classmethod
    def from_file(cls, fpath: Path | str, **kwargs: Any) -> AuxFile:
        """Create an AuxFile instance from an existing file path.

        Parameters
        ----------
        fpath : Path
            Path to the existing GNSS file.
        **kwargs : Any
            Extra keyword arguments for subclass construction.

        Returns
        -------
        AuxFile
            A new instance of the AuxFile subclass.

        Raises
        ------
        FileNotFoundError
            If the specified file does not exist.
        """
        fpath = Path(fpath)
        if not fpath.exists():
            raise FileNotFoundError(f"File not found: {fpath}")

        fname = fpath.name
        agency = fname[0:3]
        yyyydoy = fname.split("_")[1][0:7]

        return cls(
            date=yyyydoy,
            agency=agency,
            product_type="final",
            ftp_server="N/A",
            local_dir=fpath.parent,
            fpath=fpath,
            **kwargs,
        )

    def download_file(
        self,
        url: str,
        destination: Path,
        file_info: dict | None = None,
    ) -> Path:
        """Download a file using the configured downloader.

        Parameters
        ----------
        url : str
            Download URL.
        destination : Path
            Local file destination.
        file_info : dict, optional
            Extra info passed to the downloader.

        Returns
        -------
        Path
            Path to the downloaded file.
        """
        if self.downloader is None:
            raise RuntimeError("No downloader is configured")
        return self.downloader.download(url, destination, file_info)

    def download_with_fallback(
        self,
        ftp_path: str,
        destination: Path,
        file_info: dict | None = None,
        product_spec: Any | None = None,
    ) -> Path:
        """Download a file, trying all servers from the product spec in priority order.

        Iterates through the product's FTP server list (from products.toml),
        skipping servers that require authentication when no credentials are
        configured. Warns when falling back to an alternate server. Raises a
        clear error when all servers fail.

        Parameters
        ----------
        ftp_path : str
            Server-relative path (e.g. ``/gnss/products/2345/FILE.gz``).
        destination : Path
            Local file destination.
        file_info : dict, optional
            Extra context passed to the downloader.
        product_spec : ProductSpec, optional
            Product specification with ``ftp_servers`` list. If None, falls
            back to single-server download using ``self.ftp_server``.

        Returns
        -------
        Path
            Path to the downloaded file.

        Raises
        ------
        RuntimeError
            If download fails from all available servers.
        """
        if self.downloader is None:
            raise RuntimeError("No downloader is configured")

        # Fall back to legacy single-server path when no product spec
        if product_spec is None or not product_spec.ftp_servers:
            url = f"{self.ftp_server}{ftp_path}"
            return self.downloader.download(url, destination, file_info)

        # Sort servers by priority (lowest number = highest priority)
        servers = sorted(product_spec.ftp_servers, key=lambda s: s.priority)

        # Filter out auth-required servers when no credentials
        available_servers = []
        skipped_servers = []
        for server in servers:
            if server.requires_auth and self.user_email is None:
                skipped_servers.append(server)
            else:
                available_servers.append(server)

        if not available_servers:
            server_names = ", ".join(s.url for s in skipped_servers)
            raise RuntimeError(
                f"No FTP servers available for {product_spec.prefix}.\n"
                f"All servers require authentication: {server_names}\n"
                f"Set nasa_earthdata_acc_mail in config/processing.yaml to "
                f"enable NASA CDDIS access.\n"
                f"Register at: https://urs.earthdata.nasa.gov/users/new"
            )

        errors: list[str] = []
        for i, server in enumerate(available_servers):
            url = f"{server.url.rstrip('/')}{ftp_path}"
            try:
                result = self.downloader.download(url, destination, file_info)
                if i > 0:
                    # We got here via fallback — log which server succeeded
                    print(
                        f"  ✓ Download succeeded from fallback server: "
                        f"{server.description or server.url}"
                    )
                return result
            except Exception as e:
                if self.downloader._is_network_error(e):
                    raise RuntimeError(
                        "No internet connection detected. "
                        "Please check your network and try again."
                    ) from e

                server_label = server.description or server.url
                error_msg = f"{server_label}: {e!s}"
                errors.append(error_msg)
                print(f"  ⚠ Download failed from {server_label}: {e!s}")

                # Warn about fallback if there are more servers to try
                if i < len(available_servers) - 1:
                    next_server = available_servers[i + 1]
                    next_label = next_server.description or next_server.url
                    print(f"  → Trying fallback server: {next_label}")

        # All servers exhausted
        product_label = f"{product_spec.agency_code}/{product_spec.product_type}"
        error_detail = "\n".join(f"  - {e}" for e in errors)
        if skipped_servers:
            skip_detail = "\n".join(
                f"  - {s.description or s.url} (requires auth)" for s in skipped_servers
            )
            auth_hint = (
                f"\n\nSkipped servers (no credentials):\n{skip_detail}\n"
                f"Set nasa_earthdata_acc_mail in config/processing.yaml "
                f"to enable these servers."
            )
        else:
            auth_hint = ""

        raise RuntimeError(
            f"Failed to download {product_label} from all available servers.\n"
            f"\nFile: {destination.name}\n"
            f"\nServer errors:\n{error_detail}"
            f"{auth_hint}"
        )

    @abstractmethod
    def read_file(self) -> xr.Dataset:
        """Read and parse the auxiliary file.

        Returns
        -------
        xr.Dataset
            Parsed dataset representation of the file.
        """
        pass

    @abstractmethod
    def get_interpolation_strategy(self) -> Interpolator:
        """Get the interpolation strategy for this file type."""
        pass

    @property
    def data(self) -> xr.Dataset:
        """Access the file's data, loading it if necessary."""
        if self._data is None:
            self._data = self.read_file()
            strategy = self.get_interpolation_strategy()
            self._data.attrs["interpolator_config"] = strategy.to_attrs()
        return self._data

    def check_file_exists(self) -> Path:
        """Verify file exists locally or download it if needed.

        Returns
        -------
        Path
            Local file path.
        """
        filename = self.generate_filename_based_on_type()
        file_path = self.local_dir / filename
        if not file_path.exists():
            print(f"File {file_path} does not exist. Downloading...")
            self.download_aux_file()
        else:
            print(f"File {file_path} exists.")
        return file_path

    @abstractmethod
    def generate_filename_based_on_type(self) -> Path:
        """Generate the appropriate filename for this type of auxiliary file."""
        pass

    @abstractmethod
    def download_aux_file(self) -> None:
        """Download the auxiliary file from the specified FTP server."""
        pass

data property

Access the file's data, loading it if necessary.

__post_init__()

Initialize after dataclass creation.

Sets up paths, downloader, and verifies local file existence.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __post_init__(self) -> None:
    """Initialize after dataclass creation.

    Sets up paths, downloader, and verifies local file existence.
    """
    if not self.file_type:
        self.file_type = ["unknown"]
    self.local_dir = Path(self.local_dir)
    self.local_dir.mkdir(parents=True, exist_ok=True)

    if self.downloader is None:
        self.downloader = FtpDownloader(user_email=self.user_email)

    self.fpath = self.check_file_exists()

from_datetime_date(date, agency, product_type, ftp_server, local_dir, **kwargs) classmethod

Create an AuxFile instance from a datetime.date.

Parameters

date : datetime.date Date for the desired auxiliary file. agency : str Agency providing the data (e.g., "COD", "IGS"). product_type : str Product type ("final", "rapid", "ultrarapid"). ftp_server : str Base URL for file downloads. local_dir : Path Directory for storing files locally. **kwargs : Any Extra keyword arguments for subclass construction.

Returns

AuxFile A new instance of the AuxFile subclass.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@classmethod
def from_datetime_date(
    cls,
    date: datetime.date,
    agency: str,
    product_type: str,
    ftp_server: str,
    local_dir: Path,
    **kwargs: Any,
) -> AuxFile:
    """Create an AuxFile instance from a datetime.date.

    Parameters
    ----------
    date : datetime.date
        Date for the desired auxiliary file.
    agency : str
        Agency providing the data (e.g., "COD", "IGS").
    product_type : str
        Product type ("final", "rapid", "ultrarapid").
    ftp_server : str
        Base URL for file downloads.
    local_dir : Path
        Directory for storing files locally.
    **kwargs : Any
        Extra keyword arguments for subclass construction.

    Returns
    -------
    AuxFile
        A new instance of the AuxFile subclass.
    """
    yyyydoy = YYYYDOY.from_date(date=date).to_str()
    return cls(
        date=yyyydoy,
        agency=agency,
        product_type=product_type,
        ftp_server=ftp_server,
        local_dir=local_dir,
        **kwargs,
    )

from_file(fpath, **kwargs) classmethod

Create an AuxFile instance from an existing file path.

Parameters

fpath : Path Path to the existing GNSS file. **kwargs : Any Extra keyword arguments for subclass construction.

Returns

AuxFile A new instance of the AuxFile subclass.

Raises

FileNotFoundError If the specified file does not exist.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@classmethod
def from_file(cls, fpath: Path | str, **kwargs: Any) -> AuxFile:
    """Create an AuxFile instance from an existing file path.

    Parameters
    ----------
    fpath : Path
        Path to the existing GNSS file.
    **kwargs : Any
        Extra keyword arguments for subclass construction.

    Returns
    -------
    AuxFile
        A new instance of the AuxFile subclass.

    Raises
    ------
    FileNotFoundError
        If the specified file does not exist.
    """
    fpath = Path(fpath)
    if not fpath.exists():
        raise FileNotFoundError(f"File not found: {fpath}")

    fname = fpath.name
    agency = fname[0:3]
    yyyydoy = fname.split("_")[1][0:7]

    return cls(
        date=yyyydoy,
        agency=agency,
        product_type="final",
        ftp_server="N/A",
        local_dir=fpath.parent,
        fpath=fpath,
        **kwargs,
    )

download_file(url, destination, file_info=None)

Download a file using the configured downloader.

Parameters

url : str Download URL. destination : Path Local file destination. file_info : dict, optional Extra info passed to the downloader.

Returns

Path Path to the downloaded file.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def download_file(
    self,
    url: str,
    destination: Path,
    file_info: dict | None = None,
) -> Path:
    """Download a file using the configured downloader.

    Parameters
    ----------
    url : str
        Download URL.
    destination : Path
        Local file destination.
    file_info : dict, optional
        Extra info passed to the downloader.

    Returns
    -------
    Path
        Path to the downloaded file.
    """
    if self.downloader is None:
        raise RuntimeError("No downloader is configured")
    return self.downloader.download(url, destination, file_info)

download_with_fallback(ftp_path, destination, file_info=None, product_spec=None)

Download a file, trying all servers from the product spec in priority order.

Iterates through the product's FTP server list (from products.toml), skipping servers that require authentication when no credentials are configured. Warns when falling back to an alternate server. Raises a clear error when all servers fail.

Parameters

ftp_path : str Server-relative path (e.g. /gnss/products/2345/FILE.gz). destination : Path Local file destination. file_info : dict, optional Extra context passed to the downloader. product_spec : ProductSpec, optional Product specification with ftp_servers list. If None, falls back to single-server download using self.ftp_server.

Returns

Path Path to the downloaded file.

Raises

RuntimeError If download fails from all available servers.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def download_with_fallback(
    self,
    ftp_path: str,
    destination: Path,
    file_info: dict | None = None,
    product_spec: Any | None = None,
) -> Path:
    """Download a file, trying all servers from the product spec in priority order.

    Iterates through the product's FTP server list (from products.toml),
    skipping servers that require authentication when no credentials are
    configured. Warns when falling back to an alternate server. Raises a
    clear error when all servers fail.

    Parameters
    ----------
    ftp_path : str
        Server-relative path (e.g. ``/gnss/products/2345/FILE.gz``).
    destination : Path
        Local file destination.
    file_info : dict, optional
        Extra context passed to the downloader.
    product_spec : ProductSpec, optional
        Product specification with ``ftp_servers`` list. If None, falls
        back to single-server download using ``self.ftp_server``.

    Returns
    -------
    Path
        Path to the downloaded file.

    Raises
    ------
    RuntimeError
        If download fails from all available servers.
    """
    if self.downloader is None:
        raise RuntimeError("No downloader is configured")

    # Fall back to legacy single-server path when no product spec
    if product_spec is None or not product_spec.ftp_servers:
        url = f"{self.ftp_server}{ftp_path}"
        return self.downloader.download(url, destination, file_info)

    # Sort servers by priority (lowest number = highest priority)
    servers = sorted(product_spec.ftp_servers, key=lambda s: s.priority)

    # Filter out auth-required servers when no credentials
    available_servers = []
    skipped_servers = []
    for server in servers:
        if server.requires_auth and self.user_email is None:
            skipped_servers.append(server)
        else:
            available_servers.append(server)

    if not available_servers:
        server_names = ", ".join(s.url for s in skipped_servers)
        raise RuntimeError(
            f"No FTP servers available for {product_spec.prefix}.\n"
            f"All servers require authentication: {server_names}\n"
            f"Set nasa_earthdata_acc_mail in config/processing.yaml to "
            f"enable NASA CDDIS access.\n"
            f"Register at: https://urs.earthdata.nasa.gov/users/new"
        )

    errors: list[str] = []
    for i, server in enumerate(available_servers):
        url = f"{server.url.rstrip('/')}{ftp_path}"
        try:
            result = self.downloader.download(url, destination, file_info)
            if i > 0:
                # We got here via fallback — log which server succeeded
                print(
                    f"  ✓ Download succeeded from fallback server: "
                    f"{server.description or server.url}"
                )
            return result
        except Exception as e:
            if self.downloader._is_network_error(e):
                raise RuntimeError(
                    "No internet connection detected. "
                    "Please check your network and try again."
                ) from e

            server_label = server.description or server.url
            error_msg = f"{server_label}: {e!s}"
            errors.append(error_msg)
            print(f"  ⚠ Download failed from {server_label}: {e!s}")

            # Warn about fallback if there are more servers to try
            if i < len(available_servers) - 1:
                next_server = available_servers[i + 1]
                next_label = next_server.description or next_server.url
                print(f"  → Trying fallback server: {next_label}")

    # All servers exhausted
    product_label = f"{product_spec.agency_code}/{product_spec.product_type}"
    error_detail = "\n".join(f"  - {e}" for e in errors)
    if skipped_servers:
        skip_detail = "\n".join(
            f"  - {s.description or s.url} (requires auth)" for s in skipped_servers
        )
        auth_hint = (
            f"\n\nSkipped servers (no credentials):\n{skip_detail}\n"
            f"Set nasa_earthdata_acc_mail in config/processing.yaml "
            f"to enable these servers."
        )
    else:
        auth_hint = ""

    raise RuntimeError(
        f"Failed to download {product_label} from all available servers.\n"
        f"\nFile: {destination.name}\n"
        f"\nServer errors:\n{error_detail}"
        f"{auth_hint}"
    )

read_file() abstractmethod

Read and parse the auxiliary file.

Returns

xr.Dataset Parsed dataset representation of the file.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
292
293
294
295
296
297
298
299
300
301
@abstractmethod
def read_file(self) -> xr.Dataset:
    """Read and parse the auxiliary file.

    Returns
    -------
    xr.Dataset
        Parsed dataset representation of the file.
    """
    pass

get_interpolation_strategy() abstractmethod

Get the interpolation strategy for this file type.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
303
304
305
306
@abstractmethod
def get_interpolation_strategy(self) -> Interpolator:
    """Get the interpolation strategy for this file type."""
    pass

check_file_exists()

Verify file exists locally or download it if needed.

Returns

Path Local file path.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def check_file_exists(self) -> Path:
    """Verify file exists locally or download it if needed.

    Returns
    -------
    Path
        Local file path.
    """
    filename = self.generate_filename_based_on_type()
    file_path = self.local_dir / filename
    if not file_path.exists():
        print(f"File {file_path} does not exist. Downloading...")
        self.download_aux_file()
    else:
        print(f"File {file_path} exists.")
    return file_path

generate_filename_based_on_type() abstractmethod

Generate the appropriate filename for this type of auxiliary file.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
334
335
336
337
@abstractmethod
def generate_filename_based_on_type(self) -> Path:
    """Generate the appropriate filename for this type of auxiliary file."""
    pass

download_aux_file() abstractmethod

Download the auxiliary file from the specified FTP server.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/core/base.py
339
340
341
342
@abstractmethod
def download_aux_file(self) -> None:
    """Download the auxiliary file from the specified FTP server."""
    pass

Interpolator

Bases: ABC

Abstract base class for interpolation strategies.

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True and uses ABC to define required interpolation hooks.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class Interpolator(ABC):
    """Abstract base class for interpolation strategies.

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True` and
    uses `ABC` to define required interpolation hooks.
    """

    config: InterpolatorConfig

    @abstractmethod
    def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
        """Interpolate dataset to match target epochs.

        Parameters
        ----------
        ds : xr.Dataset
            Source dataset with (epoch, sid) dimensions.
        target_epochs : np.ndarray
            Target epoch grid (datetime64).

        Returns
        -------
        xr.Dataset
            Interpolated dataset at target epochs.
        """
        pass

    def to_attrs(self) -> dict[str, Any]:
        """Convert interpolator to attrs-compatible dictionary."""
        return {
            "interpolator_type": self.__class__.__name__,
            "config": self.config.to_dict(),
        }

interpolate(ds, target_epochs) abstractmethod

Interpolate dataset to match target epochs.

Parameters

ds : xr.Dataset Source dataset with (epoch, sid) dimensions. target_epochs : np.ndarray Target epoch grid (datetime64).

Returns

xr.Dataset Interpolated dataset at target epochs.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
    """Interpolate dataset to match target epochs.

    Parameters
    ----------
    ds : xr.Dataset
        Source dataset with (epoch, sid) dimensions.
    target_epochs : np.ndarray
        Target epoch grid (datetime64).

    Returns
    -------
    xr.Dataset
        Interpolated dataset at target epochs.
    """
    pass

to_attrs()

Convert interpolator to attrs-compatible dictionary.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
91
92
93
94
95
96
def to_attrs(self) -> dict[str, Any]:
    """Convert interpolator to attrs-compatible dictionary."""
    return {
        "interpolator_type": self.__class__.__name__,
        "config": self.config.to_dict(),
    }

InterpolatorConfig

Base class for interpolator configuration.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
21
22
23
24
25
26
class InterpolatorConfig:
    """Base class for interpolator configuration."""

    def to_dict(self) -> dict[str, Any]:
        """Convert config to dictionary for attrs storage."""
        return dict(vars(self))

to_dict()

Convert config to dictionary for attrs storage.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
24
25
26
def to_dict(self) -> dict[str, Any]:
    """Convert config to dictionary for attrs storage."""
    return dict(vars(self))

DatasetMatcher

Match auxiliary datasets to a reference RINEX dataset temporally.

Handles temporal alignment of datasets with different sampling rates using appropriate interpolation strategies. The reference dataset (typically RINEX observations) remains unchanged while auxiliary datasets are interpolated to match its epochs.

The matcher: 1. Validates all datasets have required dimensions (epoch, sid) 2. Determines relative temporal resolutions 3. Applies appropriate interpolation: - Higher resolution aux → nearest neighbor - Lower resolution aux → specialized interpolator from metadata

Examples

from canvod.auxiliary.matching import DatasetMatcher

matcher = DatasetMatcher() matched = matcher.match_datasets( ... rinex_ds, ... ephemerides=sp3_data, ... clock=clk_data ... )

Auxiliary datasets now aligned to RINEX epochs

len(matched['ephemerides'].epoch) == len(rinex_ds.epoch) True len(matched['clock'].epoch) == len(rinex_ds.epoch) True

Notes

  • Reference dataset should be the RINEX observations
  • Auxiliary datasets should have 'interpolator_config' in attrs
  • If no interpolator config, falls back to nearest neighbor
  • Temporal distance is tracked for quality assessment
Source code in packages/canvod-auxiliary/src/canvod/auxiliary/matching/dataset_matcher.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class DatasetMatcher:
    """Match auxiliary datasets to a reference RINEX dataset temporally.

    Handles temporal alignment of datasets with different sampling rates
    using appropriate interpolation strategies. The reference dataset
    (typically RINEX observations) remains unchanged while auxiliary
    datasets are interpolated to match its epochs.

    The matcher:
    1. Validates all datasets have required dimensions (epoch, sid)
    2. Determines relative temporal resolutions
    3. Applies appropriate interpolation:
       - Higher resolution aux → nearest neighbor
       - Lower resolution aux → specialized interpolator from metadata

    Examples
    --------
    >>> from canvod.auxiliary.matching import DatasetMatcher
    >>>
    >>> matcher = DatasetMatcher()
    >>> matched = matcher.match_datasets(
    ...     rinex_ds,
    ...     ephemerides=sp3_data,
    ...     clock=clk_data
    ... )
    >>>
    >>> # Auxiliary datasets now aligned to RINEX epochs
    >>> len(matched['ephemerides'].epoch) == len(rinex_ds.epoch)
    True
    >>> len(matched['clock'].epoch) == len(rinex_ds.epoch)
    True

    Notes
    -----
    - Reference dataset should be the RINEX observations
    - Auxiliary datasets should have 'interpolator_config' in attrs
    - If no interpolator config, falls back to nearest neighbor
    - Temporal distance is tracked for quality assessment
    """

    def match_datasets(
        self, reference_ds: xr.Dataset, **aux_datasets: xr.Dataset
    ) -> dict[str, xr.Dataset]:
        """Match auxiliary datasets to reference dataset epochs.

        Parameters
        ----------
        reference_ds : xr.Dataset
            Primary dataset (usually RINEX observations) that defines
            the target epoch timeline. This dataset remains unchanged.
        **aux_datasets : dict[str, xr.Dataset]
            Named auxiliary datasets to align to reference epochs.
            Keys become the names in the returned dict.
            Example: ephemerides=sp3_data, clock=clk_data

        Returns
        -------
        dict[str, xr.Dataset]
            Dictionary of matched auxiliary datasets, all aligned to
            reference_ds.epoch. Keys match the input **aux_datasets keys.

        Raises
        ------
        ValueError
            - If no auxiliary datasets provided
            - If datasets missing required dimensions
            - If interpolation config missing (warns, doesn't raise)

        Examples
        --------
        >>> matcher = DatasetMatcher()
        >>> matched = matcher.match_datasets(
        ...     rinex_ds,
        ...     ephemerides=sp3_data,
        ...     clock=clk_data
        ... )
        >>> matched.keys()
        dict_keys(['ephemerides', 'clock'])
        """
        self._validate_inputs(reference_ds, aux_datasets)

        ref_interval = self._get_temporal_interval(reference_ds)
        matched = self._match_temporal_resolution(
            reference_ds, ref_interval, aux_datasets
        )

        return matched

    def _validate_inputs(
        self,
        reference_ds: xr.Dataset,
        aux_datasets: dict[str, xr.Dataset],
    ) -> None:
        """Validate input datasets.

        Checks:
        1. At least one auxiliary dataset provided
        2. All datasets have required dimensions (epoch, sid)
        3. Interpolation configuration available (warns if missing)

        Parameters
        ----------
        reference_ds : xr.Dataset
            Reference dataset.
        aux_datasets : dict[str, xr.Dataset]
            Auxiliary datasets.

        Raises
        ------
        ValueError
            If validation fails.
        """
        if not aux_datasets:
            raise ValueError("At least one auxiliary dataset required")

        self._validate_dimensions(reference_ds, "Reference dataset")

        for name, ds in aux_datasets.items():
            self._validate_dimensions(ds, f"Dataset '{name}'")
            self._validate_interpolation_config(ds, name)

    def _validate_dimensions(self, ds: xr.Dataset, name: str) -> None:
        """Check dataset has required dimensions.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset to validate.
        name : str
            Dataset name for error messages.

        Raises
        ------
        ValueError
            If missing 'epoch' or 'sid' dimension.
        """
        if "epoch" not in ds.dims or "sid" not in ds.dims:
            raise ValueError(f"{name} missing required dimension 'epoch' or 'sid'")

    def _validate_interpolation_config(
        self,
        ds: xr.Dataset,
        name: str,
    ) -> None:
        """Verify dataset has interpolation configuration.

        Issues warning if missing - interpolation will still work
        via fallback to nearest neighbor.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset to check.
        name : str
            Dataset name for warning message.
        """
        if "interpolator_config" not in ds.attrs:
            warnings.warn(
                f"Dataset '{name}' missing interpolation configuration. "
                "Will use nearest-neighbor interpolation.",
                UserWarning,
                stacklevel=2,
            )

    def _get_temporal_interval(self, ds: xr.Dataset) -> float:
        """Calculate temporal interval of dataset in seconds.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset with epoch dimension.

        Returns
        -------
        float
            Interval between first two epochs in seconds.
        """
        time_diff = ds["epoch"][1] - ds["epoch"][0]
        return float(time_diff.values)

    def _match_temporal_resolution(
        self,
        reference_ds: xr.Dataset,
        ref_interval: float,
        aux_datasets: dict[str, xr.Dataset],
    ) -> dict[str, xr.Dataset]:
        """Match temporal resolution of auxiliary datasets to reference.

        Strategy depends on relative sampling rates:
        - Higher resolution aux (finer sampling) → nearest neighbor
        - Lower resolution aux (coarser sampling) → specialized interpolator

        Parameters
        ----------
        reference_ds : xr.Dataset
            Reference dataset defining target epochs.
        ref_interval : float
            Reference dataset temporal interval in seconds.
        aux_datasets : dict[str, xr.Dataset]
            Auxiliary datasets to interpolate.

        Returns
        -------
        dict[str, xr.Dataset]
            Matched datasets aligned to reference epochs.
        """
        matched_datasets = {}

        for name, ds in aux_datasets.items():
            ds_interval = self._get_temporal_interval(ds)

            if ds_interval < ref_interval:
                # Higher resolution → simple nearest neighbor
                matched_datasets[name] = ds.interp(
                    epoch=reference_ds.epoch, method="nearest"
                )
            else:
                # Lower resolution → use specialized interpolator
                if "interpolator_config" in ds.attrs:
                    interpolator = create_interpolator_from_attrs(ds.attrs)
                    matched_datasets[name] = interpolator.interpolate(
                        ds, reference_ds.epoch
                    )
                else:
                    # Fallback to nearest neighbor
                    matched_datasets[name] = ds.interp(
                        epoch=reference_ds.epoch, method="nearest"
                    )

                # Track temporal distance for quality assessment
                temporal_distance = abs(matched_datasets[name]["epoch"] - ds["epoch"])
                matched_datasets[name][f"{name.lower()}_temporal_distance"] = (
                    temporal_distance
                )

        return matched_datasets

match_datasets(reference_ds, **aux_datasets)

Match auxiliary datasets to reference dataset epochs.

Parameters

reference_ds : xr.Dataset Primary dataset (usually RINEX observations) that defines the target epoch timeline. This dataset remains unchanged. **aux_datasets : dict[str, xr.Dataset] Named auxiliary datasets to align to reference epochs. Keys become the names in the returned dict. Example: ephemerides=sp3_data, clock=clk_data

Returns

dict[str, xr.Dataset] Dictionary of matched auxiliary datasets, all aligned to reference_ds.epoch. Keys match the input **aux_datasets keys.

Raises

ValueError - If no auxiliary datasets provided - If datasets missing required dimensions - If interpolation config missing (warns, doesn't raise)

Examples

matcher = DatasetMatcher() matched = matcher.match_datasets( ... rinex_ds, ... ephemerides=sp3_data, ... clock=clk_data ... ) matched.keys() dict_keys(['ephemerides', 'clock'])

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/matching/dataset_matcher.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def match_datasets(
    self, reference_ds: xr.Dataset, **aux_datasets: xr.Dataset
) -> dict[str, xr.Dataset]:
    """Match auxiliary datasets to reference dataset epochs.

    Parameters
    ----------
    reference_ds : xr.Dataset
        Primary dataset (usually RINEX observations) that defines
        the target epoch timeline. This dataset remains unchanged.
    **aux_datasets : dict[str, xr.Dataset]
        Named auxiliary datasets to align to reference epochs.
        Keys become the names in the returned dict.
        Example: ephemerides=sp3_data, clock=clk_data

    Returns
    -------
    dict[str, xr.Dataset]
        Dictionary of matched auxiliary datasets, all aligned to
        reference_ds.epoch. Keys match the input **aux_datasets keys.

    Raises
    ------
    ValueError
        - If no auxiliary datasets provided
        - If datasets missing required dimensions
        - If interpolation config missing (warns, doesn't raise)

    Examples
    --------
    >>> matcher = DatasetMatcher()
    >>> matched = matcher.match_datasets(
    ...     rinex_ds,
    ...     ephemerides=sp3_data,
    ...     clock=clk_data
    ... )
    >>> matched.keys()
    dict_keys(['ephemerides', 'clock'])
    """
    self._validate_inputs(reference_ds, aux_datasets)

    ref_interval = self._get_temporal_interval(reference_ds)
    matched = self._match_temporal_resolution(
        reference_ds, ref_interval, aux_datasets
    )

    return matched

ECEFPosition dataclass

Earth-Centered, Earth-Fixed (ECEF) position in meters.

ECEF is a Cartesian coordinate system with: - Origin at Earth's center of mass - X-axis pointing to 0° latitude, 0° longitude (Prime Meridian at Equator) - Y-axis pointing to 0° latitude, 90° East longitude - Z-axis pointing to North Pole

Parameters

x : float X coordinate in meters. y : float Y coordinate in meters. z : float Z coordinate in meters.

Examples

From RINEX dataset metadata

pos = ECEFPosition.from_ds_metadata(rinex_ds) print(f"X: {pos.x:.3f} m")

Manual creation

pos = ECEFPosition(x=4194304.123, y=176481.234, z=4780013.456) lat, lon, alt = pos.to_geodetic()

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass(frozen=True)
class ECEFPosition:
    """Earth-Centered, Earth-Fixed (ECEF) position in meters.

    ECEF is a Cartesian coordinate system with:
    - Origin at Earth's center of mass
    - X-axis pointing to 0° latitude, 0° longitude (Prime Meridian at Equator)
    - Y-axis pointing to 0° latitude, 90° East longitude
    - Z-axis pointing to North Pole

    Parameters
    ----------
    x : float
        X coordinate in meters.
    y : float
        Y coordinate in meters.
    z : float
        Z coordinate in meters.

    Examples
    --------
    >>> # From RINEX dataset metadata
    >>> pos = ECEFPosition.from_ds_metadata(rinex_ds)
    >>> print(f"X: {pos.x:.3f} m")

    >>> # Manual creation
    >>> pos = ECEFPosition(x=4194304.123, y=176481.234, z=4780013.456)
    >>> lat, lon, alt = pos.to_geodetic()
    """

    x: float  # meters
    y: float  # meters
    z: float  # meters

    def to_geodetic(self) -> tuple[float, float, float]:
        """Convert ECEF to geodetic coordinates.

        Returns
        -------
        tuple[float, float, float]
            (latitude, longitude, altitude) where:
            - latitude: degrees [-90, 90]
            - longitude: degrees [-180, 180]
            - altitude: meters above WGS84 ellipsoid
        """
        lat, lon, alt = pm.ecef2geodetic(self.x, self.y, self.z)
        return lat, lon, alt

    @classmethod
    def from_ds_metadata(cls, ds: xr.Dataset) -> Self:
        """Extract ECEF position from RINEX dataset metadata.

        Reads from standard RINEX header attributes.

        Parameters
        ----------
        ds : xr.Dataset
            RINEX dataset with position in attributes.

        Returns
        -------
        ECEFPosition
            Receiver position in ECEF.

        Raises
        ------
        KeyError
            If position attributes not found in dataset.

        Examples
        --------
        >>> from canvod.readers import Rnxv3Obs
        >>> rnx = Rnxv3Obs(fpath="station.24o")
        >>> ds = rnx.to_ds()
        >>> pos = ECEFPosition.from_ds_metadata(ds)
        """
        # Try different attribute names
        if "APPROX POSITION X" in ds.attrs:
            # Standard RINEX v3 format
            x = ds.attrs["APPROX POSITION X"]
            y = ds.attrs["APPROX POSITION Y"]
            z = ds.attrs["APPROX POSITION Z"]
        elif "Approximate Position" in ds.attrs:
            # Alternative format: "X=..., Y=..., Z=..."
            pos = ds.attrs["Approximate Position"]
            pos_parts = pos.split(",")

            def sanitize(s: str) -> float:
                return float(s.split("=")[1].strip().split()[0])

            x = sanitize(pos_parts[0])
            y = sanitize(pos_parts[1])
            z = sanitize(pos_parts[2])
        else:
            raise KeyError(
                "Position not found in dataset attributes. "
                "Expected 'APPROX POSITION X/Y/Z' or 'Approximate Position'"
            )

        return cls(x=x, y=y, z=z)

    def __repr__(self) -> str:
        return f"ECEFPosition(x={self.x:.3f}, y={self.y:.3f}, z={self.z:.3f})"

to_geodetic()

Convert ECEF to geodetic coordinates.

Returns

tuple[float, float, float] (latitude, longitude, altitude) where: - latitude: degrees [-90, 90] - longitude: degrees [-180, 180] - altitude: meters above WGS84 ellipsoid

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def to_geodetic(self) -> tuple[float, float, float]:
    """Convert ECEF to geodetic coordinates.

    Returns
    -------
    tuple[float, float, float]
        (latitude, longitude, altitude) where:
        - latitude: degrees [-90, 90]
        - longitude: degrees [-180, 180]
        - altitude: meters above WGS84 ellipsoid
    """
    lat, lon, alt = pm.ecef2geodetic(self.x, self.y, self.z)
    return lat, lon, alt

from_ds_metadata(ds) classmethod

Extract ECEF position from RINEX dataset metadata.

Reads from standard RINEX header attributes.

Parameters

ds : xr.Dataset RINEX dataset with position in attributes.

Returns

ECEFPosition Receiver position in ECEF.

Raises

KeyError If position attributes not found in dataset.

Examples

from canvod.readers import Rnxv3Obs rnx = Rnxv3Obs(fpath="station.24o") ds = rnx.to_ds() pos = ECEFPosition.from_ds_metadata(ds)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@classmethod
def from_ds_metadata(cls, ds: xr.Dataset) -> Self:
    """Extract ECEF position from RINEX dataset metadata.

    Reads from standard RINEX header attributes.

    Parameters
    ----------
    ds : xr.Dataset
        RINEX dataset with position in attributes.

    Returns
    -------
    ECEFPosition
        Receiver position in ECEF.

    Raises
    ------
    KeyError
        If position attributes not found in dataset.

    Examples
    --------
    >>> from canvod.readers import Rnxv3Obs
    >>> rnx = Rnxv3Obs(fpath="station.24o")
    >>> ds = rnx.to_ds()
    >>> pos = ECEFPosition.from_ds_metadata(ds)
    """
    # Try different attribute names
    if "APPROX POSITION X" in ds.attrs:
        # Standard RINEX v3 format
        x = ds.attrs["APPROX POSITION X"]
        y = ds.attrs["APPROX POSITION Y"]
        z = ds.attrs["APPROX POSITION Z"]
    elif "Approximate Position" in ds.attrs:
        # Alternative format: "X=..., Y=..., Z=..."
        pos = ds.attrs["Approximate Position"]
        pos_parts = pos.split(",")

        def sanitize(s: str) -> float:
            return float(s.split("=")[1].strip().split()[0])

        x = sanitize(pos_parts[0])
        y = sanitize(pos_parts[1])
        z = sanitize(pos_parts[2])
    else:
        raise KeyError(
            "Position not found in dataset attributes. "
            "Expected 'APPROX POSITION X/Y/Z' or 'Approximate Position'"
        )

    return cls(x=x, y=y, z=z)

GeodeticPosition dataclass

Geodetic (WGS84) position.

Parameters

lat : float Latitude in degrees [-90, 90]. lon : float Longitude in degrees [-180, 180]. alt : float Altitude in meters above WGS84 ellipsoid.

Examples

pos = GeodeticPosition(lat=48.208, lon=16.373, alt=200.0) print(f"Vienna: {pos.lat}°N, {pos.lon}°E, {pos.alt}m")

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@dataclass(frozen=True)
class GeodeticPosition:
    """Geodetic (WGS84) position.

    Parameters
    ----------
    lat : float
        Latitude in degrees [-90, 90].
    lon : float
        Longitude in degrees [-180, 180].
    alt : float
        Altitude in meters above WGS84 ellipsoid.

    Examples
    --------
    >>> pos = GeodeticPosition(lat=48.208, lon=16.373, alt=200.0)
    >>> print(f"Vienna: {pos.lat}°N, {pos.lon}°E, {pos.alt}m")
    """

    lat: float  # degrees
    lon: float  # degrees
    alt: float  # meters

    def to_ecef(self) -> ECEFPosition:
        """Convert geodetic to ECEF coordinates.

        Returns
        -------
        ECEFPosition
            Position in ECEF frame.
        """
        x, y, z = pm.geodetic2ecef(self.lat, self.lon, self.alt)
        return ECEFPosition(x=x, y=y, z=z)

    def __repr__(self) -> str:
        return (
            f"GeodeticPosition(lat={self.lat:.6f}°, "
            f"lon={self.lon:.6f}°, alt={self.alt:.1f}m)"
        )

to_ecef()

Convert geodetic to ECEF coordinates.

Returns

ECEFPosition Position in ECEF frame.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
143
144
145
146
147
148
149
150
151
152
def to_ecef(self) -> ECEFPosition:
    """Convert geodetic to ECEF coordinates.

    Returns
    -------
    ECEFPosition
        Position in ECEF frame.
    """
    x, y, z = pm.geodetic2ecef(self.lat, self.lon, self.alt)
    return ECEFPosition(x=x, y=y, z=z)

Preprocessing

Preprocessing utilities for auxiliary GNSS data.

Handles conversion of raw auxiliary data (SP3, CLK) from satellite vehicle (sv) dimension to signal ID (sid) dimension required for matching with RINEX data.

Matches gnssvodpy.icechunk_manager.preprocessing.IcechunkPreprocessor exactly.

flush_sid_accumulators()

Return accumulated SID issues and clear the module-level accumulators.

Called once per RINEX file at the end of preprocess_with_hermite_aux. The returned dict is aggregated by the main process across all files for a receiver and logged once per receiver run.

Returns

dict[str, list[str]] Keys: "not_in_global_space", "dropped_by_filter".

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def flush_sid_accumulators() -> dict[str, list[str]]:
    """Return accumulated SID issues and clear the module-level accumulators.

    Called once per RINEX file at the end of ``preprocess_with_hermite_aux``.
    The returned dict is aggregated by the main process across all files for
    a receiver and logged once per receiver run.

    Returns
    -------
    dict[str, list[str]]
        Keys: ``"not_in_global_space"``, ``"dropped_by_filter"``.
    """
    global _accumulated_not_in_global_space, _accumulated_dropped_by_filter
    result: dict[str, list[str]] = {
        "not_in_global_space": sorted(_accumulated_not_in_global_space),
        "dropped_by_filter": sorted(_accumulated_dropped_by_filter),
    }
    _accumulated_not_in_global_space = set()
    _accumulated_dropped_by_filter = set()
    return result

create_sv_to_sid_mapping(svs, aggregate_glonass_fdma=True)

Build mapping from each SV to its possible SIDs.

Builds all SIDs from known band/code combinations.

Parameters

svs : list[str] List of space vehicles (e.g., ["G01", "E02"]). aggregate_glonass_fdma : bool, default True Whether to aggregate GLONASS FDMA bands.

Returns

dict[str, list[str]] Mapping from sv → list of SIDs.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def create_sv_to_sid_mapping(
    svs: list[str], aggregate_glonass_fdma: bool = True
) -> dict[str, list[str]]:
    """Build mapping from each SV to its possible SIDs.

    Builds all SIDs from known band/code combinations.

    Parameters
    ----------
    svs : list[str]
        List of space vehicles (e.g., ["G01", "E02"]).
    aggregate_glonass_fdma : bool, default True
        Whether to aggregate GLONASS FDMA bands.

    Returns
    -------
    dict[str, list[str]]
        Mapping from sv → list of SIDs.
    """
    mapper = SignalIDMapper(aggregate_glonass_fdma=aggregate_glonass_fdma)
    systems = {
        "G": GPS(),
        "E": GALILEO(),
        "R": GLONASS(aggregate_fdma=aggregate_glonass_fdma),
        "C": BEIDOU(),
        "I": IRNSS(),
        "S": SBAS(),
        "J": QZSS(),
    }

    sv_to_sids: dict[str, list[str]] = {}
    for sv in svs:
        sys_letter = sv[0]
        if sys_letter not in systems:
            continue

        system = systems[sys_letter]
        sids = []
        if sys_letter in mapper.SYSTEM_BANDS:
            for _, band in mapper.SYSTEM_BANDS[sys_letter].items():
                codes = system.BAND_CODES.get(band, ["X"])
                sids.extend(f"{sv}|{band}|{code}" for code in codes)

        sv_to_sids[sv] = sorted(sids)

    return sv_to_sids

map_aux_sv_to_sid(aux_ds, fill_value=np.nan, aggregate_glonass_fdma=True)

Transform auxiliary dataset from sv → sid dimension.

Each sv in the dataset is expanded to all its possible SIDs. Values are replicated across SIDs for the same satellite.

Parameters

aux_ds : xr.Dataset Dataset with 'sv' dimension. fill_value : float, default np.nan Fill value for missing entries. aggregate_glonass_fdma : bool, default True Whether to aggregate GLONASS FDMA bands.

Returns

xr.Dataset Dataset with 'sid' dimension replacing 'sv'.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def map_aux_sv_to_sid(
    aux_ds: xr.Dataset,
    fill_value: float = np.nan,
    aggregate_glonass_fdma: bool = True,
) -> xr.Dataset:
    """Transform auxiliary dataset from sv → sid dimension.

    Each sv in the dataset is expanded to all its possible SIDs.
    Values are replicated across SIDs for the same satellite.

    Parameters
    ----------
    aux_ds : xr.Dataset
        Dataset with 'sv' dimension.
    fill_value : float, default np.nan
        Fill value for missing entries.
    aggregate_glonass_fdma : bool, default True
        Whether to aggregate GLONASS FDMA bands.

    Returns
    -------
    xr.Dataset
        Dataset with 'sid' dimension replacing 'sv'.
    """
    svs = aux_ds["sv"].values.tolist()
    sv_to_sids = create_sv_to_sid_mapping(svs, aggregate_glonass_fdma)
    all_sids = sorted({sid for sv in svs for sid in sv_to_sids.get(sv, [])})

    new_data_vars = {}
    for name, arr in aux_ds.data_vars.items():
        if "sv" in arr.dims:
            sv_dim = arr.dims.index("sv")
            new_shape = list(arr.shape)
            new_shape[sv_dim] = len(all_sids)
            expanded = np.full(new_shape, fill_value, dtype=arr.dtype)

            for sv_idx, sv in enumerate(svs):
                for sid in sv_to_sids.get(sv, []):
                    sid_idx = all_sids.index(sid)
                    if sv_dim == 0:
                        expanded[sid_idx, ...] = arr.values[sv_idx, ...]
                    elif sv_dim == 1:
                        expanded[..., sid_idx] = arr.values[..., sv_idx]
                    else:
                        idx_new = tuple(
                            sid_idx if i == sv_dim else slice(None)
                            for i in range(len(new_shape))
                        )
                        idx_old = tuple(
                            sv_idx if i == sv_dim else slice(None)
                            for i in range(len(arr.shape))
                        )
                        expanded[idx_new] = arr.values[idx_old]

            new_dims = list(arr.dims)
            new_dims[sv_dim] = "sid"
            new_data_vars[name] = (tuple(new_dims), expanded, arr.attrs)
        else:
            new_data_vars[name] = arr

    # Coordinates
    new_coords = {
        **{k: v for k, v in aux_ds.coords.items() if k != "sv"},
        "sid": ("sid", all_sids),
    }

    return xr.Dataset(new_data_vars, coords=new_coords, attrs=aux_ds.attrs.copy())

pad_to_global_sid(ds, keep_sids=None, aggregate_glonass_fdma=True)

Pad dataset so it has all possible SIDs across all constellations. Ensures consistent sid dimension for appending to Icechunk.

Parameters

ds : xr.Dataset Dataset with 'sid' dimension. keep_sids : list[str] | None Optional list of specific SIDs to keep. If None, keeps all. aggregate_glonass_fdma : bool, default True Whether to aggregate GLONASS FDMA bands.

Returns

xr.Dataset Dataset padded with NaN for missing SIDs.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def pad_to_global_sid(
    ds: xr.Dataset,
    keep_sids: list[str] | None = None,
    aggregate_glonass_fdma: bool = True,
) -> xr.Dataset:
    """Pad dataset so it has all possible SIDs across all constellations.
    Ensures consistent sid dimension for appending to Icechunk.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with 'sid' dimension.
    keep_sids : list[str] | None
        Optional list of specific SIDs to keep. If None, keeps all.
    aggregate_glonass_fdma : bool, default True
        Whether to aggregate GLONASS FDMA bands.

    Returns
    -------
    xr.Dataset
        Dataset padded with NaN for missing SIDs.
    """
    mapper = SignalIDMapper(aggregate_glonass_fdma=aggregate_glonass_fdma)
    systems = {
        "G": GPS(),
        "E": GALILEO(),
        "R": GLONASS(aggregate_fdma=aggregate_glonass_fdma),
        "C": BEIDOU(),
        "I": IRNSS(),
        "S": SBAS(),
        "J": QZSS(),
    }

    # Generate all possible SIDs
    sids = [
        f"{sv}|{band}|{code}"
        for sys_letter, bands in mapper.SYSTEM_BANDS.items()
        for _, band in bands.items()
        for sv in systems[sys_letter].svs
        for code in systems[sys_letter].BAND_CODES.get(band, ["X"])
    ]
    sids = sorted(sids)
    global_sid_set = set(sids)

    # Accumulate SIDs that fall outside the constellation model's universe.
    # These are logged once per receiver run by the pipeline (not per file).
    if "sid" in ds.coords:
        ds_sids = set(ds.sid.values)
        unknown_sids = ds_sids - global_sid_set
        if unknown_sids:
            _accumulated_not_in_global_space.update(unknown_sids)

    # Filter to keep_sids if provided
    if keep_sids is not None and len(keep_sids) > 0:
        keep_set = set(keep_sids)
        sids_before = set(sids)
        sids = sorted(sids_before.intersection(keep_set))

        # Accumulate observed SIDs dropped by keep_sids filter.
        # Logged once per receiver run by the pipeline (not per file).
        if "sid" in ds.coords:
            ds_sids = set(ds.sid.values)
            dropped_by_filter = (ds_sids & sids_before) - keep_set
            if dropped_by_filter:
                _accumulated_dropped_by_filter.update(dropped_by_filter)

    ds_padded = ds.reindex({"sid": np.array(sids, dtype=object)}, fill_value=np.nan)
    return _fill_sid_coords_from_sid_strings(ds_padded, mapper)

normalize_sid_dtype(ds)

normalize_sid_dtype(ds: xr.Dataset) -> xr.Dataset
normalize_sid_dtype(ds: None) -> None

Ensure sid coordinate uses object dtype.

Parameters

ds : xr.Dataset Dataset with 'sid' coordinate.

Returns

xr.Dataset Dataset with sid as object dtype.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def normalize_sid_dtype(ds: xr.Dataset | None) -> xr.Dataset | None:
    """Ensure sid coordinate uses object dtype.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with 'sid' coordinate.

    Returns
    -------
    xr.Dataset
        Dataset with sid as object dtype.
    """
    if ds is None:
        return ds
    if "sid" in ds.coords and ds.sid.dtype.kind == "U":
        ds = ds.assign_coords(
            sid=xr.Variable("sid", ds.sid.values.astype(object), ds.sid.attrs)
        )
    return ds

strip_fillvalue(ds)

strip_fillvalue(ds: xr.Dataset) -> xr.Dataset
strip_fillvalue(ds: None) -> None

Remove _FillValue attrs/encodings.

Parameters

ds : xr.Dataset Dataset to clean.

Returns

xr.Dataset Dataset with _FillValue attributes removed.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def strip_fillvalue(ds: xr.Dataset | None) -> xr.Dataset | None:
    """Remove _FillValue attrs/encodings.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset to clean.

    Returns
    -------
    xr.Dataset
        Dataset with _FillValue attributes removed.
    """
    if ds is None:
        return ds
    for v in ds.data_vars:
        ds[v].attrs.pop("_FillValue", None)
        ds[v].encoding.pop("_FillValue", None)
    return ds

add_future_datavars(ds, var_config)

Add placeholder data variables from a configuration dictionary.

Parameters

ds : xr.Dataset Dataset to add variables to. var_config : dict[str, dict[str, Any]] Configuration dict with structure: { "var_name": { "fill_value": value, "dtype": numpy dtype, "attrs": {attribute dict} } }

Returns

xr.Dataset Dataset with new variables added.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def add_future_datavars(
    ds: xr.Dataset, var_config: dict[str, dict[str, Any]]
) -> xr.Dataset:
    """Add placeholder data variables from a configuration dictionary.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset to add variables to.
    var_config : dict[str, dict[str, Any]]
        Configuration dict with structure:
        {
            "var_name": {
                "fill_value": value,
                "dtype": numpy dtype,
                "attrs": {attribute dict}
            }
        }

    Returns
    -------
    xr.Dataset
        Dataset with new variables added.
    """
    n_epochs, n_sids = ds.sizes["epoch"], ds.sizes["sid"]
    for name, cfg in var_config.items():
        if name not in ds:
            arr = np.full((n_epochs, n_sids), cfg["fill_value"], dtype=cfg["dtype"])
            ds[name] = (("epoch", "sid"), arr, cfg["attrs"])
    return ds

prep_aux_ds(aux_ds, fill_value=np.nan, aggregate_glonass_fdma=True, keep_sids=None)

Preprocess auxiliary dataset before writing to Icechunk.

Performs complete 4-step preprocessing: 1. Convert sv → sid dimension 2. Pad to global sid list (all constellations) or filter to keep_sids 3. Normalize sid dtype to object 4. Strip _FillValue attributes

This matches gnssvodpy.icechunk_manager.preprocessing.IcechunkPreprocessor.prep_aux_ds().

Parameters

aux_ds : xr.Dataset Dataset with 'sv' dimension. fill_value : float, default np.nan Fill value for missing entries. aggregate_glonass_fdma : bool, default True Whether to aggregate GLONASS FDMA bands. keep_sids : list[str] | None, default None List of specific SIDs to keep. If None, keeps all possible SIDs.

Returns

xr.Dataset Fully preprocessed dataset ready for Icechunk or interpolation.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def prep_aux_ds(
    aux_ds: xr.Dataset,
    fill_value: float = np.nan,
    aggregate_glonass_fdma: bool = True,
    keep_sids: list[str] | None = None,
) -> xr.Dataset:
    """Preprocess auxiliary dataset before writing to Icechunk.

    Performs complete 4-step preprocessing:
    1. Convert sv → sid dimension
    2. Pad to global sid list (all constellations) or filter to keep_sids
    3. Normalize sid dtype to object
    4. Strip _FillValue attributes

    This matches
    gnssvodpy.icechunk_manager.preprocessing.IcechunkPreprocessor.prep_aux_ds().

    Parameters
    ----------
    aux_ds : xr.Dataset
        Dataset with 'sv' dimension.
    fill_value : float, default np.nan
        Fill value for missing entries.
    aggregate_glonass_fdma : bool, default True
        Whether to aggregate GLONASS FDMA bands.
    keep_sids : list[str] | None, default None
        List of specific SIDs to keep. If None, keeps all possible SIDs.

    Returns
    -------
    xr.Dataset
        Fully preprocessed dataset ready for Icechunk or interpolation.
    """
    ds = map_aux_sv_to_sid(aux_ds, fill_value, aggregate_glonass_fdma)
    ds = pad_to_global_sid(
        ds, keep_sids=keep_sids, aggregate_glonass_fdma=aggregate_glonass_fdma
    )
    ds = normalize_sid_dtype(ds)
    ds = strip_fillvalue(ds)
    return ds

preprocess_aux_for_interpolation(aux_ds, fill_value=np.nan, full_preprocessing=False, aggregate_glonass_fdma=True)

Preprocess auxiliary dataset before interpolation.

Converts satellite vehicle (sv) dimension to Signal ID (sid) dimension, which is required for matching with RINEX observations after interpolation.

Parameters

aux_ds : xr.Dataset Raw auxiliary dataset with 'sv' dimension. fill_value : float, default np.nan Fill value for missing entries. full_preprocessing : bool, default False If True, applies full 4-step preprocessing (pad_to_global_sid, normalize_sid_dtype, strip_fillvalue). If False, only converts sv → sid (sufficient for interpolation). aggregate_glonass_fdma : bool, default True Whether to aggregate GLONASS FDMA bands.

Returns

xr.Dataset Preprocessed dataset with 'sid' dimension.

Notes

This must be called BEFORE interpolation. The workflow is: 1. Load raw SP3/CLK data (sv dimension) 2. Convert sv → sid (this function) 3. Interpolate to target epochs 4. Match with RINEX data (sid dimension)

For most interpolation use cases, full_preprocessing=False is sufficient. Use full_preprocessing=True when preparing data for Icechunk storage.

Examples

Load raw SP3 data

sp3_data = Sp3File(...).to_dataset() sp3_data.dims

Preprocess before interpolation (minimal)

sp3_preprocessed = preprocess_aux_for_interpolation(sp3_data) sp3_preprocessed.dims

Preprocess before Icechunk (full)

sp3_preprocessed = preprocess_aux_for_interpolation( ... sp3_data, ... full_preprocessing=True, ... ) sp3_preprocessed.dims {'epoch': 96, 'sid': ~2000} # Padded to all possible sids

Now interpolate

sp3_interp = interpolator.interpolate(sp3_preprocessed, target_epochs)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/preprocessing.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def preprocess_aux_for_interpolation(
    aux_ds: xr.Dataset,
    fill_value: float = np.nan,
    full_preprocessing: bool = False,
    aggregate_glonass_fdma: bool = True,
) -> xr.Dataset:
    """Preprocess auxiliary dataset before interpolation.

    Converts satellite vehicle (sv) dimension to Signal ID (sid) dimension,
    which is required for matching with RINEX observations after interpolation.

    Parameters
    ----------
    aux_ds : xr.Dataset
        Raw auxiliary dataset with 'sv' dimension.
    fill_value : float, default np.nan
        Fill value for missing entries.
    full_preprocessing : bool, default False
        If True, applies full 4-step preprocessing (pad_to_global_sid,
        normalize_sid_dtype, strip_fillvalue). If False, only converts
        sv → sid (sufficient for interpolation).
    aggregate_glonass_fdma : bool, default True
        Whether to aggregate GLONASS FDMA bands.

    Returns
    -------
    xr.Dataset
        Preprocessed dataset with 'sid' dimension.

    Notes
    -----
    This must be called BEFORE interpolation. The workflow is:
    1. Load raw SP3/CLK data (sv dimension)
    2. Convert sv → sid (this function)
    3. Interpolate to target epochs
    4. Match with RINEX data (sid dimension)

    For most interpolation use cases, `full_preprocessing=False` is sufficient.
    Use `full_preprocessing=True` when preparing data for Icechunk storage.

    Examples
    --------
    >>> # Load raw SP3 data
    >>> sp3_data = Sp3File(...).to_dataset()
    >>> sp3_data.dims
    {'epoch': 96, 'sv': 32}
    >>>
    >>> # Preprocess before interpolation (minimal)
    >>> sp3_preprocessed = preprocess_aux_for_interpolation(sp3_data)
    >>> sp3_preprocessed.dims
    {'epoch': 96, 'sid': 384}
    >>>
    >>> # Preprocess before Icechunk (full)
    >>> sp3_preprocessed = preprocess_aux_for_interpolation(
    ...     sp3_data,
    ...     full_preprocessing=True,
    ... )
    >>> sp3_preprocessed.dims
    {'epoch': 96, 'sid': ~2000}  # Padded to all possible sids
    >>>
    >>> # Now interpolate
    >>> sp3_interp = interpolator.interpolate(sp3_preprocessed, target_epochs)
    """
    if full_preprocessing:
        return prep_aux_ds(aux_ds, fill_value, aggregate_glonass_fdma)
    else:
        return map_aux_sv_to_sid(aux_ds, fill_value, aggregate_glonass_fdma)

Interpolation

Interpolation strategies for GNSS auxiliary data.

ClockConfig

Bases: InterpolatorConfig

Configuration for clock correction interpolation.

Attributes

window_size : int, default 9 Window size for discontinuity detection. jump_threshold : float, default 1e-6 Threshold for detecting clock jumps (seconds).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclass
class ClockConfig(InterpolatorConfig):
    """Configuration for clock correction interpolation.

    Attributes
    ----------
    window_size : int, default 9
        Window size for discontinuity detection.
    jump_threshold : float, default 1e-6
        Threshold for detecting clock jumps (seconds).
    """

    window_size: int = 9
    jump_threshold: float = 1e-6

ClockInterpolationStrategy

Bases: Interpolator

Piecewise linear interpolation for clock corrections.

Detects and handles discontinuities (clock jumps) properly.

Examples

from canvod.auxiliary.interpolation import ( ... ClockInterpolationStrategy, ClockConfig, ... )

config = ClockConfig(window_size=9, jump_threshold=1e-6) interpolator = ClockInterpolationStrategy(config=config)

Interpolate to RINEX epochs

clk_interp = interpolator.interpolate(clk_data, rinex_epochs)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class ClockInterpolationStrategy(Interpolator):
    """Piecewise linear interpolation for clock corrections.

    Detects and handles discontinuities (clock jumps) properly.

    Examples
    --------
    >>> from canvod.auxiliary.interpolation import (
    ...     ClockInterpolationStrategy, ClockConfig,
    ... )
    >>>
    >>> config = ClockConfig(window_size=9, jump_threshold=1e-6)
    >>> interpolator = ClockInterpolationStrategy(config=config)
    >>>
    >>> # Interpolate to RINEX epochs
    >>> clk_interp = interpolator.interpolate(clk_data, rinex_epochs)
    """

    config: ClockConfig

    def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
        """Interpolate clock corrections with discontinuity handling."""
        # Determine satellite dimension (sv or sid)
        sat_dim = "sv" if "sv" in ds.dims else "sid"
        sat_values = ds[sat_dim].values

        result_ds = xr.Dataset(coords={"epoch": target_epochs, sat_dim: sat_values})

        # Convert epochs to seconds
        t_source = (
            (ds["epoch"] - ds["epoch"].values[0])
            .values.astype("timedelta64[s]")
            .astype(float)
        )

        t_target = (
            (target_epochs - ds["epoch"].values[0])
            .astype("timedelta64[s]")
            .astype(float)
        )

        # Find clock variables
        clock_vars = [
            var
            for var in ds.data_vars
            if isinstance(var, str)
            and any(c in var for c in ["clock", "clk", "Clock", "CLK", "clock_offset"])
        ]

        if not clock_vars:
            raise ValueError("No clock variables found in dataset")

        # Process each clock variable
        for var in clock_vars:
            data = ds[var].values
            output = np.full((len(target_epochs), len(sat_values)), np.nan)

            # Parallel processing per satellite
            with ThreadPoolExecutor() as executor:
                futures = []
                for sat_idx in range(len(sat_values)):
                    futures.append(
                        executor.submit(
                            self._interpolate_sat_clock,
                            data[:, sat_idx],
                            t_source,
                            t_target,
                            self.config.jump_threshold,
                        )
                    )

                # Collect results
                for sat_idx, future in enumerate(futures):
                    output[:, sat_idx] = future.result()

            result_ds[var] = (("epoch", sat_dim), output)
            if var in ds:
                result_ds[var].attrs = ds[var].attrs

        return result_ds

    def _interpolate_sat_clock(
        self,
        data: np.ndarray,
        t_source: np.ndarray,
        t_target: np.ndarray,
        threshold: float,
    ) -> np.ndarray:
        """Interpolate clock data for a single satellite.

        Handles discontinuities by splitting into segments.

        Parameters
        ----------
        data : np.ndarray
            Source clock data for one satellite.
        t_source : np.ndarray
            Source epochs in seconds from start.
        t_target : np.ndarray
            Target epochs in seconds from start.
        threshold : float
            Jump threshold for discontinuity detection.

        Returns
        -------
        np.ndarray
            Interpolated values aligned to `t_target`.
        """
        output = np.full_like(t_target, np.nan)

        # Skip if all data is NaN
        if np.all(np.isnan(data)):
            return output

        # Find valid data points
        valid_mask = ~np.isnan(data)
        if not np.any(valid_mask):
            return output

        valid_data = data[valid_mask]
        valid_time = t_source[valid_mask]

        # Detect discontinuities (clock jumps)
        jumps = np.where(np.abs(np.diff(valid_data)) > threshold)[0]
        segments = np.split(np.arange(len(valid_time)), jumps + 1)

        # Interpolate each continuous segment
        for seg in segments:
            if len(seg) < 2:
                continue

            seg_time = valid_time[seg]
            seg_data = valid_data[seg]

            # Find target points in this segment
            mask = (t_target >= seg_time[0]) & (t_target <= seg_time[-1])
            if not np.any(mask):
                continue

            # Linear interpolation within segment
            interpolator = interp1d(
                seg_time, seg_data, bounds_error=False, fill_value=np.nan
            )
            output[mask] = interpolator(t_target[mask])

        return output

interpolate(ds, target_epochs)

Interpolate clock corrections with discontinuity handling.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
    """Interpolate clock corrections with discontinuity handling."""
    # Determine satellite dimension (sv or sid)
    sat_dim = "sv" if "sv" in ds.dims else "sid"
    sat_values = ds[sat_dim].values

    result_ds = xr.Dataset(coords={"epoch": target_epochs, sat_dim: sat_values})

    # Convert epochs to seconds
    t_source = (
        (ds["epoch"] - ds["epoch"].values[0])
        .values.astype("timedelta64[s]")
        .astype(float)
    )

    t_target = (
        (target_epochs - ds["epoch"].values[0])
        .astype("timedelta64[s]")
        .astype(float)
    )

    # Find clock variables
    clock_vars = [
        var
        for var in ds.data_vars
        if isinstance(var, str)
        and any(c in var for c in ["clock", "clk", "Clock", "CLK", "clock_offset"])
    ]

    if not clock_vars:
        raise ValueError("No clock variables found in dataset")

    # Process each clock variable
    for var in clock_vars:
        data = ds[var].values
        output = np.full((len(target_epochs), len(sat_values)), np.nan)

        # Parallel processing per satellite
        with ThreadPoolExecutor() as executor:
            futures = []
            for sat_idx in range(len(sat_values)):
                futures.append(
                    executor.submit(
                        self._interpolate_sat_clock,
                        data[:, sat_idx],
                        t_source,
                        t_target,
                        self.config.jump_threshold,
                    )
                )

            # Collect results
            for sat_idx, future in enumerate(futures):
                output[:, sat_idx] = future.result()

        result_ds[var] = (("epoch", sat_dim), output)
        if var in ds:
            result_ds[var].attrs = ds[var].attrs

    return result_ds

Interpolator

Bases: ABC

Abstract base class for interpolation strategies.

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True and uses ABC to define required interpolation hooks.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class Interpolator(ABC):
    """Abstract base class for interpolation strategies.

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True` and
    uses `ABC` to define required interpolation hooks.
    """

    config: InterpolatorConfig

    @abstractmethod
    def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
        """Interpolate dataset to match target epochs.

        Parameters
        ----------
        ds : xr.Dataset
            Source dataset with (epoch, sid) dimensions.
        target_epochs : np.ndarray
            Target epoch grid (datetime64).

        Returns
        -------
        xr.Dataset
            Interpolated dataset at target epochs.
        """
        pass

    def to_attrs(self) -> dict[str, Any]:
        """Convert interpolator to attrs-compatible dictionary."""
        return {
            "interpolator_type": self.__class__.__name__,
            "config": self.config.to_dict(),
        }

interpolate(ds, target_epochs) abstractmethod

Interpolate dataset to match target epochs.

Parameters

ds : xr.Dataset Source dataset with (epoch, sid) dimensions. target_epochs : np.ndarray Target epoch grid (datetime64).

Returns

xr.Dataset Interpolated dataset at target epochs.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
    """Interpolate dataset to match target epochs.

    Parameters
    ----------
    ds : xr.Dataset
        Source dataset with (epoch, sid) dimensions.
    target_epochs : np.ndarray
        Target epoch grid (datetime64).

    Returns
    -------
    xr.Dataset
        Interpolated dataset at target epochs.
    """
    pass

to_attrs()

Convert interpolator to attrs-compatible dictionary.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
91
92
93
94
95
96
def to_attrs(self) -> dict[str, Any]:
    """Convert interpolator to attrs-compatible dictionary."""
    return {
        "interpolator_type": self.__class__.__name__,
        "config": self.config.to_dict(),
    }

InterpolatorConfig

Base class for interpolator configuration.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
21
22
23
24
25
26
class InterpolatorConfig:
    """Base class for interpolator configuration."""

    def to_dict(self) -> dict[str, Any]:
        """Convert config to dictionary for attrs storage."""
        return dict(vars(self))

to_dict()

Convert config to dictionary for attrs storage.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
24
25
26
def to_dict(self) -> dict[str, Any]:
    """Convert config to dictionary for attrs storage."""
    return dict(vars(self))

Sp3Config

Bases: InterpolatorConfig

Configuration for SP3 ephemeris interpolation.

Attributes

use_velocities : bool, default True Use Hermite splines with satellite velocities if available. fallback_method : str, default 'linear' Interpolation method when velocities are unavailable.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class Sp3Config(InterpolatorConfig):
    """Configuration for SP3 ephemeris interpolation.

    Attributes
    ----------
    use_velocities : bool, default True
        Use Hermite splines with satellite velocities if available.
    fallback_method : str, default 'linear'
        Interpolation method when velocities are unavailable.
    """

    use_velocities: bool = True
    fallback_method: str = "linear"

Sp3InterpolationStrategy

Bases: Interpolator

Hermite cubic spline interpolation for SP3 ephemeris data.

Uses satellite velocities (Vx, Vy, Vz) for higher accuracy. Falls back to linear interpolation if velocities unavailable.

Examples

from canvod.auxiliary.interpolation import Sp3InterpolationStrategy, Sp3Config

config = Sp3Config(use_velocities=True, fallback_method='linear') interpolator = Sp3InterpolationStrategy(config=config)

Interpolate to RINEX epochs

sp3_interp = interpolator.interpolate(sp3_data, rinex_epochs)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class Sp3InterpolationStrategy(Interpolator):
    """Hermite cubic spline interpolation for SP3 ephemeris data.

    Uses satellite velocities (Vx, Vy, Vz) for higher accuracy.
    Falls back to linear interpolation if velocities unavailable.

    Examples
    --------
    >>> from canvod.auxiliary.interpolation import Sp3InterpolationStrategy, Sp3Config
    >>>
    >>> config = Sp3Config(use_velocities=True, fallback_method='linear')
    >>> interpolator = Sp3InterpolationStrategy(config=config)
    >>>
    >>> # Interpolate to RINEX epochs
    >>> sp3_interp = interpolator.interpolate(sp3_data, rinex_epochs)
    """

    config: Sp3Config

    def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
        """Interpolate SP3 ephemeris to target epochs."""
        if self.config.use_velocities and all(v in ds for v in ["Vx", "Vy", "Vz"]):
            return self._interpolate_with_velocities(ds, target_epochs)
        return self._interpolate_positions_only(ds, target_epochs)

    def _interpolate_with_velocities(
        self, ds: xr.Dataset, target_epochs: np.ndarray
    ) -> xr.Dataset:
        """Hermite interpolation using satellite velocities."""
        # Determine satellite dimension (sv or sid)
        sat_dim = "sv" if "sv" in ds.dims else "sid"
        sat_values = ds[sat_dim].values

        result_ds = xr.Dataset(coords={"epoch": target_epochs, sat_dim: sat_values})

        # Convert epochs to seconds (relative to first epoch)
        t_source = (
            (ds["epoch"] - ds["epoch"].values[0])
            .values.astype("timedelta64[s]")
            .astype(float)
        )

        t_target = (
            (target_epochs - ds["epoch"].values[0])
            .astype("timedelta64[s]")
            .astype(float)
        )

        # Pre-allocate output arrays
        n_targets = len(target_epochs)
        n_sats = len(sat_values)
        coords = {
            "X": np.empty((n_targets, n_sats)),
            "Y": np.empty((n_targets, n_sats)),
            "Z": np.empty((n_targets, n_sats)),
        }

        vels = {
            "Vx": np.empty((n_targets, n_sats)),
            "Vy": np.empty((n_targets, n_sats)),
            "Vz": np.empty((n_targets, n_sats)),
        }

        # Parallel interpolation per satellite
        with ThreadPoolExecutor() as executor:
            futures = []
            for i, sat in enumerate(sat_values):
                futures.append(
                    executor.submit(
                        self._interpolate_sat,
                        ds,
                        sat,
                        sat_dim,
                        t_source,
                        t_target,
                        i,
                        coords,
                        vels,
                    )
                )

            # Wait for completion
            for future in futures:
                future.result()

        # Assign results to dataset
        for coord, data in coords.items():
            result_ds[coord] = (("epoch", sat_dim), data)
            if coord in ds:
                result_ds[coord].attrs = ds[coord].attrs

        for vel, data in vels.items():
            result_ds[vel] = (("epoch", sat_dim), data)
            if vel in ds:
                result_ds[vel].attrs = ds[vel].attrs

        return result_ds

    def _interpolate_sat(
        self,
        ds: xr.Dataset,
        sat: str,
        sat_dim: str,
        t_source: np.ndarray,
        t_target: np.ndarray,
        idx: int,
        coords: dict[str, np.ndarray],
        vels: dict[str, np.ndarray],
    ) -> None:
        """Interpolate a single satellite using Hermite splines.

        Parameters
        ----------
        ds : xr.Dataset
            Source dataset.
        sat : str
            Satellite identifier.
        sat_dim : str
            Dimension name for satellites ("sv" or "sid").
        t_source : np.ndarray
            Source epochs in seconds from start.
        t_target : np.ndarray
            Target epochs in seconds from start.
        idx : int
            Output index for this satellite.
        coords : dict[str, np.ndarray]
            Output position arrays to fill.
        vels : dict[str, np.ndarray]
            Output velocity arrays to fill.
        """
        for coord, vel in [("X", "Vx"), ("Y", "Vy"), ("Z", "Vz")]:
            # Select by satellite dimension name
            pos = ds[coord].sel({sat_dim: sat}).values
            vel_data = ds[vel].sel({sat_dim: sat}).values

            # Skip if data is all NaN
            if not np.isfinite(pos).any() or not np.isfinite(vel_data).any():
                coords[coord][:, idx] = np.nan
                if vels is not None:
                    vels[vel][:, idx] = np.nan
                continue

            # Hermite cubic spline interpolation
            interpolator = CubicHermiteSpline(t_source, pos, vel_data)
            coords[coord][:, idx] = interpolator(t_target)
            if vels is not None:
                vels[vel][:, idx] = interpolator.derivative()(t_target)

    def _interpolate_positions_only(
        self, ds: xr.Dataset, target_epochs: np.ndarray
    ) -> xr.Dataset:
        """Fallback: simple linear interpolation for positions only."""
        return ds.interp(
            epoch=target_epochs,
            method=self.config.fallback_method,  # ty: ignore[invalid-argument-type]
        )

interpolate(ds, target_epochs)

Interpolate SP3 ephemeris to target epochs.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
119
120
121
122
123
def interpolate(self, ds: xr.Dataset, target_epochs: np.ndarray) -> xr.Dataset:
    """Interpolate SP3 ephemeris to target epochs."""
    if self.config.use_velocities and all(v in ds for v in ["Vx", "Vy", "Vz"]):
        return self._interpolate_with_velocities(ds, target_epochs)
    return self._interpolate_positions_only(ds, target_epochs)

create_interpolator_from_attrs(attrs)

Recreate interpolator instance from dataset attributes.

Parameters

attrs : dict Dataset attributes containing interpolator_config.

Returns

Interpolator Reconstructed interpolator instance.

Examples

Save interpolator config in dataset

ds.attrs['interpolator_config'] = interpolator.to_attrs()

Later, recreate interpolator

interpolator = create_interpolator_from_attrs(ds.attrs)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/interpolation/interpolator.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def create_interpolator_from_attrs(attrs: dict[str, Any]) -> Interpolator:
    """Recreate interpolator instance from dataset attributes.

    Parameters
    ----------
    attrs : dict
        Dataset attributes containing interpolator_config.

    Returns
    -------
    Interpolator
        Reconstructed interpolator instance.

    Examples
    --------
    >>> # Save interpolator config in dataset
    >>> ds.attrs['interpolator_config'] = interpolator.to_attrs()
    >>>
    >>> # Later, recreate interpolator
    >>> interpolator = create_interpolator_from_attrs(ds.attrs)
    """
    interpolator_type = attrs["interpolator_config"]["interpolator_type"]
    config_dict = attrs["interpolator_config"]["config"]

    if interpolator_type == "Sp3InterpolationStrategy":
        config = Sp3Config(**config_dict)
        return Sp3InterpolationStrategy(config=config)
    elif interpolator_type == "ClockInterpolationStrategy":
        config = ClockConfig(**config_dict)
        return ClockInterpolationStrategy(config=config)
    else:
        raise ValueError(f"Unknown interpolator type: {interpolator_type}")

Ephemeris (SP3)

Ephemeris (satellite orbit) data handling.

This module provides tools for reading, parsing, and validating satellite ephemeris data from SP3 format files, as well as the EphemerisProvider ABC for augmenting GNSS datasets with angular coordinates.

Sp3Parser

Parser for SP3 (Standard Product #3) orbit files.

Handles parsing of SP3 format files containing precise satellite orbit data. Implements optimized single-pass reading for performance.

Parameters

fpath : Path Path to SP3 file. dimensionless : bool, default True If True, strip units from output.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/parser.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
class Sp3Parser:
    """Parser for SP3 (Standard Product #3) orbit files.

    Handles parsing of SP3 format files containing precise satellite orbit data.
    Implements optimized single-pass reading for performance.

    Parameters
    ----------
    fpath : Path
        Path to SP3 file.
    dimensionless : bool, default True
        If True, strip units from output.
    """

    def __init__(self, fpath: Path, dimensionless: bool = True) -> None:
        """Initialize SP3 parser."""
        self.fpath = Path(fpath)
        self.dimensionless = dimensionless

    def parse(self) -> xr.Dataset:
        """Parse SP3 file to xarray Dataset.

        Returns
        -------
        xr.Dataset
            Dataset with satellite positions (X, Y, Z) in meters.

        Raises
        ------
        FileNotFoundError
            If file does not exist.
        ValueError
            If file format is invalid.
        """
        if not self.fpath.exists():
            raise FileNotFoundError(f"SP3 file not found: {self.fpath}")

        epochs: list[datetime.datetime] = []
        epoch_data: list[tuple[int, str, float, float, float]] = []
        svs: set[str] = set()
        current_epoch_idx: int = -1

        with open(self.fpath) as f:
            # Skip header until first epoch marker
            for line in f:
                if line.startswith("*"):
                    current_epoch = self._parse_epoch_line(line)
                    epochs.append(current_epoch)
                    current_epoch_idx += 1
                    break

            # Single pass through file
            for line in f:
                if line.startswith("*"):
                    current_epoch = self._parse_epoch_line(line)
                    epochs.append(current_epoch)
                    current_epoch_idx += 1

                elif line.startswith("P"):
                    sv_code = line[1:4].strip()
                    svs.add(sv_code)

                    x = self._parse_coordinate(line[4:18])
                    y = self._parse_coordinate(line[18:32])
                    z = self._parse_coordinate(line[32:46])

                    epoch_data.append(
                        (
                            current_epoch_idx,
                            sv_code,
                            x if x is not None else np.nan,
                            y if y is not None else np.nan,
                            z if z is not None else np.nan,
                        )
                    )

                elif line.startswith("EOF"):
                    break

        # Build arrays
        sv_list = sorted(svs)
        sv_idx = {sv: i for i, sv in enumerate(sv_list)}
        shape = (len(epochs), len(sv_list))

        x_data = np.full(shape, np.nan)
        y_data = np.full(shape, np.nan)
        z_data = np.full(shape, np.nan)

        for epoch_idx, sv, x, y, z in epoch_data:
            j = sv_idx[sv]
            x_data[epoch_idx, j] = x
            y_data[epoch_idx, j] = y
            z_data[epoch_idx, j] = z

        # Convert to meters
        x_data = (x_data * UREG.kilometer).to(UREG.meter)
        y_data = (y_data * UREG.kilometer).to(UREG.meter)
        z_data = (z_data * UREG.kilometer).to(UREG.meter)

        if self.dimensionless:
            x_data = x_data.magnitude
            y_data = y_data.magnitude
            z_data = z_data.magnitude

        # Create dataset
        ds = xr.Dataset(
            data_vars={
                "X": (("epoch", "sv"), x_data),
                "Y": (("epoch", "sv"), y_data),
                "Z": (("epoch", "sv"), z_data),
            },
            coords={
                "epoch": np.array(epochs, dtype="datetime64[ns]"),
                "sv": np.array(sv_list),
            },
        )

        # Add variable attributes
        for var, attrs in self._get_variable_attributes().items():
            if var in ds:
                ds[var].attrs = attrs

        return ds

    def _parse_epoch_line(self, line: str) -> datetime.datetime:
        """Parse epoch marker line.

        Parameters
        ----------
        line : str
            Line starting with "*" containing timestamp.

        Returns
        -------
        datetime.datetime
            Parsed datetime.

        Examples
        --------
        >>> _parse_epoch_line("* 2024 1 15 0 0 0.00000000")
        datetime.datetime(2024, 1, 15, 0, 0)
        """
        parts = line.split()
        return datetime.datetime(
            year=int(parts[1]),
            month=int(parts[2]),
            day=int(parts[3]),
            hour=int(parts[4]),
            minute=int(parts[5]),
            second=0,
        )

    def _parse_coordinate(self, coord_str: str) -> float | None:
        """Parse coordinate field from SP3 file.

        Handles edge cases like missing spaces and invalid data markers.

        Parameters
        ----------
        coord_str : str
            14-character coordinate string.

        Returns
        -------
        float | None
            Coordinate value in kilometers, or None if missing.

        Raises
        ------
        ValueError
            If coordinate cannot be parsed.
        """
        coord_str = coord_str.strip()
        if not coord_str or coord_str == "999999.999999":
            return None

        try:
            return float(coord_str)
        except ValueError:
            # Handle missing spaces between fields
            if "." in coord_str:
                parts = coord_str.split(".")
                if len(parts) == 2:
                    integer_part = parts[0].replace(" ", "")
                    return float(f"{integer_part}.{parts[1]}")
            raise

    def _get_variable_attributes(self) -> dict[str, dict[str, str]]:
        """Get standardized attributes for position variables."""
        return {
            "X": {
                "long_name": "x-coordinate in ECEF",
                "standard_name": r"x_{ECEF}",
                "short_name": "x",
                "units": "m",
                "axis": "x",
                "description": "x-coordinate in ECEF frame",
            },
            "Y": {
                "long_name": "y-coordinate in ECEF",
                "standard_name": r"y_{ECEF}",
                "short_name": "y",
                "units": "m",
                "axis": "y",
                "description": "y-coordinate in ECEF frame",
            },
            "Z": {
                "long_name": "z-coordinate in ECEF",
                "standard_name": r"z_{ECEF}",
                "short_name": "z",
                "units": "m",
                "axis": "z",
                "description": "z-coordinate in ECEF frame",
            },
        }

__init__(fpath, dimensionless=True)

Initialize SP3 parser.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/parser.py
26
27
28
29
def __init__(self, fpath: Path, dimensionless: bool = True) -> None:
    """Initialize SP3 parser."""
    self.fpath = Path(fpath)
    self.dimensionless = dimensionless

parse()

Parse SP3 file to xarray Dataset.

Returns

xr.Dataset Dataset with satellite positions (X, Y, Z) in meters.

Raises

FileNotFoundError If file does not exist. ValueError If file format is invalid.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/parser.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def parse(self) -> xr.Dataset:
    """Parse SP3 file to xarray Dataset.

    Returns
    -------
    xr.Dataset
        Dataset with satellite positions (X, Y, Z) in meters.

    Raises
    ------
    FileNotFoundError
        If file does not exist.
    ValueError
        If file format is invalid.
    """
    if not self.fpath.exists():
        raise FileNotFoundError(f"SP3 file not found: {self.fpath}")

    epochs: list[datetime.datetime] = []
    epoch_data: list[tuple[int, str, float, float, float]] = []
    svs: set[str] = set()
    current_epoch_idx: int = -1

    with open(self.fpath) as f:
        # Skip header until first epoch marker
        for line in f:
            if line.startswith("*"):
                current_epoch = self._parse_epoch_line(line)
                epochs.append(current_epoch)
                current_epoch_idx += 1
                break

        # Single pass through file
        for line in f:
            if line.startswith("*"):
                current_epoch = self._parse_epoch_line(line)
                epochs.append(current_epoch)
                current_epoch_idx += 1

            elif line.startswith("P"):
                sv_code = line[1:4].strip()
                svs.add(sv_code)

                x = self._parse_coordinate(line[4:18])
                y = self._parse_coordinate(line[18:32])
                z = self._parse_coordinate(line[32:46])

                epoch_data.append(
                    (
                        current_epoch_idx,
                        sv_code,
                        x if x is not None else np.nan,
                        y if y is not None else np.nan,
                        z if z is not None else np.nan,
                    )
                )

            elif line.startswith("EOF"):
                break

    # Build arrays
    sv_list = sorted(svs)
    sv_idx = {sv: i for i, sv in enumerate(sv_list)}
    shape = (len(epochs), len(sv_list))

    x_data = np.full(shape, np.nan)
    y_data = np.full(shape, np.nan)
    z_data = np.full(shape, np.nan)

    for epoch_idx, sv, x, y, z in epoch_data:
        j = sv_idx[sv]
        x_data[epoch_idx, j] = x
        y_data[epoch_idx, j] = y
        z_data[epoch_idx, j] = z

    # Convert to meters
    x_data = (x_data * UREG.kilometer).to(UREG.meter)
    y_data = (y_data * UREG.kilometer).to(UREG.meter)
    z_data = (z_data * UREG.kilometer).to(UREG.meter)

    if self.dimensionless:
        x_data = x_data.magnitude
        y_data = y_data.magnitude
        z_data = z_data.magnitude

    # Create dataset
    ds = xr.Dataset(
        data_vars={
            "X": (("epoch", "sv"), x_data),
            "Y": (("epoch", "sv"), y_data),
            "Z": (("epoch", "sv"), z_data),
        },
        coords={
            "epoch": np.array(epochs, dtype="datetime64[ns]"),
            "sv": np.array(sv_list),
        },
    )

    # Add variable attributes
    for var, attrs in self._get_variable_attributes().items():
        if var in ds:
            ds[var].attrs = attrs

    return ds

AgencyEphemerisProvider

Bases: EphemerisProvider

SP3/CLK-based ephemeris from analysis centres.

Downloads final/rapid/ultra products from COD, ESA, IGS etc., interpolates via Hermite cubic splines, and computes theta/phi via ECEF→spherical coordinate transformation.

Parameters

agency : str Analysis centre code ("COD", "ESA", "GFZ", "JPL"). product_type : str Product type ("final", "rapid", "ultra"). aux_data_dir : Path, optional Directory for cached aux Zarr files. keep_sids : list[str], optional SID filter list. store_radial_distance : bool Whether to keep r in the output.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class AgencyEphemerisProvider(EphemerisProvider):
    """SP3/CLK-based ephemeris from analysis centres.

    Downloads final/rapid/ultra products from COD, ESA, IGS etc.,
    interpolates via Hermite cubic splines, and computes theta/phi
    via ECEF→spherical coordinate transformation.

    Parameters
    ----------
    agency : str
        Analysis centre code (``"COD"``, ``"ESA"``, ``"GFZ"``, ``"JPL"``).
    product_type : str
        Product type (``"final"``, ``"rapid"``, ``"ultra"``).
    aux_data_dir : Path, optional
        Directory for cached aux Zarr files.
    keep_sids : list[str], optional
        SID filter list.
    store_radial_distance : bool
        Whether to keep ``r`` in the output.
    """

    def __init__(
        self,
        agency: str = "COD",
        product_type: str = "final",
        aux_data_dir: Path | None = None,
        keep_sids: list[str] | None = None,
        store_radial_distance: bool = False,
    ) -> None:
        self.agency = agency
        self.product_type = product_type
        self.aux_data_dir = aux_data_dir
        self.keep_sids = keep_sids
        self.store_radial_distance = store_radial_distance
        self._aux_zarr_path: Path | None = None

    def preprocess_day(
        self,
        date: str,
        site_config: Any,
    ) -> Path | None:
        """Download SP3/CLK and interpolate to observation epochs.

        Creates a Zarr store with interpolated satellite positions (X, Y, Z)
        and clock corrections at the observation sampling rate.

        Parameters
        ----------
        date : str
            Date in ``YYYYDOY`` format.
        site_config : Any
            Site configuration (needs ``gnss_site_data_root``, receiver info).

        Returns
        -------
        Path
            Path to the preprocessed aux Zarr store.
        """
        import shutil

        import numpy as np
        import xarray as xr

        from canvod.auxiliary.interpolation import (
            ClockConfig,
            ClockInterpolationStrategy,
            Sp3Config,
            Sp3InterpolationStrategy,
        )
        from canvod.auxiliary.pipeline import AuxDataPipeline

        year = int(date[:4])
        doy = int(date[4:])

        # Determine aux output directory
        aux_dir = self.aux_data_dir or Path(site_config.gnss_site_data_root)
        aux_zarr_path = aux_dir / f"aux_{date}.zarr"

        # Always reprocess (cheap; avoids stale caches)
        if aux_zarr_path.exists():
            shutil.rmtree(aux_zarr_path)

        # Build MatchedDirs stub (only yyyydoy is used by the pipeline)
        from canvod.readers.matching.models import MatchedDirs
        from canvod.utils.tools import YYYYDOY

        yyyydoy = YYYYDOY.from_str(date)
        matched_dirs = MatchedDirs(
            canopy_data_dir=aux_dir,
            reference_data_dir=aux_dir,
            yyyydoy=yyyydoy,
        )

        pipeline = AuxDataPipeline.create_standard(
            matched_dirs=matched_dirs,
            aux_file_path=aux_dir,
            agency=self.agency,
            product_type=self.product_type,
            keep_sids=self.keep_sids,
        )
        pipeline.load_all()

        # Generate target epoch grid (5s default)
        sampling_interval = 5.0
        day_start = np.datetime64(f"{year:04d}-01-01", "D") + np.timedelta64(
            doy - 1,
            "D",
        )
        n_epochs = int(24 * 3600 / sampling_interval)
        target_epochs = day_start + np.arange(n_epochs) * np.timedelta64(
            int(sampling_interval), "s"
        )

        # Interpolate ephemerides (Hermite cubic)
        sp3_config = Sp3Config(use_velocities=True, fallback_method="linear")
        sp3_interp = Sp3InterpolationStrategy(config=sp3_config)
        ephem_ds = pipeline.get("ephemerides")
        ephem_interp = sp3_interp.interpolate(ephem_ds, target_epochs)

        # Interpolate clock (piecewise linear)
        clock_config = ClockConfig(window_size=9, jump_threshold=1e-6)
        clock_interp_strategy = ClockInterpolationStrategy(config=clock_config)
        clock_ds = pipeline.get("clock")
        clock_interp = clock_interp_strategy.interpolate(clock_ds, target_epochs)

        # Merge and write
        aux_processed = xr.merge([ephem_interp, clock_interp])
        aux_processed.to_zarr(aux_zarr_path, mode="w")

        self._aux_zarr_path = aux_zarr_path
        return aux_zarr_path

    def augment_dataset(
        self,
        ds: xr.Dataset,
        receiver_position: ECEFPosition,
    ) -> xr.Dataset:
        """Add theta/phi to dataset using preprocessed SP3/CLK data.

        Parameters
        ----------
        ds : xr.Dataset
            GNSS observation dataset.
        receiver_position : ECEFPosition
            Receiver ECEF position.

        Returns
        -------
        xr.Dataset
            Augmented dataset with theta, phi (and optionally r).

        Raises
        ------
        RuntimeError
            If ``preprocess_day()`` has not been called.
        """
        import xarray as xr

        from canvod.auxiliary.position.spherical_coords import (
            add_spherical_coords_to_dataset,
            compute_spherical_coordinates,
        )

        if self._aux_zarr_path is None:
            raise RuntimeError(
                "Call preprocess_day() before augment_dataset(). "
                "The aux data must be prepared first."
            )

        # Open preprocessed aux data and align to observation epochs
        aux_store = xr.open_zarr(self._aux_zarr_path, decode_timedelta=True)
        aux_slice = aux_store.sel(epoch=ds.epoch, method="nearest").load()

        # Inner join on SIDs
        common_sids = sorted(set(ds.sid.values) & set(aux_slice.sid.values))
        if not common_sids:
            raise ValueError(
                f"No common SIDs between dataset ({len(ds.sid)}) "
                f"and aux data ({len(aux_slice.sid)})"
            )

        ds = ds.sel(sid=common_sids)
        aux_slice = aux_slice.sel(sid=common_sids)

        # Compute spherical coordinates
        sat_x = aux_slice["X"].values
        sat_y = aux_slice["Y"].values
        sat_z = aux_slice["Z"].values
        r, theta, phi = compute_spherical_coordinates(
            sat_x,
            sat_y,
            sat_z,
            receiver_position,
        )
        ds = add_spherical_coords_to_dataset(ds, r, theta, phi)

        if not self.store_radial_distance and "r" in ds:
            ds = ds.drop_vars("r")

        # Add clock if available
        if "clock" in aux_slice.data_vars:
            ds = ds.assign({"clock": aux_slice["clock"]})

        return ds

preprocess_day(date, site_config)

Download SP3/CLK and interpolate to observation epochs.

Creates a Zarr store with interpolated satellite positions (X, Y, Z) and clock corrections at the observation sampling rate.

Parameters

date : str Date in YYYYDOY format. site_config : Any Site configuration (needs gnss_site_data_root, receiver info).

Returns

Path Path to the preprocessed aux Zarr store.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def preprocess_day(
    self,
    date: str,
    site_config: Any,
) -> Path | None:
    """Download SP3/CLK and interpolate to observation epochs.

    Creates a Zarr store with interpolated satellite positions (X, Y, Z)
    and clock corrections at the observation sampling rate.

    Parameters
    ----------
    date : str
        Date in ``YYYYDOY`` format.
    site_config : Any
        Site configuration (needs ``gnss_site_data_root``, receiver info).

    Returns
    -------
    Path
        Path to the preprocessed aux Zarr store.
    """
    import shutil

    import numpy as np
    import xarray as xr

    from canvod.auxiliary.interpolation import (
        ClockConfig,
        ClockInterpolationStrategy,
        Sp3Config,
        Sp3InterpolationStrategy,
    )
    from canvod.auxiliary.pipeline import AuxDataPipeline

    year = int(date[:4])
    doy = int(date[4:])

    # Determine aux output directory
    aux_dir = self.aux_data_dir or Path(site_config.gnss_site_data_root)
    aux_zarr_path = aux_dir / f"aux_{date}.zarr"

    # Always reprocess (cheap; avoids stale caches)
    if aux_zarr_path.exists():
        shutil.rmtree(aux_zarr_path)

    # Build MatchedDirs stub (only yyyydoy is used by the pipeline)
    from canvod.readers.matching.models import MatchedDirs
    from canvod.utils.tools import YYYYDOY

    yyyydoy = YYYYDOY.from_str(date)
    matched_dirs = MatchedDirs(
        canopy_data_dir=aux_dir,
        reference_data_dir=aux_dir,
        yyyydoy=yyyydoy,
    )

    pipeline = AuxDataPipeline.create_standard(
        matched_dirs=matched_dirs,
        aux_file_path=aux_dir,
        agency=self.agency,
        product_type=self.product_type,
        keep_sids=self.keep_sids,
    )
    pipeline.load_all()

    # Generate target epoch grid (5s default)
    sampling_interval = 5.0
    day_start = np.datetime64(f"{year:04d}-01-01", "D") + np.timedelta64(
        doy - 1,
        "D",
    )
    n_epochs = int(24 * 3600 / sampling_interval)
    target_epochs = day_start + np.arange(n_epochs) * np.timedelta64(
        int(sampling_interval), "s"
    )

    # Interpolate ephemerides (Hermite cubic)
    sp3_config = Sp3Config(use_velocities=True, fallback_method="linear")
    sp3_interp = Sp3InterpolationStrategy(config=sp3_config)
    ephem_ds = pipeline.get("ephemerides")
    ephem_interp = sp3_interp.interpolate(ephem_ds, target_epochs)

    # Interpolate clock (piecewise linear)
    clock_config = ClockConfig(window_size=9, jump_threshold=1e-6)
    clock_interp_strategy = ClockInterpolationStrategy(config=clock_config)
    clock_ds = pipeline.get("clock")
    clock_interp = clock_interp_strategy.interpolate(clock_ds, target_epochs)

    # Merge and write
    aux_processed = xr.merge([ephem_interp, clock_interp])
    aux_processed.to_zarr(aux_zarr_path, mode="w")

    self._aux_zarr_path = aux_zarr_path
    return aux_zarr_path

augment_dataset(ds, receiver_position)

Add theta/phi to dataset using preprocessed SP3/CLK data.

Parameters

ds : xr.Dataset GNSS observation dataset. receiver_position : ECEFPosition Receiver ECEF position.

Returns

xr.Dataset Augmented dataset with theta, phi (and optionally r).

Raises

RuntimeError If preprocess_day() has not been called.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def augment_dataset(
    self,
    ds: xr.Dataset,
    receiver_position: ECEFPosition,
) -> xr.Dataset:
    """Add theta/phi to dataset using preprocessed SP3/CLK data.

    Parameters
    ----------
    ds : xr.Dataset
        GNSS observation dataset.
    receiver_position : ECEFPosition
        Receiver ECEF position.

    Returns
    -------
    xr.Dataset
        Augmented dataset with theta, phi (and optionally r).

    Raises
    ------
    RuntimeError
        If ``preprocess_day()`` has not been called.
    """
    import xarray as xr

    from canvod.auxiliary.position.spherical_coords import (
        add_spherical_coords_to_dataset,
        compute_spherical_coordinates,
    )

    if self._aux_zarr_path is None:
        raise RuntimeError(
            "Call preprocess_day() before augment_dataset(). "
            "The aux data must be prepared first."
        )

    # Open preprocessed aux data and align to observation epochs
    aux_store = xr.open_zarr(self._aux_zarr_path, decode_timedelta=True)
    aux_slice = aux_store.sel(epoch=ds.epoch, method="nearest").load()

    # Inner join on SIDs
    common_sids = sorted(set(ds.sid.values) & set(aux_slice.sid.values))
    if not common_sids:
        raise ValueError(
            f"No common SIDs between dataset ({len(ds.sid)}) "
            f"and aux data ({len(aux_slice.sid)})"
        )

    ds = ds.sel(sid=common_sids)
    aux_slice = aux_slice.sel(sid=common_sids)

    # Compute spherical coordinates
    sat_x = aux_slice["X"].values
    sat_y = aux_slice["Y"].values
    sat_z = aux_slice["Z"].values
    r, theta, phi = compute_spherical_coordinates(
        sat_x,
        sat_y,
        sat_z,
        receiver_position,
    )
    ds = add_spherical_coords_to_dataset(ds, r, theta, phi)

    if not self.store_radial_distance and "r" in ds:
        ds = ds.drop_vars("r")

    # Add clock if available
    if "clock" in aux_slice.data_vars:
        ds = ds.assign({"clock": aux_slice["clock"]})

    return ds

EphemerisProvider

Bases: ABC

Abstract base class for satellite ephemeris providers.

Implementations augment a GNSS dataset with angular coordinates (theta, phi) relative to a receiver position.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class EphemerisProvider(ABC):
    """Abstract base class for satellite ephemeris providers.

    Implementations augment a GNSS dataset with angular coordinates
    (theta, phi) relative to a receiver position.
    """

    @abstractmethod
    def augment_dataset(
        self,
        ds: xr.Dataset,
        receiver_position: ECEFPosition,
    ) -> xr.Dataset:
        """Add theta and phi (and optionally r) to *ds*.

        Parameters
        ----------
        ds : xr.Dataset
            GNSS observation dataset with ``(epoch, sid)`` dims.
        receiver_position : ECEFPosition
            Receiver ECEF coordinates.

        Returns
        -------
        xr.Dataset
            Dataset with ``theta``, ``phi`` (and optionally ``r``) added.
        """

    @abstractmethod
    def preprocess_day(
        self,
        date: str,
        site_config: Any,
    ) -> Path | None:
        """Download / prepare ephemeris data for one day.

        Parameters
        ----------
        date : str
            Date in ``YYYYDOY`` format.
        site_config : Any
            Site configuration object providing receiver info, data root, etc.

        Returns
        -------
        Path or None
            Path to cached/preprocessed ephemeris data, or ``None``
            if preparation is not needed (e.g. broadcast provider).
        """

augment_dataset(ds, receiver_position) abstractmethod

Add theta and phi (and optionally r) to ds.

Parameters

ds : xr.Dataset GNSS observation dataset with (epoch, sid) dims. receiver_position : ECEFPosition Receiver ECEF coordinates.

Returns

xr.Dataset Dataset with theta, phi (and optionally r) added.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@abstractmethod
def augment_dataset(
    self,
    ds: xr.Dataset,
    receiver_position: ECEFPosition,
) -> xr.Dataset:
    """Add theta and phi (and optionally r) to *ds*.

    Parameters
    ----------
    ds : xr.Dataset
        GNSS observation dataset with ``(epoch, sid)`` dims.
    receiver_position : ECEFPosition
        Receiver ECEF coordinates.

    Returns
    -------
    xr.Dataset
        Dataset with ``theta``, ``phi`` (and optionally ``r``) added.
    """

preprocess_day(date, site_config) abstractmethod

Download / prepare ephemeris data for one day.

Parameters

date : str Date in YYYYDOY format. site_config : Any Site configuration object providing receiver info, data root, etc.

Returns

Path or None Path to cached/preprocessed ephemeris data, or None if preparation is not needed (e.g. broadcast provider).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@abstractmethod
def preprocess_day(
    self,
    date: str,
    site_config: Any,
) -> Path | None:
    """Download / prepare ephemeris data for one day.

    Parameters
    ----------
    date : str
        Date in ``YYYYDOY`` format.
    site_config : Any
        Site configuration object providing receiver info, data root, etc.

    Returns
    -------
    Path or None
        Path to cached/preprocessed ephemeris data, or ``None``
        if preparation is not needed (e.g. broadcast provider).
    """

SbfBroadcastProvider

Bases: EphemerisProvider

Ephemeris from SBF SatVisibility broadcast data.

Transfers theta/phi directly from SBF metadata datasets, skipping external orbit/clock downloads entirely. Only works when the source format is SBF.

Parameters

canopy_file : Path, optional Path to canopy SBF file for reference receivers in shared-position mode. When provided, the canopy file's geometry overrides the reference file's own geometry. canopy_reader_format : str Reader format for the canopy file.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class SbfBroadcastProvider(EphemerisProvider):
    """Ephemeris from SBF SatVisibility broadcast data.

    Transfers theta/phi directly from SBF metadata datasets,
    skipping external orbit/clock downloads entirely.  Only works
    when the source format is SBF.

    Parameters
    ----------
    canopy_file : Path, optional
        Path to canopy SBF file for reference receivers in shared-position
        mode.  When provided, the canopy file's geometry overrides the
        reference file's own geometry.
    canopy_reader_format : str
        Reader format for the canopy file.
    """

    def __init__(
        self,
        canopy_file: Path | None = None,
        canopy_reader_format: str = "sbf",
    ) -> None:
        self.canopy_file = canopy_file
        self.canopy_reader_format = canopy_reader_format

    def preprocess_day(
        self,
        date: str,
        site_config: Any,
    ) -> Path | None:
        """No preprocessing needed for broadcast ephemeris."""
        return None

    def augment_dataset(
        self,
        ds: xr.Dataset,
        receiver_position: ECEFPosition,
        *,
        aux_datasets: dict[str, xr.Dataset] | None = None,
    ) -> xr.Dataset:
        """Add theta/phi from SBF SatVisibility metadata.

        Parameters
        ----------
        ds : xr.Dataset
            SBF observation dataset.
        receiver_position : ECEFPosition
            Receiver ECEF position (unused but required by ABC).
        aux_datasets : dict, optional
            Auxiliary datasets from ``reader.to_ds_and_auxiliary()``.
            Must contain ``"sbf_obs"`` with theta/phi.

        Returns
        -------
        xr.Dataset
            Dataset with theta/phi added from SBF broadcast.
        """
        import numpy as np

        # Determine source of sbf_obs metadata
        if self.canopy_file is not None:
            from canvodpy.factories import ReaderFactory

            canopy_rnx = ReaderFactory.create(
                self.canopy_reader_format,
                fpath=self.canopy_file,
            )
            _, canopy_aux = canopy_rnx.to_ds_and_auxiliary(
                keep_data_vars=None,
                write_global_attrs=False,
            )
            meta_ds = canopy_aux.get("sbf_obs")
        elif aux_datasets is not None:
            meta_ds = aux_datasets.get("sbf_obs")
        else:
            raise ValueError(
                "SbfBroadcastProvider requires either canopy_file or "
                "aux_datasets with 'sbf_obs' key."
            )

        if (
            meta_ds is None
            or "broadcast_theta" not in meta_ds
            or "broadcast_phi" not in meta_ds
        ):
            raise ValueError(
                "sbf_obs metadata does not contain broadcast_theta/broadcast_phi. "
                "Cannot use broadcast ephemeris."
            )

        from canvod.auxiliary.position.spherical_coords import (
            add_broadcast_spherical_coords_to_dataset,
        )

        # Extract broadcast geometry (already in radians from reader)
        bt = meta_ds["broadcast_theta"]
        bp = meta_ds["broadcast_phi"]

        # Align to observation epoch space
        if "epoch" in bt.dims:
            common_epochs = np.intersect1d(ds.epoch.values, bt.epoch.values)
            bt = bt.sel(epoch=common_epochs).reindex(
                epoch=ds.epoch.values, fill_value=np.nan
            )
            bp = bp.sel(epoch=common_epochs).reindex(
                epoch=ds.epoch.values, fill_value=np.nan
            )

        # Align to observation SID space
        common_sids = sorted(set(ds.sid.values) & set(bt.sid.values))
        bt = bt.sel(sid=common_sids).reindex(sid=ds.sid.values, fill_value=np.nan)
        bp = bp.sel(sid=common_sids).reindex(sid=ds.sid.values, fill_value=np.nan)

        # Use .values to prevent coord leakage from meta_ds (pdop, hdop, …)
        return add_broadcast_spherical_coords_to_dataset(ds, bt.values, bp.values)

preprocess_day(date, site_config)

No preprocessing needed for broadcast ephemeris.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
317
318
319
320
321
322
323
def preprocess_day(
    self,
    date: str,
    site_config: Any,
) -> Path | None:
    """No preprocessing needed for broadcast ephemeris."""
    return None

augment_dataset(ds, receiver_position, *, aux_datasets=None)

Add theta/phi from SBF SatVisibility metadata.

Parameters

ds : xr.Dataset SBF observation dataset. receiver_position : ECEFPosition Receiver ECEF position (unused but required by ABC). aux_datasets : dict, optional Auxiliary datasets from reader.to_ds_and_auxiliary(). Must contain "sbf_obs" with theta/phi.

Returns

xr.Dataset Dataset with theta/phi added from SBF broadcast.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/provider.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def augment_dataset(
    self,
    ds: xr.Dataset,
    receiver_position: ECEFPosition,
    *,
    aux_datasets: dict[str, xr.Dataset] | None = None,
) -> xr.Dataset:
    """Add theta/phi from SBF SatVisibility metadata.

    Parameters
    ----------
    ds : xr.Dataset
        SBF observation dataset.
    receiver_position : ECEFPosition
        Receiver ECEF position (unused but required by ABC).
    aux_datasets : dict, optional
        Auxiliary datasets from ``reader.to_ds_and_auxiliary()``.
        Must contain ``"sbf_obs"`` with theta/phi.

    Returns
    -------
    xr.Dataset
        Dataset with theta/phi added from SBF broadcast.
    """
    import numpy as np

    # Determine source of sbf_obs metadata
    if self.canopy_file is not None:
        from canvodpy.factories import ReaderFactory

        canopy_rnx = ReaderFactory.create(
            self.canopy_reader_format,
            fpath=self.canopy_file,
        )
        _, canopy_aux = canopy_rnx.to_ds_and_auxiliary(
            keep_data_vars=None,
            write_global_attrs=False,
        )
        meta_ds = canopy_aux.get("sbf_obs")
    elif aux_datasets is not None:
        meta_ds = aux_datasets.get("sbf_obs")
    else:
        raise ValueError(
            "SbfBroadcastProvider requires either canopy_file or "
            "aux_datasets with 'sbf_obs' key."
        )

    if (
        meta_ds is None
        or "broadcast_theta" not in meta_ds
        or "broadcast_phi" not in meta_ds
    ):
        raise ValueError(
            "sbf_obs metadata does not contain broadcast_theta/broadcast_phi. "
            "Cannot use broadcast ephemeris."
        )

    from canvod.auxiliary.position.spherical_coords import (
        add_broadcast_spherical_coords_to_dataset,
    )

    # Extract broadcast geometry (already in radians from reader)
    bt = meta_ds["broadcast_theta"]
    bp = meta_ds["broadcast_phi"]

    # Align to observation epoch space
    if "epoch" in bt.dims:
        common_epochs = np.intersect1d(ds.epoch.values, bt.epoch.values)
        bt = bt.sel(epoch=common_epochs).reindex(
            epoch=ds.epoch.values, fill_value=np.nan
        )
        bp = bp.sel(epoch=common_epochs).reindex(
            epoch=ds.epoch.values, fill_value=np.nan
        )

    # Align to observation SID space
    common_sids = sorted(set(ds.sid.values) & set(bt.sid.values))
    bt = bt.sel(sid=common_sids).reindex(sid=ds.sid.values, fill_value=np.nan)
    bp = bp.sel(sid=common_sids).reindex(sid=ds.sid.values, fill_value=np.nan)

    # Use .values to prevent coord leakage from meta_ds (pdop, hdop, …)
    return add_broadcast_spherical_coords_to_dataset(ds, bt.values, bp.values)

Sp3File

Bases: AuxFile

Handler for SP3 orbit files with multi-product support.

Now supports all IGS analysis centers via product registry: COD, GFZ, ESA, JPL, IGS, WHU, GRG, SHA

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True.

Attributes

date : str String in YYYYDOY format. agency : str Agency code (e.g., "COD", "GFZ", "ESA"). product_type : str Product type ("final", "rapid"). ftp_server : str Base URL for downloads. local_dir : Path Local storage directory. add_velocities : bool | None, default True Whether to compute velocities. dimensionless : bool | None, default True Whether to strip units (store magnitudes only). product_spec : ProductSpec | None, optional Product specification resolved from the registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class Sp3File(AuxFile):
    """Handler for SP3 orbit files with multi-product support.

    Now supports all IGS analysis centers via product registry:
    COD, GFZ, ESA, JPL, IGS, WHU, GRG, SHA

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True`.

    Attributes
    ----------
    date : str
        String in YYYYDOY format.
    agency : str
        Agency code (e.g., "COD", "GFZ", "ESA").
    product_type : str
        Product type ("final", "rapid").
    ftp_server : str
        Base URL for downloads.
    local_dir : Path
        Local storage directory.
    add_velocities : bool | None, default True
        Whether to compute velocities.
    dimensionless : bool | None, default True
        Whether to strip units (store magnitudes only).
    product_spec : ProductSpec | None, optional
        Product specification resolved from the registry.
    """

    date: str
    agency: str
    product_type: str
    ftp_server: str
    local_dir: Path
    add_velocities: bool | None = True
    dimensionless: bool | None = True
    product_spec: ProductSpec | None = None

    def __post_init__(self) -> None:
        """Initialize with product validation."""
        self.file_type = ["orbit"]
        self.local_dir = Path(self.local_dir)
        self.local_dir.mkdir(parents=True, exist_ok=True)

        # Validate product exists in registry
        self.product_spec = get_product_spec(self.agency, self.product_type)

        super().__post_init__()

    def get_interpolation_strategy(self) -> Interpolator:
        """Get appropriate interpolation strategy for SP3 files."""
        config = Sp3Config(
            use_velocities=bool(self.add_velocities),
            fallback_method="linear",
        )
        return Sp3InterpolationStrategy(config=config)

    def generate_filename_based_on_type(self) -> Path:
        """Generate filename using product registry.

        Pattern: {PREFIX}_{YYYYDOY}0000_{DURATION}_{SAMPLING}_ORB.SP3

        Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3
        """
        assert self.product_spec is not None, (
            "product_spec must be set via \
            __post_init__"
        )
        prefix = self.product_spec.prefix
        duration = self.product_spec.duration
        sampling = self.product_spec.sampling_rate

        return Path(f"{prefix}_{self.date}0000_{duration}_{sampling}_ORB.SP3")

    def download_aux_file(self) -> None:
        """Download SP3 file, trying all servers from the product registry.

        Servers are tried in priority order from products.toml. Auth-required
        servers are skipped when no credentials are configured. Warnings are
        printed when falling back to alternate servers.

        Raises
        ------
        RuntimeError
            If download fails from all available servers.
        ValueError
            If GPS week calculation fails.
        """
        assert self.product_spec is not None, (
            "product_spec must be set via \
            __post_init__"
        )
        orbit_file = self.generate_filename_based_on_type()
        gps_week = get_gps_week_from_filename(orbit_file)

        ftp_path = self.product_spec.ftp_path_pattern.format(
            gps_week=gps_week,
            file=f"{orbit_file}.gz",
        )
        destination = self.local_dir / orbit_file
        file_info = {
            "gps_week": gps_week,
            "filename": orbit_file,
            "type": "orbit",
            "agency": self.agency,
        }

        self.download_with_fallback(ftp_path, destination, file_info, self.product_spec)
        print(f"Downloaded orbit file for {self.agency} on date {self.date}")

    def read_file(self) -> xr.Dataset:
        """Read and validate SP3 file.

        Returns
        -------
        xr.Dataset
            Dataset with satellite positions (X, Y, Z) in meters.

        Raises
        ------
        FileNotFoundError
            If file does not exist.
        ValueError
            If validation fails.
        """
        # Use dedicated parser
        assert self.fpath is not None, "fpath must be set before calling read_file"
        parser = Sp3Parser(self.fpath, dimensionless=bool(self.dimensionless))
        dataset = parser.parse()

        # Validate format
        validator = Sp3Validator(dataset, self.fpath)
        result = validator.validate()

        if not result.is_valid:
            raise ValueError(f"SP3 validation failed:\n{result.summary()}")

        # Add metadata
        dataset = self._add_metadata(dataset)

        # Compute velocities if requested
        if self.add_velocities:
            dataset = self.compute_velocity(dataset)

        return dataset

    def _add_metadata(self, ds: xr.Dataset) -> xr.Dataset:
        """Add file-level metadata to dataset."""
        assert self.fpath is not None, "fpath must be set before calling _add_metadata"
        assert self.product_spec is not None, (
            "product_spec must be set before \
            calling _add_metadata"
        )
        ds.attrs = {
            "file": str(self.fpath.name),
            "agency": self.agency,
            "agency_name": self.product_spec.agency_name,
            "product_type": self.product_type,
            "ftp_server": self.ftp_server,
            "date": self.date,
            "sampling_rate": self.product_spec.sampling_rate,
            "duration": self.product_spec.duration,
        }
        return ds

    def compute_velocity(self, ds: xr.Dataset) -> xr.Dataset:
        """Compute satellite velocities from position data.

        Uses central differences for interior points, forward/backward
        differences for endpoints.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset with X, Y, Z coordinates.

        Returns
        -------
        xr.Dataset
            Dataset augmented with Vx, Vy, Vz velocities.
        """
        # Calculate time step
        time_diffs = np.diff(ds["epoch"].values)
        dt = np.median(time_diffs).astype("timedelta64[s]").astype(float)

        # Initialize velocity arrays
        vx = np.zeros_like(ds["X"].values)
        vy = np.zeros_like(ds["Y"].values)
        vz = np.zeros_like(ds["Z"].values)

        # Central difference for interior points
        vx[1:-1] = (ds["X"].values[2:] - ds["X"].values[:-2]) / (2 * dt)
        vy[1:-1] = (ds["Y"].values[2:] - ds["Y"].values[:-2]) / (2 * dt)
        vz[1:-1] = (ds["Z"].values[2:] - ds["Z"].values[:-2]) / (2 * dt)

        # Forward difference for first point
        vx[0] = (ds["X"].values[1] - ds["X"].values[0]) / dt
        vy[0] = (ds["Y"].values[1] - ds["Y"].values[0]) / dt
        vz[0] = (ds["Z"].values[1] - ds["Z"].values[0]) / dt

        # Backward difference for last point
        vx[-1] = (ds["X"].values[-1] - ds["X"].values[-2]) / dt
        vy[-1] = (ds["Y"].values[-1] - ds["Y"].values[-2]) / dt
        vz[-1] = (ds["Z"].values[-1] - ds["Z"].values[-2]) / dt

        # Add units if needed
        if not self.dimensionless:
            vx = vx * (UREG.meter / UREG.second)
            vy = vy * (UREG.meter / UREG.second)
            vz = vz * (UREG.meter / UREG.second)

        # Add to dataset
        ds = ds.assign(
            Vx=(("epoch", "sv"), vx),
            Vy=(("epoch", "sv"), vy),
            Vz=(("epoch", "sv"), vz),
        )

        # Add velocity attributes
        for var, attrs in self._get_velocity_attributes(dt).items():
            if var in ds:
                ds[var].attrs = attrs

        return ds

    def _get_velocity_attributes(
        self,
        dt: float,
    ) -> dict[str, dict[str, str | float]]:
        """Get standardized attributes for velocity variables."""
        base_attrs = {
            "units": "m/s",
            "computation_method": "central_difference",
            "time_step": float(dt),
            "reference_frame": "ECEF",
        }

        return {
            "Vx": {
                "long_name": "x-component of velocity",
                "standard_name": "v_x",
                "short_name": "v_x",
                "axis": "v_x",
                **base_attrs,
            },
            "Vy": {
                "long_name": "y-component of velocity",
                "standard_name": "v_y",
                "short_name": "v_y",
                "axis": "v_y",
                **base_attrs,
            },
            "Vz": {
                "long_name": "z-component of velocity",
                "standard_name": "v_z",
                "short_name": "v_z",
                "axis": "v_z",
                **base_attrs,
            },
        }

__post_init__()

Initialize with product validation.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
62
63
64
65
66
67
68
69
70
71
def __post_init__(self) -> None:
    """Initialize with product validation."""
    self.file_type = ["orbit"]
    self.local_dir = Path(self.local_dir)
    self.local_dir.mkdir(parents=True, exist_ok=True)

    # Validate product exists in registry
    self.product_spec = get_product_spec(self.agency, self.product_type)

    super().__post_init__()

get_interpolation_strategy()

Get appropriate interpolation strategy for SP3 files.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
73
74
75
76
77
78
79
def get_interpolation_strategy(self) -> Interpolator:
    """Get appropriate interpolation strategy for SP3 files."""
    config = Sp3Config(
        use_velocities=bool(self.add_velocities),
        fallback_method="linear",
    )
    return Sp3InterpolationStrategy(config=config)

generate_filename_based_on_type()

Generate filename using product registry.

Pattern: {PREFIX}{YYYYDOY}0000_ORB.SP3}_{SAMPLING

Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def generate_filename_based_on_type(self) -> Path:
    """Generate filename using product registry.

    Pattern: {PREFIX}_{YYYYDOY}0000_{DURATION}_{SAMPLING}_ORB.SP3

    Example: COD0MGXFIN_20240150000_01D_05M_ORB.SP3
    """
    assert self.product_spec is not None, (
        "product_spec must be set via \
        __post_init__"
    )
    prefix = self.product_spec.prefix
    duration = self.product_spec.duration
    sampling = self.product_spec.sampling_rate

    return Path(f"{prefix}_{self.date}0000_{duration}_{sampling}_ORB.SP3")

download_aux_file()

Download SP3 file, trying all servers from the product registry.

Servers are tried in priority order from products.toml. Auth-required servers are skipped when no credentials are configured. Warnings are printed when falling back to alternate servers.

Raises

RuntimeError If download fails from all available servers. ValueError If GPS week calculation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def download_aux_file(self) -> None:
    """Download SP3 file, trying all servers from the product registry.

    Servers are tried in priority order from products.toml. Auth-required
    servers are skipped when no credentials are configured. Warnings are
    printed when falling back to alternate servers.

    Raises
    ------
    RuntimeError
        If download fails from all available servers.
    ValueError
        If GPS week calculation fails.
    """
    assert self.product_spec is not None, (
        "product_spec must be set via \
        __post_init__"
    )
    orbit_file = self.generate_filename_based_on_type()
    gps_week = get_gps_week_from_filename(orbit_file)

    ftp_path = self.product_spec.ftp_path_pattern.format(
        gps_week=gps_week,
        file=f"{orbit_file}.gz",
    )
    destination = self.local_dir / orbit_file
    file_info = {
        "gps_week": gps_week,
        "filename": orbit_file,
        "type": "orbit",
        "agency": self.agency,
    }

    self.download_with_fallback(ftp_path, destination, file_info, self.product_spec)
    print(f"Downloaded orbit file for {self.agency} on date {self.date}")

read_file()

Read and validate SP3 file.

Returns

xr.Dataset Dataset with satellite positions (X, Y, Z) in meters.

Raises

FileNotFoundError If file does not exist. ValueError If validation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def read_file(self) -> xr.Dataset:
    """Read and validate SP3 file.

    Returns
    -------
    xr.Dataset
        Dataset with satellite positions (X, Y, Z) in meters.

    Raises
    ------
    FileNotFoundError
        If file does not exist.
    ValueError
        If validation fails.
    """
    # Use dedicated parser
    assert self.fpath is not None, "fpath must be set before calling read_file"
    parser = Sp3Parser(self.fpath, dimensionless=bool(self.dimensionless))
    dataset = parser.parse()

    # Validate format
    validator = Sp3Validator(dataset, self.fpath)
    result = validator.validate()

    if not result.is_valid:
        raise ValueError(f"SP3 validation failed:\n{result.summary()}")

    # Add metadata
    dataset = self._add_metadata(dataset)

    # Compute velocities if requested
    if self.add_velocities:
        dataset = self.compute_velocity(dataset)

    return dataset

compute_velocity(ds)

Compute satellite velocities from position data.

Uses central differences for interior points, forward/backward differences for endpoints.

Parameters

ds : xr.Dataset Dataset with X, Y, Z coordinates.

Returns

xr.Dataset Dataset augmented with Vx, Vy, Vz velocities.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/reader.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def compute_velocity(self, ds: xr.Dataset) -> xr.Dataset:
    """Compute satellite velocities from position data.

    Uses central differences for interior points, forward/backward
    differences for endpoints.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with X, Y, Z coordinates.

    Returns
    -------
    xr.Dataset
        Dataset augmented with Vx, Vy, Vz velocities.
    """
    # Calculate time step
    time_diffs = np.diff(ds["epoch"].values)
    dt = np.median(time_diffs).astype("timedelta64[s]").astype(float)

    # Initialize velocity arrays
    vx = np.zeros_like(ds["X"].values)
    vy = np.zeros_like(ds["Y"].values)
    vz = np.zeros_like(ds["Z"].values)

    # Central difference for interior points
    vx[1:-1] = (ds["X"].values[2:] - ds["X"].values[:-2]) / (2 * dt)
    vy[1:-1] = (ds["Y"].values[2:] - ds["Y"].values[:-2]) / (2 * dt)
    vz[1:-1] = (ds["Z"].values[2:] - ds["Z"].values[:-2]) / (2 * dt)

    # Forward difference for first point
    vx[0] = (ds["X"].values[1] - ds["X"].values[0]) / dt
    vy[0] = (ds["Y"].values[1] - ds["Y"].values[0]) / dt
    vz[0] = (ds["Z"].values[1] - ds["Z"].values[0]) / dt

    # Backward difference for last point
    vx[-1] = (ds["X"].values[-1] - ds["X"].values[-2]) / dt
    vy[-1] = (ds["Y"].values[-1] - ds["Y"].values[-2]) / dt
    vz[-1] = (ds["Z"].values[-1] - ds["Z"].values[-2]) / dt

    # Add units if needed
    if not self.dimensionless:
        vx = vx * (UREG.meter / UREG.second)
        vy = vy * (UREG.meter / UREG.second)
        vz = vz * (UREG.meter / UREG.second)

    # Add to dataset
    ds = ds.assign(
        Vx=(("epoch", "sv"), vx),
        Vy=(("epoch", "sv"), vy),
        Vz=(("epoch", "sv"), vz),
    )

    # Add velocity attributes
    for var, attrs in self._get_velocity_attributes(dt).items():
        if var in ds:
            ds[var].attrs = attrs

    return ds

Sp3Validator

Validator for SP3 orbit files.

Performs format and data quality checks on parsed SP3 datasets.

Parameters

dataset : xr.Dataset Parsed SP3 dataset. fpath : Path Path to the original file.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/validator.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Sp3Validator:
    """Validator for SP3 orbit files.

    Performs format and data quality checks on parsed SP3 datasets.

    Parameters
    ----------
    dataset : xr.Dataset
        Parsed SP3 dataset.
    fpath : Path
        Path to the original file.
    """

    def __init__(self, dataset: xr.Dataset, fpath: Path) -> None:
        """Initialize validator."""
        self.dataset = dataset
        self.fpath = Path(fpath)
        self.result = FileValidationResult(
            is_valid=True,
            errors=[],
            warnings=[],
            file_path=self.fpath,
            file_type="SP3",
        )

    def validate(self) -> FileValidationResult:
        """Run all validation checks.

        Returns
        -------
        FileValidationResult
            Validation result with errors and warnings.
        """
        self._check_required_variables()
        self._check_required_coordinates()
        self._check_data_quality()

        return self.result

    def _check_required_variables(self) -> None:
        """Check that required variables are present."""
        required = ["X", "Y", "Z"]
        missing = [var for var in required if var not in self.dataset.data_vars]

        if missing:
            self.result.add_error(f"Missing required variables: {missing}")

    def _check_required_coordinates(self) -> None:
        """Check that required coordinates are present."""
        required = ["epoch", "sv"]
        missing = [coord for coord in required if coord not in self.dataset.coords]

        if missing:
            self.result.add_error(f"Missing required coordinates: {missing}")

    def _check_data_quality(self) -> None:
        """Check data quality metrics."""
        if "X" not in self.dataset:
            return

        # Check for excessive NaN values
        for var in ["X", "Y", "Z"]:
            nan_count = self.dataset[var].isnull().sum().item()
            total_count = self.dataset[var].size
            nan_percentage = (nan_count / total_count) * 100

            if nan_percentage > 50:
                self.result.add_error(
                    f"Variable {var} has {nan_percentage:.1f}% NaN values "
                    f"(>50% threshold)"
                )
            elif nan_percentage > 10:
                self.result.add_warning(
                    f"Variable {var} has {nan_percentage:.1f}% NaN values"
                )

    def get_summary(self) -> str:
        """Get validation summary."""
        return self.result.summary()

__init__(dataset, fpath)

Initialize validator.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/validator.py
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, dataset: xr.Dataset, fpath: Path) -> None:
    """Initialize validator."""
    self.dataset = dataset
    self.fpath = Path(fpath)
    self.result = FileValidationResult(
        is_valid=True,
        errors=[],
        warnings=[],
        file_path=self.fpath,
        file_type="SP3",
    )

validate()

Run all validation checks.

Returns

FileValidationResult Validation result with errors and warnings.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/validator.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def validate(self) -> FileValidationResult:
    """Run all validation checks.

    Returns
    -------
    FileValidationResult
        Validation result with errors and warnings.
    """
    self._check_required_variables()
    self._check_required_coordinates()
    self._check_data_quality()

    return self.result

get_summary()

Get validation summary.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/ephemeris/validator.py
86
87
88
def get_summary(self) -> str:
    """Get validation summary."""
    return self.result.summary()

Clock (CLK)

Clock correction data handling.

This module provides tools for reading, parsing, and validating satellite clock correction data from RINEX CLK format files.

ClkFile

Bases: AuxFile

Handler for GNSS clock files in CLK format.

This class reads and processes clock offset files containing satellite clock corrections. It handles the parsing of CLK format files and provides the data in xarray Dataset format.

Supports multiple analysis centers via product registry with proper FTP paths and filename conventions.

Notes

This is a Pydantic dataclass with arbitrary_types_allowed=True.

Attributes

date : str String in YYYYDOY format representing the start date. agency : str Analysis center identifier (e.g., "COD", "GFZ"). product_type : str Product type ("final", "rapid", "ultrarapid"). ftp_server : str Base URL for file downloads. local_dir : Path Local storage directory. dimensionless : bool | None, default True If True, outputs magnitude-only values (no units attached).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class ClkFile(AuxFile):
    """Handler for GNSS clock files in CLK format.

    This class reads and processes clock offset files containing satellite clock
    corrections. It handles the parsing of CLK format files and provides the data
    in xarray Dataset format.

    Supports multiple analysis centers via product registry with proper FTP paths
    and filename conventions.

    Notes
    -----
    This is a Pydantic dataclass with `arbitrary_types_allowed=True`.

    Attributes
    ----------
    date : str
        String in YYYYDOY format representing the start date.
    agency : str
        Analysis center identifier (e.g., "COD", "GFZ").
    product_type : str
        Product type ("final", "rapid", "ultrarapid").
    ftp_server : str
        Base URL for file downloads.
    local_dir : Path
        Local storage directory.
    dimensionless : bool | None, default True
        If True, outputs magnitude-only values (no units attached).
    """

    date: str
    agency: str
    product_type: str
    ftp_server: str
    local_dir: Path
    dimensionless: bool | None = True

    def __post_init__(self) -> None:
        """Initialize CLK file handler."""
        self.file_type = ["clock"]
        self.local_dir = Path(self.local_dir)
        self.local_dir.mkdir(parents=True, exist_ok=True)
        super().__post_init__()

    def get_interpolation_strategy(self) -> Interpolator:
        """Get appropriate interpolation strategy for CLK files."""
        config = ClockConfig(
            window_size=9,
            jump_threshold=1e-6,
        )
        return ClockInterpolationStrategy(config=config)

    def generate_filename_based_on_type(self) -> Path:
        """Generate standard CLK filename using product registry.

        Uses product registry to get correct prefix for the agency/product combination.
        Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

        Returns
        -------
        Path
            Filename according to CLK conventions.

        Raises
        ------
        ValueError
            If agency/product combination not in registry.
        """
        # Get product spec from registry
        spec = get_product_spec(self.agency, self.product_type)

        # CLK files use 30S sampling for most products
        sampling = "30S"  # Could be made configurable via registry

        return Path(f"{spec.prefix}_{self.date}0000_01D_{sampling}_CLK.CLK")

    def download_aux_file(self) -> None:
        """Download CLK file, trying all servers from the product registry.

        Servers are tried in priority order from products.toml. Auth-required
        servers are skipped when no credentials are configured. Warnings are
        printed when falling back to alternate servers.

        Raises
        ------
        RuntimeError
            If file cannot be downloaded from any available server.
        ValueError
            If GPS week calculation fails.
        """
        clock_file = self.generate_filename_based_on_type()
        gps_week = get_gps_week_from_filename(clock_file)

        spec = get_product_spec(self.agency, self.product_type)
        ftp_path = spec.ftp_path_pattern.format(
            gps_week=gps_week,
            file=f"{clock_file}.gz",
        )
        destination = self.local_dir / clock_file
        file_info = {
            "gps_week": gps_week,
            "filename": clock_file,
            "type": "clock",
            "agency": self.agency,
        }

        self.download_with_fallback(ftp_path, destination, file_info, spec)
        print(f"Downloaded clock file for {self.agency} on date {self.date}")

    def read_file(self) -> xr.Dataset:
        """Read and parse CLK file into xarray Dataset.

        Uses modular parser for data extraction and validator for quality checks.
        Applies unit conversion from microseconds to seconds.

        Returns
        -------
        xr.Dataset
            Clock offsets with dimensions (epoch, sv). Values are in seconds
            (or dimensionless if specified).
        """
        # Parse file using modular parser
        assert self.fpath is not None, "fpath must be set before calling read_file"
        epochs, satellites, clock_offsets = parse_clk_file(self.fpath)

        # Convert units (microseconds → seconds)
        clock_offsets = (UREG.microsecond * clock_offsets).to("s")

        if self.dimensionless:
            clock_offsets = clock_offsets.magnitude

        # Create dataset
        ds = xr.Dataset(
            data_vars={"clock_offset": (("epoch", "sv"), clock_offsets)},
            coords={
                "epoch": np.array(epochs, dtype="datetime64[ns]"),
                "sv": np.array(satellites),
            },
        )

        # Validate and add metadata
        validation = validate_clk_dataset(ds)
        ds = self._prepare_dataset(ds, validation)

        return ds

    def _prepare_dataset(self, ds: xr.Dataset, validation: dict) -> xr.Dataset:
        """Add metadata and attributes to dataset.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset from parsing.
        validation : dict
            Validation results dictionary.

        Returns
        -------
        xr.Dataset
            Dataset with complete metadata.
        """
        assert self.fpath is not None, (
            "fpath must be set \
            before calling _prepare_dataset"
        )
        ds.attrs = {
            "file": str(self.fpath.name),
            "agency": self.agency,
            "product_type": self.product_type,
            "ftp_server": self.ftp_server,
            "date": self.date,
            "valid_data_percent": validation["valid_data_percent"],
            "num_epochs": validation["num_epochs"],
            "num_satellites": validation["num_satellites"],
        }

        # Add variable attributes
        ds.clock_offset.attrs = {
            "long_name": "Satellite clock offset",
            "standard_name": "clock_offset",
            "units": "seconds",
            "description": "Clock correction for each satellite",
        }

        return ds

__post_init__()

Initialize CLK file handler.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
65
66
67
68
69
70
def __post_init__(self) -> None:
    """Initialize CLK file handler."""
    self.file_type = ["clock"]
    self.local_dir = Path(self.local_dir)
    self.local_dir.mkdir(parents=True, exist_ok=True)
    super().__post_init__()

get_interpolation_strategy()

Get appropriate interpolation strategy for CLK files.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
72
73
74
75
76
77
78
def get_interpolation_strategy(self) -> Interpolator:
    """Get appropriate interpolation strategy for CLK files."""
    config = ClockConfig(
        window_size=9,
        jump_threshold=1e-6,
    )
    return ClockInterpolationStrategy(config=config)

generate_filename_based_on_type()

Generate standard CLK filename using product registry.

Uses product registry to get correct prefix for the agency/product combination. Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

Returns

Path Filename according to CLK conventions.

Raises

ValueError If agency/product combination not in registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def generate_filename_based_on_type(self) -> Path:
    """Generate standard CLK filename using product registry.

    Uses product registry to get correct prefix for the agency/product combination.
    Filename format: {PREFIX}_{YYYYDOY}0000_01D_30S_CLK.CLK

    Returns
    -------
    Path
        Filename according to CLK conventions.

    Raises
    ------
    ValueError
        If agency/product combination not in registry.
    """
    # Get product spec from registry
    spec = get_product_spec(self.agency, self.product_type)

    # CLK files use 30S sampling for most products
    sampling = "30S"  # Could be made configurable via registry

    return Path(f"{spec.prefix}_{self.date}0000_01D_{sampling}_CLK.CLK")

download_aux_file()

Download CLK file, trying all servers from the product registry.

Servers are tried in priority order from products.toml. Auth-required servers are skipped when no credentials are configured. Warnings are printed when falling back to alternate servers.

Raises

RuntimeError If file cannot be downloaded from any available server. ValueError If GPS week calculation fails.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def download_aux_file(self) -> None:
    """Download CLK file, trying all servers from the product registry.

    Servers are tried in priority order from products.toml. Auth-required
    servers are skipped when no credentials are configured. Warnings are
    printed when falling back to alternate servers.

    Raises
    ------
    RuntimeError
        If file cannot be downloaded from any available server.
    ValueError
        If GPS week calculation fails.
    """
    clock_file = self.generate_filename_based_on_type()
    gps_week = get_gps_week_from_filename(clock_file)

    spec = get_product_spec(self.agency, self.product_type)
    ftp_path = spec.ftp_path_pattern.format(
        gps_week=gps_week,
        file=f"{clock_file}.gz",
    )
    destination = self.local_dir / clock_file
    file_info = {
        "gps_week": gps_week,
        "filename": clock_file,
        "type": "clock",
        "agency": self.agency,
    }

    self.download_with_fallback(ftp_path, destination, file_info, spec)
    print(f"Downloaded clock file for {self.agency} on date {self.date}")

read_file()

Read and parse CLK file into xarray Dataset.

Uses modular parser for data extraction and validator for quality checks. Applies unit conversion from microseconds to seconds.

Returns

xr.Dataset Clock offsets with dimensions (epoch, sv). Values are in seconds (or dimensionless if specified).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/reader.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def read_file(self) -> xr.Dataset:
    """Read and parse CLK file into xarray Dataset.

    Uses modular parser for data extraction and validator for quality checks.
    Applies unit conversion from microseconds to seconds.

    Returns
    -------
    xr.Dataset
        Clock offsets with dimensions (epoch, sv). Values are in seconds
        (or dimensionless if specified).
    """
    # Parse file using modular parser
    assert self.fpath is not None, "fpath must be set before calling read_file"
    epochs, satellites, clock_offsets = parse_clk_file(self.fpath)

    # Convert units (microseconds → seconds)
    clock_offsets = (UREG.microsecond * clock_offsets).to("s")

    if self.dimensionless:
        clock_offsets = clock_offsets.magnitude

    # Create dataset
    ds = xr.Dataset(
        data_vars={"clock_offset": (("epoch", "sv"), clock_offsets)},
        coords={
            "epoch": np.array(epochs, dtype="datetime64[ns]"),
            "sv": np.array(satellites),
        },
    )

    # Validate and add metadata
    validation = validate_clk_dataset(ds)
    ds = self._prepare_dataset(ds, validation)

    return ds

parse_clk_data(file_handle)

Parse CLK data records using two-pass strategy.

Two-Pass Strategy

Pass 1: Collect all unique epochs and satellites Pass 2: Fill data arrays with clock offsets

This ensures we capture all satellites, even if they're not in the header, and handles satellites appearing/disappearing during the time period.

Parameters

file_handle : TextIO Open file object positioned after header.

Returns

tuple[list[datetime.datetime], list[str], np.ndarray] (epochs, satellites, clock_offsets) where: - epochs: list of datetime objects - satellites: sorted list of satellite codes - clock_offsets: 2D array (epochs × satellites) in microseconds

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/parser.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def parse_clk_data(
    file_handle: TextIO,
) -> tuple[list[datetime.datetime], list[str], np.ndarray]:
    """Parse CLK data records using two-pass strategy.

    Two-Pass Strategy:
        Pass 1: Collect all unique epochs and satellites
        Pass 2: Fill data arrays with clock offsets

    This ensures we capture all satellites, even if they're not in the header,
    and handles satellites appearing/disappearing during the time period.

    Parameters
    ----------
    file_handle : TextIO
        Open file object positioned after header.

    Returns
    -------
    tuple[list[datetime.datetime], list[str], np.ndarray]
        (epochs, satellites, clock_offsets) where:
        - epochs: list of datetime objects
        - satellites: sorted list of satellite codes
        - clock_offsets: 2D array (epochs × satellites) in microseconds
    """
    # First pass: collect all epochs and satellites
    epochs = []
    satellites = set()
    clock_records = []
    current_epoch = None

    for line in file_handle:
        if line.startswith("AS"):
            parts = line.split()

            # Parse epoch
            epoch = datetime.datetime(
                year=int(parts[2]),
                month=int(parts[3]),
                day=int(parts[4]),
                hour=int(parts[5]),
                minute=int(parts[6]),
                second=int(float(parts[7])),
            )

            # Store unique epochs and satellites
            if epoch != current_epoch:
                current_epoch = epoch
                epochs.append(epoch)

            sv_code = parts[1]
            satellites.add(sv_code)

            # Store complete record
            clock_offset = float(parts[9])  # microseconds
            clock_records.append((epoch, sv_code, clock_offset))

    # Create lookup dictionaries for efficient indexing
    epoch_idx = {epoch: i for i, epoch in enumerate(epochs)}
    sv_list = sorted(satellites)
    sv_idx = {sv: i for i, sv in enumerate(sv_list)}

    # Pre-allocate array with NaN
    shape = (len(epochs), len(sv_list))
    clock_offsets = np.full(shape, np.nan)

    # Fill array using collected records
    for epoch, sv, offset in clock_records:
        i = epoch_idx[epoch]
        j = sv_idx[sv]
        clock_offsets[i, j] = offset

    return epochs, sv_list, clock_offsets

parse_clk_file(filepath)

Parse complete CLK file.

Parameters

filepath : Path Path to CLK file.

Returns

tuple[list[datetime.datetime], list[str], np.ndarray] (epochs, satellites, clock_offsets).

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/parser.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def parse_clk_file(
    filepath: Path,
) -> tuple[list[datetime.datetime], list[str], np.ndarray]:
    """Parse complete CLK file.

    Parameters
    ----------
    filepath : Path
        Path to CLK file.

    Returns
    -------
    tuple[list[datetime.datetime], list[str], np.ndarray]
        (epochs, satellites, clock_offsets).
    """
    with filepath.open() as f:
        # Parse header (skip it, we get satellites from data anyway)
        _ = parse_clk_header(f)

        # Parse data
        epochs, satellites, clock_offsets = parse_clk_data(f)

    return epochs, satellites, clock_offsets

parse_clk_header(file_handle)

Parse CLK file header to extract satellite list.

Parameters

file_handle : TextIO Open file object positioned at start.

Returns

set[str] Satellite identifiers (e.g., "G01", "R01").

Raises

ValueError If header format is invalid.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/parser.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def parse_clk_header(file_handle: TextIO) -> set[str]:
    """Parse CLK file header to extract satellite list.

    Parameters
    ----------
    file_handle : TextIO
        Open file object positioned at start.

    Returns
    -------
    set[str]
        Satellite identifiers (e.g., "G01", "R01").

    Raises
    ------
    ValueError
        If header format is invalid.
    """
    satellites = set()

    for line in file_handle:
        if "OF SOLN SATS" in line:
            _ = int(line[4:6])
        elif "PRN LIST" in line:
            # Parse PRN list line(s)
            parts = line.split()[2:]  # Skip 'PRN' and 'LIST'
            for prn in parts:
                if len(prn) == 3:  # Valid PRN format (e.g., G01)
                    satellites.add(prn)
        elif "END OF HEADER" in line:
            break

    return satellites

check_clk_data_quality(ds, min_coverage=80.0)

Check if CLK data meets minimum quality requirements.

Parameters

ds : xr.Dataset Dataset from a CLK file. min_coverage : float, default 80.0 Minimum required data coverage percentage.

Returns

bool True if data quality is acceptable.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/validator.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def check_clk_data_quality(ds: xr.Dataset, min_coverage: float = 80.0) -> bool:
    """Check if CLK data meets minimum quality requirements.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset from a CLK file.
    min_coverage : float, default 80.0
        Minimum required data coverage percentage.

    Returns
    -------
    bool
        True if data quality is acceptable.
    """
    results = validate_clk_dataset(ds)

    # Must have all required components
    if not (results["has_clock_offset"] and results["has_epoch"] and results["has_sv"]):
        return False

    # Must have monotonic epochs
    if not results["epochs_monotonic"]:
        return False

    # Must meet minimum coverage requirement
    if results["valid_data_percent"] < min_coverage:
        return False

    return True

validate_clk_dataset(ds)

Validate CLK dataset structure and data quality.

Checks

  • Required variable exists (clock_offset)
  • Required coordinates exist (epoch, sv)
  • Data completeness (percentage of valid values)
  • Temporal consistency (monotonic epochs)

Parameters

ds : xr.Dataset Dataset from a CLK file.

Returns

dict[str, bool | float | int] Validation results with keys: - has_clock_offset - has_epoch - has_sv - valid_data_percent - epochs_monotonic - num_epochs - num_satellites

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/clock/validator.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def validate_clk_dataset(ds: xr.Dataset) -> dict[str, bool | float | int]:
    """Validate CLK dataset structure and data quality.

    Checks:
        - Required variable exists (clock_offset)
        - Required coordinates exist (epoch, sv)
        - Data completeness (percentage of valid values)
        - Temporal consistency (monotonic epochs)

    Parameters
    ----------
    ds : xr.Dataset
        Dataset from a CLK file.

    Returns
    -------
    dict[str, bool | float | int]
        Validation results with keys:
        - has_clock_offset
        - has_epoch
        - has_sv
        - valid_data_percent
        - epochs_monotonic
        - num_epochs
        - num_satellites
    """
    results = {}

    # Check required variable
    results["has_clock_offset"] = "clock_offset" in ds.data_vars

    # Check required coordinates
    results["has_epoch"] = "epoch" in ds.coords
    results["has_sv"] = "sv" in ds.coords

    if results["has_clock_offset"]:
        # Calculate data completeness
        clock_data = ds["clock_offset"].values
        total_values = clock_data.size
        valid_values = np.sum(~np.isnan(clock_data))
        results["valid_data_percent"] = (valid_values / total_values) * 100
    else:
        results["valid_data_percent"] = 0.0

    if results["has_epoch"]:
        # Check temporal consistency
        epochs = ds["epoch"].values
        results["epochs_monotonic"] = np.all(epochs[:-1] <= epochs[1:])
        results["num_epochs"] = len(epochs)
    else:
        results["epochs_monotonic"] = False
        results["num_epochs"] = 0

    if results["has_sv"]:
        results["num_satellites"] = len(ds["sv"])
    else:
        results["num_satellites"] = 0

    return results

Position and Coordinates

Position and coordinate transformations for GNSS data.

Provides ECEF/geodetic position representations and spherical coordinate computation for satellite-receiver geometry analysis.

ECEFPosition dataclass

Earth-Centered, Earth-Fixed (ECEF) position in meters.

ECEF is a Cartesian coordinate system with: - Origin at Earth's center of mass - X-axis pointing to 0° latitude, 0° longitude (Prime Meridian at Equator) - Y-axis pointing to 0° latitude, 90° East longitude - Z-axis pointing to North Pole

Parameters

x : float X coordinate in meters. y : float Y coordinate in meters. z : float Z coordinate in meters.

Examples

From RINEX dataset metadata

pos = ECEFPosition.from_ds_metadata(rinex_ds) print(f"X: {pos.x:.3f} m")

Manual creation

pos = ECEFPosition(x=4194304.123, y=176481.234, z=4780013.456) lat, lon, alt = pos.to_geodetic()

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass(frozen=True)
class ECEFPosition:
    """Earth-Centered, Earth-Fixed (ECEF) position in meters.

    ECEF is a Cartesian coordinate system with:
    - Origin at Earth's center of mass
    - X-axis pointing to 0° latitude, 0° longitude (Prime Meridian at Equator)
    - Y-axis pointing to 0° latitude, 90° East longitude
    - Z-axis pointing to North Pole

    Parameters
    ----------
    x : float
        X coordinate in meters.
    y : float
        Y coordinate in meters.
    z : float
        Z coordinate in meters.

    Examples
    --------
    >>> # From RINEX dataset metadata
    >>> pos = ECEFPosition.from_ds_metadata(rinex_ds)
    >>> print(f"X: {pos.x:.3f} m")

    >>> # Manual creation
    >>> pos = ECEFPosition(x=4194304.123, y=176481.234, z=4780013.456)
    >>> lat, lon, alt = pos.to_geodetic()
    """

    x: float  # meters
    y: float  # meters
    z: float  # meters

    def to_geodetic(self) -> tuple[float, float, float]:
        """Convert ECEF to geodetic coordinates.

        Returns
        -------
        tuple[float, float, float]
            (latitude, longitude, altitude) where:
            - latitude: degrees [-90, 90]
            - longitude: degrees [-180, 180]
            - altitude: meters above WGS84 ellipsoid
        """
        lat, lon, alt = pm.ecef2geodetic(self.x, self.y, self.z)
        return lat, lon, alt

    @classmethod
    def from_ds_metadata(cls, ds: xr.Dataset) -> Self:
        """Extract ECEF position from RINEX dataset metadata.

        Reads from standard RINEX header attributes.

        Parameters
        ----------
        ds : xr.Dataset
            RINEX dataset with position in attributes.

        Returns
        -------
        ECEFPosition
            Receiver position in ECEF.

        Raises
        ------
        KeyError
            If position attributes not found in dataset.

        Examples
        --------
        >>> from canvod.readers import Rnxv3Obs
        >>> rnx = Rnxv3Obs(fpath="station.24o")
        >>> ds = rnx.to_ds()
        >>> pos = ECEFPosition.from_ds_metadata(ds)
        """
        # Try different attribute names
        if "APPROX POSITION X" in ds.attrs:
            # Standard RINEX v3 format
            x = ds.attrs["APPROX POSITION X"]
            y = ds.attrs["APPROX POSITION Y"]
            z = ds.attrs["APPROX POSITION Z"]
        elif "Approximate Position" in ds.attrs:
            # Alternative format: "X=..., Y=..., Z=..."
            pos = ds.attrs["Approximate Position"]
            pos_parts = pos.split(",")

            def sanitize(s: str) -> float:
                return float(s.split("=")[1].strip().split()[0])

            x = sanitize(pos_parts[0])
            y = sanitize(pos_parts[1])
            z = sanitize(pos_parts[2])
        else:
            raise KeyError(
                "Position not found in dataset attributes. "
                "Expected 'APPROX POSITION X/Y/Z' or 'Approximate Position'"
            )

        return cls(x=x, y=y, z=z)

    def __repr__(self) -> str:
        return f"ECEFPosition(x={self.x:.3f}, y={self.y:.3f}, z={self.z:.3f})"

to_geodetic()

Convert ECEF to geodetic coordinates.

Returns

tuple[float, float, float] (latitude, longitude, altitude) where: - latitude: degrees [-90, 90] - longitude: degrees [-180, 180] - altitude: meters above WGS84 ellipsoid

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def to_geodetic(self) -> tuple[float, float, float]:
    """Convert ECEF to geodetic coordinates.

    Returns
    -------
    tuple[float, float, float]
        (latitude, longitude, altitude) where:
        - latitude: degrees [-90, 90]
        - longitude: degrees [-180, 180]
        - altitude: meters above WGS84 ellipsoid
    """
    lat, lon, alt = pm.ecef2geodetic(self.x, self.y, self.z)
    return lat, lon, alt

from_ds_metadata(ds) classmethod

Extract ECEF position from RINEX dataset metadata.

Reads from standard RINEX header attributes.

Parameters

ds : xr.Dataset RINEX dataset with position in attributes.

Returns

ECEFPosition Receiver position in ECEF.

Raises

KeyError If position attributes not found in dataset.

Examples

from canvod.readers import Rnxv3Obs rnx = Rnxv3Obs(fpath="station.24o") ds = rnx.to_ds() pos = ECEFPosition.from_ds_metadata(ds)

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@classmethod
def from_ds_metadata(cls, ds: xr.Dataset) -> Self:
    """Extract ECEF position from RINEX dataset metadata.

    Reads from standard RINEX header attributes.

    Parameters
    ----------
    ds : xr.Dataset
        RINEX dataset with position in attributes.

    Returns
    -------
    ECEFPosition
        Receiver position in ECEF.

    Raises
    ------
    KeyError
        If position attributes not found in dataset.

    Examples
    --------
    >>> from canvod.readers import Rnxv3Obs
    >>> rnx = Rnxv3Obs(fpath="station.24o")
    >>> ds = rnx.to_ds()
    >>> pos = ECEFPosition.from_ds_metadata(ds)
    """
    # Try different attribute names
    if "APPROX POSITION X" in ds.attrs:
        # Standard RINEX v3 format
        x = ds.attrs["APPROX POSITION X"]
        y = ds.attrs["APPROX POSITION Y"]
        z = ds.attrs["APPROX POSITION Z"]
    elif "Approximate Position" in ds.attrs:
        # Alternative format: "X=..., Y=..., Z=..."
        pos = ds.attrs["Approximate Position"]
        pos_parts = pos.split(",")

        def sanitize(s: str) -> float:
            return float(s.split("=")[1].strip().split()[0])

        x = sanitize(pos_parts[0])
        y = sanitize(pos_parts[1])
        z = sanitize(pos_parts[2])
    else:
        raise KeyError(
            "Position not found in dataset attributes. "
            "Expected 'APPROX POSITION X/Y/Z' or 'Approximate Position'"
        )

    return cls(x=x, y=y, z=z)

GeodeticPosition dataclass

Geodetic (WGS84) position.

Parameters

lat : float Latitude in degrees [-90, 90]. lon : float Longitude in degrees [-180, 180]. alt : float Altitude in meters above WGS84 ellipsoid.

Examples

pos = GeodeticPosition(lat=48.208, lon=16.373, alt=200.0) print(f"Vienna: {pos.lat}°N, {pos.lon}°E, {pos.alt}m")

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@dataclass(frozen=True)
class GeodeticPosition:
    """Geodetic (WGS84) position.

    Parameters
    ----------
    lat : float
        Latitude in degrees [-90, 90].
    lon : float
        Longitude in degrees [-180, 180].
    alt : float
        Altitude in meters above WGS84 ellipsoid.

    Examples
    --------
    >>> pos = GeodeticPosition(lat=48.208, lon=16.373, alt=200.0)
    >>> print(f"Vienna: {pos.lat}°N, {pos.lon}°E, {pos.alt}m")
    """

    lat: float  # degrees
    lon: float  # degrees
    alt: float  # meters

    def to_ecef(self) -> ECEFPosition:
        """Convert geodetic to ECEF coordinates.

        Returns
        -------
        ECEFPosition
            Position in ECEF frame.
        """
        x, y, z = pm.geodetic2ecef(self.lat, self.lon, self.alt)
        return ECEFPosition(x=x, y=y, z=z)

    def __repr__(self) -> str:
        return (
            f"GeodeticPosition(lat={self.lat:.6f}°, "
            f"lon={self.lon:.6f}°, alt={self.alt:.1f}m)"
        )

to_ecef()

Convert geodetic to ECEF coordinates.

Returns

ECEFPosition Position in ECEF frame.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/position.py
143
144
145
146
147
148
149
150
151
152
def to_ecef(self) -> ECEFPosition:
    """Convert geodetic to ECEF coordinates.

    Returns
    -------
    ECEFPosition
        Position in ECEF frame.
    """
    x, y, z = pm.geodetic2ecef(self.lat, self.lon, self.alt)
    return ECEFPosition(x=x, y=y, z=z)

add_broadcast_spherical_coords_to_dataset(ds, theta, phi)

Add broadcast spherical coordinates to xarray Dataset.

Same convention as :func:add_spherical_coords_to_dataset but attrs reflect that values come from the SBF broadcast navigation solution rather than independently-computed SP3/CLK ephemerides.

Parameters

ds : xr.Dataset Dataset with 'epoch' and 'sid' dimensions. theta : np.ndarray Polar angles in radians [0, π]. phi : np.ndarray Azimuthal angles in radians [0, 2π).

Returns

xr.Dataset Dataset with phi and theta variables added.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/spherical_coords.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def add_broadcast_spherical_coords_to_dataset(
    ds: xr.Dataset,
    theta: np.ndarray,
    phi: np.ndarray,
) -> xr.Dataset:
    """Add broadcast spherical coordinates to xarray Dataset.

    Same convention as :func:`add_spherical_coords_to_dataset` but attrs
    reflect that values come from the SBF broadcast navigation solution
    rather than independently-computed SP3/CLK ephemerides.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with 'epoch' and 'sid' dimensions.
    theta : np.ndarray
        Polar angles in radians [0, π].
    phi : np.ndarray
        Azimuthal angles in radians [0, 2π).

    Returns
    -------
    xr.Dataset
        Dataset with phi and theta variables added.
    """
    ds["theta"] = xr.DataArray(
        theta,
        coords=[ds["epoch"], ds["sid"]],
        dims=["epoch", "sid"],
        attrs=_BROADCAST_OBS_THETA_ATTRS,
    )
    ds["phi"] = xr.DataArray(
        phi,
        coords=[ds["epoch"], ds["sid"]],
        dims=["epoch", "sid"],
        attrs=_BROADCAST_OBS_PHI_ATTRS,
    )
    return ds

add_spherical_coords_to_dataset(ds, r, theta, phi)

Add spherical coordinates to xarray Dataset with proper metadata.

Parameters

ds : xr.Dataset Dataset with 'epoch' and 'sid' dimensions. r : np.ndarray Radial distances in meters. theta : np.ndarray Polar angles in radians [0, π]. phi : np.ndarray Azimuthal angles in radians [0, 2π).

Returns

xr.Dataset Dataset with phi, theta, r variables added.

Notes

Variables are added with CF-compliant attributes following physics convention.

Examples

After computing spherical coordinates

r, theta, phi = compute_spherical_coordinates(sat_x, sat_y, sat_z, rx_pos)

Add to RINEX dataset

augmented_ds = add_spherical_coords_to_dataset(rinex_ds, r, theta, phi) print(augmented_ds.phi.attrs['description'])

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/spherical_coords.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def add_spherical_coords_to_dataset(
    ds: xr.Dataset,
    r: np.ndarray,
    theta: np.ndarray,
    phi: np.ndarray,
) -> xr.Dataset:
    """Add spherical coordinates to xarray Dataset with proper metadata.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with 'epoch' and 'sid' dimensions.
    r : np.ndarray
        Radial distances in meters.
    theta : np.ndarray
        Polar angles in radians [0, π].
    phi : np.ndarray
        Azimuthal angles in radians [0, 2π).

    Returns
    -------
    xr.Dataset
        Dataset with phi, theta, r variables added.

    Notes
    -----
    Variables are added with CF-compliant attributes following physics
    convention.

    Examples
    --------
    >>> # After computing spherical coordinates
    >>> r, theta, phi = compute_spherical_coordinates(sat_x, sat_y, sat_z, rx_pos)
    >>>
    >>> # Add to RINEX dataset
    >>> augmented_ds = add_spherical_coords_to_dataset(rinex_ds, r, theta, phi)
    >>> print(augmented_ds.phi.attrs['description'])
    """
    ds = ds.assign(
        {
            "phi": xr.DataArray(
                phi,
                coords=[ds["epoch"], ds["sid"]],
                dims=["epoch", "sid"],
                attrs={
                    "long_name": "Azimuthal angle (navigation convention)",
                    "short_name": "φ",
                    "units": "rad",
                    "description": (
                        "Azimuthal angle from North in ENU frame, clockwise"
                    ),
                    "valid_range": [0.0, 2 * np.pi],
                    "convention": "navigation (0=North, π/2=East, π=South, 3π/2=West)",
                },
            ),
            "theta": xr.DataArray(
                theta,
                coords=[ds["epoch"], ds["sid"]],
                dims=["epoch", "sid"],
                attrs={
                    "long_name": "Polar angle (physics convention)",
                    "short_name": "θ",
                    "units": "rad",
                    "description": "Polar angle from zenith (+z/Up)",
                    "valid_range": [0.0, np.pi / 2],
                    "convention": "physics (0=zenith, π/2=horizon)",
                },
            ),
            "r": xr.DataArray(
                r,
                coords=[ds["epoch"], ds["sid"]],
                dims=["epoch", "sid"],
                attrs={
                    "long_name": "Distance",
                    "short_name": "r",
                    "units": "m",
                    "description": "Distance between satellite and receiver",
                },
            ),
        }
    )
    return ds

compute_spherical_coordinates(sat_x, sat_y, sat_z, rx_pos)

Compute spherical coordinates (r, theta, phi) in navigation convention.

Uses local ENU (East-North-Up) topocentric frame centered at receiver.

Navigation Convention: - theta: Polar angle from +z axis (zenith), [0, π] radians * theta = 0 → zenith (straight up) * theta = π/2 → horizon * theta > π/2 → below horizon (set to NaN) - phi: Azimuthal angle from North, clockwise, [0, 2π) radians * phi = 0 → North * phi = π/2 → East * phi = π → South * phi = 3π/2 → West - r: Radial distance in meters

Note

The phi convention follows geographic/navigation standards where: - 0° points North (positive Y in ENU frame) - Angles increase clockwise when viewed from above - This is computed as arctan2(East, North), giving North=0° reference - Differs from physics convention which uses East=0° reference

Parameters

sat_x : np.ndarray Satellite X coordinates in ECEF (meters). sat_y : np.ndarray Satellite Y coordinates in ECEF (meters). sat_z : np.ndarray Satellite Z coordinates in ECEF (meters). rx_pos : ECEFPosition Receiver position in ECEF.

Returns

tuple[np.ndarray, np.ndarray, np.ndarray] (r, theta, phi) where: - r: distances in meters - theta: polar angles in radians [0, π] - phi: azimuthal angles in radians [0, 2π)

Notes

Satellites below horizon (theta > π/2) are set to NaN.

Examples

from canvod.auxiliary.position import ( ... ECEFPosition, compute_spherical_coordinates, ... )

Receiver position

rx = ECEFPosition(x=4194304.0, y=176481.0, z=4780013.0)

Satellite positions (example)

sat_x = np.array([16364123.0, 10205789.0]) sat_y = np.array([12123456.0, -8901234.0]) sat_z = np.array([18456789.0, 21234567.0])

r, theta, phi = compute_spherical_coordinates(sat_x, sat_y, sat_z, rx) print(f"Distance: {r[0]/1e6:.2f} Mm") print(f"Polar angle: {np.degrees(theta[0]):.1f}°") print(f"Azimuth: {np.degrees(phi[0]):.1f}°")

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/position/spherical_coords.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def compute_spherical_coordinates(
    sat_x: np.ndarray | float,
    sat_y: np.ndarray | float,
    sat_z: np.ndarray | float,
    rx_pos: ECEFPosition,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Compute spherical coordinates (r, theta, phi) in navigation convention.

    Uses local ENU (East-North-Up) topocentric frame centered at receiver.

    Navigation Convention:
    - theta: Polar angle from +z axis (zenith), [0, π] radians
      * theta = 0 → zenith (straight up)
      * theta = π/2 → horizon
      * theta > π/2 → below horizon (set to NaN)
    - phi: Azimuthal angle from North, clockwise, [0, 2π) radians
      * phi = 0 → North
      * phi = π/2 → East
      * phi = π → South
      * phi = 3π/2 → West
    - r: Radial distance in meters

    Note
    ----
    The phi convention follows geographic/navigation standards where:
    - 0° points North (positive Y in ENU frame)
    - Angles increase clockwise when viewed from above
    - This is computed as arctan2(East, North), giving North=0° reference
    - Differs from physics convention which uses East=0° reference

    Parameters
    ----------
    sat_x : np.ndarray
        Satellite X coordinates in ECEF (meters).
    sat_y : np.ndarray
        Satellite Y coordinates in ECEF (meters).
    sat_z : np.ndarray
        Satellite Z coordinates in ECEF (meters).
    rx_pos : ECEFPosition
        Receiver position in ECEF.

    Returns
    -------
    tuple[np.ndarray, np.ndarray, np.ndarray]
        (r, theta, phi) where:
        - r: distances in meters
        - theta: polar angles in radians [0, π]
        - phi: azimuthal angles in radians [0, 2π)

    Notes
    -----
    Satellites below horizon (theta > π/2) are set to NaN.

    Examples
    --------
    >>> from canvod.auxiliary.position import (
    ...     ECEFPosition, compute_spherical_coordinates,
    ... )
    >>>
    >>> # Receiver position
    >>> rx = ECEFPosition(x=4194304.0, y=176481.0, z=4780013.0)
    >>>
    >>> # Satellite positions (example)
    >>> sat_x = np.array([16364123.0, 10205789.0])
    >>> sat_y = np.array([12123456.0, -8901234.0])
    >>> sat_z = np.array([18456789.0, 21234567.0])
    >>>
    >>> r, theta, phi = compute_spherical_coordinates(sat_x, sat_y, sat_z, rx)
    >>> print(f"Distance: {r[0]/1e6:.2f} Mm")
    >>> print(f"Polar angle: {np.degrees(theta[0]):.1f}°")
    >>> print(f"Azimuth: {np.degrees(phi[0]):.1f}°")
    """
    # Receiver ECEF coordinates
    rx_x = rx_pos.x
    rx_y = rx_pos.y
    rx_z = rx_pos.z

    # Convert receiver ECEF to geodetic (lat, lon, alt)
    lat, lon, alt = pm.ecef2geodetic(rx_x, rx_y, rx_z)

    # Convert satellite ECEF to ENU (East-North-Up) relative to receiver
    e, n, u = pm.ecef2enu(sat_x, sat_y, sat_z, lat, lon, alt)

    # Compute radial distance
    r = np.sqrt(e**2 + n**2 + u**2)

    # Compute theta: polar angle from +z (Up) axis
    # Clamp u/r to [-1, 1] to handle numerical errors
    cos_theta = np.clip(u / r, -1.0, 1.0)
    theta = np.arccos(cos_theta)

    # Mask satellites below horizon (u < 0 means below horizon)
    below_horizon = u < 0

    # Compute phi: azimuthal angle from North, clockwise (navigation convention)
    # phi = arctan2(East, North) gives North=0°, East=90°, South=180°, West=270°
    phi = np.arctan2(e, n)
    phi = np.mod(phi, 2 * np.pi)  # Wrap to [0, 2π)

    # Set below-horizon satellites to NaN
    r = np.where(below_horizon, np.nan, r)
    theta = np.where(below_horizon, np.nan, theta)
    phi = np.where(below_horizon, np.nan, phi)

    return r, theta, phi

Product Registry

Product registry and specifications for IGS analysis centers.

ProductSpec

Bases: BaseModel

Product specification with structural validation.

Validates data structure only (fast). FTP availability validated during download (lazy, fail-fast).

Notes

This is a Pydantic BaseModel.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/products/registry_config.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ProductSpec(BaseModel):
    """
    Product specification with structural validation.

    Validates data structure only (fast).
    FTP availability validated during download (lazy, fail-fast).

    Notes
    -----
    This is a Pydantic `BaseModel`.
    """

    model_config = ConfigDict(extra="forbid")

    agency_code: str = Field(
        min_length=3,
        max_length=3,
        pattern=r"^[A-Z]{3}$",
    )
    agency_name: str
    product_type: str = Field(
        pattern=r"^(final|rapid|ultrarapid|real-time|near-real-time)$",
    )
    prefix: str = Field(
        min_length=9,
        max_length=10,
        pattern=r"^[A-Z0-9]+$",
    )
    sampling_rate: str = Field(pattern=r"^\d{2}[SMHD]$")
    duration: str = Field(pattern=r"^\d{2}[DH]$")
    available_formats: list[str]
    ftp_servers: list[FtpServerConfig]
    ftp_path_pattern: str
    description: str = ""

    @field_validator("prefix")
    @classmethod
    def validate_prefix_matches_agency(cls, v: str, info) -> str:
        """Validate prefix starts with agency code."""
        if "agency_code" in info.data:
            agency = info.data["agency_code"]
            if not v.startswith(agency):
                raise ValueError(f"Prefix must start with agency code {agency}")
        return v

validate_prefix_matches_agency(v, info) classmethod

Validate prefix starts with agency code.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/products/registry_config.py
70
71
72
73
74
75
76
77
78
@field_validator("prefix")
@classmethod
def validate_prefix_matches_agency(cls, v: str, info) -> str:
    """Validate prefix starts with agency code."""
    if "agency_code" in info.data:
        agency = info.data["agency_code"]
        if not v.startswith(agency):
            raise ValueError(f"Prefix must start with agency code {agency}")
    return v

get_product_spec(agency, product_type)

Get product spec from global registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/products/registry_config.py
234
235
236
def get_product_spec(agency: str, product_type: str) -> ProductSpec:
    """Get product spec from global registry."""
    return get_registry().get_product_spec(agency, product_type)

list_agencies()

List agencies from global registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/products/registry_config.py
239
240
241
def list_agencies() -> list[str]:
    """List agencies from global registry."""
    return get_registry().list_agencies()

get_products_for_agency(agency)

Get products for agency from global registry.

Source code in packages/canvod-auxiliary/src/canvod/auxiliary/products/registry_config.py
249
250
251
def get_products_for_agency(agency: str) -> list[str]:
    """Get products for agency from global registry."""
    return get_registry().get_products_for_agency(agency)