Note
Go to the end to download the full example code.
OMol25 Atomic Data ETL Pipeline#
This example demonstrates a complete Source → Filter → Sink pipeline for curating atomic/molecular data from the Open Molecules 2025 (OMol25) dataset.
OMol25 contains over 100 million DFT calculations at the
ωB97M-V/def2-TZVPD level of theory, covering 83 elements and ~83 million
unique molecular systems. The dataset is stored as ASE LMDB
(.aselmdb) files — each file holds thousands of atomic structures
with positions, forces, energies, and other computed properties.
The pipeline reads the raw LMDB files, computes per-field statistics (mean, std, min, max, skewness, kurtosis) using numerically stable Welford accumulators, and writes the processed structures to a Zarr store in the nvalchemi format — a common preprocessing step for training machine-learned interatomic potentials (MLIPs).
We process only the first 2 LMDB files to keep the example fast.
References#
OMol25 dataset: https://huggingface.co/facebook/OMol25
OMol25 paper: Levine et al., arXiv:2505.08762 (2025)
nvalchemi toolkit: https://nvidia.github.io/nvalchemi-toolkit/
Imports#
Import the core pipeline components: a Source to read ASE LMDB
files, a Filter to compute field statistics, a Sink to write
AtomicData to a Zarr store, and
run_pipeline() for parallel execution.
import pyarrow.parquet as pq
from physicsnemo_curator.domains.atm.filters.stats import AtomicStatsFilter
from physicsnemo_curator.domains.atm.sinks.zarr_writer import AtomicDataZarrSink
from physicsnemo_curator.domains.atm.sources.aselmdb import ASELMDBSource
from physicsnemo_curator.run import gather_pipeline, run_pipeline
Configure the Source#
ASELMDBSource
discovers all .aselmdb files in a directory, sorted
lexicographically. Each file corresponds to one source index and
may contain thousands of atomic structures.
The optional metadata_path parameter points to a NumPy .npz
file containing natoms and data_ids arrays, which the source
loads eagerly for downstream reference.
source = ASELMDBSource(
data_dir="./val/",
metadata_path="./val/metadata.npz",
)
print(f"LMDB files discovered: {len(source)}")
if source.metadata is not None:
natoms = source.metadata.get("natoms")
if natoms is not None:
print(f"Total structures in metadata: {len(natoms):,}")
Build the Pipeline#
The fluent API chains Source → Filter → Sink into a lazy
Pipeline. Nothing is
executed until we explicitly process indices.
AtomicStatsFilterexamines eachAtomicDataand accumulates per-field, per-component statistics using Welford’s online algorithm. Fields are grouped by level (node, edge, system) and includepositions,forces,energies,atomic_numbers, and any extra data attached to the structures. The filter is pass-through — each item is yielded unchanged.AtomicDataZarrSinkcollects items into batches (default 1000) and writes them to a structured Zarr store usingAtomicDataZarrWriter. Multiple pipeline indices append to the same store.
pipeline = source.filter(AtomicStatsFilter(output="outputs/omol25/stats.parquet")).write(
AtomicDataZarrSink(output_path="outputs/omol25/dataset.zarr", batch_size=500)
)
Run in Parallel#
run_pipeline() dispatches work to a
process_pool backend. We pass indices=range(2) to process
only the first 2 LMDB files (each containing many structures).
Each worker gets an independent copy of the pipeline, so LMDB files are read, statistics are accumulated, and structures are written concurrently.
results = run_pipeline(
pipeline,
n_jobs=2,
backend="process_pool",
indices=range(2),
progress=True,
)
Inspect Results#
results is a list of lists — one entry per processed index,
each containing the file paths written by the sink.
print(f"\nProcessed {len(results)} LMDB files")
for i, paths in enumerate(results):
print(f" File {i}: {paths}")
Gather Statistics#
When running in parallel, each worker writes per-index shard files for
the stateful statistics filter.
gather_pipeline() discovers those shards,
merges them using the parallel Welford algorithm (Chan et al., 1979),
and writes a single consolidated Parquet file.
merged = gather_pipeline(pipeline)
for path in merged:
print(f"Merged statistics: {path}")
Explore the Statistics#
The merged Parquet file contains one row per (field, component) pair with columns for mean, std, variance, min, max, median, skewness, kurtosis, and the full Welford accumulator state.
table = pq.read_table("outputs/omol25/stats.parquet")
print(f"\nStatistics table: {table.num_rows} rows, {table.num_columns} columns")
print(f"Fields tracked: {table.column('field_key').to_pylist()[:10]}...")
print(f"Levels: {set(table.column('level').to_pylist())}")
The outputs/omol25/ directory now contains:
outputs/omol25/
├── stats.parquet # Per-field statistics (merged)
└── dataset.zarr/ # AtomicData Zarr store
├── meta/ # Pointer arrays (atoms_ptr, edges_ptr)
├── core/ # Core fields (positions, forces, ...)
├── custom/ # User-defined fields
└── .zattrs # Root metadata (num_samples, fields)
The Zarr store follows the nvalchemi
AtomicDataZarrWriter
layout with CSR-style pointer arrays for variable-size systems,
enabling efficient random access for training loops.
Adding Checkpointing#
For large-scale runs (all 80 LMDB files), wrap the pipeline with
CheckpointedPipeline
to enable restart from where you left off. Create a checkpoint with
CheckpointedPipeline(pipeline, db_path="outputs/omol25/etl.db"),
then pass it to run_pipeline as usual. On restart, completed
LMDB files are skipped automatically.
See Checkpointing Pipelines for the full guide.