Coverage for cuda / core / _memory / _ipc.pyx: 51.64%
122 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 01:27 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 01:27 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5cimport cpython
7from cuda.bindings cimport cydriver
8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
9from cuda.core._memory._memory_pool cimport _MemPool
10from cuda.core._stream cimport Stream
11from cuda.core._resource_handles cimport (
12 DevicePtrHandle,
13 create_fd_handle,
14 create_mempool_handle_ipc,
15 deviceptr_import_ipc,
16 get_last_error,
17 as_cu,
18 as_intptr,
19 as_py,
20)
22from cuda.core._stream cimport default_stream
23from cuda.core._utils.cuda_utils cimport HANDLE_RETURN
24from cuda.core._utils.cuda_utils import check_multiprocessing_start_method
26import multiprocessing
27import os
28import platform
29import uuid
30import weakref
32__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle']
35cdef object registry = weakref.WeakValueDictionary()
38cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \
39 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \
40 if platform.system() == "Linux" else \
41 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
43cdef is_supported():
44 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
47cdef class IPCDataForBuffer:
48 """Data members related to sharing memory buffers via IPC."""
49 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
50 self._ipc_descriptor = ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
51 self._is_mapped = is_mapped 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
53 @property
54 def ipc_descriptor(self):
55 return self._ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
57 @property
58 def is_mapped(self):
59 return self._is_mapped 1stuvwxyzABCDEFcdGHIJqr
62cdef class IPCDataForMR:
63 """Data members related to sharing memory resources via IPC."""
64 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
65 self._alloc_handle = alloc_handle 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
66 self._is_mapped = is_mapped 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
68 @property
69 def alloc_handle(self):
70 return self._alloc_handle
72 @property
73 def is_mapped(self):
74 return self._is_mapped
76 @property
77 def uuid(self):
78 return getattr(self._alloc_handle, 'uuid', None) 1abqref
81cdef class IPCBufferDescriptor:
82 """Serializable object describing a buffer that can be shared between processes."""
84 def __init__(self, *arg, **kwargs):
85 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.")
87 @staticmethod
88 def _init(reserved: bytes, size: int):
89 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
90 self._payload = reserved 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
91 self._size = size 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
92 return self 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
94 def __reduce__(self):
95 return IPCBufferDescriptor._init, (self._payload, self._size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefop
97 @property
98 def size(self):
99 return self._size 1n
101 cdef const void* payload_ptr(self) noexcept:
102 """Return the payload as a const void* for C API calls."""
103 return <const void*><const char*>(self._payload)
106cdef class IPCAllocationHandle:
107 """Shareable handle to an IPC-enabled device memory pool."""
109 def __init__(self, *arg, **kwargs):
110 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.")
112 @classmethod
113 def _init(cls, handle: int, uuid): # no-cython-lint
114 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
115 assert handle >= 0 2m Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
116 self._h_fd = create_fd_handle(handle) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
117 self._uuid = uuid 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
118 return self 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
120 cpdef close(self):
121 """Close the handle."""
122 self._h_fd.reset()
124 def __int__(self) -> int:
125 if not self._h_fd or as_intptr(self._h_fd) < 0:
126 raise ValueError(
127 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed."
128 )
129 return as_py(self._h_fd)
131 @property
132 def handle(self) -> int:
133 return as_py(self._h_fd) 2m NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
135 @property
136 def uuid(self) -> uuid.UUID:
137 return self._uuid 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
140def _reduce_allocation_handle(alloc_handle):
141 check_multiprocessing_start_method() 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
142 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
143 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
146def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint
147 return cls._init(df.detach(), uuid)
150multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle)
153# Buffer IPC Implementation
154# -------------------------
155cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
156 if not self.memory_resource.is_ipc_enabled: 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
157 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
158 cdef cydriver.CUmemPoolPtrExportData data
159 with nogil: 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
160 HANDLE_RETURN( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
161 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
162 )
163 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
164 <char*>(data.reserved), sizeof(data.reserved)
165 )
166 return IPCBufferDescriptor._init(data_b, self.size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
168cdef Buffer Buffer_from_ipc_descriptor(
169 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream
170):
171 """Import a buffer that was exported from another process."""
172 if not mr.is_ipc_enabled: 2LbMb
173 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
174 if stream is None:
175 # Note: match this behavior to _MemPool.allocate()
176 stream = default_stream()
177 cdef Stream s = <Stream>stream
178 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc(
179 mr._h_pool,
180 ipc_descriptor.payload_ptr(),
181 s._h_stream
182 )
183 if not h_ptr:
184 HANDLE_RETURN(get_last_error())
185 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor)
188# _MemPool IPC Implementation
189# ---------------------------
191cdef _MemPool MP_from_allocation_handle(cls, alloc_handle):
192 # Quick exit for registry hits.
193 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint
194 mr = registry.get(uuid)
195 if mr is not None:
196 if not isinstance(mr, cls):
197 raise TypeError(
198 f"Registry contains a {type(mr).__name__} for uuid "
199 f"{uuid}, but {cls.__name__} was requested")
200 return mr
202 # Ensure we have an allocation handle. Duplicate the file descriptor, if
203 # necessary.
204 if isinstance(alloc_handle, int):
205 fd = os.dup(alloc_handle)
206 try:
207 alloc_handle = IPCAllocationHandle._init(fd, None)
208 except:
209 os.close(fd)
210 raise
212 # Construct a new mempool.
213 cdef _MemPool self = <_MemPool>(cls.__new__(cls))
214 self._mempool_owned = True
215 cdef int ipc_fd = int(alloc_handle)
216 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE)
217 if not self._h_pool:
218 raise RuntimeError("Failed to import memory pool from IPC handle")
219 self._ipc_data = IPCDataForMR(alloc_handle, True)
221 # Register it.
222 if uuid is not None:
223 registered = self.register(uuid)
224 assert registered is self
226 return self
229cdef _MemPool MP_from_registry(uuid):
230 try:
231 return registry[uuid]
232 except KeyError:
233 raise RuntimeError(f"Memory resource {uuid} was not found") from None
236cdef _MemPool MP_register(_MemPool self, uuid):
237 existing = registry.get(uuid)
238 if existing is not None:
239 return existing
240 assert self.uuid is None or self.uuid == uuid
241 registry[uuid] = self
242 self._ipc_data._alloc_handle._uuid = uuid
243 return self
246cdef IPCAllocationHandle MP_export_mempool(_MemPool self):
247 # Note: This is Linux only (int for file descriptor)
248 cdef int fd
249 with nogil: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
250 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
251 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0)
252 )
253 try: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
254 return IPCAllocationHandle._init(fd, uuid.uuid4()) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
255 except:
256 os.close(fd)
257 raise