base#

Base classes for pipeline execution backends.

This module defines the abstract interface that all execution backends must implement, along with common utilities.

Classes#

RunBackend

Abstract base class for pipeline execution backends.

RunConfig

Configuration for pipeline execution.

WorkerProgressDisplay

Multi-line progress display showing per-worker activity.

Functions#

make_progress_bar(→ Any)

Return a tqdm progress bar or None.

process_single_index(→ list[str])

Process a single pipeline index.

process_single_index_packed(→ list[str])

Process a single pipeline index (packed arguments for map functions).

Module Contents#

class physicsnemo_curator.run.base.RunBackend[source]#

Bases: abc.ABC

Abstract base class for pipeline execution backends.

Subclasses implement different parallelization strategies (threading, multiprocessing, distributed computing, workflow orchestrators, etc.).

Class Attributes#

namestr

Unique identifier for this backend (e.g., “sequential”, “thread_pool”).

descriptionstr

Human-readable description of the backend.

requirestuple[str, …]

Optional package dependencies required by this backend.

classmethod is_available() bool[source]#

Check if this backend’s dependencies are installed.

Returns:

True if all required packages are available.

Return type:

bool

abstractmethod run(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
config: RunConfig,
) list[list[str]][source]#

Execute the pipeline over the configured indices.

Parameters:
  • pipeline (Pipeline) – A fully-configured pipeline (source + filters + sink).

  • config (RunConfig) – Execution configuration.

Returns:

Outer list is ordered by the input indices; each inner list contains the file paths returned by the sink for that index.

Return type:

list[list[str]]

description: ClassVar[str]#
name: ClassVar[str]#
requires: ClassVar[tuple[str, Ellipsis]] = ()#
class physicsnemo_curator.run.base.RunConfig[source]#

Configuration for pipeline execution.

Parameters:
  • n_jobs (int) – Number of parallel workers. 1 forces sequential execution. -1 uses all available CPUs. Values <= 0 follow the convention cpu_count + 1 + n_jobs.

  • progress (bool) – Whether to show a progress indicator (if supported by backend).

  • indices (list[int] | None) – Specific source indices to process. None processes all indices.

  • backend_options (dict[str, Any]) – Additional backend-specific options.

backend_options: dict[str, Any]#
indices: list[int] | None = None#
n_jobs: int = 1#
progress: bool = True#
property resolved_n_jobs: int#

Return the concrete positive worker count.

Returns:

Positive integer number of workers.

Return type:

int

class physicsnemo_curator.run.base.WorkerProgressDisplay(
total: int,
n_workers: int,
*,
enabled: bool = True,
desc: str = 'run_pipeline',
)[source]#

Multi-line progress display showing per-worker activity.

Renders an overall progress bar plus one bar per active worker (up to _MAX_WORKER_BARS). Falls back gracefully when tqdm is not installed or progress is disabled.

Parameters:
  • total (int) – Total number of items to process.

  • n_workers (int) – Number of parallel workers.

  • enabled (bool) – Whether to show progress at all.

  • desc (str) – Description label for the overall bar.

close() None[source]#

Close all bars and clean up terminal lines.

complete_item() None[source]#

Increment the overall bar without worker tracking.

Use this for backends where individual worker identity is not available.

worker_done(worker_id: int) None[source]#

Mark a worker as idle and update the overall bar.

Parameters:

worker_id (int) – Zero-based worker identifier.

worker_start(worker_id: int, index: int) None[source]#

Mark a worker as starting to process an index.

Parameters:
  • worker_id (int) – Zero-based worker identifier.

  • index (int) – The source index being processed.

property active: bool#

Return whether the display is active.

physicsnemo_curator.run.base.make_progress_bar(
total: int,
*,
enabled: bool,
desc: str = 'run_pipeline',
) Any[source]#

Return a tqdm progress bar or None.

Parameters:
  • total (int) – Number of items.

  • enabled (bool) – Whether to attempt tqdm import.

  • desc (str) – Description for the progress bar.

Returns:

A tqdm progress bar, or None if disabled or unavailable.

Return type:

Any

physicsnemo_curator.run.base.process_single_index(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
index: int,
) list[str][source]#

Process a single pipeline index.

This is a module-level function to support pickling for multiprocess backends. After processing, any stateful filters with flush methods are automatically flushed to shard files.

Parameters:
  • pipeline (Pipeline) – The pipeline to execute.

  • index (int) – The index to process.

Returns:

File paths written by the sink.

Return type:

list[str]

physicsnemo_curator.run.base.process_single_index_packed(
args: tuple[physicsnemo_curator.core.base.Pipeline[Any], int],
) list[str][source]#

Process a single pipeline index (packed arguments for map functions).

Parameters:

args (tuple[Pipeline, int]) – A (pipeline, index) pair.

Returns:

File paths written by the sink.

Return type:

list[str]