Parallel Execution#
run_pipeline() processes every index of a pipeline — optionally in parallel —
and returns the collected sink outputs. It replaces the manual
for i in range(len(pipeline)) loop.
Quick Start#
from physicsnemo_curator import run_pipeline
# Sequential (default)
results = run_pipeline(pipeline)
# Parallel with 4 worker processes
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Use all CPUs
results = run_pipeline(pipeline, n_jobs=-1)
# Process a subset of indices
results = run_pipeline(pipeline, indices=[0, 5, 10])
Backends#
run_pipeline supports multiple execution backends, selected via the
backend parameter:
Backend |
Dependency |
Description |
|---|---|---|
|
None |
Simple for-loop. Always used when |
|
None |
|
|
None |
|
|
|
joblib’s robust process pool. Better memory handling for large arrays. |
|
|
|
|
|
Prefect workflow orchestration with observability, retries, and scheduling. |
|
Varies |
Picks the best available: dask → loky → process_pool. |
Installing optional backends#
# Install individual backend extras
pip install 'physicsnemo-curator[loky]'
pip install 'physicsnemo-curator[dask]'
pip install 'physicsnemo-curator[prefect]'
# Or install multiple
pip install 'physicsnemo-curator[loky,dask]'
Custom backends#
You can register your own backends:
from physicsnemo_curator.run import register_backend, RunBackend, RunConfig
class MyBackend(RunBackend):
name = "my_backend"
description = "My custom execution backend"
requires = ("my_package",) # Optional dependencies
def run(self, pipeline, config):
# Your execution logic here
results = []
for idx in config.indices or range(len(pipeline)):
results.append(pipeline[idx])
return results
register_backend(MyBackend)
# Now use it:
results = run_pipeline(pipeline, n_jobs=4, backend="my_backend")
Parameters#
run_pipeline(
pipeline, # Pipeline with source + filters + sink
*,
n_jobs=1, # Workers. -1 = all CPUs.
backend="auto", # "auto", "sequential", "thread_pool", "process_pool", "loky", "dask", "prefect"
indices=None, # Subset of source indices, or None for all
progress=True, # Show progress bar (tqdm / dask diagnostics)
**backend_kwargs, # Extra args forwarded to the backend executor
)
Returns: list[list[str]] — outer list ordered by input indices, inner
list contains file paths returned by the sink for that index.
Process Isolation#
All multiprocess backends ("process_pool", "loky", "dask", "prefect") execute each
index in a separate process. This means:
Each worker gets an independent copy of the pipeline, source, filters, and sink.
Stateful filters are not merged back. For example,
MeanFilteraccumulates rows inself._rows— those rows exist only in the child process and are discarded when the worker exits.
If you need filter side-effects (like MeanFilter.flush()), use sequential
execution:
# Sequential — filter state is preserved
results = run_pipeline(pipeline, n_jobs=1)
pipeline.filters[0].flush()
Or implement a post-hoc merge step that combines per-worker outputs.
Profiling#
To measure wall-clock time, memory, and GPU usage across parallel backends,
ensure track_metrics=True (the default). See Profiling
for details.
Examples#
Basic parallel ETL#
from physicsnemo_curator.domains.mesh.sources.vtk import VTKSource
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator import run_pipeline
pipeline = (
VTKSource("./cfd_results/")
.write(MeshSink(output_dir="./output/"))
)
# Process all items with 8 workers
results = run_pipeline(pipeline, n_jobs=8, backend="process_pool")
print(f"Wrote {sum(len(r) for r in results)} files")
With HuggingFace dataset sources#
from physicsnemo_curator.domains.mesh.sources.drivaerml import DrivAerMLSource
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator import run_pipeline
pipeline = (
DrivAerMLSource(mesh_type="boundary")
.write(MeshSink(output_dir="./drivaer_output/"))
)
# Process first 10 runs in parallel
results = run_pipeline(pipeline, n_jobs=4, indices=list(range(10)))
Using Prefect with retries#
from physicsnemo_curator import run_pipeline
# Prefect provides automatic retries, logging, and observability
results = run_pipeline(
pipeline,
n_jobs=4,
backend="prefect",
retries=3,
retry_delay_seconds=10,
)
Choosing a backend at runtime#
import os
from physicsnemo_curator import run_pipeline
# CI uses sequential; production uses all CPUs
n = 1 if os.getenv("CI") else -1
results = run_pipeline(pipeline, n_jobs=n)
Listing available backends#
from physicsnemo_curator.run import list_backends
backends = list_backends()
for name, info in backends.items():
status = "available" if info["available"] else f"requires {info['requires']}"
print(f"{name}: {info['description']} ({status})")