pipeline_store#

Unified pipeline store with SQLite-backed metrics, checkpointing, and provenance.

Provides PipelineStore, a single SQLite database that combines checkpoint tracking (completed/failed indices), per-index and per-stage wall-clock metrics, and pipeline provenance (config hashing).

Also contains the metrics dataclasses (StageMetrics, IndexMetrics, PipelineMetrics), the _TimedGenerator timing utility, and provenance helpers for serializing pipeline configuration.

Usage#

>>> from physicsnemo_curator.core.pipeline_store import PipelineStore
>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(db_path=Path("run.db"), pipeline_config=config, config_hash=chash)
>>> store.is_completed(0)  # None — not yet completed
>>> store.record_success(0, ["/out/0.vtk"], wall_time_ns=1_000_000, ...)

Attributes#

Classes#

IndexMetrics

Metrics for one __getitem__ call (one source index).

PipelineMetrics

Aggregated metrics across all processed indices.

PipelineStore

SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.

StageMetrics

Metrics for a single pipeline stage (source, one filter, or sink).

Module Contents#

class physicsnemo_curator.core.pipeline_store.IndexMetrics[source]#

Metrics for one __getitem__ call (one source index).

Parameters:
  • index (int) – The source index that was processed.

  • stages (list[StageMetrics]) – Per-stage timing breakdown.

  • wall_time_ns (int) – Total wall-clock time for this index in nanoseconds.

  • peak_memory_bytes (int) – Peak Python memory usage during this index (from tracemalloc).

  • gpu_memory_bytes (int | None) – Peak GPU memory delta, or None if GPU tracking was disabled.

classmethod from_dict(data: dict[str, Any]) IndexMetrics[source]#

Reconstruct from a dictionary (e.g. deserialized JSON).

Parameters:

data (dict[str, Any]) – Dictionary as produced by to_dict().

Returns:

Reconstructed metrics object.

Return type:

IndexMetrics

to_dict() dict[str, Any][source]#

Convert to a plain dictionary.

Returns:

Nested dictionary with all metric fields.

Return type:

dict[str, Any]

gpu_memory_bytes: int | None#
index: int#
peak_memory_bytes: int#
stages: list[StageMetrics]#
wall_time_ns: int#
class physicsnemo_curator.core.pipeline_store.PipelineMetrics[source]#

Aggregated metrics across all processed indices.

Parameters:

indices (list[IndexMetrics]) – Per-index metrics, one entry per __getitem__ call.

summary() dict[str, Any][source]#

Return a summary dictionary for programmatic use.

Returns:

Dictionary with total/mean wall time, peak memory, index count, and per-index breakdowns.

Return type:

dict[str, Any]

to_console() None[source]#

Print a formatted summary table to stdout.

Outputs a human-readable table showing per-index and aggregate metrics. Uses only stdlib formatting (no external dependencies).

to_csv(path: str | pathlib.Path) None[source]#

Write per-index metrics to a CSV file.

Each row represents one index. Stage timings are included as separate columns named stage_<name>_ns.

Parameters:

path (str | pathlib.Path) – Output file path.

to_json(path: str | pathlib.Path) None[source]#

Write metrics to a JSON file.

Parameters:

path (str | pathlib.Path) – Output file path.

indices: list[IndexMetrics] = []#
property mean_index_time_ns: float#

Mean wall-clock time per index (nanoseconds).

Returns:

Average per-index time, or 0.0 if no indices were processed.

Return type:

float

property total_peak_memory_bytes: int#

Maximum peak memory observed across all indices (bytes).

Returns:

Max of per-index peak memory values.

Return type:

int

property total_wall_time_ns: int#

Total wall-clock time across all indices (nanoseconds).

Returns:

Sum of per-index wall times.

Return type:

int

class physicsnemo_curator.core.pipeline_store.PipelineStore(
db_path: pathlib.Path,
pipeline_config: dict,
config_hash: str,
*,
_worker: bool = False,
)[source]#

SQLite-backed store combining checkpoint tracking, metrics, provenance, and worker progress.

Manages a single database with six tables: pipeline_runs, index_results, stage_metrics, output_files, filter_artifacts, and workers. Supports checkpoint resumption via config hashing, per-index success/error recording, aggregated metrics queries, and live worker progress tracking.

Parameters:
  • db_path (pathlib.Path) – Path to the SQLite database file. Created automatically if it does not exist.

  • pipeline_config (dict) – Full pipeline configuration dictionary (from _pipeline_config()).

  • config_hash (str) – SHA-256 hex hash of the pipeline configuration.

Examples

>>> config = _pipeline_config(pipeline)
>>> chash = _config_hash(config)
>>> store = PipelineStore(Path("run.db"), config, chash)
>>> store.is_completed(0)  # None if not yet done
>>> store.record_success(0, ["/out/0.vtk"], 1_000_000, 4096, None, [])

