base#

Base classes for pipeline execution backends.

This module defines the abstract interface that all execution backends must implement, along with common utilities.

Classes#

RunBackend

Abstract base class for pipeline execution backends.

RunConfig

Configuration for pipeline execution.

WorkerProgressDisplay

Multi-line progress display showing per-worker activity.

Functions#

batch_groups(→ list[list[int]])

Merge partition groups into at most n_workers batches.

intersect_partitions(→ list[list[int]] | None)

Intersect source and sink partition constraints.

make_progress_bar(→ Any)

Return a tqdm progress bar or None.

process_index_group(→ dict[int, list[str]])

Process a group of pipeline indices sequentially.

process_single_index(→ list[str])

Process a single pipeline index.

process_single_index_packed(→ list[str])

Process a single pipeline index (packed arguments for map functions).

Module Contents#

class physicsnemo_curator.run.base.RunBackend[source]#

Bases: abc.ABC

Abstract base class for pipeline execution backends.

Subclasses implement different parallelization strategies (threading, multiprocessing, distributed computing, workflow orchestrators, etc.).

Class Attributes#

namestr

Unique identifier for this backend (e.g., “sequential”, “process_pool”).

descriptionstr

Human-readable description of the backend.

requirestuple[str, …]

Optional package dependencies required by this backend.

classmethod is_available() bool[source]#

Check if this backend’s dependencies are installed.

Returns:

True if all required packages are available.

Return type:

bool

abstractmethod run(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
config: RunConfig,
) list[list[str]][source]#

Execute the pipeline over the configured indices.

Parameters:
  • pipeline (Pipeline) – A fully-configured pipeline (source + filters + sink).

  • config (RunConfig) – Execution configuration.

Returns:

Outer list is ordered by the input indices; each inner list contains the file paths returned by the sink for that index.

Return type:

list[list[str]]

description: ClassVar[str]#
name: ClassVar[str]#
requires: ClassVar[tuple[str, Ellipsis]] = ()#
class physicsnemo_curator.run.base.RunConfig[source]#

Configuration for pipeline execution.

Parameters:
  • n_jobs (int) – Number of parallel workers. 1 forces sequential execution. -1 uses all available CPUs. Values <= 0 follow the convention cpu_count + 1 + n_jobs.

  • use_tui (bool) – Whether to show the full-screen Textual TUI for progress (requires an interactive terminal). When False, prints simple timestamped log lines to the console instead.

  • indices (list[int] | None) – Specific source indices to process. None processes all indices.

  • backend_options (dict[str, Any]) – Additional backend-specific options.

backend_options: dict[str, Any]#
indices: list[int] | None = None#
n_jobs: int = 1#
property resolved_n_jobs: int#

Return the concrete positive worker count.

Returns:

Positive integer number of workers.

Return type:

int

use_tui: bool = True#
class physicsnemo_curator.run.base.WorkerProgressDisplay(
total: int,
n_workers: int,
*,
enabled: bool = True,
desc: str = 'run_pipeline',
show_worker_bars: bool = False,
)[source]#

Multi-line progress display showing per-worker activity.

Renders an overall progress bar plus optionally one bar per active worker (up to _MAX_WORKER_BARS). Falls back gracefully when tqdm is not installed or progress is disabled.

Parameters:
  • total (int) – Total number of items to process.

  • n_workers (int) – Number of parallel workers.

  • enabled (bool) – Whether to show progress at all.

  • desc (str) – Description label for the overall bar.

  • show_worker_bars (bool) – Whether to show per-worker progress bars. Defaults to False to avoid console conflicts with multiple processes.

close() None[source]#

Close all bars and clean up terminal lines.

complete_item() None[source]#

Increment the overall bar without worker tracking.

Use this for backends where individual worker identity is not available.

worker_done(worker_id: int) None[source]#

Mark a worker as idle (does NOT update main bar - use complete_item).

Parameters:

worker_id (int) – Zero-based worker identifier.

worker_start(worker_id: int, index: int) None[source]#

Mark a worker as starting to process an index.

Parameters:
  • worker_id (int) – Zero-based worker identifier.

  • index (int) – The source index being processed.

property active: bool#

Return whether the display is active.

physicsnemo_curator.run.base.batch_groups(
groups: list[list[int]],
n_workers: int,
) list[list[int]][source]#

Merge partition groups into at most n_workers batches.

When there are more groups than workers, groups are distributed across workers using a greedy bin-packing strategy (assign each group to the lightest batch) to balance load.

Each batch is a flat list of indices preserving the constraint that indices from the same original group are always together.

Parameters:
Returns:

At most n_workers batches, each a list of indices.

Return type:

list[list[int]]

physicsnemo_curator.run.base.intersect_partitions(
source_groups: list[list[int]] | None,
sink_groups: list[list[int]] | None,
) list[list[int]] | None[source]#

Intersect source and sink partition constraints.

Both the source and sink may independently declare that certain indices MUST be processed by the same worker. This function computes the finest partition that satisfies both constraints, or raises ValueError if the constraints are incompatible.

Parameters:
  • source_groups (list[list[int]] | None) – Groups from Source.partition_indices(), or None.

  • sink_groups (list[list[int]] | None) – Groups from Sink.partition_indices(), or None.

Returns:

Merged groups satisfying both constraints, or None if neither source nor sink requires partitioning.

Return type:

list[list[int]] | None

Raises:

ValueError – If the source and sink constraints are incompatible (one requires indices together that the other requires apart).

physicsnemo_curator.run.base.make_progress_bar(
total: int,
*,
enabled: bool,
desc: str = 'run_pipeline',
) Any[source]#

Return a tqdm progress bar or None.

Parameters:
  • total (int) – Number of items.

  • enabled (bool) – Whether to attempt tqdm import.

  • desc (str) – Description for the progress bar.

Returns:

A tqdm progress bar, or None if disabled or unavailable.

Return type:

Any

physicsnemo_curator.run.base.process_index_group(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
indices: list[int],
) dict[int, list[str]][source]#

Process a group of pipeline indices sequentially.

Used when the sink provides partition_indices() to batch related indices onto the same worker (e.g. for chunk-aligned parallel writes).

Parameters:
  • pipeline (Pipeline) – The pipeline to execute.

  • indices (list[int]) – The indices to process (in order).

Returns:

Mapping of index to sink output paths.

Return type:

dict[int, list[str]]

physicsnemo_curator.run.base.process_single_index(
pipeline: physicsnemo_curator.core.base.Pipeline[Any],
index: int,
) list[str][source]#

Process a single pipeline index.

This is a module-level function to support pickling for multiprocess backends. After processing, any stateful filters with flush methods are automatically flushed to shard files.

Parameters:
  • pipeline (Pipeline) – The pipeline to execute.

  • index (int) – The index to process.

Returns:

File paths written by the sink.

Return type:

list[str]

physicsnemo_curator.run.base.process_single_index_packed(
args: tuple[physicsnemo_curator.core.base.Pipeline[Any], int],
) list[str][source]#

Process a single pipeline index (packed arguments for map functions).

Parameters:

args (tuple[Pipeline, int]) – A (pipeline, index) pair.

Returns:

File paths written by the sink.

Return type:

list[str]