zarr_writer#
Zarr writer sink for xarray DataArrays.
Writes incoming xarray.DataArray objects to a Zarr store,
creating one Zarr group per variable with dimensions
(time, lat, lon). Supports user-specified chunking and Zarr v3
sharding.
When n_indices and variables are provided, the sink pre-allocates
the full Zarr store at construction time and uses region-based writes
(mode="r+") which are safe for fully concurrent workers — each
write touches only independent chunk data without modifying shared
array metadata.
When n_indices and variables are omitted, the sink falls back to
append-based writes (append_dim) which are only safe for
sequential execution.
When executed with a parallel backend (process_pool), the sink
partitions pipeline indices into chunk-aligned groups so that no two
workers write to the same Zarr chunk concurrently. The partitioning
dimension defaults to the append_dim ("time").
Classes#
Write |
Module Contents#
- class physicsnemo_curator.domains.da.sinks.zarr_writer.ZarrSink(
- output_path: str,
- chunks: dict[str, int] | None = None,
- shards: dict[str, int] | None = None,
- append_dim: str = 'time',
- n_indices: int | None = None,
- variables: list[str] | None = None,
- overwrite: bool = False,
Bases:
physicsnemo_curator.core.base.Sink[xarray.DataArray]Write
xarray.DataArrayfields to a Zarr store.Each incoming DataArray is expected to carry coordinate metadata (e.g.
time,variable,lat,lon). The sink uses these coordinates — not the pipeline index — to organise the output.DataArrays with a
variabledimension are split along it so that each variable gets its own Zarr group:<output_path>/<variable_name>/, with dimensions(time, lat, lon). Subsequent calls append along the append dimension, so the sink accumulates data across pipeline indices based on the coordinate in the incoming data.For parallel execution, the sink partitions pipeline indices into chunk-aligned groups via
partition_indices()so that no two workers write to the same Zarr chunk concurrently.- Parameters:
output_path (str) – Path to the output Zarr store directory.
chunks (dict[str, int] | None) – Chunk sizes per dimension for the Zarr arrays. Defaults to
{"time": 1, "lat": 721, "lon": 1440}(one time-step per chunk, full spatial extent).shards (dict[str, int] | None) – Shard sizes per dimension (Zarr v3 only). When provided, each shard is a container for multiple chunks. Requires
zarr>=3.0. If None, sharding is not used.append_dim (str) – The dimension along which new data is appended on subsequent writes. This is also the dimension used for chunk-aligned partitioning. Defaults to
"time".n_indices (int | None) – Total number of pipeline indices (time steps) that will be written. When provided together with variables, the store is pre-allocated at construction time enabling safe concurrent region writes.
variables (list[str] | None) – Variable names for pre-allocation. Each variable becomes a separate Zarr group. Required when n_indices is set.
overwrite (bool) – If True, an existing store at output_path will be overwritten during pre-allocation. If False (default) and the store already exists, a
FileExistsErroris raised to prevent accidental data loss.
Examples
Sequential (append-based, backward compatible):
>>> sink = ZarrSink( ... output_path="output.zarr", ... chunks={"time": 1, "lat": 721, "lon": 1440}, ... )
Parallel-safe (pre-allocated with region writes):
>>> sink = ZarrSink( ... output_path="output.zarr", ... chunks={"time": 1, "hrrr_y": 1059, "hrrr_x": 1799}, ... n_indices=72, ... variables=["t2m", "q2m", "tcwv"], ... overwrite=True, ... )
- classmethod params() list[physicsnemo_curator.core.base.Param]#
Return parameter descriptors for the Zarr sink.
- partition_indices(
- indices: collections.abc.Iterable[int],
Group pipeline indices by chunk along the append dimension.
Each returned group contains indices whose data lands within the same Zarr chunk along
append_dim. The runner dispatches each group to a single worker, preventing concurrent writes to the same chunk.If the store is pre-allocated with region writes, or if the chunk size along the append dimension is 1, every index is its own chunk and no partitioning is needed — returns None to signal that one-index-per-worker dispatch is fine.
- description: ClassVar[str] = 'Write DataArrays to a Zarr store with configurable chunking and sharding'#
Short description shown in the interactive CLI.
- property output_path: pathlib.Path#
Return the output Zarr store path.