Coverage for cuda/core/_memory/_ipc.pyx: 51.45%

138 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-03 01:38 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5cimport cpython 

6  

7from libc.stddef cimport size_t 

8from cuda.bindings cimport cydriver 

9from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle 

10from cuda.core._memory._memory_pool cimport _MemPool 

11from cuda.core._stream cimport Stream, Stream_accept 

12from cuda.core._resource_handles cimport ( 

13 DevicePtrHandle, 

14 create_fd_handle, 

15 create_mempool_handle_ipc, 

16 deviceptr_import_ipc, 

17 get_last_error, 

18 as_cu, 

19 as_intptr, 

20 as_py, 

21) 

22  

23from cuda.core._utils.cuda_utils cimport HANDLE_RETURN 

24from cuda.core._utils.cuda_utils import check_multiprocessing_start_method 

25  

26import multiprocessing 

27import os 

28import platform 

29import uuid 

30import weakref 

31  

32__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle'] 

33  

34  

35cdef object registry = weakref.WeakValueDictionary() 

36  

37  

38cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \ 

39 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ 

40 if platform.system() == "Linux" else \ 

41 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

42  

43cdef is_supported(): 

44 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

45  

46  

47cdef class IPCDataForBuffer: 

48 """Data members related to sharing memory buffers via IPC.""" 

49 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped) -> None: 

50 self._ipc_descriptor = ipc_descriptor 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

51 self._is_mapped = is_mapped 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

52  

53 @property 

54 def ipc_descriptor(self) -> IPCBufferDescriptor: 

55 return self._ipc_descriptor 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

56  

57 @property 

58 def is_mapped(self) -> bool: 

59 return self._is_mapped 1stuvwxyzABCDEFGHIJqr

60  

61  

62cdef class IPCDataForMR: 

63 """Data members related to sharing memory resources via IPC.""" 

64 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped) -> None: 

65 self._alloc_handle = alloc_handle 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

66 self._is_mapped = is_mapped 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

67  

68 @property 

69 def alloc_handle(self) -> IPCAllocationHandle: 

70 return self._alloc_handle 

71  

72 @property 

73 def is_mapped(self) -> bool: 

74 return self._is_mapped 

75  

76 @property 

77 def uuid(self) -> uuid.UUID | None: 

78 return getattr(self._alloc_handle, 'uuid', None) 1abqrcd

79  

80  

81cdef class IPCBufferDescriptor: 

82 """Serializable object describing a buffer that can be shared between processes. 

83  

84 Note 

85 ---- 

86 The payload and ``size`` fields are controlled by the exporting peer. 

87 Receivers must treat them as untrusted and import only through 

88 :meth:`Buffer.from_ipc_descriptor`. 

89 """ 

90  

91 def __init__(self, *arg, **kwargs) -> None: 

92 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") 

93  

94 @staticmethod 

95 def _init(reserved: bytes, size: int) -> IPCBufferDescriptor: 

96 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b K L M N e f RbSbs t u v O V P W Q X R Y S Z T 0 w x y z A B C D E F g h G H I J q r i j k l c d Tbn Ubo p

97 self._payload = reserved 2a b K L M N e f RbSbs t u v O V P W Q X R Y S Z T 0 w x y z A B C D E F g h G H I J q r i j k l c d Tbn Ubo p

98 self._size = size 2a b K L M N e f RbSbs t u v O V P W Q X R Y S Z T 0 w x y z A B C D E F g h G H I J q r i j k l c d Tbn Ubo p

99 return self 2a b K L M N e f RbSbs t u v O V P W Q X R Y S Z T 0 w x y z A B C D E F g h G H I J q r i j k l c d Tbn Ubo p

100  

101 def __reduce__(self) -> tuple[object, ...]: 

102 return IPCBufferDescriptor._init, (self._payload, self._size) 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdop

103  

104 @property 

105 def size(self) -> int: 

106 return self._size 1n

107  

108 cdef const void* payload_ptr(self) noexcept: 

109 """Return the payload as a const void* for C API calls.""" 

110 return <const void*><const char*>(self._payload) 