Initialize the pipeline store.

Parameters:
  • db_path (pathlib.Path) – Path to the SQLite database file.

  • pipeline_config (dict) – Full pipeline configuration dictionary.

  • config_hash (str) – SHA-256 hex hash of the pipeline configuration.

  • _worker (bool, optional) – If True, skip schema creation and just look up the existing run_id. Used by child processes that reconnect to a database already initialised by the parent.

active_workers(
invocation_id: str | None = None,
) list[dict[str, Any]][source]#

Return all workers registered for this pipeline run.

Parameters:

invocation_id (str | None, optional) – If provided, only return workers from this invocation.

Returns:

List of worker dictionaries with keys: worker_id, pid, hostname, started_at, last_heartbeat, current_index, completed_count, invocation_id.

Return type:

list[dict[str, Any]]

all_filter_artifacts() dict[str, list[str]][source]#

Return all filter artifact paths grouped by filter name.

Returns:

Mapping of filter name to list of all artifact paths.

Return type:

dict[str, list[str]]

checkpoint() None[source]#

Force a WAL checkpoint to flush data to the main database file.

This ensures that all committed writes are transferred from the WAL file into the main .db file, making them visible to readers that open the database in a separate process (e.g. the dashboard).

Uses PASSIVE mode which does not block concurrent readers or writers. It is safe to call at any time.

completed_indices() set[int][source]#

Return the set of successfully completed indices for this run.

Returns:

Indices with status='completed'.

Return type:

set[int]

failed_indices() dict[int, str][source]#

Return indices that failed with their error messages.

Returns:

Mapping from index to error message string.

Return type:

dict[int, str]

filter_artifacts_for_index(index: int) dict[str, list[str]][source]#

Return filter artifact paths for a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Mapping of filter name to list of artifact paths.

Return type:

dict[str, list[str]]

classmethod from_db(db_path: str | pathlib.Path) PipelineStore[source]#

Open an existing pipeline database in read-only mode.

This is the entry point for the dashboard and post-hoc analysis tools. It reads the pipeline_runs table to recover config_hash and pipeline_config, so the caller does not need to know them.

Parameters:

db_path (str or pathlib.Path) – Path to an existing .db file produced by a pipeline run.

Returns:

A store instance backed by the existing database.

Return type:

PipelineStore

Raises:
get_logs(
since_id: int = 0,
limit: int = 100,
min_level: int = 0,
) list[dict[str, Any]][source]#

Retrieve log entries since a given ID.

Parameters:
  • since_id (int) – Return logs with id > since_id (for polling new entries).

  • limit (int) – Maximum number of entries to return.

  • min_level (int) – Minimum log level (e.g., 20 for INFO, 10 for DEBUG).

Returns:

Log entries with keys: id, timestamp, level, level_name, logger_name, message, worker_id, idx.

Return type:

list[dict]

get_total_indices() int | None[source]#

Get the total number of source indices for this run.

Returns:

Total indices if set, otherwise None.

Return type:

int | None

index_for_path(path: str) int | None[source]#

Find which source index produced a given output file.

Parameters:

path (str) – Output file path to look up.

Returns:

Source index that produced the file, or None if not found.

Return type:

int | None

index_metrics(index: int) IndexMetrics | None[source]#

Retrieve metrics for a single index.

Parameters:

index (int) – Source index to query.

Returns:

Metrics for the index, or None if not found.

Return type:

IndexMetrics | None

is_completed(index: int) list[str] | None[source]#

Check if an index has been completed successfully.

Parameters:

index (int) – Source index to check.

Returns:

Cached output paths if completed, None otherwise.

Return type:

list[str] | None

metrics() PipelineMetrics[source]#

Build aggregated metrics from the database.

Returns:

Aggregated metrics across all completed indices in this run.

Return type:

PipelineMetrics

output_paths_for_index(index: int) list[str][source]#

Return the output file paths produced by a given source index.

Parameters:

index (int) – Source index to query.

Returns:

Output file paths ordered by sequence, or empty list if none.

Return type:

list[str]

record_error(index: int, error: str, wall_time_ns: int) None[source]#

Record a failed index execution.

Parameters:
  • index (int) – Source index that failed.

  • error (str) – Error message.

  • wall_time_ns (int) – Wall-clock time before the error in nanoseconds.

record_filter_artifacts(
index: int,
filter_name: str,
filter_order: int,
paths: list[str],
) None[source]#

Record file artifacts produced by a filter for a given index.

Parameters:
  • index (int) – Source index that was processed.

  • filter_name (str) – Human-readable name of the filter.

  • filter_order (int) – Position of the filter in the pipeline (0-indexed).

  • paths (list[str]) – File paths produced by the filter for this index.

