zarr_writer#
AtomicData Zarr writer sink for atomic/molecular pipelines.
Persists AtomicData objects to a structured Zarr
store using AtomicDataZarrWriter.
Items are collected into batches of configurable size before being flushed
to the store for efficient I/O. The first batch creates the store via
write(), and subsequent batches extend it via append().
When a naming_template is provided and the pipeline’s source exposes a
relative_path(index) method, the sink can mirror the input directory
structure — each source index writes to a separate Zarr store whose path
is derived from the source file layout.
Examples
>>> sink = AtomicDataZarrSink(output_path="./output.zarr")
>>> paths = sink(atomic_data_iterator, index=0)
Attributes#
Classes#
Write |
Module Contents#
- class physicsnemo_curator.domains.atm.sinks.zarr_writer.AtomicDataZarrSink( )#
Bases:
physicsnemo_curator.core.base.Sink[nvalchemi.data.AtomicData]Write
AtomicDataobjects to a Zarr store.Items are batched in memory (up to batch_size) and flushed to the Zarr store using
AtomicDataZarrWriter. The first flush creates the store; all subsequent flushes append to it.Default mode (no naming_template): all pipeline indices write to the same store via append semantics, producing a single consolidated output.
Directory-mirroring mode (naming_template provided): each pipeline index writes to a separate Zarr store whose name is derived from the template. When the pipeline’s source exposes a
relative_path(index)method (e.g.ASELMDBSource), the{relpath}and{stem}placeholders resolve to the source’s directory structure, enabling output layouts that mirror the input.- Parameters:
output_path (str) – Base directory for output Zarr store(s).
naming_template (str or None) – Python format string for per-index store naming. The placeholders
{index}(source index) is always available. When the source supports it,{relpath}(parent directory relative to source root) and{stem}(filename stem without extension) are also available. WhenNone(default), all indices write to a single store at output_path.batch_size (int) – Number of
AtomicDataitems to accumulate before flushing to the store. Larger batches reduce I/O overhead.
Examples
Default (single store):
>>> sink = AtomicDataZarrSink(output_path="./output.zarr") >>> paths = sink(atomic_data_iterator, index=0) >>> paths ['./output.zarr']
Directory mirroring:
>>> sink = AtomicDataZarrSink( ... output_path="./output/", ... naming_template="{relpath}/{stem}.zarr", ... ) >>> # Input: ./data/split_a/run_01.aselmdb >>> # Output: ./output/split_a/run_01.zarr
- classmethod params() list[physicsnemo_curator.core.base.Param]#
Return parameter descriptors for this sink.
- set_source(
- source: physicsnemo_curator.core.base.Source[nvalchemi.data.AtomicData],
Inject the pipeline source for
{relpath}/{stem}resolution.Called automatically by the
Pipelinewhen the sink is attached viaPipeline.write().- Parameters:
source (Source[AtomicData]) – The pipeline source. If it exposes a
relative_path(index)method, the sink will use it to resolve naming placeholders.
- description: ClassVar[str] = 'Write AtomicData to a Zarr store using nvalchemi'#
Short description shown in the interactive CLI.
- property output_path: pathlib.Path#
Return the output Zarr store path.
- physicsnemo_curator.domains.atm.sinks.zarr_writer.logger#