Skip to content

canvod.virtualiconvname API Reference

Filename convention, mapping, validation, and cataloging.

Convention

Pydantic v2 model for the canVOD file naming convention.

Convention::

{SIT}{T}{NN}{AGC}_R_{YYYY}{DOY}{HHMM}_{PERIOD}_{SAMPLING}_{CONTENT}.{TYPE}[.{COMPRESSION}]

SIT         3-char site ID, uppercase            (e.g. ROS, HAI, FON, LBS)
T           Receiver type, single uppercase char  (R=reference, A=active/below-canopy)
NN          Receiver number, zero-padded 01-99
AGC         3-char data provider ID, uppercase    (e.g. GFZ, TUD, TUW, MPI)
_R          Literal - 'R' for Receiver
YYYY        4-digit year
DOY         3-digit day of year (001-366)
HHMM        Start time, hours + minutes
PERIOD      Batch size: 2-digit value + unit      (e.g. 01D, 15M, 01H)
SAMPLING    Data frequency: 2-digit value + unit  (e.g. 01S, 05S, 05M)
CONTENT     2-char user field, default 'AA'
TYPE        File type, lowercase                  (rnx, sbf, ubx, nmea)
COMPRESSION Optional compression extension        (zip, gz, bz2, zst, ...)

Examples::

HAIA01GFZ_R_20250010000_01D_01S_AA.rnx.zip
ROSR01TUW_R_20250010000_01D_05S_AA.rnx
ROSR35TUW_R_20232221530_15M_05S_AA.sbf

CanVODFilename

Structured representation of a canVOD-compliant filename.

Construct directly with keyword arguments, or parse from a filename string with :meth:from_filename. Render back to a string with :attr:name.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/convention.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@dataclass(config=ConfigDict(frozen=True, str_strip_whitespace=True))
class CanVODFilename:
    """Structured representation of a canVOD-compliant filename.

    Construct directly with keyword arguments, or parse from a filename string
    with :meth:`from_filename`.  Render back to a string with :attr:`name`.
    """

    site: SiteId
    receiver_type: ReceiverType
    receiver_number: Annotated[int, Field(ge=1, le=99)]
    agency: AgencyId
    year: Annotated[int, Field(ge=2000, le=2099)]
    doy: Annotated[int, Field(ge=1, le=366)]
    hour: Annotated[int, Field(ge=0, le=23)] = 0
    minute: Annotated[int, Field(ge=0, le=59)] = 0
    period: Duration = "01D"
    sampling: Duration = "05S"
    content: ContentCode = "AA"
    file_type: FileType = FileType.RNX
    compression: str | None = None

    # -- Validators -----------------------------------------------------------

    @field_validator("compression")
    @classmethod
    def _validate_compression(cls, v: str | None) -> str | None:
        if v is not None:
            v = v.lower().strip()
            if not v:
                return None
        return v

    # -- Computed properties --------------------------------------------------

    @property
    def sampling_interval(self) -> timedelta:
        """Sampling frequency as a timedelta."""
        return _duration_to_timedelta(self.sampling)

    @property
    def batch_duration(self) -> timedelta:
        """Batch / file period as a timedelta."""
        return _duration_to_timedelta(self.period)

    @property
    def name(self) -> str:
        """Full filename including optional compression extension."""
        stem = (
            f"{self.site}{self.receiver_type.value}"
            f"{self.receiver_number:02d}"
            f"{self.agency}"
            f"_R"
            f"_{self.year:04d}{self.doy:03d}{self.hour:02d}{self.minute:02d}"
            f"_{self.period}"
            f"_{self.sampling}"
            f"_{self.content}"
            f".{self.file_type.value}"
        )
        if self.compression:
            return f"{stem}.{self.compression}"
        return stem

    @property
    def stem(self) -> str:
        """Filename without the compression extension (if any)."""
        parts = self.name.rsplit(".", 1) if self.compression else [self.name]
        return parts[0]

    # -- Constructors ---------------------------------------------------------

    @classmethod
    def from_filename(cls, filename: str) -> Self:
        """Parse a canVOD-compliant filename string into a model instance.

        Accepts a bare filename (no directory components).  Leading path
        segments are stripped automatically.

        Raises
        ------
        ValueError
            If the filename does not match the convention.
        """
        # Strip any directory prefix
        basename = filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1]

        m = _FILENAME_RE.match(basename)
        if m is None:
            raise ValueError(
                f"Filename does not match the canVOD convention: {basename!r}"
            )

        return cls(
            site=m.group("site"),
            receiver_type=ReceiverType(m.group("receiver_type")),
            receiver_number=int(m.group("receiver_number")),
            agency=m.group("agency"),
            year=int(m.group("year")),
            doy=int(m.group("doy")),
            hour=int(m.group("hour")),
            minute=int(m.group("minute")),
            period=m.group("period"),
            sampling=m.group("sampling"),
            content=m.group("content"),
            file_type=FileType(m.group("file_type")),
            compression=m.group("compression"),
        )

    def __str__(self) -> str:
        return self.name

sampling_interval property

Sampling frequency as a timedelta.

batch_duration property

Batch / file period as a timedelta.

name property

Full filename including optional compression extension.

stem property

Filename without the compression extension (if any).

from_filename(filename) classmethod

Parse a canVOD-compliant filename string into a model instance.

Accepts a bare filename (no directory components). Leading path segments are stripped automatically.

Raises

ValueError If the filename does not match the convention.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/convention.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def from_filename(cls, filename: str) -> Self:
    """Parse a canVOD-compliant filename string into a model instance.

    Accepts a bare filename (no directory components).  Leading path
    segments are stripped automatically.

    Raises
    ------
    ValueError
        If the filename does not match the convention.
    """
    # Strip any directory prefix
    basename = filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1]

    m = _FILENAME_RE.match(basename)
    if m is None:
        raise ValueError(
            f"Filename does not match the canVOD convention: {basename!r}"
        )

    return cls(
        site=m.group("site"),
        receiver_type=ReceiverType(m.group("receiver_type")),
        receiver_number=int(m.group("receiver_number")),
        agency=m.group("agency"),
        year=int(m.group("year")),
        doy=int(m.group("doy")),
        hour=int(m.group("hour")),
        minute=int(m.group("minute")),
        period=m.group("period"),
        sampling=m.group("sampling"),
        content=m.group("content"),
        file_type=FileType(m.group("file_type")),
        compression=m.group("compression"),
    )

ReceiverType

Bases: StrEnum

Char 4: receiver role at the site.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/convention.py
63
64
65
66
67
class ReceiverType(StrEnum):
    """Char 4: receiver role at the site."""

    REFERENCE = "R"
    ACTIVE = "A"

FileType

Bases: StrEnum

File format / observation type.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/convention.py
70
71
72
73
74
75
76
class FileType(StrEnum):
    """File format / observation type."""

    RNX = "rnx"
    SBF = "sbf"
    UBX = "ubx"
    NMEA = "nmea"

Mapping

Virtual renaming engine: map physical files to canVOD conventional names.

The FilenameMapper discovers files on disk according to the configured directory layout and source pattern, then wraps each in a VirtualFile that pairs the physical path with its canVOD conventional name.

VirtualFile dataclass

Physical file mapped to its canVOD conventional name.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass(frozen=True)
class VirtualFile:
    """Physical file mapped to its canVOD conventional name."""

    physical_path: Path
    conventional_name: CanVODFilename

    @property
    def canonical_str(self) -> str:
        """The conventional filename as a string."""
        return self.conventional_name.name

    def open(self, mode: str = "rb"):
        """Open the physical file."""
        return self.physical_path.open(mode)

canonical_str property

The conventional filename as a string.

open(mode='rb')

