Note
Go to the end to download the full example code.
Creating a Pipeline#
This example shows how to build a data curation pipeline using the Source → Filter → Sink pattern. We read meshes from the Navier-Stokes Cylinder dataset, apply a statistics filter, and write the outputs to disk.
A pipeline is lazy — nothing is executed until you index into it
with pipeline[i]. This makes it easy to build, inspect, and
compose pipelines before running them.
Note
Install the mesh extras before running:
pip install physicsnemo-curator[mesh]
Imports#
Every pipeline has three ingredients: a Source (data reader), zero or more Filters (transforms or analytics), and a Sink (writer).
from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.filters.precision import PrecisionFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.ns_cylinder import NavierStokesCylinderSource
Step 1: Create a Source#
A Source is an indexed
collection of data items. Here we use
NavierStokesCylinderSource
which provides 500 Navier-Stokes flow simulations as
Mesh objects.
source = NavierStokesCylinderSource()
print(f"Source: {source.name}")
print(f"Items available: {len(source)}")
Step 2: Add Filters#
Filters transform or inspect items as they flow through the pipeline.
The fluent .filter() method chains multiple filters together:
MeanFiltercomputes spatial means and writes a Parquet summary.PrecisionFilterconverts floating-point fields tofloat32.
pipeline = source.filter(MeanFilter(output="outputs/getting_started/stats.parquet")).filter(
PrecisionFilter(target_dtype="float32")
)
print(f"Filters: {[f.name for f in pipeline.filters]}")
print(f"Sink: {pipeline.sink}") # None — no sink yet
Step 3: Attach a Sink#
A Sink persists items to
storage. The .write() method attaches a sink and returns a
complete pipeline.
pipeline = pipeline.write(MeshSink(output_dir="outputs/getting_started/meshes/"))
assert pipeline.sink is not None
print(f"Sink: {pipeline.sink.name}")
print(f"Pipeline length: {len(pipeline)}")
Step 4: Execute One Index#
Indexing into a pipeline runs the full Source → Filters → Sink chain for a single source item and returns the file paths written by the sink.
paths = pipeline[0]
print(f"Index 0 wrote: {paths}")
The fluent API also supports building in one expression:
pipeline = (
NavierStokesCylinderSource()
.filter(MeanFilter(output="stats.parquet"))
.filter(PrecisionFilter(target_dtype="float32"))
.write(MeshSink(output_dir="meshes/"))
)
Each call returns a new immutable
Pipeline — the original
source, filters, and sink are never modified.