111  

112  

113cdef class IPCAllocationHandle: 

114 """Shareable handle to an IPC-enabled device memory pool.""" 

115  

116 def __init__(self, *arg, **kwargs) -> None: 

117 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") 

118  

119 @classmethod 

120 def _init(cls, handle: int, uuid: uuid.UUID | None) -> IPCAllocationHandle: # no-cython-lint 

121 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ 9b% ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

122 if handle < 0: 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ 9b% ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

123 raise ValueError(f"Invalid allocation handle (fd) {handle}: must be non-negative") 29b

124 self._h_fd = create_fd_handle(handle) 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

125 self._uuid = uuid 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

126 return self 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

127  

128 cpdef close(self): 

129 """Close the handle.""" 

130 self._h_fd.reset() 

131  

132 def __int__(self) -> int: 

133 if not self._h_fd or as_intptr(self._h_fd) < 0: 

134 raise ValueError( 

135 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." 

136 ) 

137 return as_py(self._h_fd) 

138  

139 @property 

140 def handle(self) -> int: 

141 return as_py(self._h_fd) 2VbWba b K L M N e f s t u v XbO YbZbP 0b1bQ 2b3bR 4b5bS 6b7bT 8bw x y z A B C D E F g h G H I J q r i j k l c d U

142  

143 @property 

144 def uuid(self) -> uuid.UUID: 

145 return self._uuid 2VbWba b K L M N e f s t u v XbO YbZbP 0b1bQ 2b3bR 4b5bS 6b7bT 8bw x y z A B C D E F g h G H I J q r i j k l c d U

146  

147  

148def _reduce_allocation_handle(alloc_handle: IPCAllocationHandle) -> tuple[object, ...]: 

149 check_multiprocessing_start_method() 2VbWba b K L M N e f s t u v XbO YbZbP 0b1bQ 2b3bR 4b5bS 6b7bT 8bw x y z A B C D E F g h G H I J q r i j k l c d U

150 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2VbWba b K L M N e f s t u v XbO YbZbP 0b1bQ 2b3bR 4b5bS 6b7bT 8bw x y z A B C D E F g h G H I J q r i j k l c d U

151 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2VbWba b K L M N e f s t u v XbO YbZbP 0b1bQ 2b3bR 4b5bS 6b7bT 8bw x y z A B C D E F g h G H I J q r i j k l c d U

152  

153  

154def _reconstruct_allocation_handle(cls: type, df: object, uuid: uuid.UUID | None) -> IPCAllocationHandle: # no-cython-lint 

155 return cls._init(df.detach(), uuid) 

156  

157  

158multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) 

159  

160  

161# Buffer IPC Implementation 

162# ------------------------- 

163cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): 

164 if not self.memory_resource.is_ipc_enabled: 2a b K L M N e f s t u v O V P W Q X R Y S Z T 0 w x y z A B C D E F g h G H I J q r i j k l c d Tbn Ubo p

165 raise RuntimeError("Memory resource is not IPC-enabled") 2TbUb

166 cdef cydriver.CUmemPoolPtrExportData data 

167 with nogil: 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

168 HANDLE_RETURN( 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

169 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

170 ) 

171 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

172 <char*>(data.reserved), sizeof(data.reserved) 

173 ) 

174 return IPCBufferDescriptor._init(data_b, self.size) 1abKLMNefstuvOVPWQXRYSZT0wxyzABCDEFghGHIJqrijklcdnop

175  

176cdef Buffer Buffer_from_ipc_descriptor( 

177 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream 

178): 

179 """Import a buffer that was exported from another process.""" 

180 if not mr.is_ipc_enabled: 2RbSbTbUb

181 raise RuntimeError("Memory resource is not IPC-enabled") 2TbUb

182 cdef size_t payload_size = len(ipc_descriptor._payload) 2RbSb

183 cdef size_t expected_size = sizeof(cydriver.CUmemPoolPtrExportData) 2RbSb

184 if payload_size < expected_size: 2RbSb

