zarr_writer#

AtomicData Zarr writer sink for atomic/molecular pipelines.

Persists AtomicData objects to a structured Zarr store. Supports two modes of operation:

Sequential mode (default): Uses AtomicDataZarrWriter with write/append semantics. Suitable for single-process pipelines.

Pre-allocated parallel mode: When natoms and schema are provided at construction, the store is pre-allocated to its full size upfront. Workers can then write to non-overlapping regions concurrently without synchronization. This enables safe parallel writes via the process_pool backend.

Examples

Sequential (single store):

>>> sink = AtomicDataZarrSink(output_path="./output.zarr")
>>> paths = sink(atomic_data_iterator, index=0)

Parallel (pre-allocated):

>>> import numpy as np
>>> from nvalchemi.data import AtomicData
>>> source = ASELMDBSource(data_dir="input/")
>>> sample = next(source[0])
>>> sink = AtomicDataZarrSink(
...     output_path="./output.zarr",
...     natoms=source.metadata["natoms"],
...     schema=sample,
...     chunk_size=1024,
... )

Attributes#

Classes#

AtomicDataZarrSink

Write AtomicData objects to a Zarr store.

Module Contents#

class physicsnemo_curator.domains.atm.sinks.zarr_writer.AtomicDataZarrSink(
output_path: str,
naming_template: str | None = None,
batch_size: int = 1000,
natoms: numpy.ndarray | collections.abc.Sequence[int] | None = None,
nedges: numpy.ndarray | collections.abc.Sequence[int] | None = None,
schema: nvalchemi.data.AtomicData | None = None,
chunk_size: int = 1024,
)#

Bases: physicsnemo_curator.core.base.Sink[nvalchemi.data.AtomicData]

Write AtomicData objects to a Zarr store.

Sequential mode (default): Items are batched and flushed using nvalchemi’s AtomicDataZarrWriter. The first flush creates the store; subsequent flushes append.

Pre-allocated parallel mode: When natoms and schema are provided, the store is fully pre-allocated at construction time. Each call to __call__() writes data at a fixed offset determined by the index. No locking or coordination is needed because different indices map to non-overlapping array regions.

Parameters:
  • output_path (str) – Base directory for output Zarr store(s).

  • naming_template (str or None) – Per-index store naming template (sequential mode only).

  • batch_size (int) – Items per write batch (sequential mode).

  • natoms (array-like or None) – Per-structure atom counts. When provided together with schema, enables pre-allocated parallel mode.

  • nedges (array-like or None) – Per-structure edge counts (optional). Required for pre-allocating edge-level arrays. If None, edge arrays are skipped in pre-allocation.

  • schema (AtomicData or None) – A representative sample used to discover field names, dtypes, and shapes. Provide any single AtomicData instance (e.g. next(source[0])).

  • chunk_size (int) – Number of atoms per Zarr chunk along the leading dimension of node-level arrays. Also determines index partitioning for parallel dispatch.

Examples

Sequential (backward-compatible):

>>> sink = AtomicDataZarrSink(output_path="./output.zarr")

Parallel with pre-allocation:

>>> source = ASELMDBSource(data_dir="input/")
>>> sink = AtomicDataZarrSink(
...     output_path="./output.zarr",
...     natoms=source.metadata["natoms"],
...     schema=next(source[0]),
...     chunk_size=2048,
... )
classmethod params() list[physicsnemo_curator.core.base.Param]#

Return parameter descriptors for this sink.

Returns:

The configurable parameters.

Return type:

list[Param]

partition_indices(
indices: list[int] | None = None,
) list[list[int]] | None#

Return chunk-aligned index groups for parallel dispatch.

When the sink is in pre-allocated mode, this returns groups of indices that can each be processed by a single worker without overlapping. The runner uses this to assign work.

Parameters:

indices (list[int] or None) – Specific indices to partition. If None, returns all groups.

Returns:

Groups of indices, or None if not in parallel mode.

Return type:

list[list[int]] or None

set_source(
source: physicsnemo_curator.core.base.Source[nvalchemi.data.AtomicData],
) None#

Inject the pipeline source for {relpath}/{stem} resolution.

Called automatically by the Pipeline when the sink is attached via Pipeline.write().

Parameters:

source (Source[AtomicData]) – The pipeline source.

property batch_size: int#

Return the configured batch size.

property chunk_size: int#

Return the configured chunk size (atoms per chunk).

description: ClassVar[str] = 'Write AtomicData to a Zarr store using nvalchemi'#

Short description shown in the interactive CLI.

name: ClassVar[str] = 'AtomicData Zarr'#

Human-readable display name for the interactive CLI.

property naming_template: str | None#

Return the naming template, or None for single-store mode.

property output_path: pathlib.Path#

Return the output Zarr store path.

property parallel: bool#

Return whether the sink is in pre-allocated parallel mode.

physicsnemo_curator.domains.atm.sinks.zarr_writer.logger#