base#

Abstract base classes for pipeline components and the Pipeline builder.

Attributes#

REQUIRED

Sentinel value indicating a Param has no default and must be provided.

logger

Classes#

Filter

Abstract filter/transform that processes a stream of T items.

Param

Descriptor for a configurable parameter on a pipeline component.

Pipeline

Lazy pipeline that chains a source through filters into a sink.

Sink

Abstract sink that persists items and returns output file paths.

Source

Abstract data source that yields items of type T.

Module Contents#

class physicsnemo_curator.core.base.Filter[T][source]#

Bases: abc.ABC

Abstract filter/transform that processes a stream of T items.

Filters receive a generator of items and yield zero or more items per input (full generator semantics — can expand, contract, or pass through).

Subclasses must set name and description and implement params() and __call__().

artifacts() list[str][source]#

Return paths of files produced by this filter since the last call.

Stateful filters that write side-effect files (statistics, logs, etc.) should override this to report the paths written during the most recent flush() or __call__() cycle. The framework calls this after each index to record filter artifacts in the pipeline store.

The default implementation returns an empty list, which is correct for stateless (pass-through) filters.

Returns:

Paths of files written, or [] if none.

Return type:

list[str]

classmethod params() list[Param][source]#
Abstractmethod:

Declare the configurable parameters for this filter.

Returns:

Ordered list of parameter descriptors.

Return type:

list[Param]

description: ClassVar[str]#

Short description shown in the interactive CLI.

name: ClassVar[str]#

Human-readable display name for the interactive CLI.

class physicsnemo_curator.core.base.Param[source]#

Descriptor for a configurable parameter on a pipeline component.

Parameters:
  • name (str) – Parameter name (should match the __init__ keyword argument).

  • description (str) – Human-readable help text shown in the interactive CLI.

  • type (type) – Expected Python type (str, int, float, pathlib.Path, …).

  • default (Any) – Default value. Use REQUIRED (the default) to indicate the parameter must be supplied by the user.

  • choices (list[str] | None) – If not None, the CLI will present a selection prompt instead of free-text input.

choices: list[str] | None = None#
default: Any#
description: str#
name: str#
property required: bool#

Return True if this parameter has no default value.

type: Param.type#
class physicsnemo_curator.core.base.Pipeline[T][source]#

Lazy pipeline that chains a source through filters into a sink.

The pipeline is built incrementally using the filter() and write() builder methods. Execution is deferred until the pipeline is indexed with pipeline[i], which processes only the i-th source item.

Parameters:
  • source (Source[T]) – The data source.

  • filters (list[Filter[T]]) – Ordered list of filters to apply.

  • sink (Sink[T] | None) – Optional sink for writing output.

Examples

>>> pipeline = (
...     MySource(path="/data")
...     .filter(FilterA())
...     .filter(FilterB())
...     .write(MySink(output="/out"))
... )
>>> pipeline[0]   # lazily process source item 0
['/out/item_0']
all_filter_artifacts() dict[str, list[str]][source]#

Return all filter artifact paths grouped by filter name.

Returns:

Mapping of filter name to list of all artifact paths.

Return type:

dict[str, list[str]]

Raises:

RuntimeError – If track_metrics is False.

filter(f: Filter[T]) Pipeline[T][source]#

Return a new pipeline with an additional filter appended.

Parameters:

f (Filter[T]) – The filter to append.

Returns:

A new pipeline instance (the original is unchanged).

Return type:

Pipeline[T]

filter_artifacts_for_index(index: int) dict[str, list[str]][source]#

Return filter artifact paths for a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Mapping of filter name to list of artifact paths.

Return type:

dict[str, list[str]]

Raises:

RuntimeError – If track_metrics is False.

index_for_path(path: str) int | None[source]#

Find which source index produced a given output file.

Parameters:

path (str) – Output file path to look up.

Returns:

Source index that produced the file, or None if not found.

Return type:

int | None

Raises:

RuntimeError – If track_metrics is False.

classmethod load(path: str | pathlib.Path) Pipeline[Any][source]#

Load a pipeline from a YAML or JSON configuration file.

Parameters:

path (str | pathlib.Path) – Path to the pipeline configuration file.

Returns:

Fully constructed pipeline ready for execution.

Return type:

Pipeline

Raises:

See also

Pipeline.save

Save a pipeline configuration.

output_paths_for_index(index: int) list[str][source]#

Return the output file paths produced by a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Output file paths ordered by sequence, or empty list if none.

Return type:

list[str]

Raises:

RuntimeError – If track_metrics is False.

remaining_indices() list[int][source]#

Return indices not yet completed or failed.

Returns:

Sorted list of indices still needing processing.

Return type:

list[int]

Raises:

RuntimeError – If track_metrics is False.

reset() None[source]#

Clear all records for this pipeline run.

Raises:

RuntimeError – If track_metrics is False.

