Coverage for cuda / core / experimental / _memory / _ipc.pyx: 56%
128 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-10 01:19 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-10 01:19 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5cimport cpython
6from libc.stdint cimport uintptr_t
7from libc.string cimport memcpy
9from cuda.bindings cimport cydriver
10from cuda.core.experimental._memory._buffer cimport Buffer
11from cuda.core.experimental._stream cimport default_stream
12from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN
13from cuda.core.experimental._utils.cuda_utils import check_multiprocessing_start_method
15import multiprocessing
16import os
17import platform
18import uuid
19import weakref
21__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle']
24cdef object registry = weakref.WeakValueDictionary()
27cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \
28 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \
29 if platform.system() == "Linux" else \
30 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
32cdef is_supported():
33 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
36cdef class IPCDataForBuffer:
37 """Data members related to sharing memory buffers via IPC."""
38 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
39 self._ipc_descriptor = ipc_descriptor
40 self._is_mapped = is_mapped
42 @property
43 def ipc_descriptor(self):
44 return self._ipc_descriptor
46 @property
47 def is_mapped(self):
48 return self._is_mapped
51cdef class IPCDataForMR:
52 """Data members related to sharing memory resources via IPC."""
53 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
54 self._alloc_handle = alloc_handle
55 self._is_mapped = is_mapped
57 @property
58 def alloc_handle(self):
59 return self._alloc_handle
61 @property
62 def is_mapped(self):
63 return self._is_mapped
65 @property
66 def uuid(self):
67 return getattr(self._alloc_handle, 'uuid', None)
70cdef class IPCBufferDescriptor:
71 """Serializable object describing a buffer that can be shared between processes."""
73 def __init__(self, *arg, **kwargs):
74 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.")
76 @classmethod
77 def _init(cls, reserved: bytes, size: int):
78 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(cls)
79 self._payload = reserved
80 self._size = size
81 return self
83 def __reduce__(self):
84 return self._init, (self._payload, self._size)
86 @property
87 def size(self):
88 return self._size
91cdef class IPCAllocationHandle:
92 """Shareable handle to an IPC-enabled device memory pool."""
94 def __init__(self, *arg, **kwargs):
95 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.")
97 @classmethod
98 def _init(cls, handle: int, uuid): # no-cython-lint
99 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls)
100 assert handle >= 0
101 self._handle = handle
102 self._uuid = uuid
103 return self
105 cpdef close(self):
106 """Close the handle."""
107 if self._handle >= 0:
108 try:
109 os.close(self._handle)
110 finally:
111 self._handle = -1
113 def __dealloc__(self):
114 self.close()
116 def __int__(self) -> int:
117 if self._handle < 0:
118 raise ValueError(
119 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed."
120 )
121 return self._handle
123 @property
124 def handle(self) -> int:
125 return self._handle
127 @property
128 def uuid(self) -> uuid.UUID:
129 return self._uuid
132def _reduce_allocation_handle(alloc_handle):
133 check_multiprocessing_start_method()
134 df = multiprocessing.reduction.DupFd(alloc_handle.handle)
135 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid)
138def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint
139 return cls._init(df.detach(), uuid)
142multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle)
145def _deep_reduce_device_memory_resource(mr):
146 check_multiprocessing_start_method()
147 from .._device import Device
148 device = Device(mr.device_id)
149 alloc_handle = mr.get_allocation_handle()
150 return mr.from_allocation_handle, (device, alloc_handle)
153multiprocessing.reduction.register(DeviceMemoryResource, _deep_reduce_device_memory_resource)
156# Buffer IPC Implementation
157# -------------------------
158cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
159 if not self.memory_resource.is_ipc_enabled:
160 raise RuntimeError("Memory resource is not IPC-enabled")
161 cdef cydriver.CUmemPoolPtrExportData data
162 with nogil:
163 HANDLE_RETURN(
164 cydriver.cuMemPoolExportPointer(&data, <cydriver.CUdeviceptr>(self._ptr))
165 )
166 cdef bytes data_b = cpython.PyBytes_FromStringAndSize(
167 <char*>(data.reserved), sizeof(data.reserved)
168 )
169 return IPCBufferDescriptor._init(data_b, self.size)
171cdef Buffer Buffer_from_ipc_descriptor(
172 cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_descriptor, stream
173):
174 """Import a buffer that was exported from another process."""
175 if not mr.is_ipc_enabled:
176 raise RuntimeError("Memory resource is not IPC-enabled")
177 if stream is None:
178 # Note: match this behavior to DeviceMemoryResource.allocate()
179 stream = default_stream()
180 cdef cydriver.CUmemPoolPtrExportData data
181 memcpy(
182 data.reserved,
183 <const void*><const char*>(ipc_descriptor._payload),
184 sizeof(data.reserved)
185 )
186 cdef cydriver.CUdeviceptr ptr
187 with nogil:
188 HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._handle, &data))
189 return Buffer._init(<uintptr_t>ptr, ipc_descriptor.size, mr, stream, ipc_descriptor)
192# DeviceMemoryResource IPC Implementation
193# ---------------------------------------
195cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handle):
196 # Quick exit for registry hits.
197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint
198 mr = registry.get(uuid)
199 if mr is not None:
200 return mr
202 # Ensure we have an allocation handle. Duplicate the file descriptor, if
203 # necessary.
204 if isinstance(alloc_handle, int):
205 fd = os.dup(alloc_handle)
206 try:
207 alloc_handle = IPCAllocationHandle._init(fd, None)
208 except:
209 os.close(fd)
210 raise
212 # Construct a new DMR.
213 cdef DeviceMemoryResource self = DeviceMemoryResource.__new__(cls)
214 from .._device import Device
215 self._dev_id = Device(device_id).device_id
216 self._mempool_owned = True
217 self._ipc_data = IPCDataForMR(alloc_handle, True)
219 # Map the mempool into this process.
220 cdef int handle = int(alloc_handle)
221 with nogil:
222 HANDLE_RETURN(cydriver.cuMemPoolImportFromShareableHandle(
223 &(self._handle), <void*><uintptr_t>(handle), IPC_HANDLE_TYPE, 0)
224 )
226 # Register it.
227 if uuid is not None:
228 registered = self.register(uuid)
229 assert registered is self
231 return self
234cdef DeviceMemoryResource DMR_from_registry(uuid):
235 try:
236 return registry[uuid]
237 except KeyError:
238 raise RuntimeError(f"Memory resource {uuid} was not found") from None
241cdef DeviceMemoryResource DMR_register(DeviceMemoryResource self, uuid):
242 existing = registry.get(uuid)
243 if existing is not None:
244 return existing
245 assert self.uuid is None or self.uuid == uuid
246 registry[uuid] = self
247 self._ipc_data._alloc_handle._uuid = uuid
248 return self
251cdef IPCAllocationHandle DMR_export_mempool(DeviceMemoryResource self):
252 # Note: This is Linux only (int for file descriptor)
253 cdef int fd
254 with nogil:
255 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle(
256 &fd, self._handle, IPC_HANDLE_TYPE, 0)
257 )
258 try:
259 return IPCAllocationHandle._init(fd, uuid.uuid4())
260 except:
261 os.close(fd)
262 raise