Coverage for cuda / core / _memory / _ipc.pyx: 54.76%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 01:07 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5cimport cpython 

6  

7from cuda.bindings cimport cydriver 

8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle 

9from cuda.core._memory._memory_pool cimport _MemPool 

10from cuda.core._stream cimport Stream 

11from cuda.core._resource_handles cimport ( 

12 DevicePtrHandle, 

13 create_mempool_handle_ipc, 

14 deviceptr_import_ipc, 

15 get_last_error, 

16 as_cu, 

17) 

18  

19from cuda.core._stream cimport default_stream 

20from cuda.core._utils.cuda_utils cimport HANDLE_RETURN 

21from cuda.core._utils.cuda_utils import check_multiprocessing_start_method 

22  

23import multiprocessing 

24import os 

25import platform 

26import uuid 

27import weakref 

28  

29__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle'] 

30  

31  

32cdef object registry = weakref.WeakValueDictionary() 

33  

34  

35cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \ 

36 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ 

37 if platform.system() == "Linux" else \ 

38 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

39  

40cdef is_supported(): 

41 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

42  

43  

44cdef class IPCDataForBuffer: 

45 """Data members related to sharing memory buffers via IPC.""" 

46 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped): 

47 self._ipc_descriptor = ipc_descriptor 1pabLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

48 self._is_mapped = is_mapped 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

49  

50 @property 

51 def ipc_descriptor(self): 

52 return self._ipc_descriptor 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

53  

54 @property 

55 def is_mapped(self): 

56 return self._is_mapped 1tuvwxyzABCDEFGklHIJKrs

57  

58  

59cdef class IPCDataForMR: 

60 """Data members related to sharing memory resources via IPC.""" 

61 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped): 

62 self._alloc_handle = alloc_handle 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

63 self._is_mapped = is_mapped 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

64  

65 @property 

66 def alloc_handle(self): 

67 return self._alloc_handle 

68  

69 @property 

70 def is_mapped(self): 

71 return self._is_mapped 

72  

73 @property 

74 def uuid(self): 

75 return getattr(self._alloc_handle, 'uuid', None) 1abrscd

76  

77  

78cdef class IPCBufferDescriptor: 

79 """Serializable object describing a buffer that can be shared between processes.""" 

80  

81 def __init__(self, *arg, **kwargs): 

82 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") 

83  

84 @staticmethod 

85 def _init(reserved: bytes, size: int): 

86 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o

87 self._payload = reserved 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o

88 self._size = size 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o

89 return self 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o

90  

91 def __reduce__(self): 

92 return IPCBufferDescriptor._init, (self._payload, self._size) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdno

93  

94 @property 

95 def size(self): 

96 return self._size 1m

97  

98 cdef const void* payload_ptr(self) noexcept: 

99 """Return the payload as a const void* for C API calls.""" 

100 return <const void*><const char*>(self._payload) 

101  

102  

103cdef class IPCAllocationHandle: 

104 """Shareable handle to an IPC-enabled device memory pool.""" 

105  

106 def __init__(self, *arg, **kwargs): 

107 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") 

108  

109 @classmethod 

110 def _init(cls, handle: int, uuid): # no-cython-lint 

111 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

112 assert handle >= 0 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

113 self._handle = handle 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

114 self._uuid = uuid 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

115 return self 2p - . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

116  

117 cpdef close(self): 

118 """Close the handle.""" 

119 if self._handle >= 0: 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o

120 try: 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o

121 os.close(self._handle) 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o

122 finally: 

123 self._handle = -1 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o

124  

125 def __dealloc__(self): 

126 self.close() 2Nba b e f ObPbQbRbSbTbUbVbWbXbYbZb0b1b2b3b4b5b6b7b8b9b!b#b$b%bg h i j c d 'bT U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! (b# q $ % n o

127  

128 def __int__(self) -> int: 

129 if self._handle < 0: 

130 raise ValueError( 

131 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." 

132 ) 

133 return self._handle 

134  

135 @property 

136 def handle(self) -> int: 

137 return self._handle 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q

138  

139 @property 

140 def uuid(self) -> uuid.UUID: 

141 return self._uuid 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q

142  

143  

144def _reduce_allocation_handle(alloc_handle): 

145 check_multiprocessing_start_method() 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q

146 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q

