base#
Base classes for pipeline execution backends.
This module defines the abstract interface that all execution backends must implement, along with common utilities.
Classes#
Abstract base class for pipeline execution backends. |
|
Configuration for pipeline execution. |
|
Multi-line progress display showing per-worker activity. |
Functions#
|
Merge partition groups into at most n_workers batches. |
|
Intersect source and sink partition constraints. |
|
Return a tqdm progress bar or None. |
|
Process a group of pipeline indices sequentially. |
|
Process a single pipeline index. |
|
Process a single pipeline index (packed arguments for map functions). |
Module Contents#
- class physicsnemo_curator.run.base.RunBackend[source]#
Bases:
abc.ABCAbstract base class for pipeline execution backends.
Subclasses implement different parallelization strategies (threading, multiprocessing, distributed computing, workflow orchestrators, etc.).
Class Attributes#
- namestr
Unique identifier for this backend (e.g., “sequential”, “process_pool”).
- descriptionstr
Human-readable description of the backend.
- requirestuple[str, …]
Optional package dependencies required by this backend.
- classmethod is_available() bool[source]#
Check if this backend’s dependencies are installed.
- Returns:
True if all required packages are available.
- Return type:
- class physicsnemo_curator.run.base.RunConfig[source]#
Configuration for pipeline execution.
- Parameters:
n_jobs (int) – Number of parallel workers.
1forces sequential execution.-1uses all available CPUs. Values<= 0follow the conventioncpu_count + 1 + n_jobs.use_tui (bool) – Whether to show the full-screen Textual TUI for progress (requires an interactive terminal). When
False, prints simple timestamped log lines to the console instead.indices (list[int] | None) – Specific source indices to process.
Noneprocesses all indices.backend_options (dict[str, Any]) – Additional backend-specific options.
- class physicsnemo_curator.run.base.WorkerProgressDisplay(
- total: int,
- n_workers: int,
- *,
- enabled: bool = True,
- desc: str = 'run_pipeline',
- show_worker_bars: bool = False,
Multi-line progress display showing per-worker activity.
Renders an overall progress bar plus optionally one bar per active worker (up to
_MAX_WORKER_BARS). Falls back gracefully when tqdm is not installed or progress is disabled.- Parameters:
total (int) – Total number of items to process.
n_workers (int) – Number of parallel workers.
enabled (bool) – Whether to show progress at all.
desc (str) – Description label for the overall bar.
show_worker_bars (bool) – Whether to show per-worker progress bars. Defaults to False to avoid console conflicts with multiple processes.
- complete_item() None[source]#
Increment the overall bar without worker tracking.
Use this for backends where individual worker identity is not available.
- worker_done(worker_id: int) None[source]#
Mark a worker as idle (does NOT update main bar - use complete_item).
- Parameters:
worker_id (int) – Zero-based worker identifier.
- physicsnemo_curator.run.base.batch_groups( ) list[list[int]][source]#
Merge partition groups into at most n_workers batches.
When there are more groups than workers, groups are distributed across workers using a greedy bin-packing strategy (assign each group to the lightest batch) to balance load.
Each batch is a flat list of indices preserving the constraint that indices from the same original group are always together.
- physicsnemo_curator.run.base.intersect_partitions( ) list[list[int]] | None[source]#
Intersect source and sink partition constraints.
Both the source and sink may independently declare that certain indices MUST be processed by the same worker. This function computes the finest partition that satisfies both constraints, or raises
ValueErrorif the constraints are incompatible.- Parameters:
- Returns:
Merged groups satisfying both constraints, or
Noneif neither source nor sink requires partitioning.- Return type:
- Raises:
ValueError – If the source and sink constraints are incompatible (one requires indices together that the other requires apart).
- physicsnemo_curator.run.base.process_index_group(
- pipeline: physicsnemo_curator.core.base.Pipeline[Any],
- indices: list[int],
Process a group of pipeline indices sequentially.
Used when the sink provides
partition_indices()to batch related indices onto the same worker (e.g. for chunk-aligned parallel writes).
- physicsnemo_curator.run.base.process_single_index(
- pipeline: physicsnemo_curator.core.base.Pipeline[Any],
- index: int,
Process a single pipeline index.
This is a module-level function to support pickling for multiprocess backends. After processing, any stateful filters with
flushmethods are automatically flushed to shard files.