nvalchemi.dynamics.hooks.LoggingHook#

class nvalchemi.dynamics.hooks.LoggingHook(backend, frequency=1, log_path=None, custom_scalars=None, writer_fn=None)[source]#

Log per-sample scalar observables from the simulation.

At each firing step, this hook computes per-graph scalars from the Batch and writes one row per graph to the configured logging backend. Each row includes:

  • step — the current dynamics.step_count.

  • graph_idx — the graph’s index within the batch.

  • status — the sample’s status code (from batch.status), indicating which pipeline stage it belongs to. Always 0 for single-stage dynamics.

  • energy — per-graph potential energy (from batch.energies).

  • fmax — per-graph maximum atomic force norm.

  • temperature — per-graph instantaneous kinetic temperature (from batch.velocities and batch.atomic_masses via the equipartition theorem), if velocities are present.

Users can extend or replace this set by providing a custom_scalars mapping of {name: callable} pairs, where each callable has signature (batch, dynamics) -> Tensor of shape (B,) (one value per graph) or a plain float (broadcast to all graphs).

Asynchronous I/O. Scalar computation and the GPU-to-CPU transfer run on a dedicated CUDA side stream (set up in __enter__()) so they do not stall the default compute stream. The D2H copy uses non_blocking=True; the single-worker ThreadPoolExecutor synchronizes the stream before reading the CPU tensor and writing to the backend.

Context manager. Use with to guarantee the CUDA stream is created, all pending writes are flushed, and file handles are closed on exit:

with LoggingHook(backend="csv", log_path="out.csv") as hook:
    dynamics.register_hook(hook)
    dynamics.run(batch)

This hook is non-blocking: scalar computation and the D2H transfer run on a dedicated CUDA side stream (when available), and all .item() calls and I/O happen in a background ThreadPoolExecutor worker, so the GPU pipeline is not stalled.

Parameters:
  • backend ({"csv", "tensorboard", "custom"}) – Logging backend to use.

  • frequency (int, optional) – Log every frequency steps. Default 1.

  • log_path (str | Path | None, optional) – File path for file-based backends ("csv", "tensorboard"). Default None.

  • custom_scalars (dict[str, Callable] | None, optional) – Additional named scalars to compute and log. Each callable receives (batch, dynamics) and returns either a (B,) tensor (per-graph values) or a float (broadcast to all graphs). Name collisions override defaults. Default None.

  • writer_fn (Callable[[int, list[dict[str, float]]], None] | None, optional) – Custom writer function, required when backend="custom". Receives (step_count, rows). Default None.

Examples

>>> from nvalchemi.dynamics.hooks import LoggingHook
>>> with LoggingHook(backend="csv", log_path="md_log.csv", frequency=100) as hook:
...     dynamics = DemoDynamics(model=model, n_steps=10_000, dt=0.5, hooks=[hook])
...     dynamics.run(batch)

Using custom scalars:

>>> def pressure(batch, dynamics):
...     return compute_pressure(batch.stresses, batch.cell)
>>> hook = LoggingHook(
...     backend="csv",
...     log_path="md_log.csv",
...     frequency=50,
...     custom_scalars={"pressure": pressure},
... )

Notes

  • The default temperature calculation assumes an NVT-like system with 3N degrees of freedom (no constraint correction). Override via custom_scalars if constraints remove DOFs.

  • For distributed pipelines, each rank logs independently. Use log_path with rank-specific filenames to avoid file contention.

__init__(backend, frequency=1, log_path=None, custom_scalars=None, writer_fn=None)[source]#
Parameters:
  • backend (LogBackend)

  • frequency (int)

  • log_path (str | Path | None)

  • custom_scalars (dict[str, Callable[[Batch, BaseDynamics], float | torch.Tensor]] | None)

  • writer_fn (Callable[[int, list[dict[str, float]]], None] | None)

Return type:

None

Methods

__init__(backend[, frequency, log_path, ...])

close()

Flush pending writes and release stream / file resources.