nvalchemi.dynamics.hooks.LoggingHook#
- class nvalchemi.dynamics.hooks.LoggingHook(backend, frequency=1, log_path=None, custom_scalars=None, writer_fn=None)[source]#
Log per-sample scalar observables from the simulation.
At each firing step, this hook computes per-graph scalars from the
Batchand writes one row per graph to the configured logging backend. Each row includes:step — the current
dynamics.step_count.graph_idx — the graph’s index within the batch.
status — the sample’s status code (from
batch.status), indicating which pipeline stage it belongs to. Always0for single-stage dynamics.energy — per-graph potential energy (from
batch.energies).fmax — per-graph maximum atomic force norm.
temperature — per-graph instantaneous kinetic temperature (from
batch.velocitiesandbatch.atomic_massesvia the equipartition theorem), if velocities are present.
Users can extend or replace this set by providing a
custom_scalarsmapping of{name: callable}pairs, where each callable has signature(batch, dynamics) -> Tensorof shape(B,)(one value per graph) or a plainfloat(broadcast to all graphs).Asynchronous I/O. Scalar computation and the GPU-to-CPU transfer run on a dedicated CUDA side stream (set up in
__enter__()) so they do not stall the default compute stream. The D2H copy usesnon_blocking=True; the single-workerThreadPoolExecutorsynchronizes the stream before reading the CPU tensor and writing to the backend.Context manager. Use
withto guarantee the CUDA stream is created, all pending writes are flushed, and file handles are closed on exit:with LoggingHook(backend="csv", log_path="out.csv") as hook: dynamics.register_hook(hook) dynamics.run(batch)
This hook is non-blocking: scalar computation and the D2H transfer run on a dedicated CUDA side stream (when available), and all
.item()calls and I/O happen in a backgroundThreadPoolExecutorworker, so the GPU pipeline is not stalled.- Parameters:
backend ({"csv", "tensorboard", "custom"}) – Logging backend to use.
frequency (int, optional) – Log every
frequencysteps. Default1.log_path (str | Path | None, optional) – File path for file-based backends (
"csv","tensorboard"). DefaultNone.custom_scalars (dict[str, Callable] | None, optional) – Additional named scalars to compute and log. Each callable receives
(batch, dynamics)and returns either a(B,)tensor (per-graph values) or afloat(broadcast to all graphs). Name collisions override defaults. DefaultNone.writer_fn (Callable[[int, list[dict[str, float]]], None] | None, optional) – Custom writer function, required when
backend="custom". Receives(step_count, rows). DefaultNone.
Examples
>>> from nvalchemi.dynamics.hooks import LoggingHook >>> with LoggingHook(backend="csv", log_path="md_log.csv", frequency=100) as hook: ... dynamics = DemoDynamics(model=model, n_steps=10_000, dt=0.5, hooks=[hook]) ... dynamics.run(batch)
Using custom scalars:
>>> def pressure(batch, dynamics): ... return compute_pressure(batch.stresses, batch.cell) >>> hook = LoggingHook( ... backend="csv", ... log_path="md_log.csv", ... frequency=50, ... custom_scalars={"pressure": pressure}, ... )
Notes
The default temperature calculation assumes an NVT-like system with
3Ndegrees of freedom (no constraint correction). Override viacustom_scalarsif constraints remove DOFs.For distributed pipelines, each rank logs independently. Use
log_pathwith rank-specific filenames to avoid file contention.
- __init__(backend, frequency=1, log_path=None, custom_scalars=None, writer_fn=None)[source]#
- Parameters:
backend (LogBackend)
frequency (int)
log_path (str | Path | None)
custom_scalars (dict[str, Callable[[Batch, BaseDynamics], float | torch.Tensor]] | None)
writer_fn (Callable[[int, list[dict[str, float]]], None] | None)
- Return type:
None
Methods
__init__(backend[, frequency, log_path, ...])close()Flush pending writes and release stream / file resources.