Skip to content

Benchmark

BenchmarkConfig dataclass

Configuration for benchmarking.

This dataclass contains all the configuration parameters needed to run a benchmark. It supports both time-based and batch-based limits, as well as warmup phases.

Attributes:

Name Type Description
name str

Name of the benchmark

num_epochs int

Number of epochs to run

max_batches Optional[int]

Maximum number of batches to process (None for all)

max_time_seconds Optional[float]

Maximum time to run benchmark (None for no limit)

warmup_batches Optional[int]

Number of warmup batches

warmup_time_seconds Optional[float]

Time to warmup in seconds (overrides warmup_batches if set)

data_path Optional[Union[str, Path]]

Path to data files (for disk size measurement)

Source code in bionemo/scspeedtest/benchmark.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass
class BenchmarkConfig:
    """Configuration for benchmarking.

    This dataclass contains all the configuration parameters needed
    to run a benchmark. It supports both time-based and batch-based
    limits, as well as warmup phases.

    Attributes:
        name: Name of the benchmark
        num_epochs: Number of epochs to run
        max_batches: Maximum number of batches to process (None for all)
        max_time_seconds: Maximum time to run benchmark (None for no limit)
        warmup_batches: Number of warmup batches
        warmup_time_seconds: Time to warmup in seconds (overrides warmup_batches if set)
        data_path: Path to data files (for disk size measurement)
    """

    name: str = "UnnamedBenchmark"
    num_epochs: int = 1
    max_batches: Optional[int] = None
    max_time_seconds: Optional[float] = None
    warmup_batches: Optional[int] = None
    warmup_time_seconds: Optional[float] = None
    data_path: Optional[Union[str, Path]] = None
    shuffle: bool = True
    num_runs: int = 1

benchmark_dataloaders_with_configs(dataloader_configs, shared_dataset_factory=None, output_prefix='consolidated_benchmark_results')

Benchmark multiple dataloader configs with optional shared dataset.

Each config can have its own dataset_factory, use the shared_dataset_factory, or have none (dataloader creates everything).

Parameters:

Name Type Description Default
dataloader_configs List[Dict[str, Any]]

List of dicts with keys: name, dataloader_factory, dataset_factory (optional), data_path, etc.

required
shared_dataset_factory Optional[Callable[[], Any]]

Optional function that creates a dataset once, then reused across multiple dataloaders

None
output_prefix str

Prefix for the output CSV filename

'consolidated_benchmark_results'

Returns:

Type Description
List[BenchmarkResult]

List[BenchmarkResult] for multiple dataloader configs.

Source code in bionemo/scspeedtest/benchmark.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def benchmark_dataloaders_with_configs(
    dataloader_configs: List[Dict[str, Any]],
    shared_dataset_factory: Optional[Callable[[], Any]] = None,
    output_prefix: str = "consolidated_benchmark_results",
) -> List[BenchmarkResult]:
    """Benchmark multiple dataloader configs with optional shared dataset.

    Each config can have its own dataset_factory, use the shared_dataset_factory, or have none (dataloader creates everything).

    Args:
        dataloader_configs: List of dicts with keys: name, dataloader_factory, dataset_factory (optional), data_path, etc.
        shared_dataset_factory: Optional function that creates a dataset once, then reused across multiple dataloaders
        output_prefix: Prefix for the output CSV filename

    Returns:
        List[BenchmarkResult] for multiple dataloader configs.
    """
    results = []

    # Ensure every config has a dataloader_factory
    for idx, config in enumerate(dataloader_configs):
        if "dataloader_factory" not in config or config["dataloader_factory"] is None:
            raise ValueError(
                f"Config at index {idx} ('{config.get('name', 'UnnamedBenchmark')}') is missing a 'dataloader_factory'."
            )

    _drop_caches()

    # Create shared dataset if factory is provided
    shared_dataset = None
    shared_dataset_baseline = None
    shared_dataset_time = None
    if shared_dataset_factory is not None:
        shared_dataset, shared_dataset_baseline, _, _, _, _, shared_dataset_time = measure_peak_memory_full(
            shared_dataset_factory
        )
    for dl_config in dataloader_configs:
        # Determine which dataset factory to use
        if "dataset_factory" in dl_config:
            # Config has its own dataset factory
            config_dataset_factory = dl_config["dataset_factory"]
        else:
            # No dataset factory - dataloader factory creates everything
            config_dataset_factory = None

        config_dataloader_factory = dl_config["dataloader_factory"]
        if shared_dataset is not None:

            def config_dataloader_from_dataset():
                return config_dataloader_factory(shared_dataset)

            dataloader_factory = config_dataloader_from_dataset
        else:
            dataloader_factory = config_dataloader_factory

        result = benchmark_single_dataloader(
            dataloader_factory=dataloader_factory,
            data_path=dl_config.get("data_path", None),
            name=dl_config.get("name", "UnnamedBenchmark"),
            dataset_factory=config_dataset_factory,
            num_epochs=dl_config.get("num_epochs", 1),
            max_batches=dl_config.get("max_batches", None),
            max_time_seconds=dl_config.get("max_time_seconds", None),
            warmup_batches=dl_config.get("warmup_batches", 5),
            warmup_time_seconds=dl_config.get("warmup_time_seconds", None),
            shuffle=dl_config.get("shuffle", True),
            num_runs=dl_config.get("num_runs", 1),
            dataset_baseline=shared_dataset_baseline,
            output_prefix=output_prefix,
            dataset_instantiation_time=shared_dataset_time,
        )
        # If this hasn't been set, set it to the minimum in the first dataloader
        if not shared_dataset_baseline:
            shared_dataset_baseline = result.memory_before_instantiation_mb

        print_results(result)
        if isinstance(result, list):
            for r in result:
                r.dataset_instantiation_time_seconds = shared_dataset_time
            results.extend(result)
        else:
            result.dataset_instantiation_time_seconds = shared_dataset_time
            results.append(result)
        _drop_caches()
    return results

