Buffers & Data Flow#

The dynamics framework uses a layered buffer architecture to manage data flow between the active simulation batch, inter-rank communication, and persistent storage. Understanding this architecture is essential for optimizing throughput in multi-GPU pipelines and debugging data routing issues.

The three buffer layers#

Layer

Class / Location

Storage

Purpose

Communication

send_buffer / recv_buffer on _CommunicationMixin

Pre-allocated Batch.empty()

Zero-copy inter-rank transfer via isend / irecv

Overflow sinks

DataSink (GPUBuffer, HostMemory, ZarrData)

Varies

Staging when active batch is full

Active batch

active_batch on _CommunicationMixin

Live Batch

The working set being integrated

Dataset/Sampler --> [Active Batch] --> step() --> convergence check
                         ^                             |
                    _recv_to_batch              _poststep_sync_buffers
                         ^                             |
                   [Recv Buffer]                [Send Buffer]
                         ^                             |
                   Batch.irecv                   Batch.isend
                         ^                             |
            --- prior rank --------------- next rank ---
                                                 |
                                          [Overflow Sinks]

Data flows from samplers or upstream ranks into the active batch, through the dynamics step, and out to downstream ranks or sinks.

Pre-allocated communication buffers#

Communication buffers are configured via BufferConfig:

from nvalchemi.dynamics import DemoDynamics
from nvalchemi.dynamics.base import BufferConfig

buffer_config = BufferConfig(
    num_systems=64,    # max graphs in buffer
    num_nodes=2000,    # max total atoms
    num_edges=10000,   # max total edges
)

dynamics = DemoDynamics(
    model=model,
    dt=1.0,
    buffer_config=buffer_config,
)

Lazy initialization. Buffers are created on the first step via _ensure_buffers(). The first concrete batch serves as a template for attribute keys, dtypes, and trailing shapes (e.g., hidden dimensions). This lazy approach is necessary because the attribute schema is not known until a real batch appears.

The buffer lifecycle. Communication buffers (send_buffer and recv_buffer) follow a Batch.empty()put()zero() cycle:

  1. Batch.empty() allocates storage with zero graphs but full capacity.

  2. Batch.put() copies selected graphs from a source batch using Warp GPU kernels.

  3. Batch.zero() resets occupancy while preserving allocated memory.

After put() extracts converged graphs into the send buffer, the active batch (the working simulation state) is rebuilt without those graphs via Batch.trim(), which returns a new Batch with tight storage — no padding, no trailing buffer slots.

``trim`` vs ``defrag``. defrag() compacts data to the front of a pre-allocated buffer in-place, preserving the physical capacity for further put / defrag cycles. trim() instead creates a brand-new batch whose tensors are sized to exactly fit the remaining graphs. Use defrag for reusable communication buffers; use trim for batches that will be consumed directly by a model or integrator and must have self-consistent tensor shapes.

Warning

Batch.put() uses Warp GPU kernels that only copy float32 attributes. Integer and other dtypes may need separate handling.

Note

Every stage in a DistributedPipeline must provide a BufferConfig, and adjacent stages must have identical values. Both constraints are validated during setup().

The communication protocol#

DistributedPipeline uses a five-phase step to coordinate data flow between ranks:

  1. _prestep_sync_buffers(): Zeros the send buffer and posts irecv from the prior rank (using a pre-computed template for correct buffer shapes). In sync mode the receive completes inline; in async modes the handle is stored for later completion.

  2. _complete_pending_recv(): Waits on the deferred receive, routes data through the recv buffer into the active batch (stripping buffer padding via to_data_list / from_data_list), and drains overflow sinks to backfill any available capacity.

  3. step(): The dynamics integration step (forward pass, pre_update, post_update, convergence check).

  4. _poststep_sync_buffers(): Extracts converged samples into the send buffer via _populate_send_buffer (subject to back-pressure). The send buffer is then unconditionally sent to the next rank. On the final rank, converged samples are extracted via _remove_converged_final_stage and written to sinks.

  5. _sync_done_flags(): All ranks synchronize done flags via dist.all_reduce(MAX); the loop terminates when every rank reports done.

Communication modes:

Mode

Behavior

"sync"

Blocking receive in _prestep_sync_buffers. Simplest and most debuggable. Good for small pipelines.

"async_recv"

Deferred receive: irecv is posted in _prestep_sync_buffers but wait() is called later in _complete_pending_recv. Allows compute to overlap with communication. Default mode.

"fully_async"

Both send and receive are deferred. Sends from the previous step are drained at the start of the next _prestep_sync_buffers. Maximum overlap, highest throughput.

