Coverage for cuda / core / _memory / _ipc.pyx: 54.76%
126 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 01:07 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5cimport cpython
7from cuda.bindings cimport cydriver
8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
9from cuda.core._memory._memory_pool cimport _MemPool
10from cuda.core._stream cimport Stream
11from cuda.core._resource_handles cimport (
12 DevicePtrHandle,
13 create_mempool_handle_ipc,
14 deviceptr_import_ipc,
15 get_last_error,
16 as_cu,
17)
19from cuda.core._stream cimport default_stream
20from cuda.core._utils.cuda_utils cimport HANDLE_RETURN
21from cuda.core._utils.cuda_utils import check_multiprocessing_start_method
23import multiprocessing
24import os
25import platform
26import uuid
27import weakref
29__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle']
32cdef object registry = weakref.WeakValueDictionary()
35cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \
36 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \
37 if platform.system() == "Linux" else \
38 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
40cdef is_supported():
41 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
44cdef class IPCDataForBuffer:
45 """Data members related to sharing memory buffers via IPC."""
46 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
47 self._ipc_descriptor = ipc_descriptor 1pabLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
48 self._is_mapped = is_mapped 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
50 @property
51 def ipc_descriptor(self):
52 return self._ipc_descriptor 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
54 @property
55 def is_mapped(self):
56 return self._is_mapped 1tuvwxyzABCDEFGklHIJKrs
59cdef class IPCDataForMR:
60 """Data members related to sharing memory resources via IPC."""
61 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
62 self._alloc_handle = alloc_handle 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
63 self._is_mapped = is_mapped 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
65 @property
66 def alloc_handle(self):
67 return self._alloc_handle
69 @property
70 def is_mapped(self):
71 return self._is_mapped
73 @property
74 def uuid(self):
75 return getattr(self._alloc_handle, 'uuid', None) 1abrscd
78cdef class IPCBufferDescriptor:
79 """Serializable object describing a buffer that can be shared between processes."""
81 def __init__(self, *arg, **kwargs):
82 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.")
84 @staticmethod
85 def _init(reserved: bytes, size: int):
86 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o
87 self._payload = reserved 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o
88 self._size = size 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o
89 return self 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o
91 def __reduce__(self):
92 return IPCBufferDescriptor._init, (self._payload, self._size) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdno
94 @property
95 def size(self):
96 return self._size 1m
98 cdef const void* payload_ptr(self) noexcept:
99 """Return the payload as a const void* for C API calls."""
100 return <const void*><const char*>(self._payload)
103cdef class IPCAllocationHandle:
104 """Shareable handle to an IPC-enabled device memory pool."""
106 def __init__(self, *arg, **kwargs):
107 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.")
109 @classmethod
110 def _init(cls, handle: int, uuid): # no-cython-lint
111 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
112 assert handle >= 0 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
113 self._handle = handle 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
114 self._uuid = uuid 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
115 return self 2p - . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
117 cpdef close(self):
118 """Close the handle."""
119 if self._handle >= 0: 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o
120 try: 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o
121 os.close(self._handle) 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o
122 finally:
123 self._handle = -1 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o
125 def __dealloc__(self):
126 self.close() 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o
128 def __int__(self) -> int:
129 if self._handle < 0:
130 raise ValueError(
131 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed."
132 )
133 return self._handle
135 @property
136 def handle(self) -> int:
137 return self._handle 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q
139 @property
140 def uuid(self) -> uuid.UUID:
141 return self._uuid 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q
144def _reduce_allocation_handle(alloc_handle):
145 check_multiprocessing_start_method() 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q
146 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q
147 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q
150def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint
151 return cls._init(df.detach(), uuid)
154multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle)
157# Buffer IPC Implementation
158# -------------------------
159cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
160 if not self.memory_resource.is_ipc_enabled: 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o
161 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
162 cdef cydriver.CUmemPoolPtrExportData data
163 with nogil: 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
164 HANDLE_RETURN( 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
165 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
166 )
167 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
168 <char*>(data.reserved), sizeof(data.reserved)
169 )
170 return IPCBufferDescriptor._init(data_b, self.size) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno
172cdef Buffer Buffer_from_ipc_descriptor(
173 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream
174):
175 """Import a buffer that was exported from another process."""
176 if not mr.is_ipc_enabled: 2LbMb
177 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
178 if stream is None:
179 # Note: match this behavior to _MemPool.allocate()
180 stream = default_stream()
181 cdef Stream s = <Stream>stream
182 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc(
183 mr._h_pool,
184 ipc_descriptor.payload_ptr(),
185 s._h_stream
186 )
187 if not h_ptr:
188 HANDLE_RETURN(get_last_error())
189 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor)
192# _MemPool IPC Implementation
193# ---------------------------
195cdef _MemPool MP_from_allocation_handle(cls, alloc_handle):
196 # Quick exit for registry hits.
197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint
198 mr = registry.get(uuid)
199 if mr is not None:
200 if not isinstance(mr, cls):
201 raise TypeError(
202 f"Registry contains a {type(mr).__name__} for uuid "
203 f"{uuid}, but {cls.__name__} was requested")
204 return mr
206 # Ensure we have an allocation handle. Duplicate the file descriptor, if
207 # necessary.
208 if isinstance(alloc_handle, int):
209 fd = os.dup(alloc_handle)
210 try:
211 alloc_handle = IPCAllocationHandle._init(fd, None)
212 except:
213 os.close(fd)
214 raise
216 # Construct a new mempool.
217 cdef _MemPool self = <_MemPool>(cls.__new__(cls))
218 self._mempool_owned = True
219 cdef int ipc_fd = int(alloc_handle)
220 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE)
221 if not self._h_pool:
222 raise RuntimeError("Failed to import memory pool from IPC handle")
223 self._ipc_data = IPCDataForMR(alloc_handle, True)
225 # Register it.
226 if uuid is not None:
227 registered = self.register(uuid)
228 assert registered is self
230 return self
233cdef _MemPool MP_from_registry(uuid):
234 try:
235 return registry[uuid]
236 except KeyError:
237 raise RuntimeError(f"Memory resource {uuid} was not found") from None
240cdef _MemPool MP_register(_MemPool self, uuid):
241 existing = registry.get(uuid)
242 if existing is not None:
243 return existing
244 assert self.uuid is None or self.uuid == uuid
245 registry[uuid] = self
246 self._ipc_data._alloc_handle._uuid = uuid
247 return self
250cdef IPCAllocationHandle MP_export_mempool(_MemPool self):
251 # Note: This is Linux only (int for file descriptor)
252 cdef int fd
253 with nogil: 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
254 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
255 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0)
256 )
257 try: 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
258 return IPCAllocationHandle._init(fd, uuid.uuid4()) 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o
259 except:
260 os.close(fd)
261 raise