benchmark_single_dataloader(dataloader_factory, data_path, name='UnnamedBenchmark', dataset_factory=None, num_epochs=1, max_batches=None, max_time_seconds=None, warmup_batches=5, warmup_time_seconds=None, shuffle=False, num_runs=1, dataset_baseline=None, output_prefix='consolidated_benchmark_results', dataset_instantiation_time=None)

Benchmark a single dataloader with optional separate dataset factory.

Parameters:

Name Type Description Default
dataloader_factory Callable[..., Any]

Factory function that creates a dataloader. If dataset_factory is provided, this should accept a dataset parameter. Otherwise, it should create everything internally.

required
data_path Union[str, Path]

Path to the data file

required
name str

Name of the benchmark

'UnnamedBenchmark'
dataset_factory Optional[Callable[[], Any]]

Optional factory function that creates the dataset separately

None
num_epochs int

Number of epochs to run

1
max_batches Optional[int]

Maximum number of batches per epoch (None for unlimited)

None
max_time_seconds Optional[float]

Maximum time to run in seconds (None for unlimited)

None
warmup_batches int

Number of batches for warmup

5
warmup_time_seconds Optional[float]

Time in seconds for warmup

None
shuffle bool

Whether to shuffle the data

False
num_runs int

Number of runs to perform

1
dataset_baseline Optional[float]

Optional baseline memory usage for the dataset (for dataset reuse with multiple dataloaders)

None
output_prefix str

Prefix for the output CSV filename

'consolidated_benchmark_results'
dataset_instantiation_time Optional[float]

Optional time taken to instantiate the datasets

None

Returns:

Type Description
Union[BenchmarkResult, List[BenchmarkResult]]

Single BenchmarkResult for num_runs=1, or List[BenchmarkResult] for multiple runs

