Coverage for cuda / core / _memory / _ipc.pyx: 57.38%
122 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:07 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5cimport cpython
7from cuda.bindings cimport cydriver
8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
9from cuda.core._memory._memory_pool cimport _MemPool
10from cuda.core._stream cimport Stream
11from cuda.core._resource_handles cimport (
12 DevicePtrHandle,
13 create_mempool_handle_ipc,
14 deviceptr_import_ipc,
15 get_last_error,
16 as_cu,
17)
19from cuda.core._stream cimport default_stream
20from cuda.core._utils.cuda_utils cimport HANDLE_RETURN
21from cuda.core._utils.cuda_utils import check_multiprocessing_start_method
23import multiprocessing
24import os
25import platform
26import uuid
27import weakref
29__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle']
32cdef object registry = weakref.WeakValueDictionary()
35cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \
36 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \
37 if platform.system() == "Linux" else \
38 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
40cdef is_supported():
41 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
44cdef class IPCDataForBuffer:
45 """Data members related to sharing memory buffers via IPC."""
46 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
47 self._ipc_descriptor = ipc_descriptor 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
48 self._is_mapped = is_mapped 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
50 @property
51 def ipc_descriptor(self):
52 return self._ipc_descriptor 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
54 @property
55 def is_mapped(self):
56 return self._is_mapped 1rstuvwxyzABCDEklFGHIpq
59cdef class IPCDataForMR:
60 """Data members related to sharing memory resources via IPC."""
61 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
62 self._alloc_handle = alloc_handle 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
63 self._is_mapped = is_mapped 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
65 @property
66 def alloc_handle(self):
67 return self._alloc_handle
69 @property
70 def is_mapped(self):
71 return self._is_mapped
73 @property
74 def uuid(self):
75 return getattr(self._alloc_handle, 'uuid', None) 1abpqcd
78cdef class IPCBufferDescriptor:
79 """Serializable object describing a buffer that can be shared between processes."""
81 def __init__(self, *arg, **kwargs):
82 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.")
84 @staticmethod
85 def _init(reserved: bytes, size: int):
86 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{
87 self._payload = reserved 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{
88 self._size = size 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{
89 return self 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{
91 def __reduce__(self):
92 return IPCBufferDescriptor._init, (self._payload, self._size) 1abJKefrstuvwxyzABCDEklFGHIpqghijcd
94 @property
95 def size(self):
96 return self._size 1m
98 cdef const void* payload_ptr(self) noexcept:
99 """Return the payload as a const void* for C API calls."""
100 return <const void*><const char*>(self._payload)
103cdef class IPCAllocationHandle:
104 """Shareable handle to an IPC-enabled device memory pool."""
106 def __init__(self, *arg, **kwargs):
107 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.")
109 @classmethod
110 def _init(cls, handle: int, uuid): # no-cython-lint
111 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
112 assert handle >= 0 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
113 self._handle = handle 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
114 self._uuid = uuid 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
115 return self 1n45a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
117 cpdef close(self):
118 """Close the handle."""
119 if self._handle >= 0: 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23
120 try: 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23
121 os.close(self._handle) 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23
122 finally:
123 self._handle = -1 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23
125 def __dealloc__(self):
126 self.close() 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23
128 def __int__(self) -> int:
129 if self._handle < 0:
130 raise ValueError(
131 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed."
132 )
133 return self._handle
135 @property
136 def handle(self) -> int:
137 return self._handle 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o
139 @property
140 def uuid(self) -> uuid.UUID:
141 return self._uuid 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o
144def _reduce_allocation_handle(alloc_handle):
145 check_multiprocessing_start_method() 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o
146 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o
147 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o
150def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint
151 return cls._init(df.detach(), uuid)
154multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle)
157# Buffer IPC Implementation
158# -------------------------
159cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
160 if not self.memory_resource.is_ipc_enabled: 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{
161 raise RuntimeError("Memory resource is not IPC-enabled") 1`{
162 cdef cydriver.CUmemPoolPtrExportData data
163 with nogil: 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
164 HANDLE_RETURN( 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
165 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
166 )
167 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
168 <char*>(data.reserved), sizeof(data.reserved)
169 )
170 return IPCBufferDescriptor._init(data_b, self.size) 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm
172cdef Buffer Buffer_from_ipc_descriptor(
173 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream
174):
175 """Import a buffer that was exported from another process."""
176 if not mr.is_ipc_enabled: 1`{
177 raise RuntimeError("Memory resource is not IPC-enabled") 1`{
178 if stream is None:
179 # Note: match this behavior to _MemPool.allocate()
180 stream = default_stream()
181 cdef Stream s = <Stream>stream
182 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc(
183 mr._h_pool,
184 ipc_descriptor.payload_ptr(),
185 s._h_stream
186 )
187 if not h_ptr:
188 HANDLE_RETURN(get_last_error())
189 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor)
192# _MemPool IPC Implementation
193# ---------------------------
195cdef _MemPool MP_from_allocation_handle(cls, alloc_handle):
196 # Quick exit for registry hits.
197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint
198 mr = registry.get(uuid)
199 if mr is not None:
200 return mr
202 # Ensure we have an allocation handle. Duplicate the file descriptor, if
203 # necessary.
204 if isinstance(alloc_handle, int):
205 fd = os.dup(alloc_handle)
206 try:
207 alloc_handle = IPCAllocationHandle._init(fd, None)
208 except:
209 os.close(fd)
210 raise
212 # Construct a new mempool.
213 cdef _MemPool self = <_MemPool>(cls.__new__(cls))
214 self._mempool_owned = True
215 cdef int ipc_fd = int(alloc_handle)
216 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE)
217 if not self._h_pool:
218 raise RuntimeError("Failed to import memory pool from IPC handle")
219 self._ipc_data = IPCDataForMR(alloc_handle, True)
221 # Register it.
222 if uuid is not None:
223 registered = self.register(uuid)
224 assert registered is self
226 return self
229cdef _MemPool MP_from_registry(uuid):
230 try:
231 return registry[uuid]
232 except KeyError:
233 raise RuntimeError(f"Memory resource {uuid} was not found") from None
236cdef _MemPool MP_register(_MemPool self, uuid):
237 existing = registry.get(uuid)
238 if existing is not None:
239 return existing
240 assert self.uuid is None or self.uuid == uuid
241 registry[uuid] = self
242 self._ipc_data._alloc_handle._uuid = uuid
243 return self
246cdef IPCAllocationHandle MP_export_mempool(_MemPool self):
247 # Note: This is Linux only (int for file descriptor)
248 cdef int fd
249 with nogil: 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
250 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
251 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0)
252 )
253 try: 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
254 return IPCAllocationHandle._init(fd, uuid.uuid4()) 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23
255 except:
256 os.close(fd)
257 raise