Checkpointing Pipelines#
Pipeline includes built-in checkpointing that records completed indices
in a SQLite database. On restart, indices that already finished are
skipped — their cached output paths are returned immediately without
re-executing the source, filters, or sink.
Checkpointing is enabled by default via the track_metrics field
(which also enables timing and memory profiling).
Quick Start#
from physicsnemo_curator import Pipeline, run_pipeline
# Pipeline with default checkpointing (track_metrics=True)
pipeline = (
MySource(data_dir="/data/")
.filter(MyFilter())
.write(MySink(output_dir="/output/"))
)
# Run — works with all backends
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Interrupt and restart — completed indices are skipped
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Inspect progress
print(pipeline.summary())
How It Works#
When track_metrics=True (the default), each pipeline[index] call:
Checks the database for a prior completion record for this index.
If found, returns the cached paths immediately (no computation).
If not found, runs the pipeline and records timing, memory, and output paths to SQLite.
If the pipeline raises an exception, records the error and re-raises.
Controlling the Database Location#
By default, the database is stored at ~/.cache/psnc/{config_hash[:16]}.db.
Each unique pipeline configuration gets its own database file (based on its
SHA-256 hash). The cache directory follows XDG conventions and can be
controlled with environment variables:
Priority |
Method |
Example |
|---|---|---|
1 (highest) |
|
|
2 |
|
|
3 (lowest) |
XDG default |
|
Override the directory with db_dir:
from pathlib import Path
pipeline = Pipeline(
source=MySource(data_dir="/data/"),
filters=[MyFilter()],
sink=MySink(output_dir="/output/"),
db_dir=Path("/output/checkpoints"),
)
Disabling Checkpointing#
Set track_metrics=False to disable all checkpointing and metrics:
pipeline = Pipeline(
source=MySource(data_dir="/data/"),
filters=[MyFilter()],
sink=MySink(output_dir="/output/"),
track_metrics=False,
)
Provenance Tracking#
The checkpoint stores full pipeline provenance — source class, filter parameters, sink configuration — as a JSON document with a SHA-256 hash. Each unique configuration gets its own database file, so different pipelines never collide.
Error Handling#
Failed indices are recorded with their error message but are not marked as completed. On the next run they will be retried automatically:
# First run — index 42 fails
results = run_pipeline(pipeline, n_jobs=4)
# Check what failed
print(pipeline.failed_indices)
# {42: "RuntimeError: corrupt file at /data/sample_42.lmdb"}
# Fix the underlying issue, then retry — only index 42 runs
results = run_pipeline(pipeline, n_jobs=4)
Query API#
Property / Method |
Returns |
Description |
|---|---|---|
|
|
Successfully completed indices |
|
|
Failed indices with error messages |
|
|
Indices not yet completed (sorted) |
|
|
Total, completed, failed, remaining counts + elapsed time + worker count |
|
|
Full timing and memory metrics |
|
|
Workers registered for this run |
|
|
Clear all records and start fresh |
|
|
Re-run a single index |
Summary Example#
>>> pipeline.summary()
{'total': 80, 'completed': 65, 'failed': 2, 'remaining': 13,
'errors': 2, 'avg_time_ms': 48.2}
Combining with Profiling#
Checkpointing and profiling are unified — both are controlled by
track_metrics. When enabled, you get both checkpoint/resume and
per-index timing and memory metrics automatically. See
Profiling for details on accessing metrics.
SQLite Database#
The checkpoint uses a SQLite database in WAL (Write-Ahead Logging) mode for safe concurrent writes from multiple threads or processes. The database contains four tables:
pipeline_runs — one row per unique pipeline configuration:
Column |
Type |
Description |
|---|---|---|
|
INTEGER |
Auto-incrementing primary key |
|
TEXT |
SHA-256 of the pipeline config JSON |
|
TEXT |
Full pipeline configuration |
|
TEXT |
ISO-8601 timestamp |
index_results — one row per processed index:
Column |
Type |
Description |
|---|---|---|
|
INTEGER |
Source index |
|
INTEGER |
Foreign key to |
|
TEXT |
|
|
TEXT |
JSON array of output file paths |
|
TEXT |
ISO-8601 timestamp |
|
INTEGER |
Wall-clock time in nanoseconds |
|
INTEGER |
Peak Python memory (bytes) |
|
INTEGER |
Peak GPU memory (bytes, or NULL) |
|
TEXT |
Error message (NULL for success) |
stage_metrics — per-stage timing for each index:
Column |
Type |
Description |
|---|---|---|
|
INTEGER |
Source index |
|
INTEGER |
Foreign key |
|
INTEGER |
0 = source, 1..N = filters, N+1 = sink |
|
TEXT |
Stage class name |
|
INTEGER |
Wall-clock time in nanoseconds |
workers — one row per worker thread/process:
Column |
Type |
Description |
|---|---|---|
|
TEXT |
UUID4 hex string (primary key) |
|
INTEGER |
Foreign key to |
|
INTEGER |
OS process ID |
|
TEXT |
Machine hostname |
|
TEXT |
ISO-8601 timestamp |
|
TEXT |
ISO-8601 timestamp (updated on index start/finish) |
|
INTEGER |
Index currently being processed (NULL if idle) |
Worker Progress Tracking#
Pipeline execution automatically registers workers in the database. Each thread (or process) gets a stable UUID identifier. The worker record is updated when an index starts and when it finishes.
# Run a multi-worker pipeline
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Check which workers participated
for w in pipeline.active_workers:
print(f"Worker {w['worker_id'][:8]} (PID {w['pid']}) on {w['hostname']}")
if w["current_index"] is not None:
print(f" Currently processing index {w['current_index']}")
Worker tracking works with all backends — the instrumentation is in
Pipeline.__getitem__, not in the backends themselves.
Full Example#
from pathlib import Path
from physicsnemo_curator import Pipeline, run_pipeline
from physicsnemo_curator.domains.atm import ASELMDBSource, AtomicDataZarrSink
# Build pipeline — checkpointing is on by default
pipeline = (
ASELMDBSource(data_dir="/data/val/")
.write(AtomicDataZarrSink(output_path="/output/dataset.zarr"))
)
# Optionally control DB location
pipeline.db_dir = Path("/output/checkpoints")
# Run — can be interrupted and resumed
results = run_pipeline(pipeline, n_jobs=8, backend="process_pool")
# Check progress
s = pipeline.summary()
print(f"Completed: {s['completed']}/{s['total']}")
if s['failed'] > 0:
print(f"Failed indices: {list(pipeline.failed_indices.keys())}")
# Start fresh if needed
# pipeline.reset()