Open the physical file.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
40
41
42
def open(self, mode: str = "rb"):
    """Open the physical file."""
    return self.physical_path.open(mode)

FilenameMapper

Maps physical files to canVOD conventional names.

Parameters

site_naming Site-level naming config. receiver_naming Receiver-level naming config. receiver_type Whether this receiver is "reference" or "canopy". receiver_base_dir Absolute path to the receiver's data directory.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class FilenameMapper:
    """Maps physical files to canVOD conventional names.

    Parameters
    ----------
    site_naming
        Site-level naming config.
    receiver_naming
        Receiver-level naming config.
    receiver_type
        Whether this receiver is ``"reference"`` or ``"canopy"``.
    receiver_base_dir
        Absolute path to the receiver's data directory.
    """

    def __init__(
        self,
        site_naming: SiteNamingConfig,
        receiver_naming: ReceiverNamingConfig,
        receiver_type: Literal["reference", "canopy"],
        receiver_base_dir: Path,
    ) -> None:
        self.site_naming = site_naming
        self.receiver_naming = receiver_naming
        self.receiver_type_str = receiver_type
        self.receiver_base_dir = receiver_base_dir

        self._rx_type = (
            ReceiverType.REFERENCE
            if receiver_type == "reference"
            else ReceiverType.ACTIVE
        )

    def discover_all(self) -> list[VirtualFile]:
        """Discover all files and map them to conventional names."""
        files = self._discover_files()
        results = []
        for path in natsorted(files, key=lambda p: p.name):
            try:
                vf = self.map_single_file(path)
                results.append(vf)
            except ValueError, KeyError:
                continue
        return results

    def discover_for_date(self, year: int, doy: int) -> list[VirtualFile]:
        """Discover files for a specific date."""
        layout = self.receiver_naming.directory_layout

        if layout == DirectoryLayout.YYDDD_SUBDIRS:
            yy = year % 100
            dir_name = f"{yy:02d}{doy:03d}"
            search_dir = self.receiver_base_dir / dir_name
        elif layout == DirectoryLayout.YYYYDDD_SUBDIRS:
            dir_name = f"{year:04d}{doy:03d}"
            search_dir = self.receiver_base_dir / dir_name
        else:
            # FLAT: discover all and filter
            all_files = self.discover_all()
            return [
                vf
                for vf in all_files
                if vf.conventional_name.year == year and vf.conventional_name.doy == doy
            ]

        if not search_dir.is_dir():
            return []

        files = self._glob_in_dir(search_dir)
        results = []
        for path in natsorted(files, key=lambda p: p.name):
            try:
                vf = self.map_single_file(path, year=year, doy=doy)
                results.append(vf)
            except ValueError, KeyError:
                continue
        return results

    def map_single_file(
        self, file_path: Path, *, year: int | None = None, doy: int | None = None
    ) -> VirtualFile:
        """Map a single physical file to its canVOD conventional name.

        Parameters
        ----------
        file_path
            Path to the physical file.
        year, doy
            Optional date override (e.g. from directory name).

        Raises
        ------
        ValueError
            If the file cannot be matched or mapped.
        """
        filename = file_path.name
        pattern_name = self.receiver_naming.source_pattern

        result = match_pattern(filename, pattern_name)
        if result is None:
            raise ValueError(f"No pattern matched for {filename!r}")

        pat, m = result
        groups = m.groupdict()

        # Validate station code if configured
        expected_station = self.receiver_naming.source_station
        if expected_station and "station" in groups:
            actual_station = groups["station"]
            if actual_station.lower() != expected_station.lower():
                raise ValueError(
                    f"Station code mismatch for {filename!r}: "
                    f"expected {expected_station!r}, got {actual_station!r}"
                )

        # Extract year
        if year is None:
            if "year" in groups and groups["year"] is not None:
                year = int(groups["year"])
            elif "yy" in groups and groups["yy"] is not None:
                year = resolve_year_from_yy(int(groups["yy"]))
            else:
                raise ValueError(f"Cannot determine year from {filename!r}")

        # Extract DOY
        if doy is None:
            if "doy" in groups and groups["doy"] is not None:
                doy = int(groups["doy"])
            else:
                raise ValueError(f"Cannot determine DOY from {filename!r}")

        # Extract hour
        if "hour" in groups and groups["hour"] is not None:
            hour = int(groups["hour"])
        elif "hour_letter" in groups and groups["hour_letter"] is not None:
            hour = hour_letter_to_int(groups["hour_letter"])
        else:
            hour = 0

        # Extract minute
        if "minute" in groups and groups["minute"] is not None:
            minute = int(groups["minute"])
        else:
            minute = 0

        # Sampling and period from regex or config defaults
        sampling = (
            groups.get("sampling")
            or self.receiver_naming.sampling
            or self.site_naming.default_sampling
        )
        period = groups.get("period")
        if not period:
            # For RINEX v2 / SBF: hour_letter='0' means daily file
            hour_letter = groups.get("hour_letter")
            if hour_letter == "0" and hour == 0 and minute == 0:
                period = "01D"
            else:
                period = self.receiver_naming.period or self.site_naming.default_period

        # Content from config
        content = self.receiver_naming.content or self.site_naming.default_content

        # Agency from config
        agency = self.receiver_naming.agency or self.site_naming.agency

        # File type and compression from extension
        file_type, compression = _detect_file_type(file_path)

        conventional = CanVODFilename(
            site=self.site_naming.site_id,
            receiver_type=self._rx_type,
            receiver_number=self.receiver_naming.receiver_number,
            agency=agency,
            year=year,
            doy=doy,
            hour=hour,
            minute=minute,
            period=period,
            sampling=sampling,
            content=content,
            file_type=file_type,
            compression=compression,
        )

        return VirtualFile(physical_path=file_path, conventional_name=conventional)

    @staticmethod
    def detect_overlaps(
        vfs: list[VirtualFile],
    ) -> list[tuple[VirtualFile, VirtualFile]]:
        """Detect temporal overlaps among virtual files.

        Groups files by ``(year, doy)`` and checks whether any file's time
        range contains or overlaps another's.  A ``01D`` file alongside
        ``15M`` files for the same day is the canonical overlap case.

        Returns
        -------
        list[tuple[VirtualFile, VirtualFile]]
            Pairs of overlapping files.
        """
        from collections import defaultdict

        by_date: dict[tuple[int, int], list[VirtualFile]] = defaultdict(list)
        for vf in vfs:
            cn = vf.conventional_name
            by_date[(cn.year, cn.doy)].append(vf)

        overlaps: list[tuple[VirtualFile, VirtualFile]] = []
        for group in by_date.values():
            if len(group) < 2:
                continue
            # Compute (start_minutes, end_minutes) for each file
            ranges: list[tuple[int, int, VirtualFile]] = []
            for vf in group:
                cn = vf.conventional_name
                start_min = cn.hour * 60 + cn.minute
                duration_sec = int(cn.batch_duration.total_seconds())
                end_min = start_min + duration_sec // 60
                ranges.append((start_min, end_min, vf))

            # O(n^2) pairwise check — fine for <100 files per day
            for i in range(len(ranges)):
                for j in range(i + 1, len(ranges)):
                    s_i, e_i, vf_i = ranges[i]
                    s_j, e_j, vf_j = ranges[j]
                    # Overlap if intervals intersect
                    if s_i < e_j and s_j < e_i:
                        overlaps.append((vf_i, vf_j))
        return overlaps

    # -- Private helpers ------------------------------------------------------

    def _discover_files(self) -> list[Path]:
        """Discover all data files according to directory layout."""
        layout = self.receiver_naming.directory_layout

        if layout == DirectoryLayout.FLAT:
            return self._glob_in_dir(self.receiver_base_dir)

        # Subdirectory layouts
        if not self.receiver_base_dir.is_dir():
            return []

        if layout == DirectoryLayout.YYDDD_SUBDIRS:
            dir_pattern = re.compile(r"^\d{5}$")
        else:
            dir_pattern = re.compile(r"^\d{7}$")

        files: list[Path] = []
        for subdir in sorted(self.receiver_base_dir.iterdir()):
            if subdir.is_dir() and dir_pattern.match(subdir.name):
                files.extend(self._glob_in_dir(subdir))
        return files

    def _glob_in_dir(self, directory: Path) -> list[Path]:
        """Glob for data files in a directory using the source pattern's globs."""
        pattern_name = self.receiver_naming.source_pattern

        if pattern_name == "auto":
            globs: set[str] = set()
            for name in auto_match_order():
                globs.update(BUILTIN_PATTERNS[name].file_globs)
        else:
            globs = set(BUILTIN_PATTERNS[pattern_name].file_globs)

        files: list[Path] = []
        seen: set[Path] = set()
        for g in sorted(globs):
            for path in directory.glob(g):
                if path.is_file() and path not in seen:
                    seen.add(path)
                    files.append(path)
        return files

