Dask & Resource Management¶
What is Dask¶
Dask is a Python library for parallel computing. It distributes work across multiple CPU cores automatically, so that processing many GNSS files happens simultaneously instead of one at a time.
canVODpy uses Dask's Distributed scheduler, which creates a local cluster of worker processes on your machine. Each worker handles one GNSS file at a time, and the scheduler coordinates the work.
Why canVODpy uses Dask¶
GNSS-Transmissometry processing involves reading hundreds or thousands of observation files per site. On shared machines (university servers, HPC nodes), it's important to:
- Use available cores without manually tuning parallelism
- Respect memory limits so other users aren't affected
- Handle failures gracefully -- if a worker runs out of memory, Dask restarts it and retries the task
- Monitor progress through a web dashboard
Dask provides all of this out of the box.
How it works in canVODpy¶
When you run the processing pipeline, canVODpy creates a DaskClusterManager
that starts a LocalCluster:
LocalCluster
Worker 0 ── 1 process, 1 core
Worker 1 ── 1 process, 1 core
Worker 2 ── 1 process, 1 core
...
Each worker process reads and preprocesses one GNSS file at a time. The scheduler distributes files across workers and collects results.
Resource modes¶
| Mode | Use case | Behaviour |
|---|---|---|
auto |
Local machine, single user | Dask detects available cores and memory automatically |
manual |
Shared server, HPC node | You set explicit limits on workers, memory, and CPU cores |
Configure the mode in processing.yaml:
processing:
resource_mode: auto
For shared machines:
processing:
resource_mode: manual
n_max_threads: 4 # use 4 worker processes
max_memory_gb: 16 # soft RAM limit across all workers
threads_per_worker: 1 # threads per worker (1 is usually best)
cpu_affinity: [0, 1, 2, 3] # pin to specific CPU cores (Linux only)
nice_priority: 10 # lower process priority (0=normal, 19=lowest)
Cluster lifecycle¶
DaskClusterManager owns the full lifetime of the local Dask cluster. It starts
the cluster when the pipeline begins and shuts it down when the pipeline ends,
regardless of whether the run completes normally, raises an exception, or is
interrupted by the user.
Shutdown happens automatically through two complementary mechanisms. When the
manager is used as a context manager — which is how the pipeline always runs it
— the cluster is stopped in __exit__ as soon as the with block exits.
Additionally, a handler is registered with Python's atexit module at creation
time, so the cluster is also stopped if the Python process exits without
executing the with block's cleanup (for example, if sys.exit() is called
from a Dask worker or a signal handler). Both paths call the same close()
method, which is guarded against double execution: if close() is called a
second time — for instance, because atexit fires after __exit__ has already
run — it returns immediately without attempting to stop an already-stopped
cluster.
You never call close() directly. The manager is designed to be used
exclusively as a context manager, and the pipeline infrastructure handles
startup and teardown transparently.
# The pipeline does this internally — you do not call these methods yourself
with DaskClusterManager(config) as manager:
client = manager.client
# ... work happens here ...
# cluster is stopped here, or by atexit if the process exits first
Worker plugins¶
ResourceInitPlugin¶
When cpu_affinity or nice_priority is set, canVODpy registers a Dask
WorkerPlugin that configures each worker process at startup:
- CPU affinity (Linux only): pins the worker to specific CPU cores using
os.sched_setaffinity, preventing it from migrating across all cores - Nice priority: lowers the process priority using
os.setpriority, so interactive users on the same machine get CPU time first
If a worker is restarted (e.g., after an out-of-memory kill), the plugin re-applies these settings automatically.
MemoryMonitor¶
The MemoryMonitor logs system memory usage at key points during processing.
It does not enforce limits -- Dask's built-in nanny process handles actual
memory enforcement by killing and restarting workers that exceed their allocation.
Dask arrays in Icechunk stores¶
canVODpy stores processed GNSS data in Icechunk/Zarr stores using chunked arrays. When you read data back from a store, Dask can load these chunks lazily -- only the chunks you actually access are read into memory.
This means the same Dask cluster that processes RINEX files can also read from the store without loading the entire dataset:
import xarray as xr
# Opens lazily -- no data loaded yet
ds = xr.open_zarr(store, group="rosalia/reference_01")
# Only loads the chunks needed for this slice
snr_day1 = ds.SNR.sel(epoch="2025-01-01")
Dashboard¶
When a Dask cluster is running, a monitoring dashboard is available at:
http://localhost:8787
The dashboard shows:
- Active tasks and worker utilisation
- Memory usage per worker
- Task stream (timeline of completed work)
- Progress bars for running computations
Configuration reference¶
| Field | Default | Description |
|---|---|---|
resource_mode |
auto |
auto or manual |
n_max_threads |
-- | Number of worker processes (required in manual mode) |
max_memory_gb |
-- | Soft RAM limit in GB (manual mode) |
threads_per_worker |
1 |
Threads per worker process |
cpu_affinity |
-- | List of CPU core IDs to pin workers to (Linux only) |
nice_priority |
0 |
Process priority: 0 = normal, 19 = lowest |
Fallback
If Dask is not installed, canVODpy falls back to ProcessPoolExecutor from
the Python standard library. All resource management features require Dask.