Source code for nvalchemi.dynamics.hooks.snapshot

# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Snapshot hook for saving batch state to a data sink.

Provides :class:`SnapshotHook`, which periodically writes the full batch
state to a :class:`~nvalchemi.dynamics.sinks.DataSink` (GPU buffer, host
memory, or Zarr store).
"""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING

import torch

from nvalchemi.data import Batch
from nvalchemi.dynamics.base import DynamicsStage
from nvalchemi.hooks._context import HookContext

if TYPE_CHECKING:
    from nvalchemi.dynamics.sinks import DataSink

__all__ = ["SnapshotHook", "ConvergedSnapshotHook"]


[docs] class SnapshotHook: """Save a snapshot of the active batch to a :class:`DataSink` at a given frequency. This hook writes the **full** batch state — positions, velocities, forces, energy, and any other tensors present on the :class:`~nvalchemi.data.Batch` — to the configured sink every ``frequency`` steps. It is the primary mechanism for recording trajectories and creating restart checkpoints during dynamics runs. The hook delegates serialization entirely to the :class:`~nvalchemi.dynamics.sinks.DataSink` interface, meaning the same ``SnapshotHook`` instance works with any backend: * :class:`~nvalchemi.dynamics.sinks.GPUBuffer` — pre-allocated device memory for high-speed, in-simulation buffering. * :class:`~nvalchemi.dynamics.sinks.HostMemory` — CPU-resident list-of-:class:`AtomicData` storage, useful for staging before disk I/O. * :class:`~nvalchemi.dynamics.sinks.ZarrData` — persistent, Zarr-backed storage with CSR-style layout for variable-length graph data; supports local, in-memory, and remote (S3/GCS) stores. ``SnapshotHook`` fires at :attr:`~DynamicsStage.AFTER_STEP` — after all integrator updates, force clamping, and convergence checks have completed — guaranteeing that the snapshot reflects the fully resolved state for each recorded step. Parameters ---------- sink : DataSink The storage backend to write snapshots to. frequency : int, optional Write a snapshot every ``frequency`` steps. Default ``1`` (every step). Attributes ---------- sink : DataSink The storage backend. frequency : int Snapshot frequency in steps. stage : DynamicsStage Fixed to ``AFTER_STEP``. Examples -------- >>> from nvalchemi.dynamics.hooks import SnapshotHook >>> from nvalchemi.dynamics.sinks import HostMemory >>> sink = HostMemory(capacity=10_000) >>> hook = SnapshotHook(sink=sink, frequency=10) >>> dynamics = DemoDynamics(model=model, n_steps=1000, dt=0.5, hooks=[hook]) >>> dynamics.run(batch) # 100 snapshots written >>> trajectory = sink.read() Notes ----- * The hook does **not** clone the batch before writing. Whether data is copied depends on the sink implementation (e.g. :class:`HostMemory` moves to CPU; :class:`GPUBuffer` copies into pre-allocated slots). * For long simulations, prefer :class:`ZarrData` to avoid accumulating the full trajectory in memory. * When used inside a :class:`FusedStage`, the snapshot includes samples at all status codes in a single write. """
[docs] def __init__( self, sink: DataSink, frequency: int = 1, stage: Enum = DynamicsStage.AFTER_STEP, ) -> None: self.sink = sink self.stage = stage self.frequency = frequency
@torch.compiler.disable def _write_snapshot(self, batch: Batch) -> None: """Write the current batch state to the configured sink.""" self.sink.write(batch) def __call__(self, ctx: HookContext, stage: Enum) -> None: """Write the current batch state to the configured sink.""" self._write_snapshot(ctx.batch)
[docs] class ConvergedSnapshotHook: """Write only newly converged samples to a :class:`DataSink`. Fires at :attr:`~DynamicsStage.ON_CONVERGE` and uses the converged sample indices (available via ``dynamics._last_converged``) to build a boolean mask passed to :meth:`DataSink.write`. Only samples that just satisfied the convergence criterion are written — samples that converged on earlier steps are not re-written. This is the recommended hook for persisting optimized structures to Zarr in a :class:`FusedStage` pipeline, where multiple relaxations run concurrently and structures converge at different steps. Parameters ---------- sink : DataSink The storage backend to write converged samples to. :class:`~nvalchemi.dynamics.sinks.ZarrData` is the typical choice for persistent storage. frequency : int, optional Execute every ``frequency`` steps. Default ``1`` (check every step that convergence fires). stage : Enum, optional The stage at which to run this hook. Default is ``DynamicsStage.ON_CONVERGE``. Examples -------- >>> from nvalchemi.dynamics.hooks import ConvergedSnapshotHook >>> from nvalchemi.dynamics.sinks import ZarrData >>> sink = ZarrData(store="converged.zarr", capacity=100_000) >>> hook = ConvergedSnapshotHook(sink=sink) >>> dynamics.register_hook(hook) """
[docs] def __init__( self, sink: DataSink, frequency: int = 1, stage: Enum = DynamicsStage.ON_CONVERGE, ) -> None: self.sink = sink self.frequency = frequency self.stage = stage
# Neighbor data keys to strip before writing to the sink. # Neighbor tensors are ephemeral (rebuilt by NeighborListHook each step) # and their K-dimension can change between rebuilds due to adaptive # sizing, causing shape mismatches when the sink concatenates snapshots. _NEIGHBOR_KEYS = frozenset( { "neighbor_matrix", "num_neighbors", "neighbor_matrix_shifts", "neighbor_list", "neighbor_list_shifts", } ) @torch.compiler.disable def _write_converged(self, batch: Batch, mask: torch.Tensor | None) -> None: """Write converged samples to the configured sink. Neighbor data is stripped before writing because its K-dimension can vary between adaptive rebuilds, causing shape mismatches when the sink later concatenates snapshots into a single Batch. A sub-batch is created via :meth:`Batch.index_select` so the live ``batch`` object is never mutated — downstream hooks that read neighbor data after ``ON_CONVERGE`` see an intact batch. Parameters ---------- batch : Batch The current batch of atomic data. mask : torch.Tensor | None Boolean mask of converged samples, or None if none converged. """ if mask is None or not mask.any(): return # Build a sub-batch of only converged systems so we never # mutate the live batch object. indices = torch.nonzero(mask, as_tuple=True)[0] _ = batch.batch_ptr # trigger lazy init for SegmentedLevelStorage sub_batch = batch.index_select(indices) # Strip ephemeral neighbor data before writing to avoid # variable-width concatenation failures in the sink. for key in self._NEIGHBOR_KEYS: try: del sub_batch[key] except (KeyError, IndexError): pass self.sink.write(sub_batch) def __call__(self, ctx: HookContext, stage: Enum) -> None: """Write converged samples to the configured sink.""" self._write_converged(ctx.batch, ctx.converged_mask)