Checkpointing a Pipeline#

This example demonstrates how to checkpoint pipeline execution using CheckpointedPipeline.

CheckpointedPipeline wraps a pipeline and records completed indices in a SQLite database. If the pipeline is interrupted and restarted, already-completed indices are skipped and their cached output paths are returned immediately.

This is especially useful for long-running pipelines over large datasets where you want crash resilience without re-processing.

Note

Install the mesh extras before running:

pip install physicsnemo-curator[mesh]

Imports#

from physicsnemo_curator.core.checkpoint import CheckpointedPipeline

from physicsnemo_curator.domains.mesh.filters.precision import PrecisionFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.ns_cylinder import NavierStokesCylinderSource
from physicsnemo_curator.run import run_pipeline

Build and Wrap the Pipeline#

First build a normal pipeline, then wrap it with CheckpointedPipeline. The db_path argument specifies where the SQLite checkpoint file is stored.

pipeline = (
    NavierStokesCylinderSource()
    .filter(PrecisionFilter(target_dtype="float32"))
    .write(MeshSink(output_dir="outputs/checkpoint/meshes/"))
)

checkpointed = CheckpointedPipeline(
    pipeline,
    db_path="outputs/checkpoint/pipeline.db",
)

First Run — Process 5 Indices#

On the first run, all indices are new and will be fully executed.

results = run_pipeline(
    checkpointed,
    n_jobs=1,
    backend="sequential",
    indices=range(5),
    progress=True,
)

print(f"First run processed {len(results)} indices")
print(f"Checkpoint summary: {checkpointed.summary()}")

Second Run — Resume from Checkpoint#

If we run the same pipeline again (even with overlapping indices), completed indices are skipped. Their cached output paths are returned from the database without re-executing the pipeline.

results_resumed = run_pipeline(
    checkpointed,
    n_jobs=1,
    backend="sequential",
    indices=range(8),  # 0-4 cached, 5-7 new
    progress=True,
)

print(f"\nSecond run returned {len(results_resumed)} results")
print(f"Checkpoint summary: {checkpointed.summary()}")

Query Checkpoint State#

The checkpoint database tracks which indices have been completed, which failed, and which remain.

print(f"\nCompleted indices: {checkpointed.completed_indices}")
print(f"Remaining (of 500): {len(checkpointed.remaining_indices)}")
print(f"Config hash: {checkpointed.config_hash[:16]}...")
print(f"Database: {checkpointed.db_path}")

Composing with ProfiledPipeline#

CheckpointedPipeline composes with ProfiledPipeline — you can profile and checkpoint at the same time:

from physicsnemo_curator.core.profiling import ProfiledPipeline

profiled = ProfiledPipeline(pipeline)
checkpointed = CheckpointedPipeline(profiled, db_path="ckpt.db")
run_pipeline(checkpointed, n_jobs=4)

The checkpoint wraps the profiled pipeline, so skipped indices bypass both profiling and execution.

Reset Checkpoint#

To re-process all indices from scratch, call reset():

checkpointed.reset()
print(f"\nAfter reset: {checkpointed.summary()}")

Gallery generated by Sphinx-Gallery