Quickstart#

This guide walks through building ETL pipelines with PhysicsNeMo Curator for two common domains: CAE mesh processing and weather/climate reanalysis.

Both examples follow the same pattern: Source → Filter → Sink → run_pipeline.

CAE: DrivAerML Surface Meshes#

Process automotive CFD boundary meshes from the DrivAerML dataset on HuggingFace Hub. DrivAerML contains 500 parametrically morphed variants of the DrivAer notchback vehicle with high-fidelity scale-resolving CFD.

pip install physicsnemo-curator[mesh]
from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.filters.precision import PrecisionFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.drivaerml import DrivAerMLSource
from physicsnemo_curator.run import gather_pipeline, run_pipeline

# 1. Source — reads boundary VTP files from HuggingFace Hub
source = DrivAerMLSource(mesh_type="boundary")
print(f"Runs available: {len(source)}")

# 2. Build the pipeline: Source → MeanFilter → PrecisionFilter → Sink
pipeline = (
    source
    .filter(MeanFilter(output="outputs/drivaerml/mean_stats.parquet"))
    .filter(PrecisionFilter(target_dtype="float32"))
    .write(MeshSink(output_dir="outputs/drivaerml/meshes/"))
)

# 3. Run in parallel (first 3 runs)
results = run_pipeline(
    pipeline,
    n_jobs=4,
    backend="process_pool",
    indices=range(3),
)

# 4. Merge per-worker statistics
merged = gather_pipeline(pipeline)

print(f"Processed {len(results)} runs")
for path in merged:
    print(f"Merged statistics: {path}")

This produces:

outputs/drivaerml/
├── mean_stats.parquet      # Per-field spatial means
└── meshes/
    ├── mesh_0000_0/        # Run 0 (tensordict format)
    ├── mesh_0001_0/        # Run 1
    └── mesh_0002_0/        # Run 2

Weather/Climate: ERA5 Reanalysis#

Download ERA5 reanalysis fields and compute temporal statistics over one month. ERA5 data is accessed via earth2studio backends — no API keys required for the default ARCO backend.

pip install physicsnemo-curator[da]
from datetime import datetime, timedelta

from physicsnemo_curator.domains.da.filters.moments import MomentsFilter
from physicsnemo_curator.domains.da.sinks.zarr_writer import ZarrSink
from physicsnemo_curator.domains.da.sources.era5 import ERA5Source
from physicsnemo_curator.run import gather_pipeline, run_pipeline

# 1. Source — one month of 6-hourly ERA5 snapshots
start = datetime(2020, 1, 1)
times = [start + timedelta(hours=6 * i) for i in range(4 * 31)]  # ~124 steps
variables = ["t2m", "u10m", "v10m"]

source = ERA5Source(times=times, variables=variables, backend="arco")
print(f"Timesteps: {len(source)}")

# 2. Build the pipeline: Source → MomentsFilter → ZarrSink
pipeline = (
    source
    .filter(MomentsFilter(output="outputs/era5/moments.zarr", dims=("time",)))
    .write(ZarrSink(output_path="outputs/era5/data.zarr"))
)

# 3. Run in parallel
results = run_pipeline(
    pipeline,
    n_jobs=4,
    backend="process_pool",
    indices=range(len(source)),
)

# 4. Merge per-worker moment statistics
merged = gather_pipeline(pipeline)

print(f"Processed {len(results)} timesteps")
for path in merged:
    print(f"Merged moments: {path}")

This produces:

outputs/era5/
├── moments.zarr/           # Temporal statistics (mean, variance, skewness, min, max)
│   ├── t2m/
│   ├── u10m/
│   └── v10m/
└── data.zarr/              # Raw fields in Zarr format
    ├── t2m/
    ├── u10m/
    └── v10m/

Next Steps#

  • See the full Examples gallery for crash simulation, external aerodynamics, and more.

  • Read the Parallel Execution guide for details on execution backends and stateful filter handling.

  • Use the interactive wizard (pip install physicsnemo-curator[wiz]) to build pipelines without writing code — see Pipeline Wizard.