pipeline_store#

Unified pipeline store with SQLite-backed metrics, checkpointing, and provenance.

Provides PipelineStore, a single SQLite database that combines checkpoint tracking (completed/failed indices), per-index and per-stage wall-clock metrics, and pipeline provenance (config hashing).

Also contains the metrics dataclasses (StageMetrics, IndexMetrics, PipelineMetrics), the _TimedGenerator timing utility, and provenance helpers for serializing pipeline configuration.

Usage#

>>> from physicsnemo_curator.core.pipeline_store import PipelineStore
>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(db_path=Path("run.db"), pipeline_config=config, config_hash=chash)
>>> store.is_completed(0)  # None — not yet completed
>>> store.record_success(0, ["/out/0.vtk"], wall_time_ns=1_000_000, ...)

Attributes#

Classes#

IndexMetrics

Metrics for one __getitem__ call (one source index).

PipelineMetrics

Aggregated metrics across all processed indices.

PipelineStore

SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.

StageMetrics

Metrics for a single pipeline stage (source, one filter, or sink).

Module Contents#

class physicsnemo_curator.core.pipeline_store.IndexMetrics[source]#

Metrics for one __getitem__ call (one source index).

Parameters:
  • index (int) – The source index that was processed.

  • stages (list[StageMetrics]) – Per-stage timing breakdown.

  • wall_time_ns (int) – Total wall-clock time for this index in nanoseconds.

  • peak_memory_bytes (int) – Peak Python memory usage during this index (from tracemalloc).

  • gpu_memory_bytes (int | None) – Peak GPU memory delta, or None if GPU tracking was disabled.

classmethod from_dict(data: dict[str, Any]) IndexMetrics[source]#

Reconstruct from a dictionary (e.g. deserialized JSON).

Parameters:

data (dict[str, Any]) – Dictionary as produced by to_dict().

Returns:

Reconstructed metrics object.

Return type:

IndexMetrics

to_dict() dict[str, Any][source]#

Convert to a plain dictionary.

Returns:

Nested dictionary with all metric fields.

Return type:

dict[str, Any]

gpu_memory_bytes: int | None#
index: int#
peak_memory_bytes: int#
stages: list[StageMetrics]#
wall_time_ns: int#
class physicsnemo_curator.core.pipeline_store.PipelineMetrics[source]#

Aggregated metrics across all processed indices.

Parameters:

indices (list[IndexMetrics]) – Per-index metrics, one entry per __getitem__ call.

summary() dict[str, Any][source]#

Return a summary dictionary for programmatic use.

Returns:

Dictionary with total/mean wall time, peak memory, index count, and per-index breakdowns.

Return type:

dict[str, Any]

to_console() None[source]#

Print a formatted summary table to stdout.

Outputs a human-readable table showing per-index and aggregate metrics. Uses only stdlib formatting (no external dependencies).

to_csv(path: str | pathlib.Path) None[source]#

Write per-index metrics to a CSV file.

Each row represents one index. Stage timings are included as separate columns named stage_<name>_ns.

Parameters:

path (str | pathlib.Path) – Output file path.

to_json(path: str | pathlib.Path) None[source]#

Write metrics to a JSON file.

Parameters:

path (str | pathlib.Path) – Output file path.

indices: list[IndexMetrics] = []#
property mean_index_time_ns: float#

Mean wall-clock time per index (nanoseconds).

Returns:

Average per-index time, or 0.0 if no indices were processed.

Return type:

float

property total_peak_memory_bytes: int#

Maximum peak memory observed across all indices (bytes).

Returns:

Max of per-index peak memory values.

Return type:

int

property total_wall_time_ns: int#

Total wall-clock time across all indices (nanoseconds).

Returns:

Sum of per-index wall times.

Return type:

int

class physicsnemo_curator.core.pipeline_store.PipelineStore(
db_path: pathlib.Path,
pipeline_config: dict,
config_hash: str,
)[source]#

SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.

Manages a single database with six tables: pipeline_runs, index_results, stage_metrics, output_files, filter_artifacts, and workers. Supports checkpoint resumption via config hashing, per-index success/error recording, aggregated metrics queries, and live worker progress tracking.

Parameters:
  • db_path (pathlib.Path) – Path to the SQLite database file. Created automatically if it does not exist.

  • pipeline_config (dict) – Full pipeline configuration dictionary (from _pipeline_config()).

  • config_hash (str) – SHA-256 hex hash of the pipeline configuration.

Examples

>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(Path("run.db"), config, chash)
>>> store.is_completed(0)  # None if not yet done
>>> store.record_success(0, ["/out/0.vtk"], 1_000_000, 4096, None, [])

Initialize the pipeline store.

Parameters:
  • db_path (pathlib.Path) – Path to the SQLite database file.

  • pipeline_config (dict) – Full pipeline configuration dictionary.

  • config_hash (str) – SHA-256 hex hash of the pipeline configuration.

active_workers() list[dict[str, Any]][source]#

Return all workers registered for this pipeline run.

Returns:

List of worker dictionaries with keys: worker_id, pid, hostname, started_at, last_heartbeat, current_index.

Return type:

list[dict[str, Any]]

all_filter_artifacts() dict[str, list[str]][source]#

Return all filter artifact paths grouped by filter name.

Returns:

Mapping of filter name to list of all artifact paths.

Return type:

dict[str, list[str]]

