# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Demo dynamics implementations for testing and debugging.
This module provides ``DemoDynamics`` — a concrete, minimal implementation
of ``BaseDynamics`` for testing and debugging, analogous to how
``nvalchemi.models.demo`` provides ``DemoModelWrapper`` for models.
``DemoDynamics`` implements a standard Velocity Verlet integrator, which
is a symplectic, time-reversible algorithm commonly used in molecular
dynamics. This serves as a simple example of how to implement an integrator
by overriding the ``pre_update`` and ``post_update`` methods. Inter-rank
communication capabilities are inherited from ``BaseDynamics``.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import torch
from nvalchemi._typing import Forces, NodePositions, NodeVelocities
from nvalchemi.data import Batch
from nvalchemi.dynamics.base import BaseDynamics, ConvergenceHook
if TYPE_CHECKING:
from nvalchemi.dynamics.base import Hook
from nvalchemi.models.base import BaseModelMixin
__all__ = ["DemoDynamics"]
[docs]
class DemoDynamics(BaseDynamics):
"""
Velocity Verlet integrator for molecular dynamics simulations.
Implements the standard Velocity Verlet algorithm, a symplectic,
time-reversible integration scheme commonly used in molecular dynamics.
The algorithm splits the integration into two half-steps:
1. ``pre_update``: Update positions using current velocities and accelerations
x(t+dt) = x(t) + v(t)*dt + 0.5*a(t)*dt^2
2. ``post_update``: Update velocities using averaged accelerations
v(t+dt) = v(t) + 0.5*(a(t) + a(t+dt))*dt
On the first step when previous accelerations are unavailable, falls back
to Euler integration for velocities: v(t+dt) = v(t) + a(t+dt)*dt.
Inter-rank communication capabilities for use in pipeline workflows are
inherited from ``BaseDynamics``. Communication attributes include
``prior_rank``, ``next_rank``, ``sinks``, ``active_batch``,
``max_batch_size``, and ``done``.
This class is intended **entirely** for testing and debugging, demonstrating
how to implement an integrator by overriding ``pre_update`` and ``post_update``.
Do **NOT** use this class for production.
Attributes
----------
__needs_keys__ : set[str]
Set of output keys required from the model. Set to ``{"forces"}``.
__provides_keys__ : set[str]
Set of keys this dynamics produces beyond model outputs.
Set to ``{"velocities", "positions"}``.
model : BaseModelMixin
The neural network potential model.
dt : float
The integration timestep.
step_count : int
The current step number.
hooks : dict[HookStageEnum, list[Hook]]
Registered hooks organized by stage.
_prev_accelerations : torch.Tensor | None
Cached accelerations from the previous step for the velocity
half-step. ``None`` on the first step.
prior_rank : int | None
Rank of the previous pipeline stage (inherited from ``BaseDynamics``).
next_rank : int | None
Rank of the next pipeline stage (inherited from ``BaseDynamics``).
Examples
--------
>>> model = DemoModelWrapper()
>>> dynamics = DemoDynamics(model, dt=0.5, n_steps=100)
>>> dynamics.run(batch)
>>> # DistributedPipeline composition:
>>> pipeline = dynamics | other_dynamics
"""
__needs_keys__: set[str] = {"forces"}
__provides_keys__: set[str] = {"velocities", "positions"}
_prev_accelerations: torch.Tensor | None
[docs]
def __init__(
self,
model: BaseModelMixin,
n_steps: int,
dt: float = 1.0,
hooks: list[Hook] | None = None,
convergence_hook: ConvergenceHook | dict | None = None,
**kwargs: Any,
) -> None:
"""
Initialize the Velocity Verlet integrator.
Parameters
----------
model : BaseModelMixin
The neural network potential model.
n_steps : int
Total number of simulation steps for ``run()``.
dt : float, optional
The integration timestep in femtoseconds. Default is 1.0.
hooks : list[Hook] | None, optional
Initial list of hooks to register. Each hook will be
organized by its ``stage`` attribute.
convergence_hook : ConvergenceHook | dict | None, optional
Convergence hook configuration. Accepts a ``ConvergenceHook``
instance, a dict (unpacked as ``ConvergenceHook(**dict)``),
or ``None`` for no convergence checking. Default is ``None``.
**kwargs : Any
Additional keyword arguments forwarded to the next class
in the MRO (for cooperative multiple inheritance).
"""
super().__init__(
model=model,
hooks=hooks,
convergence_hook=convergence_hook,
n_steps=n_steps,
**kwargs,
)
self.dt = dt
self._prev_accelerations = None
def pre_update(self, batch: Batch) -> None:
"""
Perform the position update (first half of Velocity Verlet).
Updates positions according to:
x(t+dt) = x(t) + v(t)*dt + 0.5*a(t)*dt^2
where a(t) = F(t) / m.
If forces are not yet computed (first call before any ``compute``),
this method falls back to a simple Euler position update:
x(t+dt) = x(t) + v(t)*dt.
The update is performed inside a ``torch.no_grad()`` context to
avoid conflicts with autograd when ``forces_via_autograd=True``
(which causes ``compute()`` to set ``requires_grad_(True)`` on
positions).
Parameters
----------
batch : Batch
The current batch of atomic data. Positions are modified in-place.
"""
positions: NodePositions = batch.positions
velocities: NodeVelocities = batch.velocities
forces: Forces | None = batch.forces
masses = batch.atomic_masses.unsqueeze(-1) # (V,) -> (V, 1) for broadcasting
dt = self.dt
with torch.no_grad():
if forces is not None and not torch.all(forces == 0):
# Compute current accelerations: a = F / m
accelerations = forces / masses
# Store for velocity half-step in post_update
self._prev_accelerations = accelerations.clone()
# x(t+dt) = x(t) + v(t)*dt + 0.5*a(t)*dt^2
positions.add_(velocities * dt + 0.5 * accelerations * dt * dt)
else:
# No forces computed yet — simple Euler position update
# x(t+dt) = x(t) + v(t)*dt
positions.add_(velocities * dt)
def post_update(self, batch: Batch) -> None:
"""
Perform the velocity update (second half of Velocity Verlet).
Updates velocities according to:
v(t+dt) = v(t) + 0.5*(a(t) + a(t+dt))*dt
where a(t+dt) = F(t+dt) / m are the forces computed after the
position update.
If previous accelerations are unavailable (first step), falls back
to Euler velocity update: v(t+dt) = v(t) + a(t+dt)*dt.
The update is performed inside a ``torch.no_grad()`` context to
avoid conflicts with autograd when ``forces_via_autograd=True``.
Parameters
----------
batch : Batch
The current batch of atomic data. Velocities are modified in-place.
"""
velocities: NodeVelocities = batch.velocities
forces: Forces = batch.forces
masses = batch.atomic_masses.unsqueeze(-1) # (V,) -> (V, 1) for broadcasting
dt = self.dt
with torch.no_grad():
# Compute new accelerations from updated forces: a(t+dt) = F(t+dt) / m
new_accelerations = forces / masses
if self._prev_accelerations is not None:
# Full Velocity Verlet: v(t+dt) = v(t) + 0.5*(a(t) + a(t+dt))*dt
velocities.add_(
0.5 * (self._prev_accelerations + new_accelerations) * dt,
)
else:
# First step fallback — Euler: v(t+dt) = v(t) + a(t+dt)*dt
velocities.add_(new_accelerations * dt)