Source code for accvlab.multi_tensor_copier.async_copy

# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

from dataclasses import dataclass
from typing import Any

import torch
import numpy as np

from . import _ext


[docs] @dataclass class AsyncCopyHandle: """Handle to an in-progress asynchronous copy started by :func:`start_copy`. Use :meth:`ready` to poll for completion without blocking, or :meth:`get` to block until the result is available. The handle must be kept alive until the copy is consumed or goes out of scope; dropping it early triggers a synchronous wait in the destructor to ensure staging buffers are not freed while transfers are in flight. """ _h: Any
[docs] def ready(self) -> bool: """Return whether the copy has completed. Returns: ``True`` if the copy has completed, ``False`` otherwise. """ return bool(self._h.ready())
[docs] def get(self) -> list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor: """Block until the copy is done and return the result. The returned structure mirrors the input to :func:`start_copy`, with all tensors copied to the target device. Returns: The structure with the contained tensors copied to the target device (and numpy arrays replaced by PyTorch tensors). Raises: RuntimeError: If the copy fails """ return self._h.get()
[docs] def start_copy( data: list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor | np.ndarray, device: str | torch.device, *, use_pinned_staging: bool = True, pack_cpu_tensors: bool = True, min_packed_alignment_bytes: int = 16, max_packed_chunk_bytes: int = 32 * 1024 * 1024, use_background_thread: bool = True, ) -> AsyncCopyHandle: """Asynchronously copy tensors in a nested structure to ``device``. Traverses an arbitrarily nested combination of :class:`list`, :class:`tuple`, and :class:`dict` containers, copies every :class:`torch.Tensor` and :class:`numpy.ndarray` (automatically converted to PyTorch tensors) leaf to ``device``, and returns an :class:`AsyncCopyHandle` whose :meth:`~AsyncCopyHandle.get` method yields the copied structure. The output preserves container types and passes through non-tensor, non-container leaves (e.g. strings) unchanged. The primary optimization target is **CPU → GPU** transfers of many small tensors in non-pinned memory. Other copy directions (GPU → CPU, GPU → GPU, CPU → CPU) are supported and benefit from some optimizations (e.g. background-thread scheduling for all directions, parallel pinned staging for D2H), but are not the main focus. .. note:: The input tensors do not need to all be on the same device, copying tensors from different devices is supported. If some tensors are already on the target device, they will be re-used as is. .. important:: Packing of small tensors (see ``pack_cpu_tensors`` parameter below) is a major contribution to the overall performance optimization vs. using standard PyTorch ``.to()`` calls on the individual tensors. For this optimization to be applied, the input CPU tensors must be contiguous. .. warning:: The caller must not **free** or **modify in-place** any input tensors until the copy has completed (i.e. until :meth:`~AsyncCopyHandle.get` returns or :meth:`~AsyncCopyHandle.ready` returns ``True``). Because copies are submitted asynchronously — potentially on a background thread — input tensor memory may still be read by the GPU after this function returns. Violating this contract leads to undefined behavior (silent data corruption, stale reads, or CUDA errors). Important: Only :class:`list`, :class:`tuple`, and :class:`dict` are recognized as container types. Other container-like objects (e.g. custom classes, named tuples) are treated as opaque leaves and returned unchanged; any tensors nested inside them will **not** be copied. Args: data: The structure to copy. May be a single :class:`torch.Tensor` or :class:`numpy.ndarray`, or a nested :class:`list`/:class:`tuple`/:class:`dict` objects containing tensor/numpy arrays. device: Target PyTorch device (e.g. ``"cuda:0"``, ``"cpu"``). use_pinned_staging: When ``True``, allocate pinned (page-locked) host buffers as intermediate staging for CPU → CUDA and CUDA → CPU transfers. For H2D this enables ``non_blocking`` copies; for D2H the pinned buffer **is** the returned output tensor. Has no effect on CPU → CPU or GPU → GPU copies. pack_cpu_tensors: When ``True``, pack multiple small contiguous CPU tensors (≤ 256 KB each, mixed dtypes supported) into one or more staging buffers (each at most ``max_packed_chunk_bytes``) and issue one H2D transfer per chunk instead of per tensor. Only applies to CPU → CUDA copies. min_packed_alignment_bytes: Minimum byte-alignment of each tensor's start offset within the packed buffer. The effective alignment for each tensor which participates in the packing is ``max(min_packed_alignment_bytes, tensor.element_size())``. max_packed_chunk_bytes: Maximum payload size in bytes of each packed staging chunk (tensor data plus inter-tensor alignment padding; the actual allocation may be slightly larger to satisfy buffer-start alignment). When the total packed data exceeds this limit, multiple packed chunks are allocated and transferred. Defaults to 32 MB. use_background_thread: When ``True``, the copy orchestration (buffer allocation, staging, and CUDA copy submission) runs on a C++ background thread (from a shared pool) so that this function returns before the copies complete. Note that CPU staging is done parallelly regardless of this setting. Benefits all copy directions. Returns: Handle to the in-progress copy. Call :meth:`~AsyncCopyHandle.get` to block until completion and retrieve the result, or :meth:`~AsyncCopyHandle.ready` to poll without blocking. Raises: RuntimeError: If the copy fails (propagated on :meth:`~AsyncCopyHandle.get`). Examples: Copy a nested structure of tensors to the GPU:: data = [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6])] handle = start_copy(data, "cuda:0") # ... do other work ... result = handle.get() # [tensor([1,2,3], device='cuda:0'), ...] Convert numpy arrays to CPU tensors (makes use of the fact that numpy arrays can be used as inputs and benefits from background-thread scheduling):: data = [np.array([1, 2, 3]), np.array([4, 5, 6])] handle = start_copy(data, "cpu") result = handle.get() # [tensor([1, 2, 3]), tensor([4, 5, 6])] """ dev = torch.device(device) h = _ext.start_copy( data, str(dev), bool(use_pinned_staging), bool(use_background_thread), bool(pack_cpu_tensors), int(min_packed_alignment_bytes), int(max_packed_chunk_bytes), ) handle = AsyncCopyHandle(h) return handle