Coverage for cuda / core / _memory / _ipc.pyx: 57.38%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 01:07 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5cimport cpython 

6  

7from cuda.bindings cimport cydriver 

8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle 

9from cuda.core._memory._memory_pool cimport _MemPool 

10from cuda.core._stream cimport Stream 

11from cuda.core._resource_handles cimport ( 

12 DevicePtrHandle, 

13 create_mempool_handle_ipc, 

14 deviceptr_import_ipc, 

15 get_last_error, 

16 as_cu, 

17) 

18  

19from cuda.core._stream cimport default_stream 

20from cuda.core._utils.cuda_utils cimport HANDLE_RETURN 

21from cuda.core._utils.cuda_utils import check_multiprocessing_start_method 

22  

23import multiprocessing 

24import os 

25import platform 

26import uuid 

27import weakref 

28  

29__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle'] 

30  

31  

32cdef object registry = weakref.WeakValueDictionary() 

33  

34  

35cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \ 

36 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ 

37 if platform.system() == "Linux" else \ 

38 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

39  

40cdef is_supported(): 

41 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

42  

43  

44cdef class IPCDataForBuffer: 

45 """Data members related to sharing memory buffers via IPC.""" 

46 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped): 

47 self._ipc_descriptor = ipc_descriptor 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

48 self._is_mapped = is_mapped 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

49  

50 @property 

51 def ipc_descriptor(self): 

52 return self._ipc_descriptor 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

53  

54 @property 

55 def is_mapped(self): 

56 return self._is_mapped 1rstuvwxyzABCDEklFGHIpq

57  

58  

59cdef class IPCDataForMR: 

60 """Data members related to sharing memory resources via IPC.""" 

61 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped): 

62 self._alloc_handle = alloc_handle 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

63 self._is_mapped = is_mapped 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

64  

65 @property 

66 def alloc_handle(self): 

67 return self._alloc_handle 

68  

69 @property 

70 def is_mapped(self): 

71 return self._is_mapped 

72  

73 @property 

74 def uuid(self): 

75 return getattr(self._alloc_handle, 'uuid', None) 1abpqcd

76  

77  

78cdef class IPCBufferDescriptor: 

79 """Serializable object describing a buffer that can be shared between processes.""" 

80  

81 def __init__(self, *arg, **kwargs): 

82 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") 

83  

84 @staticmethod 

85 def _init(reserved: bytes, size: int): 