185 raise ValueError( 2RbSb

186 f"IPC buffer descriptor payload is {payload_size} bytes; " 2RbSb

187 f"expected at least {expected_size}" 2RbSb

188 ) 

189 cdef Stream s = Stream_accept(stream) 

190 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc( 

191 mr._h_pool, 

192 ipc_descriptor.payload_ptr(), 

193 s._h_stream 

194 ) 

195 if not h_ptr: 

196 HANDLE_RETURN(get_last_error()) 

197 cdef size_t mapped_size = 0 

198 cdef size_t claimed_size = ipc_descriptor.size 

199 with nogil: 

200 HANDLE_RETURN(cydriver.cuPointerGetAttribute( 

201 &mapped_size, 

202 cydriver.CU_POINTER_ATTRIBUTE_RANGE_SIZE, 

203 as_cu(h_ptr))) 

204 if claimed_size > mapped_size: 

205 h_ptr.reset() 

206 raise ValueError( 

207 f"IPC buffer descriptor size ({claimed_size}) exceeds " 

208 f"mapped allocation extent ({mapped_size} bytes)" 

209 ) 

210 return Buffer_from_deviceptr_handle(h_ptr, claimed_size, mr, ipc_descriptor) 

211  

212  

213# _MemPool IPC Implementation 

214# --------------------------- 

215  

216cdef _MemPool MP_from_allocation_handle(cls, alloc_handle): 

217 # Quick exit for registry hits. 

218 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint 

219 mr = registry.get(uuid) 

220 if mr is not None: 

221 if not isinstance(mr, cls): 

222 raise TypeError( 

223 f"Registry contains a {type(mr).__name__} for uuid " 

224 f"{uuid}, but {cls.__name__} was requested") 

225 return mr 

226  

227 # Ensure we have an allocation handle. Duplicate the file descriptor, if 

228 # necessary. 

229 if isinstance(alloc_handle, int): 

230 fd = os.dup(alloc_handle) 

231 try: 

232 alloc_handle = IPCAllocationHandle._init(fd, None) 

233 except: 

234 os.close(fd) 

235 raise 

236  

237 # Construct a new mempool. 

238 cdef _MemPool self = <_MemPool>(cls.__new__(cls)) 

239 self._mempool_owned = True 

240 cdef int ipc_fd = int(alloc_handle) 

241 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE) 

242 if not self._h_pool: 

243 HANDLE_RETURN(get_last_error()) 

244 raise RuntimeError( 

245 f"Failed to import {cls.__name__} from an allocation handle: " 

246 "cuda-core returned an empty memory pool handle without recording a CUDA error. " 

247 "This is an internal cuda-core error; please report it with your CUDA driver, " 

248 "CUDA Toolkit, and cuda-python versions." 

249 ) 

250 self._ipc_data = IPCDataForMR(alloc_handle, True) 

251  

252 # Register it. 

253 if uuid is not None: 

254 registered = self.register(uuid) 

255 assert registered is self 

256  

257 return self 

258  

259  

260cdef _MemPool MP_from_registry(uuid): 

261 try: 

262 return registry[uuid] 

263 except KeyError: 

264 raise RuntimeError(f"Memory resource {uuid} was not found") from None 

265  

266  

267cdef _MemPool MP_register(_MemPool self, uuid): 

268 existing = registry.get(uuid) 

269 if existing is not None: 

270 return existing 

271 assert self.uuid is None or self.uuid == uuid 

272 registry[uuid] = self 

273 self._ipc_data._alloc_handle._uuid = uuid 

274 return self 

275  

276  

277cdef IPCAllocationHandle MP_export_mempool(_MemPool self): 

278 # Note: This is Linux only (int for file descriptor) 

279 cdef int fd 

280 with nogil: 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

281 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

282 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0) 

283 ) 

284 try: 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

285 return IPCAllocationHandle._init(fd, uuid.uuid4()) 21 2 a 3 b 4 5 6 7 8 e 9 f ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbg h qbrbsbtbubvbi j k l c d wbxbybzbAbBbCbDbEbFbGbHbIbJbKbLbn MbNbObU PbQbo p

286 except: 

287 os.close(fd) 

288 raise