discover_all()

Discover all files and map them to conventional names.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
137
138
139
140
141
142
143
144
145
146
147
def discover_all(self) -> list[VirtualFile]:
    """Discover all files and map them to conventional names."""
    files = self._discover_files()
    results = []
    for path in natsorted(files, key=lambda p: p.name):
        try:
            vf = self.map_single_file(path)
            results.append(vf)
        except ValueError, KeyError:
            continue
    return results

discover_for_date(year, doy)

Discover files for a specific date.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def discover_for_date(self, year: int, doy: int) -> list[VirtualFile]:
    """Discover files for a specific date."""
    layout = self.receiver_naming.directory_layout

    if layout == DirectoryLayout.YYDDD_SUBDIRS:
        yy = year % 100
        dir_name = f"{yy:02d}{doy:03d}"
        search_dir = self.receiver_base_dir / dir_name
    elif layout == DirectoryLayout.YYYYDDD_SUBDIRS:
        dir_name = f"{year:04d}{doy:03d}"
        search_dir = self.receiver_base_dir / dir_name
    else:
        # FLAT: discover all and filter
        all_files = self.discover_all()
        return [
            vf
            for vf in all_files
            if vf.conventional_name.year == year and vf.conventional_name.doy == doy
        ]

    if not search_dir.is_dir():
        return []

    files = self._glob_in_dir(search_dir)
    results = []
    for path in natsorted(files, key=lambda p: p.name):
        try:
            vf = self.map_single_file(path, year=year, doy=doy)
            results.append(vf)
        except ValueError, KeyError:
            continue
    return results

map_single_file(file_path, *, year=None, doy=None)

Map a single physical file to its canVOD conventional name.

Parameters

file_path Path to the physical file. year, doy Optional date override (e.g. from directory name).

Raises

ValueError If the file cannot be matched or mapped.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def map_single_file(
    self, file_path: Path, *, year: int | None = None, doy: int | None = None
) -> VirtualFile:
    """Map a single physical file to its canVOD conventional name.

    Parameters
    ----------
    file_path
        Path to the physical file.
    year, doy
        Optional date override (e.g. from directory name).

    Raises
    ------
    ValueError
        If the file cannot be matched or mapped.
    """
    filename = file_path.name
    pattern_name = self.receiver_naming.source_pattern

    result = match_pattern(filename, pattern_name)
    if result is None:
        raise ValueError(f"No pattern matched for {filename!r}")

    pat, m = result
    groups = m.groupdict()

    # Validate station code if configured
    expected_station = self.receiver_naming.source_station
    if expected_station and "station" in groups:
        actual_station = groups["station"]
        if actual_station.lower() != expected_station.lower():
            raise ValueError(
                f"Station code mismatch for {filename!r}: "
                f"expected {expected_station!r}, got {actual_station!r}"
            )

    # Extract year
    if year is None:
        if "year" in groups and groups["year"] is not None:
            year = int(groups["year"])
        elif "yy" in groups and groups["yy"] is not None:
            year = resolve_year_from_yy(int(groups["yy"]))
        else:
            raise ValueError(f"Cannot determine year from {filename!r}")

    # Extract DOY
    if doy is None:
        if "doy" in groups and groups["doy"] is not None:
            doy = int(groups["doy"])
        else:
            raise ValueError(f"Cannot determine DOY from {filename!r}")

    # Extract hour
    if "hour" in groups and groups["hour"] is not None:
        hour = int(groups["hour"])
    elif "hour_letter" in groups and groups["hour_letter"] is not None:
        hour = hour_letter_to_int(groups["hour_letter"])
    else:
        hour = 0

    # Extract minute
    if "minute" in groups and groups["minute"] is not None:
        minute = int(groups["minute"])
    else:
        minute = 0

    # Sampling and period from regex or config defaults
    sampling = (
        groups.get("sampling")
        or self.receiver_naming.sampling
        or self.site_naming.default_sampling
    )
    period = groups.get("period")
    if not period:
        # For RINEX v2 / SBF: hour_letter='0' means daily file
        hour_letter = groups.get("hour_letter")
        if hour_letter == "0" and hour == 0 and minute == 0:
            period = "01D"
        else:
            period = self.receiver_naming.period or self.site_naming.default_period

    # Content from config
    content = self.receiver_naming.content or self.site_naming.default_content

    # Agency from config
    agency = self.receiver_naming.agency or self.site_naming.agency

    # File type and compression from extension
    file_type, compression = _detect_file_type(file_path)

    conventional = CanVODFilename(
        site=self.site_naming.site_id,
        receiver_type=self._rx_type,
        receiver_number=self.receiver_naming.receiver_number,
        agency=agency,
        year=year,
        doy=doy,
        hour=hour,
        minute=minute,
        period=period,
        sampling=sampling,
        content=content,
        file_type=file_type,
        compression=compression,
    )

    return VirtualFile(physical_path=file_path, conventional_name=conventional)

detect_overlaps(vfs) staticmethod

Detect temporal overlaps among virtual files.

Groups files by (year, doy) and checks whether any file's time range contains or overlaps another's. A 01D file alongside 15M files for the same day is the canonical overlap case.

Returns

list[tuple[VirtualFile, VirtualFile]] Pairs of overlapping files.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/mapping.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
@staticmethod
def detect_overlaps(
    vfs: list[VirtualFile],
) -> list[tuple[VirtualFile, VirtualFile]]:
    """Detect temporal overlaps among virtual files.

    Groups files by ``(year, doy)`` and checks whether any file's time
    range contains or overlaps another's.  A ``01D`` file alongside
    ``15M`` files for the same day is the canonical overlap case.

    Returns
    -------
    list[tuple[VirtualFile, VirtualFile]]
        Pairs of overlapping files.
    """
    from collections import defaultdict

    by_date: dict[tuple[int, int], list[VirtualFile]] = defaultdict(list)
    for vf in vfs:
        cn = vf.conventional_name
        by_date[(cn.year, cn.doy)].append(vf)

    overlaps: list[tuple[VirtualFile, VirtualFile]] = []
    for group in by_date.values():
        if len(group) < 2:
            continue
        # Compute (start_minutes, end_minutes) for each file
        ranges: list[tuple[int, int, VirtualFile]] = []
        for vf in group:
            cn = vf.conventional_name
            start_min = cn.hour * 60 + cn.minute
            duration_sec = int(cn.batch_duration.total_seconds())
            end_min = start_min + duration_sec // 60
            ranges.append((start_min, end_min, vf))

        # O(n^2) pairwise check — fine for <100 files per day
        for i in range(len(ranges)):
            for j in range(i + 1, len(ranges)):
                s_i, e_i, vf_i = ranges[i]
                s_j, e_j, vf_j = ranges[j]
                # Overlap if intervals intersect
                if s_i < e_j and s_j < e_i:
                    overlaps.append((vf_i, vf_j))
    return overlaps