147 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2)b*ba b L M e f t u v w +bN ,b-bO .b/bP :b;bQ =b?bR @b[bS ]bx y z A B C D E F G k l H I J K r s g h i j c d q

148  

149  

150def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint 

151 return cls._init(df.detach(), uuid) 

152  

153  

154multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) 

155  

156  

157# Buffer IPC Implementation 

158# ------------------------- 

159cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): 

160 if not self.memory_resource.is_ipc_enabled: 2a b L M e f t u v w N ' O ( P ) Q * R + S , x y z A B C D E F G k l H I J K r s g h i j c d Lbm Mbn o

161 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb

162 cdef cydriver.CUmemPoolPtrExportData data 

163 with nogil: 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

164 HANDLE_RETURN( 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

165 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

166 ) 

167 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

168 <char*>(data.reserved), sizeof(data.reserved) 

169 ) 

170 return IPCBufferDescriptor._init(data_b, self.size) 1abLMeftuvwN'O(P)Q*R+S,xyzABCDEFGklHIJKrsghijcdmno

171  

172cdef Buffer Buffer_from_ipc_descriptor( 

173 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream 

174): 

175 """Import a buffer that was exported from another process.""" 

176 if not mr.is_ipc_enabled: 2LbMb

177 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb

178 if stream is None: 

179 # Note: match this behavior to _MemPool.allocate() 

180 stream = default_stream() 

181 cdef Stream s = <Stream>stream 

182 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc( 

183 mr._h_pool, 

184 ipc_descriptor.payload_ptr(), 

185 s._h_stream 

186 ) 

187 if not h_ptr: 

188 HANDLE_RETURN(get_last_error()) 

189 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor) 

190  

191  

192# _MemPool IPC Implementation 

193# --------------------------- 

194  

195cdef _MemPool MP_from_allocation_handle(cls, alloc_handle): 

196 # Quick exit for registry hits. 

197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint 

198 mr = registry.get(uuid) 

199 if mr is not None: 

200 if not isinstance(mr, cls): 

201 raise TypeError( 

202 f"Registry contains a {type(mr).__name__} for uuid " 

203 f"{uuid}, but {cls.__name__} was requested") 

204 return mr 

205  

206 # Ensure we have an allocation handle. Duplicate the file descriptor, if 

207 # necessary. 

208 if isinstance(alloc_handle, int): 

209 fd = os.dup(alloc_handle) 

210 try: 

211 alloc_handle = IPCAllocationHandle._init(fd, None) 

212 except: 

213 os.close(fd) 

214 raise 

215  

216 # Construct a new mempool. 

217 cdef _MemPool self = <_MemPool>(cls.__new__(cls)) 

218 self._mempool_owned = True 

219 cdef int ipc_fd = int(alloc_handle) 

220 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE) 

221 if not self._h_pool: 

222 raise RuntimeError("Failed to import memory pool from IPC handle") 

223 self._ipc_data = IPCDataForMR(alloc_handle, True) 

224  

225 # Register it. 

226 if uuid is not None: 

227 registered = self.register(uuid) 

228 assert registered is self 

229  

230 return self 

231  

232  

233cdef _MemPool MP_from_registry(uuid): 

234 try: 

235 return registry[uuid] 

236 except KeyError: 

237 raise RuntimeError(f"Memory resource {uuid} was not found") from None 

238  

239  

240cdef _MemPool MP_register(_MemPool self, uuid): 

241 existing = registry.get(uuid) 

242 if existing is not None: 

243 return existing 

244 assert self.uuid is None or self.uuid == uuid 

245 registry[uuid] = self 

246 self._ipc_data._alloc_handle._uuid = uuid 

247 return self 

248  

249  

250cdef IPCAllocationHandle MP_export_mempool(_MemPool self): 

251 # Note: This is Linux only (int for file descriptor) 

252 cdef int fd 

253 with nogil: 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

254 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

255 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0) 

256 ) 

257 try: 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

258 return IPCAllocationHandle._init(fd, uuid.uuid4()) 2- . a / b : ; = e ? f @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbkblbmbnbobpbqbrbsbtbubvbwbxbybzbAbBbCbDbEbk l FbGbHbIbJbKbg h i j c d T U V W X Y Z 0 1 2 3 4 5 6 7 8 m 9 ! # q $ % n o

259 except: 

260 os.close(fd) 

261 raise