base#
Abstract base classes for pipeline components and the Pipeline builder.
Attributes#
Classes#
Abstract filter/transform that processes a stream of T items. |
|
Descriptor for a configurable parameter on a pipeline component. |
|
Lazy pipeline that chains a source through filters into a sink. |
|
Abstract sink that persists items and returns output file paths. |
|
Abstract data source that yields items of type T. |
Module Contents#
- class physicsnemo_curator.core.base.Filter[T][source]#
Bases:
abc.ABCAbstract filter/transform that processes a stream of T items.
Filters receive a generator of items and yield zero or more items per input (full generator semantics — can expand, contract, or pass through).
Subclasses must set
nameanddescriptionand implementparams()and__call__().- artifacts() list[str][source]#
Return paths of files produced by this filter since the last call.
Stateful filters that write side-effect files (statistics, logs, etc.) should override this to report the paths written during the most recent
flush()or__call__()cycle. The framework calls this after each index to record filter artifacts in the pipeline store.The default implementation returns an empty list, which is correct for stateless (pass-through) filters.
- class physicsnemo_curator.core.base.Param[source]#
Descriptor for a configurable parameter on a pipeline component.
- Parameters:
name (str) – Parameter name (should match the
__init__keyword argument).description (str) – Human-readable help text shown in the interactive CLI.
type (type) – Expected Python type (
str,int,float,pathlib.Path, …).default (Any) – Default value. Use
REQUIRED(the default) to indicate the parameter must be supplied by the user.choices (list[str] | None) – If not None, the CLI will present a selection prompt instead of free-text input.
- default: Any#
- type: Param.type#
- class physicsnemo_curator.core.base.Pipeline[T][source]#
Lazy pipeline that chains a source through filters into a sink.
The pipeline is built incrementally using the
filter()andwrite()builder methods. Execution is deferred until the pipeline is indexed withpipeline[i], which processes only the i-th source item.- Parameters:
Examples
>>> pipeline = ( ... MySource(path="/data") ... .filter(FilterA()) ... .filter(FilterB()) ... .write(MySink(output="/out")) ... ) >>> pipeline[0] # lazily process source item 0 ['/out/item_0']
- all_filter_artifacts() dict[str, list[str]][source]#
Return all filter artifact paths grouped by filter name.
- Returns:
Mapping of filter name to list of all artifact paths.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- filter_artifacts_for_index(index: int) dict[str, list[str]][source]#
Return filter artifact paths for a given source index.
- index_for_path(path: str) int | None[source]#
Find which source index produced a given output file.
- Parameters:
path (str) – Output file path to look up.
- Returns:
Source index that produced the file, or
Noneif not found.- Return type:
int | None
- Raises:
RuntimeError – If
track_metricsisFalse.
- classmethod load(path: str | pathlib.Path) Pipeline[Any][source]#
Load a pipeline from a YAML or JSON configuration file.
- Parameters:
path (str | pathlib.Path) – Path to the pipeline configuration file.
- Returns:
Fully constructed pipeline ready for execution.
- Return type:
- Raises:
FileNotFoundError – If the file does not exist.
ValueError – If the file extension is not supported.
See also
Pipeline.saveSave a pipeline configuration.
- output_paths_for_index(index: int) list[str][source]#
Return the output file paths produced by a given source index.
- Parameters:
index (int) – Source index to query.
- Returns:
Output file paths ordered by sequence, or empty list if none.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- remaining_indices() list[int][source]#
Return indices not yet completed or failed.
- Returns:
Sorted list of indices still needing processing.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- reset() None[source]#
Clear all records for this pipeline run.
- Raises:
RuntimeError – If
track_metricsisFalse.
- reset_index(index: int) None[source]#
Remove records for a single index.
- Parameters:
index (int) – Source index to remove.
- Raises:
RuntimeError – If
track_metricsisFalse.
- save(path: str | pathlib.Path) None[source]#
Save this pipeline’s configuration to a YAML or JSON file.
The file format is determined by the extension:
.yaml/.yml→ YAML,.json→ JSON.- Parameters:
path (str | pathlib.Path) – Destination file path.
- Raises:
ValueError – If the file extension is not supported.
See also
Pipeline.loadRestore a pipeline from a saved file.
- summary() dict[str, Any][source]#
Return a summary of the store state.
- Returns:
Dictionary with
total,completed,failed,remaining,config_hash,db_path,total_elapsed_s.- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- write(s: Sink[T]) Pipeline[T][source]#
Return a new pipeline with the given sink attached.
If the sink exposes a
set_sourcemethod, the pipeline’s source is automatically injected so the sink can resolve naming placeholders (e.g.{relpath},{stem}).
- property active_workers: list[dict[str, Any]]#
Return all workers registered for this pipeline run.
- Returns:
List of worker dictionaries with keys:
worker_id,pid,hostname,started_at,last_heartbeat,current_index.- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- property completed_indices: set[int]#
Return the set of successfully completed indices.
- Returns:
Indices with recorded successful completions.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- db_dir: pathlib.Path | None = None#
- property db_path: pathlib.Path | None#
Return the resolved database path, or
Noneif metrics are disabled.- Returns:
Absolute path to the SQLite database file, or
Nonewhentrack_metricsisFalse.- Return type:
pathlib.Path or None
- property failed_indices: dict[int, str]#
Return indices that failed with their error messages.
- Returns:
Mapping from index to error message string.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- property metrics: physicsnemo_curator.core.pipeline_store.PipelineMetrics#
Return aggregated metrics from the store.
- Returns:
Aggregated metrics across all completed indices.
- Return type:
- Raises:
RuntimeError – If
track_metricsisFalse.
- class physicsnemo_curator.core.base.Sink[T][source]#
Bases:
abc.ABCAbstract sink that persists items and returns output file paths.
The sink consumes a generator of items and writes each one to storage, returning the file paths of the written outputs.
Subclasses must set
nameanddescriptionand implementparams()and__call__().
- class physicsnemo_curator.core.base.Source[T][source]#
Bases:
abc.ABCAbstract data source that yields items of type T.
A source represents a collection of data items (e.g. files on disk). Each item is accessed by integer index and may yield one or more T objects (generator semantics allow a single source item to expand into multiple outputs).
Subclasses must set the class-level
nameanddescriptionattributes and implementparams(),__len__(), and__getitem__().Examples
>>> pipeline = MySource(path="/data").filter(MyFilter()).write(MySink()) >>> pipeline[0] # process first source item lazily
- classmethod params() list[Param][source]#
- Abstractmethod:
Declare the configurable parameters for this source.
- physicsnemo_curator.core.base.REQUIRED: Any#
Sentinel value indicating a
Paramhas no default and must be provided.
- physicsnemo_curator.core.base.logger#