PhysicsNeMo Curator#

ETL toolkit for deep-learning data curation with Python APIs.

PhysicsNeMo Curator provides a composable Source → Filter → Sink pipeline for reading, transforming, and writing scientific data. Each domain vertical (meshes, xarray DataArrays, molecular dynamics tensors) communicates through a single data structure, and pipelines are executed lazily on a per-item basis.

Key Features#

  • Fluent pipeline API — build pipelines with Source(...).filter(F()).write(S())

  • Lazy evaluationpipeline[i] processes only the i-th item

  • Parallel executionrun_pipeline(pipeline, n_jobs=-1) processes all items across multiple backends

  • Generator semantics — sources and filters can yield zero, one, or many items

  • Built-in dataset sources — DrivAerML, AhmedML, NavierStokesCylinder, ERA5, HRRR, OMol25

  • Remote data support — sources handle local dirs, S3, HuggingFace Hub via fsspec with transparent caching

  • Pluggable submodulesmesh, da, atm with independent dependency groups

  • Component registry — automatic discovery of sources, filters, and sinks

Quick Install#

# Core package (no domain-specific dependencies)
pip install physicsnemo-curator

# With mesh support (physicsnemo, pyvista, pyarrow, torch)
pip install physicsnemo-curator[mesh]

# With data-array support (xarray, earth2studio, zarr)
pip install physicsnemo-curator[da]

# With atomistic/molecular support (ase, lmdb, torch)
pip install physicsnemo-curator[atm]

# With parallel backends
pip install physicsnemo-curator[loky]   # joblib-based multiprocessing
pip install physicsnemo-curator[dask]   # dask distributed

Minimal Example#

from physicsnemo_curator import run_pipeline
from physicsnemo_curator.domains.mesh.sources.vtk import VTKSource
from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink

# Build the pipeline
pipeline = (
    VTKSource("./cfd_results/")
    .filter(MeanFilter(output="stats.parquet"))
    .write(MeshSink(output_dir="./output/"))
)

# Process all items (sequentially, with progress bar)
results = run_pipeline(pipeline)

# Or process in parallel across 8 workers
results = run_pipeline(pipeline, n_jobs=8, backend="process_pool")

# Flush stateful filters (only needed for sequential runs)
pipeline.filters[0].flush()

User Guide#

Domains#

Examples#

Benchmarks#

API#