DataArray Submodule#

The curator.da submodule provides pipeline components for working with xarray.DataArray objects — the standard data structure for labelled multi-dimensional arrays in the scientific Python ecosystem.

Installation#

# Install with the da dependency group
uv sync --group da

Required packages: xarray, earth2studio, zarr>=3.0, gcsfs.

Components#

ERA5Source#

Fetches ERA5 reanalysis data from Google’s Analysis-Ready, Cloud-Optimized (ARCO) Zarr store via earth2studio.data.ARCO. Each pipeline index maps to a single timestamp.

from datetime import datetime
from physicsnemo_curator.domains.da.sources.era5 import ERA5Source

source = ERA5Source(
    times=[datetime(2020, 6, 1, 0), datetime(2020, 6, 1, 6)],
    variables=["t2m", "u10m", "v10m"],
    cache=True,  # cache downloaded chunks locally
)
print(f"{len(source)} timestamps")  # 2

Each source[i] yields a single xarray.DataArray with dimensions (time, variable, lat, lon)time is length 1 (the requested timestamp), variable spans the requested fields, and the spatial grid is ERA5’s native 0.25° resolution (721 lat × 1440 lon).

Variable naming follows the earth2studio lexicon:

Category

Examples

Surface

t2m, u10m, v10m, sp, msl, sst, tp

Pressure-level

t500, z500, u850, q925 (37 levels)

Time range: 1940-01-01 through ~2023-11-11, hourly resolution.

MomentsFilter#

Computes running statistical moments (mean, variance, skewness, min, max) along specified dimensions using Welford’s online algorithm. The DataArray is yielded unchanged (pass-through).

from physicsnemo_curator.domains.da.filters.moments import MomentsFilter

filt = MomentsFilter(
    output="stats.zarr",
    dims=("time",),  # reduce over time → per-spatial-point statistics
)

Call flush() after processing to write accumulated statistics to the output Zarr store. Each variable gets its own group with arrays: mean, variance, skewness, min, max, plus a count attribute.

ZarrSink#

Writes incoming DataArrays to a Zarr v3 store. Each variable is written to its own Zarr group (e.g. output.zarr/t2m/) with dimensions (time, lat, lon). Subsequent calls append along the time dimension.

from physicsnemo_curator.domains.da.sinks.zarr_writer import ZarrSink

sink = ZarrSink(
    output_path="output.zarr",
    chunks={"time": 1, "lat": 721, "lon": 1440},
    shards={"time": 24, "lat": 721, "lon": 1440},  # optional, Zarr v3
)

Chunking controls how data is split into individual Zarr chunks. Sharding (Zarr v3) groups multiple chunks into larger shard files, reducing the number of objects in cloud storage.

NetCDF4Sink#

Writes incoming DataArrays to NetCDF4 files. Each variable gets its own subdirectory, and files are split along a configurable coordinate dimension (default: time, grouped by year). The output layout is:

<output_dir>/<variable>/<split_key>.nc

For example, with the default settings, data spanning 2020–2021 produces:

output_nc/
    t2m/
        2020.nc       # all 2020 timestamps
        2021.nc       # all 2021 timestamps

Subsequent calls with the same split key append along the time dimension using the unlimited dimension.

from physicsnemo_curator.domains.da.sinks.netcdf_writer import NetCDF4Sink

# Default: split by year
sink = NetCDF4Sink(
    output_dir="output_nc",
    chunks={"time": 1, "lat": 721, "lon": 1440},
    compression_level=4,       # zlib 0-9, default 4
)

# Split by month instead
sink = NetCDF4Sink(
    output_dir="output_nc",
    split_func=lambda t: str(np.datetime64(t, "M")),
)

# No splitting — one file per variable
sink = NetCDF4Sink(output_dir="output_nc", split_dim=None)

Chunking controls the HDF5 chunk layout inside the NetCDF4 file. Compression uses zlib (level 0 disables it, 9 is maximum compression). The time dimension is unlimited by default, allowing efficient appends.

Full Pipeline Example#

from datetime import datetime
from physicsnemo_curator import run_pipeline
from physicsnemo_curator.domains.da.sources.era5 import ERA5Source
from physicsnemo_curator.domains.da.filters.moments import MomentsFilter
from physicsnemo_curator.domains.da.sinks.zarr_writer import ZarrSink

# Fetch 24 hours of surface weather data
times = [datetime(2020, 6, 1, h) for h in range(24)]

source = ERA5Source(
    times=times,
    variables=["t2m", "u10m", "v10m", "sp"],
)

filt = MomentsFilter(output="era5_stats.zarr", dims=("time",))
sink = ZarrSink(
    output_path="era5_output.zarr",
    chunks={"time": 1, "lat": 721, "lon": 1440},
)

pipeline = source.filter(filt).write(sink)

# Process all 24 timestamps
results = run_pipeline(pipeline)

# Write accumulated statistics
filt.flush()

After running, the output directory contains:

era5_output.zarr/
    t2m/          # Zarr group: (time=24, lat=721, lon=1440)
    u10m/         # Zarr group: (time=24, lat=721, lon=1440)
    v10m/         # ...
    sp/           # ...

era5_stats.zarr/
    t2m/          # mean, variance, skewness, min, max arrays
    u10m/         # with shape (lat=721, lon=1440)
    v10m/
    sp/

Using NetCDF4 output#

Replace the sink to write NetCDF4 files instead of Zarr:

from physicsnemo_curator.domains.da.sinks.netcdf_writer import NetCDF4Sink

sink = NetCDF4Sink(
    output_dir="era5_output_nc",
    chunks={"time": 1, "lat": 721, "lon": 1440},
    compression_level=4,
)

pipeline = source.filter(filt).write(sink)
results = run_pipeline(pipeline)
filt.flush()

This produces one subdirectory per variable, with files split by year:

era5_output_nc/
    t2m/
        2020.nc       # NetCDF4: (time=24, lat=721, lon=1440)
    u10m/
        2020.nc
    v10m/
        2020.nc
    sp/
        2020.nc

Caching#

ERA5Source uses earth2studio’s built-in caching (cache=True by default). Downloaded Zarr chunks are stored in ~/.cache/earth2studio/arco/. Set the EARTH2STUDIO_CACHE environment variable to override the cache location:

export EARTH2STUDIO_CACHE=/fast-scratch/era5-cache

Process Isolation#

When using run_pipeline with n_jobs > 1, each worker process gets its own copy of the pipeline. Stateful filters like MomentsFilter accumulate statistics independently in each process — their results are not merged automatically. For accurate statistics across all indices, use n_jobs=1 (sequential) or post-process the per-worker outputs.

See the parallel execution guide for details.