Recipe

Naming recipe: user-defined mapping from arbitrary filenames to canVOD names.

A recipe is a small YAML file that describes: 1. The canonical identity of a receiver (site, agency, sampling, etc.) 2. How to extract date/time fields from the user's physical filenames.

The field extraction is a sequential left-to-right walk over the filename. Each entry in the fields list consumes characters and either extracts a named value or skips literal characters.

Recognized field names

  • year 4-digit year (e.g. 2025)
  • yy 2-digit year (80-99 → 19xx, 00-79 → 20xx)
  • doy day of year (1-366)
  • month month (01-12)
  • day day of month (01-31)
  • hour hour (00-23)
  • minute minute (00-59)
  • hour_letter RINEX hour code (a-x, single char)
  • skip ignore these characters

Example recipe (YAML)

::

name: rosalia_reference
description: Rosalia forest, reference receiver

site: ROS
agency: TUW
receiver_number: 1
receiver_type: reference
sampling: "05S"
period: "15M"
content: "AA"
file_type: rnx

layout: yyddd_subdirs
glob: "*.??o"

# Example: rref001a15.25o
fields:
  - skip: 4
  - doy: 3
  - hour_letter: 1
  - minute: 2
  - skip: 1
  - yy: 2
  - skip: 1

Example recipe for exotic filenames

::

# Example: STATION_2025_042_00_15.rinex
fields:
  - skip: 8
  - year: 4
  - skip: 1
  - doy: 3
  - skip: 1
  - hour: 2
  - skip: 1
  - minute: 2
  - skip: 6

NamingRecipe

Bases: BaseModel

A user-defined mapping from arbitrary filenames to canVOD names.

Serialize to YAML for sharing, to JSON for API transport.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class NamingRecipe(BaseModel):
    """A user-defined mapping from arbitrary filenames to canVOD names.

    Serialize to YAML for sharing, to JSON for API transport.
    """

    name: str = Field(description="Recipe identifier (e.g. 'rosalia_reference')")
    description: str = ""

    # Canonical identity
    site: SiteId
    agency: AgencyId
    receiver_number: int = Field(ge=1, le=99)
    receiver_type: Literal["reference", "canopy"] = "canopy"
    sampling: Duration = "05S"
    period: Duration = "15M"
    content: ContentCode = "AA"
    file_type: Literal["rnx", "sbf", "ubx", "nmea"] = "rnx"

    # Directory layout
    layout: DirectoryLayout = DirectoryLayout.YYDDD_SUBDIRS

    # File discovery
    glob: str = Field(
        description="Glob pattern to find files (e.g. '*.??o', '*.rinex')"
    )

    # Field extraction: sequential left-to-right
    # Each entry is a single-key dict: {field_name: width}
    fields: list[dict[str, int]] = Field(
        description="Sequential field extraction. Each entry: {field_name: width}"
    )

    @model_validator(mode="after")
    def _validate_fields(self) -> NamingRecipe:
        for i, entry in enumerate(self.fields):
            if len(entry) != 1:
                msg = (
                    f"fields[{i}]: each entry must be a single "
                    f"key-value pair, got {entry}"
                )
                raise ValueError(msg)
            field_name = next(iter(entry))
            width = entry[field_name]
            if field_name not in KNOWN_FIELDS:
                msg = (
                    f"fields[{i}]: unknown field '{field_name}'. "
                    f"Known: {sorted(KNOWN_FIELDS)}"
                )
                raise ValueError(msg)
            if not isinstance(width, int) or width < 1:
                msg = f"fields[{i}]: width must be a positive integer, got {width}"
                raise ValueError(msg)
        return self

    # -- Parsing ---------------------------------------------------------------

    def parse_filename(self, filename: str) -> dict[str, str | int]:
        """Extract fields from a physical filename.

        Parameters
        ----------
        filename
            Bare filename (no directory components).

        Returns
        -------
        dict
            Extracted field values. Integer fields (year, doy, etc.) are
            returned as ``int``.  ``hour_letter`` is returned as ``str``.
            ``skip`` fields are not included.

        Raises
        ------
        ValueError
            If the filename is too short for the field spec.
        """
        pos = 0
        result: dict[str, str | int] = {}

        for entry in self.fields:
            field_name = next(iter(entry))
            width = entry[field_name]

            if pos + width > len(filename):
                msg = (
                    f"Filename {filename!r} too short: need {width} chars "
                    f"at position {pos} for '{field_name}', "
                    f"but only {len(filename) - pos} remain"
                )
                raise ValueError(msg)

            raw = filename[pos : pos + width]
            pos += width

            if field_name == "skip":
                continue
            elif field_name == "hour_letter":
                result["hour_letter"] = raw
            else:
                try:
                    result[field_name] = int(raw)
                except ValueError:
                    msg = (
                        f"Cannot parse '{field_name}' as integer "
                        f"from {raw!r} in {filename!r}"
                    )
                    raise ValueError(msg) from None

        return result

    def to_virtual_file(self, file_path: Path) -> VirtualFile:
        """Map a physical file to a VirtualFile using this recipe.

        Parameters
        ----------
        file_path
            Path to the physical file.

        Returns
        -------
        VirtualFile
            The mapped virtual file.

        Raises
        ------
        ValueError
            If the filename cannot be parsed.
        """
        parsed = self.parse_filename(file_path.name)

        def _require_int(parsed_key: str) -> int:
            value = parsed[parsed_key]
            if isinstance(value, int):
                return value
            msg = (
                f"Recipe '{self.name}': expected integer field '{parsed_key}' "
                f"for {file_path.name!r}, got {value!r}"
            )
            raise ValueError(msg)

        def _require_str(parsed_key: str) -> str:
            value = parsed[parsed_key]
            if isinstance(value, str):
                return value
            msg = (
                f"Recipe '{self.name}': expected string field '{parsed_key}' "
                f"for {file_path.name!r}, got {value!r}"
            )
            raise ValueError(msg)

        # Resolve year
        if "year" in parsed:
            year = _require_int("year")
        elif "yy" in parsed:
            year = resolve_year_from_yy(_require_int("yy"))
        else:
            raise ValueError(
                f"Recipe '{self.name}': no 'year' or 'yy' field "
                f"in parsed result for {file_path.name!r}"
            )

        # Resolve DOY (from doy directly, or from month+day)
        if "doy" in parsed:
            doy = _require_int("doy")
        elif "month" in parsed and "day" in parsed:
            from datetime import date

            doy = (
                date(year, _require_int("month"), _require_int("day"))
                .timetuple()
                .tm_yday
            )
        else:
            raise ValueError(
                f"Recipe '{self.name}': no 'doy' or 'month'+'day' fields "
                f"in parsed result for {file_path.name!r}"
            )

        # Resolve hour
        if "hour" in parsed:
            hour = _require_int("hour")
        elif "hour_letter" in parsed:
            hour = hour_letter_to_int(_require_str("hour_letter"))
        else:
            hour = 0

        # Resolve minute
        minute = _require_int("minute") if "minute" in parsed else 0

        # Determine period: daily if hour=0 and minute=0 and no hour field
        period = self.period
        if hour == 0 and minute == 0:
            has_hour = any("hour" in e or "hour_letter" in e for e in self.fields)
            if not has_hour:
                period = "01D"

        rx_type = (
            ReceiverType.REFERENCE
            if self.receiver_type == "reference"
            else ReceiverType.ACTIVE
        )

        conventional = CanVODFilename(
            site=self.site,
            receiver_type=rx_type,
            receiver_number=self.receiver_number,
            agency=self.agency,
            year=year,
            doy=doy,
            hour=hour,
            minute=minute,
            period=period,
            sampling=self.sampling,
            content=self.content,
            file_type=FileType(self.file_type),
        )

        return VirtualFile(physical_path=file_path, conventional_name=conventional)

    @property
    def expected_length(self) -> int:
        """Total number of characters consumed by the field spec."""
        return sum(next(iter(e.values())) for e in self.fields)

    def matches(self, filename: str) -> bool:
        """Check if a filename can be parsed by this recipe."""
        if len(filename) != self.expected_length:
            return False
        try:
            self.parse_filename(filename)
            return True
        except ValueError:
            return False

    # -- Serialization ---------------------------------------------------------

    def to_yaml(self) -> str:
        """Serialize to YAML string."""
        return yaml.dump(
            self.model_dump(mode="json"),
            default_flow_style=False,
            sort_keys=False,
            allow_unicode=True,
        )

    def save(self, path: Path) -> None:
        """Write recipe to a YAML file."""
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(self.to_yaml(), encoding="utf-8")

    @classmethod
    def from_yaml(cls, text: str) -> NamingRecipe:
        """Load from a YAML string."""
        data = yaml.safe_load(text)
        return cls.model_validate(data)

    @classmethod
    def load(cls, path: Path) -> NamingRecipe:
        """Load from a YAML file."""
        text = path.read_text(encoding="utf-8")
        return cls.from_yaml(text)

