Creating a Pipeline#

This example shows how to build a data curation pipeline using the Source → Filter → Sink pattern. We read meshes from the Navier-Stokes Cylinder dataset, apply a statistics filter, and write the outputs to disk.

A pipeline is lazy — nothing is executed until you index into it with pipeline[i]. This makes it easy to build, inspect, and compose pipelines before running them.

Note

Install the mesh extras before running:

pip install physicsnemo-curator[mesh]

Imports#

Every pipeline has three ingredients: a Source (data reader), zero or more Filters (transforms or analytics), and a Sink (writer).

from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.filters.precision import PrecisionFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.ns_cylinder import NavierStokesCylinderSource

Step 1: Create a Source#

A Source is an indexed collection of data items. Here we use NavierStokesCylinderSource which provides 500 Navier-Stokes flow simulations as Mesh objects.

source = NavierStokesCylinderSource()

print(f"Source: {source.name}")
print(f"Items available: {len(source)}")

Step 2: Add Filters#

Filters transform or inspect items as they flow through the pipeline. The fluent .filter() method chains multiple filters together:

pipeline = source.filter(MeanFilter(output="outputs/getting_started/stats.parquet")).filter(
    PrecisionFilter(target_dtype="float32")
)

print(f"Filters: {[f.name for f in pipeline.filters]}")
print(f"Sink: {pipeline.sink}")  # None — no sink yet

Step 3: Attach a Sink#

A Sink persists items to storage. The .write() method attaches a sink and returns a complete pipeline.

pipeline = pipeline.write(MeshSink(output_dir="outputs/getting_started/meshes/"))

assert pipeline.sink is not None
print(f"Sink: {pipeline.sink.name}")
print(f"Pipeline length: {len(pipeline)}")

Step 4: Execute One Index#

Indexing into a pipeline runs the full Source → Filters → Sink chain for a single source item and returns the file paths written by the sink.

paths = pipeline[0]
print(f"Index 0 wrote: {paths}")

The fluent API also supports building in one expression:

pipeline = (
    NavierStokesCylinderSource()
    .filter(MeanFilter(output="stats.parquet"))
    .filter(PrecisionFilter(target_dtype="float32"))
    .write(MeshSink(output_dir="meshes/"))
)

Each call returns a new immutable Pipeline — the original source, filters, and sink are never modified.

Gallery generated by Sphinx-Gallery