pipeline_store#
Unified pipeline store with SQLite-backed metrics, checkpointing, and provenance.
Provides PipelineStore, a single SQLite database that combines
checkpoint tracking (completed/failed indices), per-index and per-stage
wall-clock metrics, and pipeline provenance (config hashing).
Also contains the metrics dataclasses (StageMetrics,
IndexMetrics, PipelineMetrics), the _TimedGenerator
timing utility, and provenance helpers for serializing pipeline configuration.
Usage#
>>> from physicsnemo_curator.core.pipeline_store import PipelineStore
>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(db_path=Path("run.db"), pipeline_config=config, config_hash=chash)
>>> store.is_completed(0) # None — not yet completed
>>> store.record_success(0, ["/out/0.vtk"], wall_time_ns=1_000_000, ...)
Attributes#
Classes#
Metrics for one |
|
Aggregated metrics across all processed indices. |
|
SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress. |
|
Metrics for a single pipeline stage (source, one filter, or sink). |
Module Contents#
- class physicsnemo_curator.core.pipeline_store.IndexMetrics[source]#
Metrics for one
__getitem__call (one source index).- Parameters:
index (int) – The source index that was processed.
stages (list[StageMetrics]) – Per-stage timing breakdown.
wall_time_ns (int) – Total wall-clock time for this index in nanoseconds.
peak_memory_bytes (int) – Peak Python memory usage during this index (from
tracemalloc).gpu_memory_bytes (int | None) – Peak GPU memory delta, or
Noneif GPU tracking was disabled.
- classmethod from_dict(data: dict[str, Any]) IndexMetrics[source]#
Reconstruct from a dictionary (e.g. deserialized JSON).
- stages: list[StageMetrics]#
- class physicsnemo_curator.core.pipeline_store.PipelineMetrics[source]#
Aggregated metrics across all processed indices.
- Parameters:
indices (list[IndexMetrics]) – Per-index metrics, one entry per
__getitem__call.
- to_console() None[source]#
Print a formatted summary table to stdout.
Outputs a human-readable table showing per-index and aggregate metrics. Uses only stdlib formatting (no external dependencies).
- to_csv(path: str | pathlib.Path) None[source]#
Write per-index metrics to a CSV file.
Each row represents one index. Stage timings are included as separate columns named
stage_<name>_ns.- Parameters:
path (str | pathlib.Path) – Output file path.
- to_json(path: str | pathlib.Path) None[source]#
Write metrics to a JSON file.
- Parameters:
path (str | pathlib.Path) – Output file path.
- indices: list[IndexMetrics] = []#
- property mean_index_time_ns: float#
Mean wall-clock time per index (nanoseconds).
- Returns:
Average per-index time, or
0.0if no indices were processed.- Return type:
- class physicsnemo_curator.core.pipeline_store.PipelineStore(
- db_path: pathlib.Path,
- pipeline_config: dict,
- config_hash: str,
SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.
Manages a single database with six tables:
pipeline_runs,index_results,stage_metrics,output_files,filter_artifacts, andworkers. Supports checkpoint resumption via config hashing, per-index success/error recording, aggregated metrics queries, and live worker progress tracking.- Parameters:
db_path (pathlib.Path) – Path to the SQLite database file. Created automatically if it does not exist.
pipeline_config (dict) – Full pipeline configuration dictionary (from
_pipeline_config()).config_hash (str) – SHA-256 hex hash of the pipeline configuration.
Examples
>>> config = _pipeline_config(pipeline) >>> chash = _config_hash(config) >>> store = PipelineStore(Path("run.db"), config, chash) >>> store.is_completed(0) # None if not yet done >>> store.record_success(0, ["/out/0.vtk"], 1_000_000, 4096, None, [])
Initialize the pipeline store.
- Parameters:
db_path (pathlib.Path) – Path to the SQLite database file.
pipeline_config (dict) – Full pipeline configuration dictionary.
config_hash (str) – SHA-256 hex hash of the pipeline configuration.
- all_filter_artifacts() dict[str, list[str]][source]#
Return all filter artifact paths grouped by filter name.
- completed_indices() set[int][source]#
Return the set of successfully completed indices for this run.
- filter_artifacts_for_index(index: int) dict[str, list[str]][source]#
Return filter artifact paths for a given source index.
- classmethod from_db(db_path: str | pathlib.Path) PipelineStore[source]#
Open an existing pipeline database in read-only mode.
This is the entry point for the dashboard and post-hoc analysis tools. It reads the
pipeline_runstable to recoverconfig_hashandpipeline_config, so the caller does not need to know them.- Parameters:
db_path (str or pathlib.Path) – Path to an existing
.dbfile produced by a pipeline run.- Returns:
A store instance backed by the existing database.
- Return type:
- Raises:
FileNotFoundError – If db_path does not exist.
ValueError – If the database contains no pipeline run records.
- index_metrics(index: int) IndexMetrics | None[source]#
Retrieve metrics for a single index.
- Parameters:
index (int) – Source index to query.
- Returns:
Metrics for the index, or
Noneif not found.- Return type:
IndexMetrics | None
- is_completed(index: int) list[str] | None[source]#
Check if an index has been completed successfully.
- metrics() PipelineMetrics[source]#
Build aggregated metrics from the database.
- Returns:
Aggregated metrics across all completed indices in this run.
- Return type:
- output_paths_for_index(index: int) list[str][source]#
Return the output file paths produced by a given source index.
- record_error(index: int, error: str, wall_time_ns: int) None[source]#
Record a failed index execution.
- record_filter_artifacts( ) None[source]#
Record file artifacts produced by a filter for a given index.
- record_success(
- index: int,
- output_paths: list[str],
- wall_time_ns: int,
- peak_memory_bytes: int,
- gpu_memory_bytes: int | None,
- stages: list[StageMetrics],
Record a successfully completed index with metrics.
- register_worker(worker_id: str, pid: int, hostname: str) None[source]#
Register a worker or update its heartbeat if already known.
- remaining_indices(total: int) list[int][source]#
Return indices not yet completed or failed for this run.
- reset() None[source]#
Clear all records for this run and re-register.
Deletes all index results, stage metrics, and pipeline run metadata from the database. The database file is kept and a fresh run is registered.
- reset_index(index: int) None[source]#
Remove records for a single index from this run.
- Parameters:
index (int) – Source index to remove.
- class physicsnemo_curator.core.pipeline_store.StageMetrics[source]#
Metrics for a single pipeline stage (source, one filter, or sink).
- Parameters:
- physicsnemo_curator.core.pipeline_store.logger#