Source code in bionemo/scspeedtest/benchmark.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def benchmark_single_dataloader(
    dataloader_factory: Callable[..., Any],
    data_path: Union[str, Path],
    name: str = "UnnamedBenchmark",
    dataset_factory: Optional[Callable[[], Any]] = None,
    num_epochs: int = 1,
    max_batches: Optional[int] = None,
    max_time_seconds: Optional[float] = None,
    warmup_batches: int = 5,
    warmup_time_seconds: Optional[float] = None,
    shuffle: bool = False,
    num_runs: int = 1,
    dataset_baseline: Optional[float] = None,
    output_prefix: str = "consolidated_benchmark_results",
    dataset_instantiation_time: Optional[float] = None,
) -> Union[BenchmarkResult, List[BenchmarkResult]]:
    """Benchmark a single dataloader with optional separate dataset factory.

    Args:
        dataloader_factory: Factory function that creates a dataloader. If dataset_factory is provided,
                           this should accept a dataset parameter. Otherwise, it should create everything internally.
        data_path: Path to the data file
        name: Name of the benchmark
        dataset_factory: Optional factory function that creates the dataset separately
        num_epochs: Number of epochs to run
        max_batches: Maximum number of batches per epoch (None for unlimited)
        max_time_seconds: Maximum time to run in seconds (None for unlimited)
        warmup_batches: Number of batches for warmup
        warmup_time_seconds: Time in seconds for warmup
        shuffle: Whether to shuffle the data
        num_runs: Number of runs to perform
        dataset_baseline: Optional baseline memory usage for the dataset (for dataset reuse with multiple dataloaders)
        output_prefix: Prefix for the output CSV filename
        dataset_instantiation_time: Optional time taken to instantiate the datasets

    Returns:
        Single BenchmarkResult for num_runs=1, or List[BenchmarkResult] for multiple runs
    """
    if dataset_factory is not None:
        # Separate dataset and dataloader creation
        dataset, dataset_baseline_measured, dataset_peak, _, _, dataset_final, dataset_time = measure_peak_memory_full(
            dataset_factory
        )

        def dataloader_from_dataset():
            return dataloader_factory(dataset)

        dataloader, dl_baseline, dl_peak, _, _, dl_final, dl_time = measure_peak_memory_full(dataloader_from_dataset)

        instantiation_metrics = {
            "peak_memory_during_instantiation_mb": max(dl_peak, dataset_peak),
            "memory_after_instantiation_mb": dl_final,
            "memory_before_instantiation_mb": dataset_baseline_measured,
            "dataset_instantiation_time_seconds": dataset_time,
            "dataloader_instantiation_time_seconds": dl_time,
        }

    else:
        # Dataloader factory creates everything internally
        dataloader, dataloader_baseline_measured, peak, _, _, final_mib, setup_time = measure_peak_memory_full(
            dataloader_factory
        )
        instantiation_metrics = {
            "peak_memory_during_instantiation_mb": peak,
            "memory_after_instantiation_mb": final_mib,
            "memory_before_instantiation_mb": dataset_baseline
            if dataset_baseline is not None
            else dataloader_baseline_measured,
            "dataset_instantiation_time_seconds": dataset_instantiation_time
            if dataset_instantiation_time is not None
            else 0,  # Combined time when no separate dataset factory
            "dataloader_instantiation_time_seconds": setup_time,
        }
    disk_size_mb = get_disk_size(data_path)

    results = []
    for run_idx in range(num_runs):
        # For single run, use the provided dataloader; for multiple runs, re-instantiate as needed
        if run_idx == 0:
            current_dataloader = dataloader
            run_name_str = name if num_runs == 1 else f"{name}_run_{run_idx + 1}"
        else:
            if dataset_factory is not None:

                def dataloader_from_dataset():
                    return dataloader_factory(dataset)

                current_dataloader = dataloader_from_dataset()
            else:
                current_dataloader = dataloader_factory()
            run_name_str = f"{name}_run_{run_idx + 1}"

        run_config = BenchmarkConfig(
            name=run_name_str,
            num_epochs=num_epochs,
            max_batches=max_batches,
            max_time_seconds=max_time_seconds,
            warmup_batches=warmup_batches,
            warmup_time_seconds=warmup_time_seconds,
            data_path=data_path,
            shuffle=shuffle,
        )
        run_result = run_benchmark(current_dataloader, run_config, run_name_str, **instantiation_metrics)
        del current_dataloader
        gc.collect()
        run_result.disk_size_mb = disk_size_mb
        results.append(run_result)

        export_benchmark_results(run_result, output_prefix=output_prefix)
        _drop_caches()

    if num_runs == 1:
        return results[0]
    else:
        return results

print_comparison(results)

Print comparison of multiple benchmark results.

