physicsnemo_curator.run#

Pipeline execution with pluggable backends.

This module provides run_pipeline(), which processes every index of a Pipeline — optionally in parallel — and returns the collected sink outputs.

Available Backends#

  • "sequential" — simple for-loop (default when n_jobs=1).

  • "thread_pool"concurrent.futures.ThreadPoolExecutor (good for I/O-bound tasks).

  • "process_pool"concurrent.futures.ProcessPoolExecutor (true parallelism for CPU-bound tasks).

  • "loky"joblib.Parallel with the loky backend (requires joblib).

  • "dask"dask.bag for parallel/distributed execution (requires dask).

  • "prefect" — Prefect workflow orchestration with observability (requires prefect).

  • "auto" — picks the best available backend automatically.

Custom Backends#

You can register custom backends using register_backend():

from physicsnemo_curator.run import register_backend, RunBackend, RunConfig

class MyBackend(RunBackend):
    name = "my_backend"
    description = "My custom execution backend"

    def run(self, pipeline, config):
        # Custom execution logic
        ...

register_backend(MyBackend)

Warning

Stateful filters (e.g. those with flush() methods) automatically write per-index shard files when using parallel backends. Call gather_pipeline() after run_pipeline() to merge all shards into a single output file.

Classes#

RunBackend

Abstract base class for pipeline execution backends.

RunConfig

Configuration for pipeline execution.

WorkerProgressDisplay

Multi-line progress display showing per-worker activity.

DaskBackend

Execute pipeline items using Dask bags.

LokyBackend

Execute pipeline items using joblib with the loky backend.

PrefectBackend

Execute pipeline items using Prefect tasks.

ProcessPoolBackend

Execute pipeline items using a process pool.

SequentialBackend

Execute pipeline items sequentially in a for-loop.

ThreadPoolBackend

Execute pipeline items using a thread pool.

Functions#

gather_pipeline(→ list[str])

Merge per-index shard files produced by stateful filters.

get_backend(→ base.RunBackend)

Get an instance of a registered backend by name.

list_backends(→ dict[str, dict[str, Any]])

List all registered backends and their availability.

register_backend(→ None)

Register a custom execution backend.

run_pipeline(→ list[list[str]])

Execute a pipeline over all (or selected) source indices.

Package Contents#

physicsnemo_curator.run.gather_pipeline(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
) list[str][source]#

Merge per-index shard files produced by stateful filters.

When run_pipeline() runs with a parallel backend, each worker flushes stateful filters to shard files named {stem}_shard_{index:06d}{suffix}. This function discovers those shards, calls the filter’s merge() method to combine them into a single output file, and removes the shard files.

Call this after run_pipeline() completes.

Parameters:

pipeline (Pipeline) – The same pipeline that was passed to run_pipeline().

Returns:

Paths to the merged output files (one per stateful filter).

Return type:

list[str]

Examples

>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
>>> merged = gather_pipeline(pipeline)
>>> print(merged)
['outputs/mean_stats.parquet']
physicsnemo_curator.run.get_backend(name: str) base.RunBackend[source]#

Get an instance of a registered backend by name.

Parameters:

name (str) – The backend name.

Returns:

An instance of the requested backend.

Return type:

RunBackend

Raises:

ValueError – If the backend is not registered.

physicsnemo_curator.run.list_backends() dict[str, dict[str, Any]][source]#

List all registered backends and their availability.

Returns:

Dictionary mapping backend names to info dicts containing: - description: Human-readable description - available: Whether dependencies are installed - requires: Tuple of required packages

Return type:

dict[str, dict[str, Any]]

physicsnemo_curator.run.register_backend(backend_cls: type[base.RunBackend]) None[source]#

Register a custom execution backend.

Parameters:

backend_cls (type[RunBackend]) – The backend class to register. Must have a name class attribute.

Raises:

ValueError – If a backend with the same name is already registered.

Examples

>>> from physicsnemo_curator.run import register_backend, RunBackend
>>> class MyBackend(RunBackend):
...     name = "my_backend"
...     description = "Custom backend"
...     def run(self, pipeline, config):
...         return []
>>> register_backend(MyBackend)
physicsnemo_curator.run.run_pipeline(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
*,
n_jobs: int = 1,
backend: str = 'auto',
indices: collections.abc.Iterable[int] | None = None,
progress: bool = True,
**backend_kwargs: Any,
) list[list[str]][source]#

Execute a pipeline over all (or selected) source indices.

This is the primary entry-point for batch or parallel pipeline execution. It dispatches to the chosen backend and collects results.

Parameters:
  • pipeline (Pipeline) – A fully-configured pipeline (source + filters + sink).

  • n_jobs (int) – Number of parallel workers. 1 forces sequential execution. -1 uses all available CPUs.

  • backend (str) – Execution backend. One of "auto", "sequential", "thread_pool", "process_pool", "loky", "dask", "prefect", or any custom registered backend.

  • indices (Iterable[int] | None) – Specific source indices to process. None (default) processes all indices range(len(pipeline)).

  • progress (bool) – Show a progress indicator if the chosen backend supports it.

  • **backend_kwargs (Any) – Extra keyword arguments forwarded to the backend.

Returns:

Outer list is ordered by the input indices; each inner list contains the file paths returned by the sink for that index.

Return type:

list[list[str]]

Raises:
  • ValueError – If backend is not a recognised name.

  • RuntimeError – If the pipeline has no sink attached.

  • ImportError – If the selected backend’s optional dependency is missing.

Notes

When using parallel backends, each worker operates on an independent copy of the pipeline. Stateful filters accumulate per-worker state that is not merged back. Use sequential execution when filter side-effects must be aggregated, or implement a custom reduce step.

Examples

Sequential execution (default):

>>> results = run_pipeline(pipeline)

Parallel with 4 processes:

>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")

Using Prefect with retries:

>>> results = run_pipeline(
...     pipeline,
...     n_jobs=4,
...     backend="prefect",
...     retries=3,
...     retry_delay_seconds=10,
... )

Process only a subset of indices:

>>> results = run_pipeline(pipeline, indices=[0, 5, 10])