Profiling Pipelines#
Pipeline includes built-in profiling that collects wall-clock time,
memory, and (optionally) GPU metrics at whole-pipeline, per-index, and
per-stage granularity — without requiring any wrappers or separate
configuration.
Profiling is enabled by default via the track_metrics field
(which also enables checkpointing).
Quick Start#
from physicsnemo_curator import Pipeline, run_pipeline
# Pipeline with default profiling (track_metrics=True)
pipeline = (
MySource(data_dir="/data/")
.filter(MyFilter())
.write(MySink(output_dir="/output/"))
)
# Run exactly as before — works with all backends
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Inspect metrics
metrics = pipeline.metrics
metrics.to_console()
Metrics Granularity#
Profiling collects data at three levels:
Level |
What’s measured |
|---|---|
Whole-pipeline |
Total wall time, peak memory across all indices |
Per-index |
Wall time, peak memory, GPU memory for each source index |
Per-stage |
Wall time for source, each filter, and sink |
Per-Stage Timing#
The pipeline chain source → filter₁ → filter₂ → … → sink uses lazy
generators. Profiling wraps each stage’s generator with an internal
timer to attribute time accurately using chain subtraction:
Source time = time spent yielding items from the source
Filter N time = time spent in filter N’s own logic (excluding upstream)
Sink time = time spent in the sink (excluding all upstream generators)
Memory Tracking#
Peak Python memory per index is tracked via tracemalloc. This captures
Python-level allocations accurately but does not cover C-extension or
Rust-extension memory. Per-stage memory is not tracked because chained
lazy generators make per-stage attribution unreliable.
Disable memory tracking (to avoid tracemalloc overhead):
pipeline = Pipeline(
source=MySource(),
sink=MySink(),
track_memory=False, # disable tracemalloc
)
GPU Memory Tracking#
To track GPU memory, set track_gpu=True:
pipeline = Pipeline(
source=MySource(),
sink=MySink(),
track_gpu=True,
)
This uses torch.cuda.max_memory_allocated() to capture peak GPU memory
per index. Requires PyTorch with CUDA support. If torch is not installed
or CUDA is unavailable, GPU fields will be None.
Output Formats#
Console Table#
metrics.to_console()
Prints a human-readable summary with per-index breakdown and stage averages.
JSON File#
metrics.to_json("profile.json")
Writes full metrics (including per-stage breakdowns) as structured JSON.
CSV File#
metrics.to_csv("profile.csv")
Writes one row per index with columns for wall time, memory, GPU memory, and per-stage timing.
Programmatic Access#
info = metrics.summary()
info["total_wall_time_ns"] # int
info["mean_index_time_ns"] # float
info["indices"][0]["stages"] # list of stage dicts
Using with Parallel Backends#
Profiling works with all backends — sequential, thread_pool, process_pool, loky, dask, and prefect — without any backend modifications.
Metrics are stored in a SQLite database using WAL mode, which supports
safe concurrent writes from multiple threads and processes. Each worker
records its metrics independently and the pipeline.metrics property
aggregates them on demand.
results = run_pipeline(pipeline, n_jobs=8, backend="process_pool")
# Aggregates metrics from all workers
metrics = pipeline.metrics
metrics.to_console()
Disabling Profiling#
Set track_metrics=False to disable all profiling and checkpointing:
pipeline = Pipeline(
source=MySource(),
sink=MySink(),
track_metrics=False,
)
Full Example#
from physicsnemo_curator import Pipeline, run_pipeline
# Build a pipeline (profiling is on by default)
pipeline = (
MySource(path="/data/cfd/")
.filter(NormalizeFilter())
.filter(ResampleFilter(target_resolution=0.01))
.write(MeshSink(output_dir="/output/"))
)
# Enable GPU tracking
pipeline.track_gpu = True
# Run
results = run_pipeline(pipeline, n_jobs=4, backend="process_pool")
# Analyze
metrics = pipeline.metrics
metrics.to_console() # Quick visual summary
metrics.to_json("profile.json") # Detailed JSON for analysis
metrics.to_csv("profile.csv") # CSV for spreadsheet
# Programmatic access
summary = metrics.summary()
print(f"Processed {summary['num_indices']} indices")
print(f"Total time: {summary['total_wall_time_ns'] / 1e9:.2f}s")
print(f"Peak memory: {summary['total_peak_memory_bytes'] / 1e6:.1f} MB")
Pipeline Fields#
Field |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Enable timing, checkpointing, and metrics |
|
|
|
Enable |
|
|
|
Enable GPU memory tracking via PyTorch |
|
|
|
Override database directory (default: |
API Reference#
PipelineMetrics#
class PipelineMetrics:
indices: list[IndexMetrics]
# Properties
total_wall_time_ns: int
mean_index_time_ns: float
total_peak_memory_bytes: int
# Output
def to_console(self) -> None: ...
def to_json(self, path: str | Path) -> None: ...
def to_csv(self, path: str | Path) -> None: ...
def summary(self) -> dict: ...
IndexMetrics#
class IndexMetrics:
index: int
stages: list[StageMetrics]
wall_time_ns: int
peak_memory_bytes: int
gpu_memory_bytes: int | None
StageMetrics#
class StageMetrics:
name: str
wall_time_ns: int