Source code in bionemo/scspeedtest/benchmark.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def print_comparison(results: List[BenchmarkResult]) -> None:
    """Print comparison of multiple benchmark results."""
    if not results or len(results) < 2:
        return

    print(f"\nComparison ({len(results)} configurations)")

    # Show individual results
    for result in results:
        print(f"\nResult for {result.name}: {result.samples_per_second:.2f} samples/sec")
        print(f"   Memory: {result.peak_memory_mb:.1f} MB")

    # Find best performers
    best_samples_per_sec = max(results, key=lambda r: r.samples_per_second)
    lowest_memory = min(results, key=lambda r: r.peak_memory_mb)

    print("\nBest Performers:")
    print(f"Best speed: {best_samples_per_sec.name} ({best_samples_per_sec.samples_per_second:.2f} samples/sec)")
    print(f"Lowest memory: {lowest_memory.name} ({lowest_memory.peak_memory_mb:.2f} MB)")

    fastest_instantiation = min(
        results,
        key=lambda r: (r.dataset_instantiation_time_seconds or 0) + (r.dataloader_instantiation_time_seconds or 0),
    )
    fastest_time = (fastest_instantiation.dataset_instantiation_time_seconds or 0) + (
        fastest_instantiation.dataloader_instantiation_time_seconds or 0
    )
    print(f"Fastest instantiation: {fastest_instantiation.name} ({fastest_time:.3f} s)")

print_results(result_or_results)

Print benchmark results in a formatted way. Accepts a single result or a list of results.

Source code in bionemo/scspeedtest/benchmark.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def print_results(result_or_results: Union[BenchmarkResult, List[BenchmarkResult]]) -> None:
    """Print benchmark results in a formatted way. Accepts a single result or a list of results."""
    results = result_or_results if isinstance(result_or_results, list) else [result_or_results]
    for result in results:
        print("=" * 60)
        print(f"Benchmark: {result.name}")
        print(f"Samples/sec: {result.samples_per_second:.2f}")
        print(f"Total samples: {result.total_samples}")
        print(f"Total time: {result.total_time_seconds:.3f}s")
        print(f"Dataset instantiation: {result.dataset_instantiation_time_seconds:.3f}s")
        print(f"Dataloader instantiation: {result.dataloader_instantiation_time_seconds:.3f}s")
        print(f"Peak memory durint iteration: {result.peak_memory_mb:.1f} MB")
        print(f"Peak memory during instantiation: {result.peak_memory_during_instantiation_mb:.1f} MB")
        print(f"Disk size: {result.disk_size_mb:.1f} MB")
        print("=" * 60 + "\n")

run_benchmark(dataloader, config, run_name=None, **instantiation_kwargs)

Run the actual benchmark and collect metrics.

Parameters:

Name Type Description Default
dataloader Any

The dataloader to benchmark

required
config BenchmarkConfig

Configuration for the benchmark run

required
run_name Optional[str]

Optional name for this run

None
**instantiation_kwargs

Instantiation metrics (dataset_instantiation_time_seconds, dataloader_instantiation_time_seconds, peak_memory_during_instantiation_mb, memory_before_instantiation_mb, memory_after_instantiation_mb)

{}

Returns:

Type Description
BenchmarkResult

BenchmarkResult containing all collected data and calculated metrics

