pipeline_store#
Unified pipeline store with SQLite-backed metrics, checkpointing, and provenance.
Provides PipelineStore, a single SQLite database that combines
checkpoint tracking (completed/failed indices), per-index and per-stage
wall-clock metrics, and pipeline provenance (config hashing).
Also contains the metrics dataclasses (StageMetrics,
IndexMetrics, PipelineMetrics), the _TimedGenerator
timing utility, and provenance helpers for serializing pipeline configuration.
Usage#
>>> from physicsnemo_curator.core.pipeline_store import PipelineStore
>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(db_path=Path("run.db"), pipeline_config=config, config_hash=chash)
>>> store.is_completed(0) # None — not yet completed
>>> store.record_success(0, ["/out/0.vtk"], wall_time_ns=1_000_000, ...)
Attributes#
Classes#
Metrics for one |
|
Aggregated metrics across all processed indices. |
|
SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress. |
|
Metrics for a single pipeline stage (source, one filter, or sink). |
Module Contents#
- class physicsnemo_curator.core.pipeline_store.IndexMetrics[source]#
Metrics for one
__getitem__call (one source index).- Parameters:
index (int) – The source index that was processed.
stages (list[StageMetrics]) – Per-stage timing breakdown.
wall_time_ns (int) – Total wall-clock time for this index in nanoseconds.
peak_memory_bytes (int) – Peak Python memory usage during this index (from
tracemalloc).gpu_memory_bytes (int | None) – Peak GPU memory delta, or
Noneif GPU tracking was disabled.
- classmethod from_dict(data: dict[str, Any]) IndexMetrics[source]#
Reconstruct from a dictionary (e.g. deserialized JSON).
- stages: list[StageMetrics]#
- class physicsnemo_curator.core.pipeline_store.PipelineMetrics[source]#
Aggregated metrics across all processed indices.
- Parameters:
indices (list[IndexMetrics]) – Per-index metrics, one entry per
__getitem__call.
- to_console() None[source]#
Print a formatted summary table to stdout.
Outputs a human-readable table showing per-index and aggregate metrics. Uses only stdlib formatting (no external dependencies).
- to_csv(path: str | pathlib.Path) None[source]#
Write per-index metrics to a CSV file.
Each row represents one index. Stage timings are included as separate columns named
stage_<name>_ns.- Parameters:
path (str | pathlib.Path) – Output file path.
- to_json(path: str | pathlib.Path) None[source]#
Write metrics to a JSON file.
- Parameters:
path (str | pathlib.Path) – Output file path.
- indices: list[IndexMetrics] = []#
- property mean_index_time_ns: float#
Mean wall-clock time per index (nanoseconds).
- Returns:
Average per-index time, or
0.0if no indices were processed.- Return type:
- class physicsnemo_curator.core.pipeline_store.PipelineStore(
- db_path: pathlib.Path,
- pipeline_config: dict,
- config_hash: str,
- *,
- _worker: bool = False,
SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.
Manages a single database with six tables:
pipeline_runs,index_results,stage_metrics,output_files,filter_artifacts, andworkers. Supports checkpoint resumption via config hashing, per-index success/error recording, aggregated metrics queries, and live worker progress tracking.- Parameters:
db_path (pathlib.Path) – Path to the SQLite database file. Created automatically if it does not exist.
pipeline_config (dict) – Full pipeline configuration dictionary (from
_pipeline_config()).config_hash (str) – SHA-256 hex hash of the pipeline configuration.
Examples
>>> config = _pipeline_config(pipeline) >>> chash = _config_hash(config) >>> store = PipelineStore(Path("run.db"), config, chash) >>> store.is_completed(0) # None if not yet done >>> store.record_success(0, ["/out/0.vtk"], 1_000_000, 4096, None, [])
Initialize the pipeline store.
- Parameters:
db_path (pathlib.Path) – Path to the SQLite database file.
pipeline_config (dict) – Full pipeline configuration dictionary.
config_hash (str) – SHA-256 hex hash of the pipeline configuration.
_worker (bool, optional) – If
True, skip schema creation and just look up the existingrun_id. Used by child processes that reconnect to a database already initialised by the parent.
- active_workers( ) list[dict[str, Any]][source]#
Return all workers registered for this pipeline run.
- all_filter_artifacts() dict[str, list[str]][source]#
Return all filter artifact paths grouped by filter name.
- checkpoint() None[source]#
Force a WAL checkpoint to flush data to the main database file.
This ensures that all committed writes are transferred from the WAL file into the main
.dbfile, making them visible to readers that open the database in a separate process (e.g. the dashboard).Uses
PASSIVEmode which does not block concurrent readers or writers. It is safe to call at any time.
- completed_indices() set[int][source]#
Return the set of successfully completed indices for this run.
- filter_artifacts_for_index(index: int) dict[str, list[str]][source]#
Return filter artifact paths for a given source index.
- classmethod from_db(db_path: str | pathlib.Path) PipelineStore[source]#
Open an existing pipeline database in read-only mode.
This is the entry point for the dashboard and post-hoc analysis tools. It reads the
pipeline_runstable to recoverconfig_hashandpipeline_config, so the caller does not need to know them.- Parameters:
db_path (str or pathlib.Path) – Path to an existing
.dbfile produced by a pipeline run.- Returns:
A store instance backed by the existing database.
- Return type:
- Raises:
FileNotFoundError – If db_path does not exist.
ValueError – If the database contains no pipeline run records.
- get_logs( ) list[dict[str, Any]][source]#
Retrieve log entries since a given ID.
- Parameters:
- Returns:
Log entries with keys: id, timestamp, level, level_name, logger_name, message, worker_id, idx.
- Return type:
- get_total_indices() int | None[source]#
Get the total number of source indices for this run.
- Returns:
Total indices if set, otherwise None.
- Return type:
int | None
- index_metrics(index: int) IndexMetrics | None[source]#
Retrieve metrics for a single index.
- Parameters:
index (int) – Source index to query.
- Returns:
Metrics for the index, or
Noneif not found.- Return type:
IndexMetrics | None
- is_completed(index: int) list[str] | None[source]#
Check if an index has been completed successfully.
- metrics() PipelineMetrics[source]#
Build aggregated metrics from the database.
- Returns:
Aggregated metrics across all completed indices in this run.
- Return type:
- output_paths_for_index(index: int) list[str][source]#
Return the output file paths produced by a given source index.
- record_error(index: int, error: str, wall_time_ns: int) None[source]#
Record a failed index execution.
- record_filter_artifacts( ) None[source]#
Record file artifacts produced by a filter for a given index.
- record_logs( ) None[source]#
Record a batch of log entries.
Uses a single transaction for efficiency and minimal lock time. Each entry is a tuple of:
(timestamp, level, level_name, logger_name, message, worker_id, idx)
- record_success(
- index: int,
- output_paths: list[str],
- wall_time_ns: int,
- peak_memory_bytes: int,
- gpu_memory_bytes: int | None,
- stages: list[StageMetrics],
Record a successfully completed index with metrics.
- register_worker( ) None[source]#
Register a worker or update its heartbeat if already known.
- Parameters:
worker_id (str) – Unique identifier for this worker (UUID hex).
pid (int) – OS process ID of the worker.
hostname (str) – Hostname of the machine running the worker.
invocation_id (str | None, optional) – Unique identifier for this
run_pipelineinvocation. Used to partition workers when the same pipeline is run concurrently with different index sets.
- remaining_indices(total: int) list[int][source]#
Return indices not yet completed or failed for this run.
- replace_filter_artifacts( ) None[source]#
Replace shard artifact paths with a single merged path.
Removes all rows matching old_paths for the given filter and inserts one row pointing to merged_path. This keeps the dashboard pointing at the final merged file after
gather_pipeline()completes.
- reset() None[source]#
Clear all records for this run and re-register.
Deletes all index results, stage metrics, and pipeline run metadata from the database. The database file is kept and a fresh run is registered.
- reset_index(index: int) None[source]#
Remove records for a single index from this run.
- Parameters:
index (int) – Source index to remove.
- resolve_artifact(path: str) pathlib.Path[source]#
Resolve a relative artifact path using the stored run directory.
- set_total_indices(total: int) None[source]#
Store the total number of source indices for this run.
Called by the pipeline runner once the source length is known. This value is persisted so the dashboard can show accurate progress even when the pipeline is still running.
- Parameters:
total (int) – Total number of indices the pipeline will process.
- worker_finish_index(worker_id: str) None[source]#
Record that a worker has finished processing its current index.
- Parameters:
worker_id (str) – Unique identifier for this worker.
- class physicsnemo_curator.core.pipeline_store.StageMetrics[source]#
Metrics for a single pipeline stage (source, one filter, or sink).
- Parameters:
- physicsnemo_curator.core.pipeline_store.logger#