expected_length property

Total number of characters consumed by the field spec.

parse_filename(filename)

Extract fields from a physical filename.

Parameters

filename Bare filename (no directory components).

Returns

dict Extracted field values. Integer fields (year, doy, etc.) are returned as int. hour_letter is returned as str. skip fields are not included.

Raises

ValueError If the filename is too short for the field spec.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def parse_filename(self, filename: str) -> dict[str, str | int]:
    """Extract fields from a physical filename.

    Parameters
    ----------
    filename
        Bare filename (no directory components).

    Returns
    -------
    dict
        Extracted field values. Integer fields (year, doy, etc.) are
        returned as ``int``.  ``hour_letter`` is returned as ``str``.
        ``skip`` fields are not included.

    Raises
    ------
    ValueError
        If the filename is too short for the field spec.
    """
    pos = 0
    result: dict[str, str | int] = {}

    for entry in self.fields:
        field_name = next(iter(entry))
        width = entry[field_name]

        if pos + width > len(filename):
            msg = (
                f"Filename {filename!r} too short: need {width} chars "
                f"at position {pos} for '{field_name}', "
                f"but only {len(filename) - pos} remain"
            )
            raise ValueError(msg)

        raw = filename[pos : pos + width]
        pos += width

        if field_name == "skip":
            continue
        elif field_name == "hour_letter":
            result["hour_letter"] = raw
        else:
            try:
                result[field_name] = int(raw)
            except ValueError:
                msg = (
                    f"Cannot parse '{field_name}' as integer "
                    f"from {raw!r} in {filename!r}"
                )
                raise ValueError(msg) from None

    return result

to_virtual_file(file_path)

Map a physical file to a VirtualFile using this recipe.

Parameters

file_path Path to the physical file.

Returns

VirtualFile The mapped virtual file.

Raises

ValueError If the filename cannot be parsed.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def to_virtual_file(self, file_path: Path) -> VirtualFile:
    """Map a physical file to a VirtualFile using this recipe.

    Parameters
    ----------
    file_path
        Path to the physical file.

    Returns
    -------
    VirtualFile
        The mapped virtual file.

    Raises
    ------
    ValueError
        If the filename cannot be parsed.
    """
    parsed = self.parse_filename(file_path.name)

    def _require_int(parsed_key: str) -> int:
        value = parsed[parsed_key]
        if isinstance(value, int):
            return value
        msg = (
            f"Recipe '{self.name}': expected integer field '{parsed_key}' "
            f"for {file_path.name!r}, got {value!r}"
        )
        raise ValueError(msg)

    def _require_str(parsed_key: str) -> str:
        value = parsed[parsed_key]
        if isinstance(value, str):
            return value
        msg = (
            f"Recipe '{self.name}': expected string field '{parsed_key}' "
            f"for {file_path.name!r}, got {value!r}"
        )
        raise ValueError(msg)

    # Resolve year
    if "year" in parsed:
        year = _require_int("year")
    elif "yy" in parsed:
        year = resolve_year_from_yy(_require_int("yy"))
    else:
        raise ValueError(
            f"Recipe '{self.name}': no 'year' or 'yy' field "
            f"in parsed result for {file_path.name!r}"
        )

    # Resolve DOY (from doy directly, or from month+day)
    if "doy" in parsed:
        doy = _require_int("doy")
    elif "month" in parsed and "day" in parsed:
        from datetime import date

        doy = (
            date(year, _require_int("month"), _require_int("day"))
            .timetuple()
            .tm_yday
        )
    else:
        raise ValueError(
            f"Recipe '{self.name}': no 'doy' or 'month'+'day' fields "
            f"in parsed result for {file_path.name!r}"
        )

    # Resolve hour
    if "hour" in parsed:
        hour = _require_int("hour")
    elif "hour_letter" in parsed:
        hour = hour_letter_to_int(_require_str("hour_letter"))
    else:
        hour = 0

    # Resolve minute
    minute = _require_int("minute") if "minute" in parsed else 0

    # Determine period: daily if hour=0 and minute=0 and no hour field
    period = self.period
    if hour == 0 and minute == 0:
        has_hour = any("hour" in e or "hour_letter" in e for e in self.fields)
        if not has_hour:
            period = "01D"

    rx_type = (
        ReceiverType.REFERENCE
        if self.receiver_type == "reference"
        else ReceiverType.ACTIVE
    )

    conventional = CanVODFilename(
        site=self.site,
        receiver_type=rx_type,
        receiver_number=self.receiver_number,
        agency=self.agency,
        year=year,
        doy=doy,
        hour=hour,
        minute=minute,
        period=period,
        sampling=self.sampling,
        content=self.content,
        file_type=FileType(self.file_type),
    )

    return VirtualFile(physical_path=file_path, conventional_name=conventional)

matches(filename)

Check if a filename can be parsed by this recipe.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
331
332
333
334
335
336
337
338
339
def matches(self, filename: str) -> bool:
    """Check if a filename can be parsed by this recipe."""
    if len(filename) != self.expected_length:
        return False
    try:
        self.parse_filename(filename)
        return True
    except ValueError:
        return False

to_yaml()

Serialize to YAML string.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
343
344
345
346
347
348
349
350
def to_yaml(self) -> str:
    """Serialize to YAML string."""
    return yaml.dump(
        self.model_dump(mode="json"),
        default_flow_style=False,
        sort_keys=False,
        allow_unicode=True,
    )

save(path)