completed_indices() set[int][source]#

Return the set of successfully completed indices for this run.

Returns:

Indices with status='completed'.

Return type:

set[int]

failed_indices() dict[int, str][source]#

Return indices that failed with their error messages.

Returns:

Mapping from index to error message string.

Return type:

dict[int, str]

filter_artifacts_for_index(index: int) dict[str, list[str]][source]#

Return filter artifact paths for a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Mapping of filter name to list of artifact paths.

Return type:

dict[str, list[str]]

classmethod from_db(db_path: str | pathlib.Path) PipelineStore[source]#

Open an existing pipeline database in read-only mode.

This is the entry point for the dashboard and post-hoc analysis tools. It reads the pipeline_runs table to recover config_hash and pipeline_config, so the caller does not need to know them.

Parameters:

db_path (str or pathlib.Path) – Path to an existing .db file produced by a pipeline run.

Returns:

A store instance backed by the existing database.

Return type:

PipelineStore

Raises:
index_for_path(path: str) int | None[source]#

Find which source index produced a given output file.

Parameters:

path (str) – Output file path to look up.

Returns:

Source index that produced the file, or None if not found.

Return type:

int | None

index_metrics(index: int) IndexMetrics | None[source]#

Retrieve metrics for a single index.

Parameters:

index (int) – Source index to query.

Returns:

Metrics for the index, or None if not found.

Return type:

IndexMetrics | None

is_completed(index: int) list[str] | None[source]#

Check if an index has been completed successfully.

Parameters:

index (int) – Source index to check.

Returns:

Cached output paths if completed, None otherwise.

Return type:

list[str] | None

metrics() PipelineMetrics[source]#

Build aggregated metrics from the database.

Returns:

Aggregated metrics across all completed indices in this run.

Return type:

PipelineMetrics

output_paths_for_index(index: int) list[str][source]#

Return the output file paths produced by a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Output file paths ordered by sequence, or empty list if none.

Return type:

list[str]

record_error(index: int, error: str, wall_time_ns: int) None[source]#

Record a failed index execution.

Parameters:
  • index (int) – Source index that failed.

  • error (str) – Error message.

  • wall_time_ns (int) – Wall-clock time before the error in nanoseconds.

record_filter_artifacts(
index: int,
filter_name: str,
filter_order: int,
paths: list[str],
) None[source]#

Record file artifacts produced by a filter for a given index.

Parameters:
  • index (int) – Source index that was processed.

  • filter_name (str) – Human-readable name of the filter.

  • filter_order (int) – Position of the filter in the pipeline (0-indexed).

  • paths (list[str]) – File paths produced by the filter for this index.

record_success(
index: int,
output_paths: list[str],
wall_time_ns: int,
peak_memory_bytes: int,
gpu_memory_bytes: int | None,
stages: list[StageMetrics],
) None[source]#

Record a successfully completed index with metrics.

Parameters:
  • index (int) – Source index that completed.

  • output_paths (list[str]) – File paths written by the sink.

  • wall_time_ns (int) – Total wall-clock time in nanoseconds.

  • peak_memory_bytes (int) – Peak memory usage in bytes.

  • gpu_memory_bytes (int | None) – Peak GPU memory delta, or None.

  • stages (list[StageMetrics]) – Per-stage timing breakdown.

register_worker(worker_id: str, pid: int, hostname: str) None[source]#

Register a worker or update its heartbeat if already known.

Parameters:
  • worker_id (str) – Unique identifier for this worker (UUID hex).

  • pid (int) – OS process ID of the worker.

  • hostname (str) – Hostname of the machine running the worker.

remaining_indices(total: int) list[int][source]#

Return indices not yet completed or failed for this run.

Parameters:

total (int) – Total number of source indices.

Returns:

Sorted list of indices still needing processing.

Return type:

list[int]

reset() None[source]#

Clear all records for this run and re-register.

Deletes all index results, stage metrics, and pipeline run metadata from the database. The database file is kept and a fresh run is registered.

reset_index(index: int) None[source]#

Remove records for a single index from this run.

Parameters:

index (int) – Source index to remove.

summary(total: int) dict[str, Any][source]#

Return a summary of the store state.

Parameters:

total (int) – Total number of source indices.

Returns:

Dictionary with keys: total, completed, failed, remaining, config_hash, db_path, total_elapsed_s, workers.

Return type:

dict[str, Any]

worker_finish_index(worker_id: str) None[source]#

Record that a worker has finished processing its current index.

Parameters:

worker_id (str) – Unique identifier for this worker.

worker_start_index(worker_id: str, index: int) None[source]#

Record that a worker is starting to process an index.

Parameters:
  • worker_id (str) – Unique identifier for this worker.

  • index (int) – Source index being processed.

class physicsnemo_curator.core.pipeline_store.StageMetrics[source]#

Metrics for a single pipeline stage (source, one filter, or sink).

Parameters:
  • name (str) – Human-readable name of the stage (e.g. "source", "DoubleFilter", "sink").

  • wall_time_ns (int) – Wall-clock time in nanoseconds spent in this stage.

to_dict() dict[str, Any][source]#

Convert to a plain dictionary.

Returns:

Dictionary with "name" and "wall_time_ns" keys.

Return type:

dict[str, Any]

name: str#
wall_time_ns: int#
physicsnemo_curator.core.pipeline_store.logger#