86 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{

87 self._payload = reserved 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{

88 self._size = size 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{

89 return self 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{

90  

91 def __reduce__(self): 

92 return IPCBufferDescriptor._init, (self._payload, self._size) 1abJKefrstuvwxyzABCDEklFGHIpqghijcd

93  

94 @property 

95 def size(self): 

96 return self._size 1m

97  

98 cdef const void* payload_ptr(self) noexcept: 

99 """Return the payload as a const void* for C API calls.""" 

100 return <const void*><const char*>(self._payload) 

101  

102  

103cdef class IPCAllocationHandle: 

104 """Shareable handle to an IPC-enabled device memory pool.""" 

105  

106 def __init__(self, *arg, **kwargs): 

107 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") 

108  

109 @classmethod 

110 def _init(cls, handle: int, uuid): # no-cython-lint 

111 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

112 assert handle >= 0 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

113 self._handle = handle 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

114 self._uuid = uuid 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

115 return self 1n45a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

116  

117 cpdef close(self): 

118 """Close the handle.""" 

119 if self._handle >= 0: 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23

120 try: 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23

121 os.close(self._handle) 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23

122 finally: 

123 self._handle = -1 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23

124  

125 def __dealloc__(self): 

126 self.close() 1abefghijcd|LMNOPQRSTUVWXYZ0}m~1o23

127  

128 def __int__(self) -> int: 

129 if self._handle < 0: 

130 raise ValueError( 

131 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." 

132 ) 

133 return self._handle 

134  

135 @property 

136 def handle(self) -> int: 

137 return self._handle 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o

138  

139 @property 

140 def uuid(self) -> uuid.UUID: 

141 return self._uuid 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o

142  

143  

144def _reduce_allocation_handle(alloc_handle): 

145 check_multiprocessing_start_method() 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o

146 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o

147 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2abbba b J K e f r s t u v w x y z A B C D E k l F G H I p q g h i j c d o

148  

149  

150def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint 

151 return cls._init(df.detach(), uuid) 

152  

153  

154multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) 

155  

156  

157# Buffer IPC Implementation 

158# ------------------------- 

159cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): 

160 if not self.memory_resource.is_ipc_enabled: 1abJKefrstuvwxyzABCDEklFGHIpqghijcd`m{

161 raise RuntimeError("Memory resource is not IPC-enabled") 1`{

162 cdef cydriver.CUmemPoolPtrExportData data 

163 with nogil: 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

164 HANDLE_RETURN( 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

165 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

166 ) 

167 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

168 <char*>(data.reserved), sizeof(data.reserved) 

169 ) 

170 return IPCBufferDescriptor._init(data_b, self.size) 1abJKefrstuvwxyzABCDEklFGHIpqghijcdm

171  

172cdef Buffer Buffer_from_ipc_descriptor( 

173 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream 

174): 

175 """Import a buffer that was exported from another process.""" 

176 if not mr.is_ipc_enabled: 1`{

177 raise RuntimeError("Memory resource is not IPC-enabled") 1`{

178 if stream is None: 

179 # Note: match this behavior to _MemPool.allocate() 

180 stream = default_stream() 

181 cdef Stream s = <Stream>stream 

182 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc( 

183 mr._h_pool, 

184 ipc_descriptor.payload_ptr(), 

185 s._h_stream 

186 ) 

187 if not h_ptr: 

188 HANDLE_RETURN(get_last_error()) 

189 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor) 

190  

191  

192# _MemPool IPC Implementation 

193# --------------------------- 

194  

195cdef _MemPool MP_from_allocation_handle(cls, alloc_handle): 

196 # Quick exit for registry hits. 

197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint 

198 mr = registry.get(uuid) 

199 if mr is not None: 

200 return mr 

201  

202 # Ensure we have an allocation handle. Duplicate the file descriptor, if 

203 # necessary. 

204 if isinstance(alloc_handle, int): 

205 fd = os.dup(alloc_handle) 

206 try: 

207 alloc_handle = IPCAllocationHandle._init(fd, None) 

208 except: 

209 os.close(fd) 

210 raise 

211  

212 # Construct a new mempool. 

213 cdef _MemPool self = <_MemPool>(cls.__new__(cls)) 

214 self._mempool_owned = True 

215 cdef int ipc_fd = int(alloc_handle) 

216 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE) 

217 if not self._h_pool: 

218 raise RuntimeError("Failed to import memory pool from IPC handle") 

219 self._ipc_data = IPCDataForMR(alloc_handle, True) 

220  

221 # Register it. 

222 if uuid is not None: 

223 registered = self.register(uuid) 

224 assert registered is self 

225  

226 return self 

227  

228  

229cdef _MemPool MP_from_registry(uuid): 

230 try: 

231 return registry[uuid] 

232 except KeyError: 

233 raise RuntimeError(f"Memory resource {uuid} was not found") from None 

234  

235  

236cdef _MemPool MP_register(_MemPool self, uuid): 

237 existing = registry.get(uuid) 

238 if existing is not None: 

239 return existing 

240 assert self.uuid is None or self.uuid == uuid 

241 registry[uuid] = self 

242 self._ipc_data._alloc_handle._uuid = uuid 

243 return self 

244  

245  

246cdef IPCAllocationHandle MP_export_mempool(_MemPool self): 

247 # Note: This is Linux only (int for file descriptor) 

248 cdef int fd 

249 with nogil: 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

250 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

251 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0) 

252 ) 

253 try: 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

254 return IPCAllocationHandle._init(fd, uuid.uuid4()) 145a6b789e!f#$%'()*+,-./:;=kl?@[]^_ghijcdLMNOPQRSTUVWXYZ0m1o23

255 except: 

256 os.close(fd) 

257 raise