Note
Go to the end to download the full example code.
Profiling a Pipeline#
This example demonstrates how to measure pipeline performance using
ProfiledPipeline.
ProfiledPipeline is a transparent wrapper that records per-index
and per-stage wall-clock timing without changing the pipeline’s
behaviour. It can be passed directly to
run_pipeline() — backends see it as a
regular pipeline.
Note
Install the mesh extras before running:
pip install physicsnemo-curator[mesh]
Imports#
from physicsnemo_curator.core.profiling import ProfiledPipeline
from physicsnemo_curator.domains.mesh.filters.mean import MeanFilter
from physicsnemo_curator.domains.mesh.filters.precision import PrecisionFilter
from physicsnemo_curator.domains.mesh.sinks.mesh_writer import MeshSink
from physicsnemo_curator.domains.mesh.sources.ns_cylinder import NavierStokesCylinderSource
from physicsnemo_curator.run import run_pipeline
Build and Wrap the Pipeline#
First build a normal pipeline, then wrap it with
ProfiledPipeline.
The wrapper records timing for each stage: source, every filter,
and sink.
pipeline = (
NavierStokesCylinderSource()
.filter(MeanFilter(output="outputs/profiling/stats.parquet"))
.filter(PrecisionFilter(target_dtype="float32"))
.write(MeshSink(output_dir="outputs/profiling/meshes/"))
)
profiled = ProfiledPipeline(pipeline)
Run with Profiling#
Pass the ProfiledPipeline to run_pipeline exactly as you
would a normal pipeline. Timing data is collected transparently.
results = run_pipeline(
profiled,
n_jobs=1,
backend="sequential",
indices=range(3),
progress=True,
)
print(f"Processed {len(results)} indices")
View Timing Summary#
The metrics
property returns a
PipelineMetrics object
with aggregated timing data.
metrics = profiled.metrics
print("\n--- Console Summary ---")
metrics.to_console()
Export Metrics#
Metrics can be exported to JSON or CSV for further analysis.
metrics.to_json("outputs/profiling/timing.json")
metrics.to_csv("outputs/profiling/timing.csv")
print("\nExported timing.json and timing.csv")
Inspect Per-Index Breakdown#
Each IndexMetrics entry
contains per-stage durations.
summary = metrics.summary()
print(f"\nTotal wall time: {summary['total_wall_time_ns'] / 1e9:.2f}s")
print(f"Mean time per index: {summary['mean_index_time_ns'] / 1e9:.2f}s")
print(f"Indices profiled: {summary['n_indices']}")
Cleanup#
Remove temporary metrics files created during profiling.
profiled.cleanup()