Coverage for cuda / core / _memory / _ipc.pyx: 51.64%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 01:27 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5cimport cpython 

6  

7from cuda.bindings cimport cydriver 

8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle 

9from cuda.core._memory._memory_pool cimport _MemPool 

10from cuda.core._stream cimport Stream 

11from cuda.core._resource_handles cimport ( 

12 DevicePtrHandle, 

13 create_fd_handle, 

14 create_mempool_handle_ipc, 

15 deviceptr_import_ipc, 

16 get_last_error, 

17 as_cu, 

18 as_intptr, 

19 as_py, 

20) 

21  

22from cuda.core._stream cimport default_stream 

23from cuda.core._utils.cuda_utils cimport HANDLE_RETURN 

24from cuda.core._utils.cuda_utils import check_multiprocessing_start_method 

25  

26import multiprocessing 

27import os 

28import platform 

29import uuid 

30import weakref 

31  

32__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle'] 

33  

34  

35cdef object registry = weakref.WeakValueDictionary() 

36  

37  

38cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \ 

39 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ 

40 if platform.system() == "Linux" else \ 

41 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

42  

43cdef is_supported(): 

44 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

45  

46  

47cdef class IPCDataForBuffer: 

48 """Data members related to sharing memory buffers via IPC.""" 

49 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped): 

50 self._ipc_descriptor = ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

51 self._is_mapped = is_mapped 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

52  

53 @property 

54 def ipc_descriptor(self): 

55 return self._ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

56  

57 @property 

58 def is_mapped(self): 

59 return self._is_mapped 1stuvwxyzABCDEFcdGHIJqr

60  

61  

62cdef class IPCDataForMR: 

63 """Data members related to sharing memory resources via IPC.""" 

64 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped): 

65 self._alloc_handle = alloc_handle 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

66 self._is_mapped = is_mapped 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

67  

68 @property 

69 def alloc_handle(self): 

70 return self._alloc_handle 

71  

72 @property 

73 def is_mapped(self): 

74 return self._is_mapped 

75  

76 @property 

77 def uuid(self): 

78 return getattr(self._alloc_handle, 'uuid', None) 1abqref

79  

80  

81cdef class IPCBufferDescriptor: 

82 """Serializable object describing a buffer that can be shared between processes.""" 

83  

84 def __init__(self, *arg, **kwargs): 

85 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") 

86  

87 @staticmethod 

88 def _init(reserved: bytes, size: int): 

89 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p

90 self._payload = reserved 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p

91 self._size = size 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p

92 return self 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p

93  

94 def __reduce__(self): 

95 return IPCBufferDescriptor._init, (self._payload, self._size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefop

96  

97 @property 

98 def size(self): 

99 return self._size 1n

100  

101 cdef const void* payload_ptr(self) noexcept: 

102 """Return the payload as a const void* for C API calls.""" 

103 return <const void*><const char*>(self._payload) 

104  

105  

106cdef class IPCAllocationHandle: 

107 """Shareable handle to an IPC-enabled device memory pool.""" 

108  

109 def __init__(self, *arg, **kwargs): 

110 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") 

111  

112 @classmethod 

113 def _init(cls, handle: int, uuid): # no-cython-lint 

114 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

115 assert handle >= 0 2m Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

116 self._h_fd = create_fd_handle(handle) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

117 self._uuid = uuid 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

118 return self 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

119  

120 cpdef close(self): 

121 """Close the handle.""" 

122 self._h_fd.reset() 

123  

124 def __int__(self) -> int: 

125 if not self._h_fd or as_intptr(self._h_fd) < 0: 

126 raise ValueError( 

127 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." 

128 ) 

129 return as_py(self._h_fd) 

130  

131 @property 

132 def handle(self) -> int: 

133 return as_py(self._h_fd) 2m NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S

134  

135 @property 

136 def uuid(self) -> uuid.UUID: 

137 return self._uuid 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S

138  

139  

140def _reduce_allocation_handle(alloc_handle): 

141 check_multiprocessing_start_method() 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S

142 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S

143 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S

144  

145  

146def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint 

147 return cls._init(df.detach(), uuid) 

148  

149  

150multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) 

151  

152  

153# Buffer IPC Implementation 

154# ------------------------- 

155cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): 

156 if not self.memory_resource.is_ipc_enabled: 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p

157 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb

158 cdef cydriver.CUmemPoolPtrExportData data 

159 with nogil: 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

160 HANDLE_RETURN( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

161 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

162 ) 

163 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

164 <char*>(data.reserved), sizeof(data.reserved) 

165 ) 

166 return IPCBufferDescriptor._init(data_b, self.size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop

167  

168cdef Buffer Buffer_from_ipc_descriptor( 

169 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream 

170): 

171 """Import a buffer that was exported from another process.""" 

172 if not mr.is_ipc_enabled: 2LbMb

173 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb

174 if stream is None: 

175 # Note: match this behavior to _MemPool.allocate() 

176 stream = default_stream() 

177 cdef Stream s = <Stream>stream 

178 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc( 

179 mr._h_pool, 

180 ipc_descriptor.payload_ptr(), 

181 s._h_stream 

182 ) 

183 if not h_ptr: 

184 HANDLE_RETURN(get_last_error()) 

185 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor) 

186  

187  

188# _MemPool IPC Implementation 

189# --------------------------- 

190  

191cdef _MemPool MP_from_allocation_handle(cls, alloc_handle): 

192 # Quick exit for registry hits. 

193 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint 

194 mr = registry.get(uuid) 

195 if mr is not None: 

196 if not isinstance(mr, cls): 

197 raise TypeError( 

198 f"Registry contains a {type(mr).__name__} for uuid " 

199 f"{uuid}, but {cls.__name__} was requested") 

200 return mr 

201  

202 # Ensure we have an allocation handle. Duplicate the file descriptor, if 

203 # necessary. 

204 if isinstance(alloc_handle, int): 

205 fd = os.dup(alloc_handle) 

206 try: 

207 alloc_handle = IPCAllocationHandle._init(fd, None) 

208 except: 

209 os.close(fd) 

210 raise 

211  

212 # Construct a new mempool. 

213 cdef _MemPool self = <_MemPool>(cls.__new__(cls)) 

214 self._mempool_owned = True 

215 cdef int ipc_fd = int(alloc_handle) 

216 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE) 

217 if not self._h_pool: 

218 raise RuntimeError("Failed to import memory pool from IPC handle") 

219 self._ipc_data = IPCDataForMR(alloc_handle, True) 

220  

221 # Register it. 

222 if uuid is not None: 

223 registered = self.register(uuid) 

224 assert registered is self 

225  

226 return self 

227  

228  

229cdef _MemPool MP_from_registry(uuid): 

230 try: 

231 return registry[uuid] 

232 except KeyError: 

233 raise RuntimeError(f"Memory resource {uuid} was not found") from None 

234  

235  

236cdef _MemPool MP_register(_MemPool self, uuid): 

237 existing = registry.get(uuid) 

238 if existing is not None: 

239 return existing 

240 assert self.uuid is None or self.uuid == uuid 

241 registry[uuid] = self 

242 self._ipc_data._alloc_handle._uuid = uuid 

243 return self 

244  

245  

246cdef IPCAllocationHandle MP_export_mempool(_MemPool self): 

247 # Note: This is Linux only (int for file descriptor) 

248 cdef int fd 

249 with nogil: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

250 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

251 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0) 

252 ) 

253 try: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

254 return IPCAllocationHandle._init(fd, uuid.uuid4()) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p

255 except: 

256 os.close(fd) 

257 raise