Source code for nvalchemi.dynamics.hooks.snapshot
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Snapshot hook for saving batch state to a data sink.
Provides :class:`SnapshotHook`, which periodically writes the full batch
state to a :class:`~nvalchemi.dynamics.sinks.DataSink` (GPU buffer, host
memory, or Zarr store).
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import torch
from nvalchemi.dynamics.hooks._base import _ObserverHook
if TYPE_CHECKING:
from nvalchemi.data import Batch
from nvalchemi.dynamics.base import BaseDynamics
from nvalchemi.dynamics.sinks import DataSink
__all__ = ["SnapshotHook", "ConvergedSnapshotHook"]
[docs]
class SnapshotHook(_ObserverHook):
"""Save a snapshot of the active batch to a :class:`DataSink` at a given frequency.
This hook writes the **full** batch state — positions, velocities,
forces, energies, and any other tensors present on the
:class:`~nvalchemi.data.Batch` — to the configured sink every
``frequency`` steps. It is the primary mechanism for recording
trajectories and creating restart checkpoints during dynamics runs.
The hook delegates serialization entirely to the
:class:`~nvalchemi.dynamics.sinks.DataSink` interface, meaning the
same ``SnapshotHook`` instance works with any backend:
* :class:`~nvalchemi.dynamics.sinks.GPUBuffer` — pre-allocated
device memory for high-speed, in-simulation buffering.
* :class:`~nvalchemi.dynamics.sinks.HostMemory` — CPU-resident
list-of-:class:`AtomicData` storage, useful for staging before
disk I/O.
* :class:`~nvalchemi.dynamics.sinks.ZarrData` — persistent,
Zarr-backed storage with CSR-style layout for variable-length
graph data; supports local, in-memory, and remote (S3/GCS)
stores.
Because ``SnapshotHook`` inherits from :class:`_ObserverHook`, it
fires at :attr:`~HookStageEnum.AFTER_STEP` — after all integrator
updates, force clamping, and convergence checks have completed —
guaranteeing that the snapshot reflects the fully resolved state
for each recorded step.
Parameters
----------
sink : DataSink
The storage backend to write snapshots to.
frequency : int, optional
Write a snapshot every ``frequency`` steps. Default ``1``
(every step).
Attributes
----------
sink : DataSink
The storage backend.
frequency : int
Snapshot frequency in steps.
stage : HookStageEnum
Fixed to ``AFTER_STEP``.
Examples
--------
>>> from nvalchemi.dynamics.hooks import SnapshotHook
>>> from nvalchemi.dynamics.sinks import HostMemory
>>> sink = HostMemory(capacity=10_000)
>>> hook = SnapshotHook(sink=sink, frequency=10)
>>> dynamics = DemoDynamics(model=model, n_steps=1000, dt=0.5, hooks=[hook])
>>> dynamics.run(batch) # 100 snapshots written
>>> trajectory = sink.read()
Notes
-----
* The hook does **not** clone the batch before writing. Whether
data is copied depends on the sink implementation (e.g.
:class:`HostMemory` moves to CPU; :class:`GPUBuffer` copies
into pre-allocated slots).
* For long simulations, prefer :class:`ZarrData` to avoid
accumulating the full trajectory in memory.
* When used inside a :class:`FusedStage`, the snapshot includes
samples at all status codes in a single write.
"""
[docs]
def __init__(self, sink: DataSink, frequency: int = 1) -> None:
super().__init__(frequency=frequency)
self.sink = sink
@torch.compiler.disable
def __call__(self, batch: Batch, dynamics: BaseDynamics) -> None:
"""Write the current batch state to the configured sink."""
self.sink.write(batch)
[docs]
class ConvergedSnapshotHook(_ObserverHook):
"""Write only newly converged samples to a :class:`DataSink`.
Fires at :attr:`~HookStageEnum.ON_CONVERGE` and uses the converged
sample indices (available via ``dynamics._last_converged``) to build
a boolean mask passed to :meth:`DataSink.write`. Only samples that
just satisfied the convergence criterion are written — samples that
converged on earlier steps are not re-written.
This is the recommended hook for persisting optimized structures to
Zarr in a :class:`FusedStage` pipeline, where multiple relaxations
run concurrently and structures converge at different steps.
Parameters
----------
sink : DataSink
The storage backend to write converged samples to.
:class:`~nvalchemi.dynamics.sinks.ZarrData` is the typical
choice for persistent storage.
frequency : int, optional
Execute every ``frequency`` steps. Default ``1`` (check every
step that convergence fires).
Examples
--------
>>> from nvalchemi.dynamics.hooks import ConvergedSnapshotHook
>>> from nvalchemi.dynamics.sinks import ZarrData
>>> sink = ZarrData(store="converged.zarr", capacity=100_000)
>>> hook = ConvergedSnapshotHook(sink=sink)
>>> dynamics.register_hook(hook)
"""
[docs]
def __init__(self, sink: DataSink, frequency: int = 1) -> None:
from nvalchemi.dynamics.base import HookStageEnum
super().__init__(frequency=frequency, stage=HookStageEnum.ON_CONVERGE)
self.sink = sink
@torch.compiler.disable
def __call__(self, batch: Batch, dynamics: BaseDynamics) -> None:
"""Write converged samples to the configured sink."""
# TODO: align last converged with PR #4
converged = dynamics._last_converged
if converged is None or converged.numel() == 0:
return
mask = torch.zeros(
batch.num_graphs, dtype=torch.bool, device=batch.positions.device
)
mask[converged] = True
self.sink.write(batch, mask=mask)