Source code in bionemo/scspeedtest/benchmark.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def run_benchmark(
    dataloader: Any,
    config: BenchmarkConfig,
    run_name: Optional[str] = None,
    **instantiation_kwargs,
) -> BenchmarkResult:
    """Run the actual benchmark and collect metrics.

    Args:
        dataloader: The dataloader to benchmark
        config: Configuration for the benchmark run
        run_name: Optional name for this run
        **instantiation_kwargs: Instantiation metrics (dataset_instantiation_time_seconds,
                               dataloader_instantiation_time_seconds, peak_memory_during_instantiation_mb,
                               memory_before_instantiation_mb, memory_after_instantiation_mb)

    Returns:
        BenchmarkResult containing all collected data and calculated metrics
    """
    # Use measure_peak_memory_full to get memory info during benchmark
    gc.collect()

    def benchmark_iteration_single_epoch(epoch_num, do_warmup):
        """Run a single epoch of benchmarking, with optional warmup."""
        gc.collect()

        update_interval = 10
        epoch_samples = 0
        epoch_batches = 0
        warmup_samples = 0
        warmup_batches = 0
        warmup_time = 0.0
        elapsed = 0.0
        start_time = None

        pbar = tqdm(desc=f"{config.name} - Epoch {epoch_num + 1}/{config.num_epochs}")
        warm_up_start = time.perf_counter()
        if not do_warmup or not config.warmup_time_seconds:
            config.warmup_time_seconds = 0
        warm_up_end = warm_up_start + config.warmup_time_seconds
        is_warming_up = True

        for num, batch in enumerate(dataloader):
            batch_size = get_batch_size(batch)

            current_time = time.perf_counter()

            if is_warming_up:
                # We're in warm-up period - count samples and batches
                warmup_samples += batch_size
                warmup_batches += 1

                if current_time >= warm_up_end:
                    # Warm-up complete and start the actual timing
                    warmup_time = current_time - warm_up_start

                    print(f"Warmup completed: {warmup_samples:,} samples, {warmup_batches:,} batches")

                    is_warming_up = False
                    start_time = time.perf_counter()
                    end_time = start_time + config.max_time_seconds if config.max_time_seconds is not None else None
                    pbar.set_description(f"{config.name} - Epoch {epoch_num + 1} (warmup complete)")
                else:
                    if warmup_batches % update_interval == 0:
                        elapsed_warmup = current_time - warm_up_start
                        current_warmup_speed = warmup_samples / elapsed_warmup if elapsed_warmup > 0 else 0
                        pbar.set_description(
                            f"{config.name} - Warmup: {elapsed_warmup:.1f}/{config.warmup_time_seconds}s, {current_warmup_speed:.1f} samples/sec"
                        )
                        pbar.update(update_interval)
                continue

            # Now we're past the warm-up period (or no warmup)
            epoch_samples += batch_size
            epoch_batches += 1
            elapsed = current_time - start_time if start_time else 0
            if epoch_batches % update_interval == 0:
                postfix_dict = {
                    "epoch": f"{epoch_num + 1}/{config.num_epochs}",
                    "samples": epoch_samples,
                    "elapsed": f"{elapsed:.2f}s",
                }

                pbar.set_postfix(**postfix_dict, refresh=False)
                pbar.update(update_interval)
            # Check max_batches limit
            if (config.max_batches and epoch_batches >= config.max_batches) or (end_time and current_time >= end_time):
                break

        # If no samples were processed in the epoch, likely because warmup consumed the entire dataset
        if epoch_samples == 0:
            import warnings

            warnings.warn(
                f"Epoch {epoch_num + 1}: No samples processed after warmup. "
                "Warmup may have consumed the entire dataset. "
                "Consider reducing warmup_batches or warmup_time_seconds.",
                RuntimeWarning,
            )

        # Final progress bar update
        if epoch_samples > 0 and elapsed > 0:
            postfix_dict = {
                "epoch": f"{epoch_num + 1}/{config.num_epochs}",
                "samples": epoch_samples,
                "elapsed": f"{elapsed:.2f}s",
                "samples_per_sec": f"{epoch_samples / elapsed:.2f}",
            }
            pbar.set_postfix(**postfix_dict, refresh=False)

        pbar.close()

        return epoch_samples, epoch_batches, elapsed, warmup_samples, warmup_batches, warmup_time

    epoch_results = []
    for epoch in range(config.num_epochs):
        # Create a modified benchmark_iteration for this epoch

        result_tuple = measure_peak_memory_full(
            lambda: benchmark_iteration_single_epoch(epoch, epoch == 0), multi_worker=dataloader.num_workers > 0
        )
        (
            (epoch_samples, epoch_batches, elapsed, warmup_samples, warmup_batches, warmup_time),
            _,
            peak,
            avg,
            _,
            _,
            iteration_time,
        ) = result_tuple

        epoch_results.append(
            {
                "epoch": epoch + 1,
                "samples": epoch_samples,
                "batches": epoch_batches,
                "warmup_samples": warmup_samples,
                "warmup_batches": warmup_batches,
                "peak_memory": peak,
                "avg_memory": avg,
                "iteration_time": iteration_time,
                "elapsed": elapsed,
                "warmup_time": warmup_time,
            }
        )

        print(f"Epoch {epoch + 1} completed: {epoch_samples:,} samples, {epoch_batches:,} batches")

    result = BenchmarkResult(
        name=config.name,
        data_path=str(config.data_path) if config.data_path else None,
        max_time_seconds=config.max_time_seconds,
        shuffle=config.shuffle,
        num_workers=dataloader.num_workers,
        # Instantiation metrics passed as kwargs
        **instantiation_kwargs,
        epoch_results=epoch_results,
    )
    return result