Write recipe to a YAML file.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
352
353
354
355
def save(self, path: Path) -> None:
    """Write recipe to a YAML file."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(self.to_yaml(), encoding="utf-8")

from_yaml(text) classmethod

Load from a YAML string.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
357
358
359
360
361
@classmethod
def from_yaml(cls, text: str) -> NamingRecipe:
    """Load from a YAML string."""
    data = yaml.safe_load(text)
    return cls.model_validate(data)

load(path) classmethod

Load from a YAML file.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/recipe.py
363
364
365
366
367
@classmethod
def load(cls, path: Path) -> NamingRecipe:
    """Load from a YAML file."""
    text = path.read_text(encoding="utf-8")
    return cls.from_yaml(text)

Patterns

Source filename pattern definitions and built-in registry.

Each SourcePattern describes how to discover and parse a particular naming scheme (RINEX v2, RINEX v3, Septentrio SBF, etc.) so the mapping engine can extract date/time metadata from any filename.

BUILTIN_PATTERNS = {'canvod': _build_canvod_pattern(), 'rinex_v3_long': _build_rinex_v3_long_pattern(), 'septentrio_rinex_v2': _build_septentrio_rinex_v2_pattern(), 'rinex_v2_short': _build_rinex_v2_short_pattern(), 'septentrio_sbf': _build_septentrio_sbf_pattern()} module-attribute

SourcePattern dataclass

A named regex pattern for matching and parsing source filenames.

Parameters

name Human-readable pattern identifier (e.g. "rinex_v3_long"). file_globs Glob patterns used to discover matching files on disk. regex Compiled regex with named groups for metadata extraction. Expected groups: year (or yy), doy, hour (optional), minute (optional), sampling (optional), period (optional).

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/patterns.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@dataclass(frozen=True)
class SourcePattern:
    """A named regex pattern for matching and parsing source filenames.

    Parameters
    ----------
    name
        Human-readable pattern identifier (e.g. ``"rinex_v3_long"``).
    file_globs
        Glob patterns used to discover matching files on disk.
    regex
        Compiled regex with named groups for metadata extraction.
        Expected groups: ``year`` (or ``yy``), ``doy``,
        ``hour`` (optional), ``minute`` (optional),
        ``sampling`` (optional), ``period`` (optional).
    """

    name: str
    file_globs: tuple[str, ...]
    regex: re.Pattern[str]

Validation

Pre-pipeline validation of data directories against naming convention.

The DataDirectoryValidator ensures every file entering the pipeline can be mapped to a CanVODFilename. Validation is a hard gate: if any files are unmatched or temporal overlaps exist, processing is blocked with a clear diagnostic message.

DataDirectoryValidator

Pre-pipeline validation of data directories against naming convention.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/validator.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class DataDirectoryValidator:
    """Pre-pipeline validation of data directories against naming convention."""

    def validate_receiver(
        self,
        site_naming: SiteNamingConfig,
        receiver_naming: ReceiverNamingConfig,
        receiver_type: Literal["reference", "canopy"],
        receiver_base_dir: Path,
        reader_format: str | None = None,
    ) -> ValidationReport:
        """Validate all files in a receiver directory.

        Parameters
        ----------
        site_naming
            Site-level naming config.
        receiver_naming
            Receiver-level naming config.
        receiver_type
            ``"reference"`` or ``"canopy"``.
        receiver_base_dir
            Absolute path to the receiver's data directory.
        reader_format
            If set (e.g. ``"rinex3"``, ``"sbf"``), only validate files
            matching that format.  Files of other formats are skipped
            (reported in ``skipped_format``).  ``"auto"`` or ``None``
            validates all formats.

        Returns
        -------
        ValidationReport
            Validation results.

        Raises
        ------
        ValueError
            If validation fails (unmatched files or overlaps detected).
        """
        mapper = FilenameMapper(
            site_naming=site_naming,
            receiver_naming=receiver_naming,
            receiver_type=receiver_type,
            receiver_base_dir=receiver_base_dir,
        )

        report = ValidationReport()

        # Discover all physical files
        all_physical = mapper._discover_files()

        # Determine which file types to accept
        accepted_types: set[FileType] | None = None
        if reader_format and reader_format != "auto":
            accepted_types = _READER_FORMAT_FILETYPES.get(reader_format)

        # Try to map each file
        for path in all_physical:
            try:
                vf = mapper.map_single_file(path)
            except ValueError, KeyError:
                report.unmatched.append(path)
                continue

            # Filter by reader_format
            if accepted_types and vf.conventional_name.file_type not in accepted_types:
                report.skipped_format.append(vf)
                continue

            report.matched.append(vf)

        # Check for duplicate canonical names
        seen_names: dict[str, VirtualFile] = {}
        for vf in report.matched:
            name = vf.canonical_str
            if name in seen_names:
                report.warnings.append(
                    f"Duplicate canonical name '{name}': "
                    f"{seen_names[name].physical_path} and {vf.physical_path}"
                )
            else:
                seen_names[name] = vf

        # Detect temporal overlaps
        report.overlaps = FilenameMapper.detect_overlaps(report.matched)

        if not report.is_valid:
            raise ValueError(_format_validation_error(report, receiver_base_dir))

        return report

validate_receiver(site_naming, receiver_naming, receiver_type, receiver_base_dir, reader_format=None)

Validate all files in a receiver directory.

Parameters

site_naming Site-level naming config. receiver_naming Receiver-level naming config. receiver_type "reference" or "canopy". receiver_base_dir Absolute path to the receiver's data directory. reader_format If set (e.g. "rinex3", "sbf"), only validate files matching that format. Files of other formats are skipped (reported in skipped_format). "auto" or None validates all formats.

Returns

ValidationReport Validation results.

Raises

ValueError If validation fails (unmatched files or overlaps detected).

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/validator.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def validate_receiver(
    self,
    site_naming: SiteNamingConfig,
    receiver_naming: ReceiverNamingConfig,
    receiver_type: Literal["reference", "canopy"],
    receiver_base_dir: Path,
    reader_format: str | None = None,
) -> ValidationReport:
    """Validate all files in a receiver directory.

    Parameters
    ----------
    site_naming
        Site-level naming config.
    receiver_naming
        Receiver-level naming config.
    receiver_type
        ``"reference"`` or ``"canopy"``.
    receiver_base_dir
        Absolute path to the receiver's data directory.
    reader_format
        If set (e.g. ``"rinex3"``, ``"sbf"``), only validate files
        matching that format.  Files of other formats are skipped
        (reported in ``skipped_format``).  ``"auto"`` or ``None``
        validates all formats.

    Returns
    -------
    ValidationReport
        Validation results.

    Raises
    ------
    ValueError
        If validation fails (unmatched files or overlaps detected).
    """
    mapper = FilenameMapper(
        site_naming=site_naming,
        receiver_naming=receiver_naming,
        receiver_type=receiver_type,
        receiver_base_dir=receiver_base_dir,
    )

    report = ValidationReport()

    # Discover all physical files
    all_physical = mapper._discover_files()

    # Determine which file types to accept
    accepted_types: set[FileType] | None = None
    if reader_format and reader_format != "auto":
        accepted_types = _READER_FORMAT_FILETYPES.get(reader_format)

    # Try to map each file
    for path in all_physical:
        try:
            vf = mapper.map_single_file(path)
        except ValueError, KeyError:
            report.unmatched.append(path)
            continue

        # Filter by reader_format
        if accepted_types and vf.conventional_name.file_type not in accepted_types:
            report.skipped_format.append(vf)
            continue

        report.matched.append(vf)

    # Check for duplicate canonical names
    seen_names: dict[str, VirtualFile] = {}
    for vf in report.matched:
        name = vf.canonical_str
        if name in seen_names:
            report.warnings.append(
                f"Duplicate canonical name '{name}': "
                f"{seen_names[name].physical_path} and {vf.physical_path}"
            )
        else:
            seen_names[name] = vf

    # Detect temporal overlaps
    report.overlaps = FilenameMapper.detect_overlaps(report.matched)

    if not report.is_valid:
        raise ValueError(_format_validation_error(report, receiver_base_dir))

    return report

ValidationReport dataclass

Result of validating a receiver's data directory.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/validator.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class ValidationReport:
    """Result of validating a receiver's data directory."""

    matched: list[VirtualFile] = field(default_factory=list)
    unmatched: list[Path] = field(default_factory=list)
    overlaps: list[tuple[VirtualFile, VirtualFile]] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)
    skipped_format: list[VirtualFile] = field(default_factory=list)

    @property
    def is_valid(self) -> bool:
        """True if no blocking issues found."""
        return not self.unmatched and not self.overlaps

is_valid property

True if no blocking issues found.

Catalog

