Buffers & Data Flow#
The dynamics framework uses a layered buffer architecture to manage data flow between the active simulation batch, inter-rank communication, and persistent storage. Understanding this architecture is essential for optimizing throughput in multi-GPU pipelines and debugging data routing issues.
The three buffer layers#
Layer |
Class / Location |
Storage |
Purpose |
|---|---|---|---|
Communication |
|
Pre-allocated |
Zero-copy inter-rank transfer via |
Overflow sinks |
|
Varies |
Staging when active batch is full |
Active batch |
|
Live |
The working set being integrated |
Dataset/Sampler --> [Active Batch] --> step() --> convergence check
^ |
_recv_to_batch _poststep_sync_buffers
^ |
[Recv Buffer] [Send Buffer]
^ |
Batch.irecv Batch.isend
^ |
--- prior rank --------------- next rank ---
|
[Overflow Sinks]
Data flows from samplers or upstream ranks into the active batch, through the dynamics step, and out to downstream ranks or sinks.
Pre-allocated communication buffers#
Communication buffers are configured via
BufferConfig:
from nvalchemi.dynamics import DemoDynamics
from nvalchemi.dynamics.base import BufferConfig
buffer_config = BufferConfig(
num_systems=64, # max graphs in buffer
num_nodes=2000, # max total atoms
num_edges=10000, # max total edges
)
dynamics = DemoDynamics(
model=model,
dt=1.0,
buffer_config=buffer_config,
)
Lazy initialization. Buffers are created on the first step via
_ensure_buffers(). The first concrete batch serves as a template
for attribute keys, dtypes, and trailing shapes (e.g., hidden
dimensions). This lazy approach is necessary because the attribute
schema is not known until a real batch appears.
The buffer lifecycle. Communication buffers (send_buffer and
recv_buffer) follow a Batch.empty() → put() → zero()
cycle:
Batch.empty()allocates storage with zero graphs but full capacity.Batch.put()copies selected graphs from a source batch using Warp GPU kernels.Batch.zero()resets occupancy while preserving allocated memory.
After put() extracts converged graphs into the send buffer, the
active batch (the working simulation state) is rebuilt without those
graphs via Batch.trim(), which
returns a new Batch with tight storage — no
padding, no trailing buffer slots.
``trim`` vs ``defrag``. defrag()
compacts data to the front of a pre-allocated buffer in-place,
preserving the physical capacity for further put / defrag
cycles. trim() instead creates a
brand-new batch whose tensors are sized to exactly fit the remaining
graphs. Use defrag for reusable communication buffers; use
trim for batches that will be consumed directly by a model or
integrator and must have self-consistent tensor shapes.
Warning
Batch.put() uses Warp GPU kernels
that only copy float32 attributes. Integer and other dtypes may
need separate handling.
Note
Every stage in a DistributedPipeline
must provide a BufferConfig, and adjacent stages must have
identical values. Both constraints are validated during
setup().
The communication protocol#
DistributedPipeline uses a five-phase step
to coordinate data flow between ranks:
_prestep_sync_buffers(): Zeros the send buffer and posts
irecvfrom the prior rank (using a pre-computed template for correct buffer shapes). In sync mode the receive completes inline; in async modes the handle is stored for later completion._complete_pending_recv(): Waits on the deferred receive, routes data through the recv buffer into the active batch (stripping buffer padding via
to_data_list/from_data_list), and drains overflow sinks to backfill any available capacity.step(): The dynamics integration step (forward pass, pre_update, post_update, convergence check).
_poststep_sync_buffers(): Extracts converged samples into the send buffer via
_populate_send_buffer(subject to back-pressure). The send buffer is then unconditionally sent to the next rank. On the final rank, converged samples are extracted via_remove_converged_final_stageand written to sinks._sync_done_flags(): All ranks synchronize
doneflags viadist.all_reduce(MAX); the loop terminates when every rank reports done.
Communication modes:
Mode |
Behavior |
|---|---|
|
Blocking receive in |
|
Deferred receive: |
|
Both send and receive are deferred. Sends from the previous step
are drained at the start of the next |
Template sharing. During setup(), the pipeline computes recv
templates for every stage via _share_templates(). Since all stages
are available on every rank, this is done locally without inter-rank
communication. Inflight (first) stages build their initial batch from
the sampler; downstream stages derive templates from their upstream
neighbour. Templates are used to pre-allocate correctly shaped
irecv buffers.
Non-blocking receives. _BatchRecvHandle.wait() uses
non-blocking dist.irecv for all message components (metadata,
segment lengths, and bulk tensor data) and waits on all handles at the
end. This enables better overlap with computation compared to the
previous blocking dist.recv approach.
Deadlock prevention. When no samples converge, an empty send buffer
is still sent so the downstream irecv completes. This ensures the
pipeline does not stall waiting for data that will never arrive.
Back-pressure#
When the send_buffer capacity is limited:
Only
min(converged_count, remaining_capacity)samples are extracted via_populate_send_buffer.The active batch is replaced by a tight copy (via
trim()) that excludes the graduated samples.Excess converged samples that did not fit in the send buffer remain in the active batch.
step()treats them as no-ops: their positions and velocities are saved before the integrator and restored after (theactive_masklogic inBaseDynamics.step()).The send buffer is sent unconditionally every step for deadlock prevention, even when empty.
Data routing helpers#
The _CommunicationMixin provides
several helper methods for routing data between buffers:
_recv_to_batch(incoming): Stages data through the recv buffer into the active batch via
_buffer_to_batch, then zeros the recv buffer._buffer_to_batch(incoming): Routes incoming data into the active batch. All code paths reconstruct the batch via
to_data_list()+from_data_list()to strip buffer padding and produce tight tensors. Three cases:No active batch exists: adopt the incoming data directly.
Room available: append to the existing active batch.
No room: overflow to sinks.
_batch_to_buffer(mask): Copies graduated samples from the active batch into the send buffer via
put(), then replaces the active batch with a tight copy viatrim()(or sets it toNoneif every graph was graduated)._populate_send_buffer(converged_indices): Creates a boolean mask from converged indices and delegates to
_batch_to_buffer. Does not send — the caller issuesisendseparately._remove_converged_final_stage(converged_indices): On the final stage, extracts converged graphs via
index_select, removes them from the active batch, and writes them to sinks._manage_send_handle(handle): Stores the send handle for later draining (
fully_async) or waits on it immediately (other modes)._overflow_to_sinks(batch, mask): Writes to the first non-full sink in priority order.
_drain_sinks_to_batch(): Pulls samples from sinks back into the active batch when room is available.
Note
_buffer_to_batch uses to_data_list() + from_data_list() for combining batches. This
is O(N) Python-level reconstruction. In high-throughput pipelines,
this can be a bottleneck compared to the Warp-accelerated
put / defrag path.
Sample lifecycle#
This section traces a sample’s journey through three representative workflows.
Standalone BaseDynamics.run()
Batch passed to run()
|
v
loop for n_steps:
|
v
pre_update --> compute --> post_update --> convergence check
|
v
return batch
The simplest workflow: a batch is passed in, stepped for n_steps
iterations, and returned.
FusedStage with inflight batching
1. sampler.build_initial_batch()
creates batch with status=0, fmax=inf
|
v
2. Each step:
compute() --> per-sub-stage masked_update based on batch.status
|
v
3. ConvergenceHook updates batch.status (e.g., 0 --> 1 --> 2)
|
v
4. Every refill_frequency steps: _refill_check()
- identifies graduated samples (status >= exit_status)
- writes them to sinks
- extracts remaining via index_select
- requests replacements from sampler
- appends them, rebuilds status/fmax tensors
|
v
5. Terminates when sampler is exhausted and all graduated,
or all samples reach exit_status
Samples migrate through sub-stages based on convergence, and graduated samples are continuously replaced from the sampler.
DistributedPipeline
Rank 0 (first, inflight):
- builds batch from sampler
- runs step
- sends converged downstream via _poststep_sync_buffers
- refills from sampler
|
v (isend/irecv)
Rank 1..N-1 (middle):
- receives from prior rank via _prestep_sync_buffers
- _complete_pending_recv routes data to active batch
- runs step
- sends converged downstream
|
v (isend/irecv)
Rank N (final):
- receives from prior rank
- runs step
- writes converged to sinks
All ranks:
- synchronize done flags via all_reduce(MAX)
- loop terminates when all report done
Samples flow from the first rank through intermediate ranks to the final rank, where they are persisted to sinks.
Data sinks#
Three DataSink implementations are
available:
GPUBuffer: Pre-allocates on first write. UsesBatch.put()internally. Has a known performance limitation:read()falls back toto_data_list()instead ofindex_select()due to Warp int32/int64 dtype incompatibility.HostMemory: CPU-resident, decomposes batches intoAtomicDatalists.ZarrData: Disk-backed, delegates toAtomicDataZarrWriter.
Sinks are tried in priority order; the first non-full sink receives the data.