physicsnemo_curator.run#
Pipeline execution with pluggable backends.
This module provides run_pipeline(), which processes every index
of a Pipeline — optionally in
parallel — and returns the collected sink outputs.
Available Backends#
"sequential"— simplefor-loop (default when n_jobs=1)."thread_pool"—concurrent.futures.ThreadPoolExecutor(good for I/O-bound tasks)."process_pool"—concurrent.futures.ProcessPoolExecutor(true parallelism for CPU-bound tasks)."loky"—joblib.Parallelwith thelokybackend (requiresjoblib)."dask"—dask.bagfor parallel/distributed execution (requiresdask)."prefect"— Prefect workflow orchestration with observability (requiresprefect)."auto"— picks the best available backend automatically.
Custom Backends#
You can register custom backends using register_backend():
from physicsnemo_curator.run import register_backend, RunBackend, RunConfig
class MyBackend(RunBackend):
name = "my_backend"
description = "My custom execution backend"
def run(self, pipeline, config):
# Custom execution logic
...
register_backend(MyBackend)
Warning
Stateful filters (e.g. those with flush() methods) automatically
write per-index shard files when using parallel backends. Call
gather_pipeline() after run_pipeline() to merge all
shards into a single output file.
Classes#
Abstract base class for pipeline execution backends. |
|
Configuration for pipeline execution. |
|
Multi-line progress display showing per-worker activity. |
|
Execute pipeline items using Dask bags. |
|
Execute pipeline items using joblib with the loky backend. |
|
Execute pipeline items using Prefect tasks. |
|
Execute pipeline items using a process pool. |
|
Execute pipeline items sequentially in a for-loop. |
|
Execute pipeline items using a thread pool. |
Functions#
|
Merge per-index shard files produced by stateful filters. |
|
Get an instance of a registered backend by name. |
|
List all registered backends and their availability. |
|
Register a custom execution backend. |
|
Execute a pipeline over all (or selected) source indices. |
Package Contents#
- physicsnemo_curator.run.gather_pipeline(
- pipeline: physicsnemo_curator.core.base.Pipeline[Any],
Merge per-index shard files produced by stateful filters.
When
run_pipeline()runs with a parallel backend, each worker flushes stateful filters to shard files named{stem}_shard_{index:06d}{suffix}. This function discovers those shards, calls the filter’smerge()method to combine them into a single output file, and removes the shard files.Call this after
run_pipeline()completes.- Parameters:
pipeline (Pipeline) – The same pipeline that was passed to
run_pipeline().- Returns:
Paths to the merged output files (one per stateful filter).
- Return type:
Examples
>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool") >>> merged = gather_pipeline(pipeline) >>> print(merged) ['outputs/mean_stats.parquet']
- physicsnemo_curator.run.get_backend(name: str) base.RunBackend[source]#
Get an instance of a registered backend by name.
- Parameters:
name (str) – The backend name.
- Returns:
An instance of the requested backend.
- Return type:
- Raises:
ValueError – If the backend is not registered.
- physicsnemo_curator.run.list_backends() dict[str, dict[str, Any]][source]#
List all registered backends and their availability.
- physicsnemo_curator.run.register_backend(backend_cls: type[base.RunBackend]) None[source]#
Register a custom execution backend.
- Parameters:
backend_cls (type[RunBackend]) – The backend class to register. Must have a
nameclass attribute.- Raises:
ValueError – If a backend with the same name is already registered.
Examples
>>> from physicsnemo_curator.run import register_backend, RunBackend >>> class MyBackend(RunBackend): ... name = "my_backend" ... description = "Custom backend" ... def run(self, pipeline, config): ... return [] >>> register_backend(MyBackend)
- physicsnemo_curator.run.run_pipeline(
- pipeline: physicsnemo_curator.core.base.Pipeline[Any],
- *,
- n_jobs: int = 1,
- backend: str = 'auto',
- indices: collections.abc.Iterable[int] | None = None,
- progress: bool = True,
- **backend_kwargs: Any,
Execute a pipeline over all (or selected) source indices.
This is the primary entry-point for batch or parallel pipeline execution. It dispatches to the chosen backend and collects results.
- Parameters:
pipeline (Pipeline) – A fully-configured pipeline (source + filters + sink).
n_jobs (int) – Number of parallel workers.
1forces sequential execution.-1uses all available CPUs.backend (str) – Execution backend. One of
"auto","sequential","thread_pool","process_pool","loky","dask","prefect", or any custom registered backend.indices (Iterable[int] | None) – Specific source indices to process.
None(default) processes all indicesrange(len(pipeline)).progress (bool) – Show a progress indicator if the chosen backend supports it.
**backend_kwargs (Any) – Extra keyword arguments forwarded to the backend.
- Returns:
Outer list is ordered by the input indices; each inner list contains the file paths returned by the sink for that index.
- Return type:
- Raises:
ValueError – If backend is not a recognised name.
RuntimeError – If the pipeline has no sink attached.
ImportError – If the selected backend’s optional dependency is missing.
Notes
When using parallel backends, each worker operates on an independent copy of the pipeline. Stateful filters accumulate per-worker state that is not merged back. Use sequential execution when filter side-effects must be aggregated, or implement a custom reduce step.
Examples
Sequential execution (default):
>>> results = run_pipeline(pipeline)
Parallel with 4 processes:
>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
Using Prefect with retries:
>>> results = run_pipeline( ... pipeline, ... n_jobs=4, ... backend="prefect", ... retries=3, ... retry_delay_seconds=10, ... )
Process only a subset of indices:
>>> results = run_pipeline(pipeline, indices=[0, 5, 10])