Note
Go to the end to download the full example code.
Running a Pipeline in Parallel#
This example demonstrates run_pipeline()
to execute a pipeline across multiple source indices using parallel
workers.
Building on the Creating a Pipeline
example, we process multiple DrivAerML CFD meshes concurrently with a
process_pool backend and then merge per-worker statistics using
gather_pipeline().
Note
Install the mesh extras before running:
pip install physicsnemo-curator[mesh]
Imports#
from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.drivaerml import DrivAerMLSource
from physicsnemo_curator.run import gather_pipeline, run_pipeline
Build the Pipeline#
DrivAerMLSource
provides 500 DrivAerML automotive CFD meshes from HuggingFace Hub.
We attach a MeanFilter
for spatial statistics and a
MeshSink for
output.
pipeline = (
DrivAerMLSource(mesh_type="boundary")
.filter(MeanFilter(output="outputs/parallel/stats.parquet"))
.write(MeshSink(output_dir="outputs/parallel/meshes/"))
)
print(f"Total runs available: {len(pipeline)}")
Run in Parallel#
run_pipeline() dispatches indices to
parallel workers. Key parameters:
n_jobs— number of workers (-1= all CPUs)backend—"process_pool","thread_pool","loky","dask","prefect", or"auto"indices— which source indices to process (default: all)progress— show a progress bar
Each worker receives an independent copy of the pipeline, so data is read, filtered, and written concurrently.
results = run_pipeline(
pipeline,
n_jobs=4,
backend="process_pool",
indices=range(3),
progress=True,
)
Inspect Results#
results is a list of lists — one entry per processed index, each
containing the file paths returned by the sink.
print(f"\nProcessed {len(results)} runs")
for i, paths in enumerate(results):
print(f" Run {i}: {paths}")
Gather Statistics#
When running in parallel, stateful filters (like
MeanFilter) produce
per-index shard files.
gather_pipeline() discovers those
shards, calls the filter’s merge() method to combine them into
a single output file, and cleans up the temporaries.
merged = gather_pipeline(pipeline)
for path in merged:
print(f"Merged statistics: {path}")
Available Backends#
Backend |
Install extra |
Best for |
|---|---|---|
|
(built-in) |
Debugging, small datasets |
|
(built-in) |
I/O-bound tasks |
|
(built-in) |
CPU-bound tasks (default) |
|
|
|
|
|
|
|
|
|
Use backend="auto" to let the framework pick the best available
backend for your system.