record_logs(
logs: list[tuple[str, int, str, str, str, str | None, int | None]],
) None[source]#

Record a batch of log entries.

Uses a single transaction for efficiency and minimal lock time. Each entry is a tuple of: (timestamp, level, level_name, logger_name, message, worker_id, idx)

Parameters:

logs (list[tuple]) – List of log entry tuples.

record_success(
index: int,
output_paths: list[str],
wall_time_ns: int,
peak_memory_bytes: int,
gpu_memory_bytes: int | None,
stages: list[StageMetrics],
) None[source]#

Record a successfully completed index with metrics.

Parameters:
  • index (int) – Source index that completed.

  • output_paths (list[str]) – File paths written by the sink.

  • wall_time_ns (int) – Total wall-clock time in nanoseconds.

  • peak_memory_bytes (int) – Peak memory usage in bytes.

  • gpu_memory_bytes (int | None) – Peak GPU memory delta, or None.

  • stages (list[StageMetrics]) – Per-stage timing breakdown.

register_worker(
worker_id: str,
pid: int,
hostname: str,
invocation_id: str | None = None,
) None[source]#

Register a worker or update its heartbeat if already known.

Parameters:
  • worker_id (str) – Unique identifier for this worker (UUID hex).

  • pid (int) – OS process ID of the worker.

  • hostname (str) – Hostname of the machine running the worker.

  • invocation_id (str | None, optional) – Unique identifier for this run_pipeline invocation. Used to partition workers when the same pipeline is run concurrently with different index sets.

remaining_indices(total: int) list[int][source]#

Return indices not yet completed or failed for this run.

Parameters:

total (int) – Total number of source indices.

Returns:

Sorted list of indices still needing processing.

Return type:

list[int]

replace_filter_artifacts(
filter_name: str,
filter_order: int,
old_paths: list[str],
merged_path: str,
) None[source]#

Replace shard artifact paths with a single merged path.

Removes all rows matching old_paths for the given filter and inserts one row pointing to merged_path. This keeps the dashboard pointing at the final merged file after gather_pipeline() completes.

Parameters:
  • filter_name (str) – Human-readable name of the filter.

  • filter_order (int) – Position of the filter in the pipeline (0-indexed).

  • old_paths (list[str]) – Shard file paths to remove from the artifact table.

  • merged_path (str) – Path to the merged output file.

reset() None[source]#

Clear all records for this run and re-register.

Deletes all index results, stage metrics, and pipeline run metadata from the database. The database file is kept and a fresh run is registered.

reset_index(index: int) None[source]#

Remove records for a single index from this run.

Parameters:

index (int) – Source index to remove.

resolve_artifact(path: str) pathlib.Path[source]#

Resolve a relative artifact path using the stored run directory.

Parameters:

path (str) – Artifact path (may be relative or absolute).

Returns:

Absolute path. If path is already absolute, it is returned as-is. Otherwise it is resolved relative to run_dir (falling back to the current working directory if run_dir is not available).

Return type:

pathlib.Path

set_total_indices(total: int) None[source]#

Store the total number of source indices for this run.

Called by the pipeline runner once the source length is known. This value is persisted so the dashboard can show accurate progress even when the pipeline is still running.

Parameters:

total (int) – Total number of indices the pipeline will process.

summary(total: int) dict[str, Any][source]#

Return a summary of the store state.

Parameters:

total (int) – Total number of source indices.

Returns:

Dictionary with keys: total, completed, failed, remaining, config_hash, db_path, total_elapsed_s, workers.

Return type:

dict[str, Any]

worker_finish_index(worker_id: str) None[source]#

Record that a worker has finished processing its current index.

Parameters:

worker_id (str) – Unique identifier for this worker.

worker_start_index(worker_id: str, index: int) None[source]#

Record that a worker is starting to process an index.

Parameters:
  • worker_id (str) – Unique identifier for this worker.

  • index (int) – Source index being processed.

property run_dir: str | None#

Return the working directory recorded when the pipeline was started.

Returns:

Absolute path of the CWD at pipeline start, or None for databases created before this column was added.

Return type:

str or None

class physicsnemo_curator.core.pipeline_store.StageMetrics[source]#

Metrics for a single pipeline stage (source, one filter, or sink).

Parameters:
  • name (str) – Human-readable name of the stage (e.g. "source", "DoubleFilter", "sink").

  • wall_time_ns (int) – Wall-clock time in nanoseconds spent in this stage.

to_dict() dict[str, Any][source]#

Convert to a plain dictionary.

Returns:

Dictionary with "name" and "wall_time_ns" keys.

Return type:

dict[str, Any]

name: str#
wall_time_ns: int#
physicsnemo_curator.core.pipeline_store.logger#