physicsnemo_curator.run#

Pipeline execution with pluggable backends.

This module provides run_pipeline(), which processes every index of a Pipeline — optionally in parallel — and returns the collected sink outputs.

Available Backends#

  • "sequential" — simple for-loop (default).

  • "process_pool"concurrent.futures.ProcessPoolExecutor (true parallelism for CPU-bound tasks).

  • "loky"joblib.Parallel with the loky backend (requires joblib).

  • "dask"dask.bag for parallel/distributed execution (requires dask).

Custom Backends#

You can register custom backends using register_backend():

from physicsnemo_curator.run import register_backend, RunBackend, RunConfig

class MyBackend(RunBackend):
    name = "my_backend"
    description = "My custom execution backend"

    def run(self, pipeline, config):
        # Custom execution logic
        ...

register_backend(MyBackend)

Warning

Stateful filters (e.g. those with flush() methods) automatically write per-index shard files when using parallel backends. Call gather_pipeline() after run_pipeline() to merge all shards into a single output file.

Classes#

RunBackend

Abstract base class for pipeline execution backends.

RunConfig

Configuration for pipeline execution.

WorkerProgressDisplay

Multi-line progress display showing per-worker activity.

DaskBackend

Execute pipeline items using Dask bags.

LokyBackend

Execute pipeline items using joblib with the loky backend.

ProcessPoolBackend

Execute pipeline items using a process pool.

PipelineProgressApp

Compact full-screen Textual app for pipeline progress monitoring.

LogProgressMonitor

Simple timestamped log-line progress monitor.

ProgressMonitor

Context manager that runs a Textual TUI in a daemon thread.

SequentialBackend

Execute pipeline items sequentially in a for-loop.

Functions#

gather_pipeline(→ list[str])

Merge per-worker shard files produced by stateful filters.

get_backend(→ base.RunBackend)

Get an instance of a registered backend by name.

list_backends(→ dict[str, dict[str, Any]])

List all registered backends and their availability.

register_backend(→ None)

Register a custom execution backend.

run_pipeline(→ list[list[str]])

Execute a pipeline over all (or selected) source indices.

Package Contents#

physicsnemo_curator.run.gather_pipeline(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
) list[str][source]#

Merge per-worker shard files produced by stateful filters.

When run_pipeline() runs with a parallel backend, each worker flushes stateful filters to shard files named {stem}_worker_{worker_id}{suffix}. This function discovers those shards, calls the filter’s merge() method to combine them into a single output file, and removes the shard files.

Call this after run_pipeline() completes.

Parameters:

pipeline (Pipeline) – The same pipeline that was passed to run_pipeline().

Returns:

Paths to the merged output files (one per stateful filter).

Return type:

list[str]

Examples

>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
>>> merged = gather_pipeline(pipeline)
>>> print(merged)
['outputs/mean_stats.parquet']
physicsnemo_curator.run.get_backend(name: str) base.RunBackend[source]#

Get an instance of a registered backend by name.

Parameters:

name (str) – The backend name.

Returns:

An instance of the requested backend.

Return type:

RunBackend

Raises:

ValueError – If the backend is not registered.

physicsnemo_curator.run.list_backends() dict[str, dict[str, Any]][source]#

List all registered backends and their availability.

Returns:

Dictionary mapping backend names to info dicts containing: - description: Human-readable description - available: Whether dependencies are installed - requires: Tuple of required packages

Return type:

dict[str, dict[str, Any]]

physicsnemo_curator.run.register_backend(backend_cls: type[base.RunBackend]) None[source]#

Register a custom execution backend.

Parameters:

backend_cls (type[RunBackend]) – The backend class to register. Must have a name class attribute.

Raises:

ValueError – If a backend with the same name is already registered.

Examples

>>> from physicsnemo_curator.run import register_backend, RunBackend
>>> class MyBackend(RunBackend):
...     name = "my_backend"
...     description = "Custom backend"
...     def run(self, pipeline, config):
...         return []
>>> register_backend(MyBackend)
physicsnemo_curator.run.run_pipeline(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
*,
n_jobs: int = 1,
backend: str = 'sequential',
indices: collections.abc.Iterable[int] | None = None,
use_tui: bool = True,
**backend_kwargs: Any,
) list[list[str]][source]#

Execute a pipeline over all (or selected) source indices.

This is the primary entry-point for batch or parallel pipeline execution. It dispatches to the chosen backend and collects results.

Parameters:
  • pipeline (Pipeline) – A fully-configured pipeline (source + filters + sink).

  • n_jobs (int) – Number of parallel workers. 1 forces sequential execution. -1 uses all available CPUs.

  • backend (str) – Execution backend. One of "sequential", "process_pool", "loky", "dask", or any custom registered backend.

  • indices (Iterable[int] | None) – Specific source indices to process. None (default) processes all indices range(len(pipeline)).

  • use_tui (bool) – Whether to show the full-screen Textual TUI for progress (requires an interactive terminal). When False, prints simple timestamped log lines to the console instead.

  • **backend_kwargs (Any) – Extra keyword arguments forwarded to the backend.

Returns:

Outer list is ordered by the input indices; each inner list contains the file paths returned by the sink for that index.

Return type:

list[list[str]]

Raises:
  • ValueError – If backend is not a recognised name.

  • RuntimeError – If the pipeline has no sink attached.

  • ImportError – If the selected backend’s optional dependency is missing.

Notes

When using parallel backends, each worker operates on an independent copy of the pipeline. Stateful filters accumulate per-worker state that is not merged back. Use sequential execution when filter side-effects must be aggregated, or implement a custom reduce step.

Examples

Sequential execution (default):

>>> results = run_pipeline(pipeline)

Parallel with 4 processes:

>>> results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")

Process only a subset of indices:

>>> results = run_pipeline(pipeline, indices=[0, 5, 10])