reset_index(index: int) None[source]#

Remove records for a single index.

Parameters:

index (int) – Source index to remove.

Raises:

RuntimeError – If track_metrics is False.

save(path: str | pathlib.Path) None[source]#

Save this pipeline’s configuration to a YAML or JSON file.

The file format is determined by the extension: .yaml / .yml → YAML, .json → JSON.

Parameters:

path (str | pathlib.Path) – Destination file path.

Raises:

ValueError – If the file extension is not supported.

See also

Pipeline.load

Restore a pipeline from a saved file.

summary() dict[str, Any][source]#

Return a summary of the store state.

Returns:

Dictionary with total, completed, failed, remaining, config_hash, db_path, total_elapsed_s.

Return type:

dict[str, Any]

Raises:

RuntimeError – If track_metrics is False.

write(s: Sink[T]) Pipeline[T][source]#

Return a new pipeline with the given sink attached.

If the sink exposes a set_source method, the pipeline’s source is automatically injected so the sink can resolve naming placeholders (e.g. {relpath}, {stem}).

Parameters:

s (Sink[T]) – The sink to attach.

Returns:

A new pipeline instance (the original is unchanged).

Return type:

Pipeline[T]

property active_workers: list[dict[str, Any]]#

Return all workers registered for this pipeline run.

Returns:

List of worker dictionaries with keys: worker_id, pid, hostname, started_at, last_heartbeat, current_index.

Return type:

list[dict[str, Any]]

Raises:

RuntimeError – If track_metrics is False.

property completed_indices: set[int]#

Return the set of successfully completed indices.

Returns:

Indices with recorded successful completions.

Return type:

set[int]

Raises:

RuntimeError – If track_metrics is False.

db_dir: pathlib.Path | None = None#
property db_path: pathlib.Path | None#

Return the resolved database path, or None if metrics are disabled.

Returns:

Absolute path to the SQLite database file, or None when track_metrics is False.

Return type:

pathlib.Path or None

property failed_indices: dict[int, str]#

Return indices that failed with their error messages.

Returns:

Mapping from index to error message string.

Return type:

dict[int, str]

Raises:

RuntimeError – If track_metrics is False.

filters: list[Filter[T]] = []#
property metrics: physicsnemo_curator.core.pipeline_store.PipelineMetrics#

Return aggregated metrics from the store.

Returns:

Aggregated metrics across all completed indices.

Return type:

PipelineMetrics

Raises:

RuntimeError – If track_metrics is False.

sink: Sink[T] | None = None#
source: Source[T]#
track_gpu: bool = False#
track_memory: bool = True#
track_metrics: bool = True#
class physicsnemo_curator.core.base.Sink[T][source]#

Bases: abc.ABC

Abstract sink that persists items and returns output file paths.

The sink consumes a generator of items and writes each one to storage, returning the file paths of the written outputs.

Subclasses must set name and description and implement params() and __call__().

classmethod params() list[Param][source]#
Abstractmethod:

Declare the configurable parameters for this sink.

Returns:

Ordered list of parameter descriptors.

Return type:

list[Param]

description: ClassVar[str]#

Short description shown in the interactive CLI.

name: ClassVar[str]#

Human-readable display name for the interactive CLI.

class physicsnemo_curator.core.base.Source[T][source]#

Bases: abc.ABC

Abstract data source that yields items of type T.

A source represents a collection of data items (e.g. files on disk). Each item is accessed by integer index and may yield one or more T objects (generator semantics allow a single source item to expand into multiple outputs).

Subclasses must set the class-level name and description attributes and implement params(), __len__(), and __getitem__().

Examples

>>> pipeline = MySource(path="/data").filter(MyFilter()).write(MySink())
>>> pipeline[0]  # process first source item lazily
filter(f: Filter[T]) Pipeline[T][source]#

Create a Pipeline with this source and a single filter.

Parameters:

f (Filter[T]) – The filter to append.

Returns:

A new pipeline containing this source and the given filter.

Return type:

Pipeline[T]

classmethod params() list[Param][source]#
Abstractmethod:

Declare the configurable parameters for this source.

Returns:

Ordered list of parameter descriptors.

Return type:

list[Param]

write(s: Sink[T]) Pipeline[T][source]#

Create a Pipeline with this source and a sink (no filters).

If the sink exposes a set_source method, the source is automatically injected so the sink can resolve naming placeholders (e.g. {relpath}, {stem}) from the source.

Parameters:

s (Sink[T]) – The sink to attach.

Returns:

A new pipeline containing this source and the given sink.

Return type:

Pipeline[T]

description: ClassVar[str]#

Short description shown in the interactive CLI.

name: ClassVar[str]#

Human-readable display name for the interactive CLI.

physicsnemo_curator.core.base.REQUIRED: Any#

Sentinel value indicating a Param has no default and must be provided.

physicsnemo_curator.core.base.logger#