zarr_writer#

Zarr writer sink for xarray DataArrays.

Writes incoming xarray.DataArray objects to a Zarr store, creating one Zarr group per variable with dimensions (time, lat, lon). Supports user-specified chunking and Zarr v3 sharding.

When n_indices and variables are provided, the sink pre-allocates the full Zarr store at construction time and uses region-based writes (mode="r+") which are safe for fully concurrent workers — each write touches only independent chunk data without modifying shared array metadata.

When n_indices and variables are omitted, the sink falls back to append-based writes (append_dim) which are only safe for sequential execution.

When executed with a parallel backend (process_pool), the sink partitions pipeline indices into chunk-aligned groups so that no two workers write to the same Zarr chunk concurrently. The partitioning dimension defaults to the append_dim ("time").

Classes#

ZarrSink

Write xarray.DataArray fields to a Zarr store.

Module Contents#

class physicsnemo_curator.domains.da.sinks.zarr_writer.ZarrSink(
output_path: str,
chunks: dict[str, int] | None = None,
shards: dict[str, int] | None = None,
append_dim: str = 'time',
n_indices: int | None = None,
variables: list[str] | None = None,
overwrite: bool = False,
)#

Bases: physicsnemo_curator.core.base.Sink[xarray.DataArray]

Write xarray.DataArray fields to a Zarr store.

Each incoming DataArray is expected to carry coordinate metadata (e.g. time, variable, lat, lon). The sink uses these coordinates — not the pipeline index — to organise the output.

DataArrays with a variable dimension are split along it so that each variable gets its own Zarr group: <output_path>/<variable_name>/, with dimensions (time, lat, lon). Subsequent calls append along the append dimension, so the sink accumulates data across pipeline indices based on the coordinate in the incoming data.

For parallel execution, the sink partitions pipeline indices into chunk-aligned groups via partition_indices() so that no two workers write to the same Zarr chunk concurrently.

Parameters:
  • output_path (str) – Path to the output Zarr store directory.

  • chunks (dict[str, int] | None) – Chunk sizes per dimension for the Zarr arrays. Defaults to {"time": 1, "lat": 721, "lon": 1440} (one time-step per chunk, full spatial extent).

  • shards (dict[str, int] | None) – Shard sizes per dimension (Zarr v3 only). When provided, each shard is a container for multiple chunks. Requires zarr>=3.0. If None, sharding is not used.

  • append_dim (str) – The dimension along which new data is appended on subsequent writes. This is also the dimension used for chunk-aligned partitioning. Defaults to "time".

  • n_indices (int | None) – Total number of pipeline indices (time steps) that will be written. When provided together with variables, the store is pre-allocated at construction time enabling safe concurrent region writes.

  • variables (list[str] | None) – Variable names for pre-allocation. Each variable becomes a separate Zarr group. Required when n_indices is set.

  • overwrite (bool) – If True, an existing store at output_path will be overwritten during pre-allocation. If False (default) and the store already exists, a FileExistsError is raised to prevent accidental data loss.

Examples

Sequential (append-based, backward compatible):

>>> sink = ZarrSink(
...     output_path="output.zarr",
...     chunks={"time": 1, "lat": 721, "lon": 1440},
... )

Parallel-safe (pre-allocated with region writes):

>>> sink = ZarrSink(
...     output_path="output.zarr",
...     chunks={"time": 1, "hrrr_y": 1059, "hrrr_x": 1799},
...     n_indices=72,
...     variables=["t2m", "q2m", "tcwv"],
...     overwrite=True,
... )
classmethod params() list[physicsnemo_curator.core.base.Param]#

Return parameter descriptors for the Zarr sink.

Returns:

Descriptors for output_path, chunks, and append_dim.

Return type:

list[Param]

partition_indices(
indices: collections.abc.Iterable[int],
) list[list[int]] | None#

Group pipeline indices by chunk along the append dimension.

Each returned group contains indices whose data lands within the same Zarr chunk along append_dim. The runner dispatches each group to a single worker, preventing concurrent writes to the same chunk.

If the store is pre-allocated with region writes, or if the chunk size along the append dimension is 1, every index is its own chunk and no partitioning is needed — returns None to signal that one-index-per-worker dispatch is fine.

Parameters:

indices (Iterable[int]) – Pipeline indices to partition.

Returns:

Chunk-aligned groups, or None if each index already maps to a unique chunk (chunk size == 1 or pre-allocated store).

Return type:

list[list[int]] | None

property append_dim: str#

Return the dimension along which data is appended.

description: ClassVar[str] = 'Write DataArrays to a Zarr store with configurable chunking and sharding'#

Short description shown in the interactive CLI.

name: ClassVar[str] = 'Zarr Writer'#

Human-readable display name for the interactive CLI.

property output_path: pathlib.Path#

Return the output Zarr store path.

property zarr_version: int#

Return the Zarr format version in use.