DuckDB-backed metadata catalog for file mappings.

The FilenameCatalog persists the mapping between physical files and their canVOD conventional names, enabling fast lookups and date-range queries without re-scanning the filesystem.

Catalog location: {gnss_site_data_root}/.canvod/filename_catalog.duckdb

FilenameCatalog

DuckDB-backed catalog of file name mappings.

Parameters

db_path Path to the DuckDB database file. Created if it doesn't exist.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
class FilenameCatalog:
    """DuckDB-backed catalog of file name mappings.

    Parameters
    ----------
    db_path
        Path to the DuckDB database file. Created if it doesn't exist.
    """

    def __init__(self, db_path: Path) -> None:
        self.db_path = db_path
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self._conn = duckdb.connect(str(db_path))
        self._conn.execute(_CREATE_SEQUENCE_SQL)
        self._conn.execute(_CREATE_TABLE_SQL)

    def record(self, vf: VirtualFile) -> None:
        """Insert or update a single file mapping."""
        now = datetime.now(tz=UTC)
        cn = vf.conventional_name
        size, mtime = _file_stat(vf.physical_path)
        file_hash = _compute_file_hash(vf.physical_path)
        phys_str = str(vf.physical_path)

        existing = self._conn.execute(
            "SELECT id FROM file_mapping WHERE physical_path = ?", [phys_str]
        ).fetchone()

        if existing:
            self._conn.execute(
                """\
                UPDATE file_mapping SET
                    physical_name = ?, file_size_bytes = ?, file_mtime = ?,
                    conventional_name = ?, site_id = ?, receiver_type = ?,
                    receiver_number = ?, agency = ?, year = ?, doy = ?,
                    hour = ?, minute = ?, period = ?, sampling = ?,
                    content = ?, file_type = ?, compression = ?,
                    file_hash = ?, last_verified_at = ?
                WHERE id = ?""",
                [
                    vf.physical_path.name,
                    size,
                    mtime,
                    cn.name,
                    cn.site,
                    cn.receiver_type.value,
                    cn.receiver_number,
                    cn.agency,
                    cn.year,
                    cn.doy,
                    cn.hour,
                    cn.minute,
                    cn.period,
                    cn.sampling,
                    cn.content,
                    cn.file_type.value,
                    cn.compression,
                    file_hash,
                    now,
                    existing[0],
                ],
            )
        else:
            self._conn.execute(
                """\
                INSERT INTO file_mapping (
                    id, physical_path, physical_name, file_size_bytes, file_mtime,
                    conventional_name, site_id, receiver_type, receiver_number,
                    agency, year, doy, hour, minute, period, sampling,
                    content, file_type, compression,
                    file_hash, first_seen_at, last_verified_at
                ) VALUES (
                    nextval('file_mapping_id_seq'),
                    ?, ?, ?, ?,
                    ?, ?, ?, ?,
                    ?, ?, ?, ?, ?, ?, ?,
                    ?, ?, ?,
                    ?, ?, ?
                )""",
                [
                    phys_str,
                    vf.physical_path.name,
                    size,
                    mtime,
                    cn.name,
                    cn.site,
                    cn.receiver_type.value,
                    cn.receiver_number,
                    cn.agency,
                    cn.year,
                    cn.doy,
                    cn.hour,
                    cn.minute,
                    cn.period,
                    cn.sampling,
                    cn.content,
                    cn.file_type.value,
                    cn.compression,
                    file_hash,
                    now,
                    now,
                ],
            )

    def record_batch(self, vfs: list[VirtualFile]) -> None:
        """Insert or update a batch of file mappings."""
        for vf in vfs:
            self.record(vf)

    def lookup_by_conventional(self, name: str) -> Path | None:
        """Look up a physical path by conventional name.

        Returns None if not found.
        """
        row = self._conn.execute(
            "SELECT physical_path FROM file_mapping WHERE conventional_name = ?",
            [name],
        ).fetchone()
        return Path(row[0]) if row else None

    def lookup_by_physical(self, path: Path) -> CanVODFilename | None:
        """Look up a conventional name by physical path.

        Returns None if not found.
        """
        row = self._conn.execute(
            "SELECT conventional_name FROM file_mapping WHERE physical_path = ?",
            [str(path)],
        ).fetchone()
        if row is None:
            return None
        return CanVODFilename.from_filename(row[0])

    def query_date_range(
        self,
        start_year: int,
        start_doy: int,
        end_year: int,
        end_doy: int,
        *,
        receiver_type: str | None = None,
    ) -> list[VirtualFile]:
        """Query file mappings within a date range.

        Parameters
        ----------
        start_year, start_doy
            Start of range (inclusive).
        end_year, end_doy
            End of range (inclusive).
        receiver_type
            Optional filter: ``"R"`` or ``"A"``.
        """
        sql = """\
            SELECT physical_path, conventional_name
            FROM file_mapping
            WHERE (year * 1000 + doy) BETWEEN ? AND ?
        """
        params: list = [
            start_year * 1000 + start_doy,
            end_year * 1000 + end_doy,
        ]

        if receiver_type is not None:
            sql += " AND receiver_type = ?"
            params.append(receiver_type)

        sql += " ORDER BY year, doy, hour, minute"

        rows = self._conn.execute(sql, params).fetchall()
        results = []
        for phys_str, conv_name in rows:
            cn = CanVODFilename.from_filename(conv_name)
            results.append(
                VirtualFile(physical_path=Path(phys_str), conventional_name=cn)
            )
        return results

    def verify_integrity(self) -> list[str]:
        """Check that all cataloged physical files still exist.

        Returns
        -------
        list[str]
            List of physical paths that no longer exist on disk.
        """
        rows = self._conn.execute("SELECT physical_path FROM file_mapping").fetchall()
        missing = []
        for (phys_str,) in rows:
            if not Path(phys_str).exists():
                missing.append(phys_str)
        return missing

    def count(self) -> int:
        """Return total number of cataloged files."""
        row = self._conn.execute("SELECT COUNT(*) FROM file_mapping").fetchone()
        if row is None:
            return 0
        return row[0]

    def to_polars(self):
        """Export the catalog to a Polars DataFrame via DuckDB-Arrow bridge.

        Returns
        -------
        polars.DataFrame
        """
        import polars as pl

        arrow_table = self._conn.execute(
            "SELECT * FROM file_mapping"
        ).fetch_arrow_table()
        return pl.from_arrow(arrow_table)

    def close(self) -> None:
        """Close the database connection."""
        self._conn.close()

    def __enter__(self) -> FilenameCatalog:
        return self

    def __exit__(self, *args) -> None:
        self.close()

record(vf)