Template sharing. During setup(), the pipeline computes recv templates for every stage via _share_templates(). Since all stages are available on every rank, this is done locally without inter-rank communication. Inflight (first) stages build their initial batch from the sampler; downstream stages derive templates from their upstream neighbour. Templates are used to pre-allocate correctly shaped irecv buffers.

Non-blocking receives. _BatchRecvHandle.wait() uses non-blocking dist.irecv for all message components (metadata, segment lengths, and bulk tensor data) and waits on all handles at the end. This enables better overlap with computation compared to the previous blocking dist.recv approach.

Deadlock prevention. When no samples converge, an empty send buffer is still sent so the downstream irecv completes. This ensures the pipeline does not stall waiting for data that will never arrive.

Back-pressure#

When the send_buffer capacity is limited:

  • Only min(converged_count, remaining_capacity) samples are extracted via _populate_send_buffer.

  • The active batch is replaced by a tight copy (via trim()) that excludes the graduated samples.

  • Excess converged samples that did not fit in the send buffer remain in the active batch.

  • step() treats them as no-ops: their positions and velocities are saved before the integrator and restored after (the active_mask logic in BaseDynamics.step()).

  • The send buffer is sent unconditionally every step for deadlock prevention, even when empty.

Data routing helpers#

The _CommunicationMixin provides several helper methods for routing data between buffers:

  • _recv_to_batch(incoming): Stages data through the recv buffer into the active batch via _buffer_to_batch, then zeros the recv buffer.

  • _buffer_to_batch(incoming): Routes incoming data into the active batch. All code paths reconstruct the batch via to_data_list() + from_data_list() to strip buffer padding and produce tight tensors. Three cases:

    1. No active batch exists: adopt the incoming data directly.

    2. Room available: append to the existing active batch.

    3. No room: overflow to sinks.

  • _batch_to_buffer(mask): Copies graduated samples from the active batch into the send buffer via put(), then replaces the active batch with a tight copy via trim() (or sets it to None if every graph was graduated).

  • _populate_send_buffer(converged_indices): Creates a boolean mask from converged indices and delegates to _batch_to_buffer. Does not send — the caller issues isend separately.

  • _remove_converged_final_stage(converged_indices): On the final stage, extracts converged graphs via index_select, removes them from the active batch, and writes them to sinks.

  • _manage_send_handle(handle): Stores the send handle for later draining (fully_async) or waits on it immediately (other modes).

  • _overflow_to_sinks(batch, mask): Writes to the first non-full sink in priority order.

  • _drain_sinks_to_batch(): Pulls samples from sinks back into the active batch when room is available.

Note

_buffer_to_batch uses to_data_list() + from_data_list() for combining batches. This is O(N) Python-level reconstruction. In high-throughput pipelines, this can be a bottleneck compared to the Warp-accelerated put / defrag path.

Sample lifecycle#

This section traces a sample’s journey through three representative workflows.

Standalone BaseDynamics.run()

Batch passed to run()
      |
      v
loop for n_steps:
      |
      v
  pre_update --> compute --> post_update --> convergence check
      |
      v
return batch

The simplest workflow: a batch is passed in, stepped for n_steps iterations, and returned.

FusedStage with inflight batching

1. sampler.build_initial_batch()
   creates batch with status=0, fmax=inf
      |
      v
2. Each step:
   compute() --> per-sub-stage masked_update based on batch.status
      |
      v
3. ConvergenceHook updates batch.status (e.g., 0 --> 1 --> 2)
      |
      v
4. Every refill_frequency steps: _refill_check()
   - identifies graduated samples (status >= exit_status)
   - writes them to sinks
   - extracts remaining via index_select
   - requests replacements from sampler
   - appends them, rebuilds status/fmax tensors
      |
      v
5. Terminates when sampler is exhausted and all graduated,
   or all samples reach exit_status

Samples migrate through sub-stages based on convergence, and graduated samples are continuously replaced from the sampler.

DistributedPipeline

Rank 0 (first, inflight):
  - builds batch from sampler
  - runs step
  - sends converged downstream via _poststep_sync_buffers
  - refills from sampler
      |
      v (isend/irecv)
Rank 1..N-1 (middle):
  - receives from prior rank via _prestep_sync_buffers
  - _complete_pending_recv routes data to active batch
  - runs step
  - sends converged downstream
      |
      v (isend/irecv)
Rank N (final):
  - receives from prior rank
  - runs step
  - writes converged to sinks

All ranks:
  - synchronize done flags via all_reduce(MAX)
  - loop terminates when all report done

Samples flow from the first rank through intermediate ranks to the final rank, where they are persisted to sinks.

Data sinks#

Three DataSink implementations are available:

Sinks are tried in priority order; the first non-full sink receives the data.