Insert or update a single file mapping.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def record(self, vf: VirtualFile) -> None:
    """Insert or update a single file mapping."""
    now = datetime.now(tz=UTC)
    cn = vf.conventional_name
    size, mtime = _file_stat(vf.physical_path)
    file_hash = _compute_file_hash(vf.physical_path)
    phys_str = str(vf.physical_path)

    existing = self._conn.execute(
        "SELECT id FROM file_mapping WHERE physical_path = ?", [phys_str]
    ).fetchone()

    if existing:
        self._conn.execute(
            """\
            UPDATE file_mapping SET
                physical_name = ?, file_size_bytes = ?, file_mtime = ?,
                conventional_name = ?, site_id = ?, receiver_type = ?,
                receiver_number = ?, agency = ?, year = ?, doy = ?,
                hour = ?, minute = ?, period = ?, sampling = ?,
                content = ?, file_type = ?, compression = ?,
                file_hash = ?, last_verified_at = ?
            WHERE id = ?""",
            [
                vf.physical_path.name,
                size,
                mtime,
                cn.name,
                cn.site,
                cn.receiver_type.value,
                cn.receiver_number,
                cn.agency,
                cn.year,
                cn.doy,
                cn.hour,
                cn.minute,
                cn.period,
                cn.sampling,
                cn.content,
                cn.file_type.value,
                cn.compression,
                file_hash,
                now,
                existing[0],
            ],
        )
    else:
        self._conn.execute(
            """\
            INSERT INTO file_mapping (
                id, physical_path, physical_name, file_size_bytes, file_mtime,
                conventional_name, site_id, receiver_type, receiver_number,
                agency, year, doy, hour, minute, period, sampling,
                content, file_type, compression,
                file_hash, first_seen_at, last_verified_at
            ) VALUES (
                nextval('file_mapping_id_seq'),
                ?, ?, ?, ?,
                ?, ?, ?, ?,
                ?, ?, ?, ?, ?, ?, ?,
                ?, ?, ?,
                ?, ?, ?
            )""",
            [
                phys_str,
                vf.physical_path.name,
                size,
                mtime,
                cn.name,
                cn.site,
                cn.receiver_type.value,
                cn.receiver_number,
                cn.agency,
                cn.year,
                cn.doy,
                cn.hour,
                cn.minute,
                cn.period,
                cn.sampling,
                cn.content,
                cn.file_type.value,
                cn.compression,
                file_hash,
                now,
                now,
            ],
        )

record_batch(vfs)

Insert or update a batch of file mappings.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
183
184
185
186
def record_batch(self, vfs: list[VirtualFile]) -> None:
    """Insert or update a batch of file mappings."""
    for vf in vfs:
        self.record(vf)

lookup_by_conventional(name)

Look up a physical path by conventional name.

Returns None if not found.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
188
189
190
191
192
193
194
195
196
197
def lookup_by_conventional(self, name: str) -> Path | None:
    """Look up a physical path by conventional name.

    Returns None if not found.
    """
    row = self._conn.execute(
        "SELECT physical_path FROM file_mapping WHERE conventional_name = ?",
        [name],
    ).fetchone()
    return Path(row[0]) if row else None

lookup_by_physical(path)

Look up a conventional name by physical path.

Returns None if not found.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
199
200
201
202
203
204
205
206
207
208
209
210
def lookup_by_physical(self, path: Path) -> CanVODFilename | None:
    """Look up a conventional name by physical path.

    Returns None if not found.
    """
    row = self._conn.execute(
        "SELECT conventional_name FROM file_mapping WHERE physical_path = ?",
        [str(path)],
    ).fetchone()
    if row is None:
        return None
    return CanVODFilename.from_filename(row[0])

query_date_range(start_year, start_doy, end_year, end_doy, *, receiver_type=None)

Query file mappings within a date range.

Parameters

start_year, start_doy Start of range (inclusive). end_year, end_doy End of range (inclusive). receiver_type Optional filter: "R" or "A".

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def query_date_range(
    self,
    start_year: int,
    start_doy: int,
    end_year: int,
    end_doy: int,
    *,
    receiver_type: str | None = None,
) -> list[VirtualFile]:
    """Query file mappings within a date range.

    Parameters
    ----------
    start_year, start_doy
        Start of range (inclusive).
    end_year, end_doy
        End of range (inclusive).
    receiver_type
        Optional filter: ``"R"`` or ``"A"``.
    """
    sql = """\
        SELECT physical_path, conventional_name
        FROM file_mapping
        WHERE (year * 1000 + doy) BETWEEN ? AND ?
    """
    params: list = [
        start_year * 1000 + start_doy,
        end_year * 1000 + end_doy,
    ]

    if receiver_type is not None:
        sql += " AND receiver_type = ?"
        params.append(receiver_type)

    sql += " ORDER BY year, doy, hour, minute"

    rows = self._conn.execute(sql, params).fetchall()
    results = []
    for phys_str, conv_name in rows:
        cn = CanVODFilename.from_filename(conv_name)
        results.append(
            VirtualFile(physical_path=Path(phys_str), conventional_name=cn)
        )
    return results

verify_integrity()

Check that all cataloged physical files still exist.

Returns

list[str] List of physical paths that no longer exist on disk.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def verify_integrity(self) -> list[str]:
    """Check that all cataloged physical files still exist.

    Returns
    -------
    list[str]
        List of physical paths that no longer exist on disk.
    """
    rows = self._conn.execute("SELECT physical_path FROM file_mapping").fetchall()
    missing = []
    for (phys_str,) in rows:
        if not Path(phys_str).exists():
            missing.append(phys_str)
    return missing

count()

Return total number of cataloged files.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
272
273
274
275
276
277
def count(self) -> int:
    """Return total number of cataloged files."""
    row = self._conn.execute("SELECT COUNT(*) FROM file_mapping").fetchone()
    if row is None:
        return 0
    return row[0]

to_polars()

Export the catalog to a Polars DataFrame via DuckDB-Arrow bridge.

Returns

polars.DataFrame

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
279
280
281
282
283
284
285
286
287
288
289
290
291
def to_polars(self):
    """Export the catalog to a Polars DataFrame via DuckDB-Arrow bridge.

    Returns
    -------
    polars.DataFrame
    """
    import polars as pl

    arrow_table = self._conn.execute(
        "SELECT * FROM file_mapping"
    ).fetch_arrow_table()
    return pl.from_arrow(arrow_table)

close()

Close the database connection.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/catalog.py
293
294
295
def close(self) -> None:
    """Close the database connection."""
    self._conn.close()

Configuration

Pydantic models for naming configuration in sites.yaml.

These models validate the naming: sections at site and receiver level. The canvod-utils package stores these as opaque dict | None fields; this package validates them when constructing a FilenameMapper.

SiteNamingConfig

Bases: BaseModel

Site-level naming defaults (sites.<name>.naming in YAML).

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/config_models.py
25
26
27
28
29
30
31
32
class SiteNamingConfig(BaseModel):
    """Site-level naming defaults (``sites.<name>.naming`` in YAML)."""

    site_id: SiteId
    agency: AgencyId
    default_sampling: Duration = "05S"
    default_period: Duration = "01D"
    default_content: ContentCode = "AA"

ReceiverNamingConfig

Bases: BaseModel

Receiver-level naming overrides (sites.<name>.receivers.<rx>.naming).

The source_station field specifies the 4-character station code used in RINEX v2 / SBF filenames (e.g. ract, rref). When set, only files whose station code matches are accepted during discovery and validation. When None, any station code is accepted.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/config_models.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ReceiverNamingConfig(BaseModel):
    """Receiver-level naming overrides (``sites.<name>.receivers.<rx>.naming``).

    The ``source_station`` field specifies the 4-character station code used
    in RINEX v2 / SBF filenames (e.g. ``ract``, ``rref``).  When set, only
    files whose station code matches are accepted during discovery and
    validation.  When ``None``, any station code is accepted.
    """

    receiver_number: int = Field(ge=1, le=99)
    source_pattern: str = "auto"
    source_station: str | None = Field(
        default=None,
        description="4-char station code in source filenames (e.g. 'ract')",
    )
    directory_layout: DirectoryLayout = DirectoryLayout.YYDDD_SUBDIRS
    agency: AgencyId | None = None
    sampling: Duration | None = None
    period: Duration | None = None
    content: ContentCode | None = None

DirectoryLayout

Bases: StrEnum

How receiver data files are organised into subdirectories.

Source code in packages/canvod-virtualiconvname/src/canvod/virtualiconvname/config_models.py
17
18
19
20
21
22
class DirectoryLayout(StrEnum):
    """How receiver data files are organised into subdirectories."""

    YYDDD_SUBDIRS = "yyddd_subdirs"  # 25001/, 25002/
    YYYYDDD_SUBDIRS = "yyyyddd_subdirs"  # 2025001/
    FLAT = "flat"  # all files in one directory