Coverage for cuda / core / experimental / _memory / _ipc.pyx: 56%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-10 01:19 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5cimport cpython 

6from libc.stdint cimport uintptr_t 

7from libc.string cimport memcpy 

8  

9from cuda.bindings cimport cydriver 

10from cuda.core.experimental._memory._buffer cimport Buffer 

11from cuda.core.experimental._stream cimport default_stream 

12from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN 

13from cuda.core.experimental._utils.cuda_utils import check_multiprocessing_start_method 

14  

15import multiprocessing 

16import os 

17import platform 

18import uuid 

19import weakref 

20  

21__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle'] 

22  

23  

24cdef object registry = weakref.WeakValueDictionary() 

25  

26  

27cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \ 

28 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \ 

29 if platform.system() == "Linux" else \ 

30 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

31  

32cdef is_supported(): 

33 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 

34  

35  

36cdef class IPCDataForBuffer: 

37 """Data members related to sharing memory buffers via IPC.""" 

38 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped): 

39 self._ipc_descriptor = ipc_descriptor 

40 self._is_mapped = is_mapped 

41  

42 @property 

43 def ipc_descriptor(self): 

44 return self._ipc_descriptor 

45  

46 @property 

47 def is_mapped(self): 

48 return self._is_mapped 

49  

50  

51cdef class IPCDataForMR: 

52 """Data members related to sharing memory resources via IPC.""" 

53 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped): 

54 self._alloc_handle = alloc_handle 

55 self._is_mapped = is_mapped 

56  

57 @property 

58 def alloc_handle(self): 

59 return self._alloc_handle 

60  

61 @property 

62 def is_mapped(self): 

63 return self._is_mapped 

64  

65 @property 

66 def uuid(self): 

67 return getattr(self._alloc_handle, 'uuid', None) 

68  

69  

70cdef class IPCBufferDescriptor: 

71 """Serializable object describing a buffer that can be shared between processes.""" 

72  

73 def __init__(self, *arg, **kwargs): 

74 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.") 

75  

76 @classmethod 

77 def _init(cls, reserved: bytes, size: int): 

78 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(cls) 

79 self._payload = reserved 

80 self._size = size 

81 return self 

82  

83 def __reduce__(self): 

84 return self._init, (self._payload, self._size) 

85  

86 @property 

87 def size(self): 

88 return self._size 

89  

90  

91cdef class IPCAllocationHandle: 

92 """Shareable handle to an IPC-enabled device memory pool.""" 

93  

94 def __init__(self, *arg, **kwargs): 

95 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.") 

96  

97 @classmethod 

98 def _init(cls, handle: int, uuid): # no-cython-lint 

99 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 

100 assert handle >= 0 

101 self._handle = handle 

102 self._uuid = uuid 

103 return self 

104  

105 cpdef close(self): 

106 """Close the handle.""" 

107 if self._handle >= 0: 

108 try: 

109 os.close(self._handle) 

110 finally: 

111 self._handle = -1 

112  

113 def __dealloc__(self): 

114 self.close() 

115  

116 def __int__(self) -> int: 

117 if self._handle < 0: 

118 raise ValueError( 

119 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed." 

120 ) 

121 return self._handle 

122  

123 @property 

124 def handle(self) -> int: 

125 return self._handle 

126  

127 @property 

128 def uuid(self) -> uuid.UUID: 

129 return self._uuid 

130  

131  

132def _reduce_allocation_handle(alloc_handle): 

133 check_multiprocessing_start_method() 

134 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 

135 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 

136  

137  

138def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint 

139 return cls._init(df.detach(), uuid) 

140  

141  

142multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) 

143  

144  

145def _deep_reduce_device_memory_resource(mr): 

146 check_multiprocessing_start_method() 

147 from .._device import Device 

148 device = Device(mr.device_id) 

149 alloc_handle = mr.get_allocation_handle() 

150 return mr.from_allocation_handle, (device, alloc_handle) 

151  

152  

153multiprocessing.reduction.register(DeviceMemoryResource, _deep_reduce_device_memory_resource) 

154  

155  

156# Buffer IPC Implementation 

157# ------------------------- 

158cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): 

159 if not self.memory_resource.is_ipc_enabled: 

160 raise RuntimeError("Memory resource is not IPC-enabled") 

161 cdef cydriver.CUmemPoolPtrExportData data 

162 with nogil: 

163 HANDLE_RETURN( 

164 cydriver.cuMemPoolExportPointer(&data, <cydriver.CUdeviceptr>(self._ptr)) 

165 ) 

166 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 

167 <char*>(data.reserved), sizeof(data.reserved) 

168 ) 

169 return IPCBufferDescriptor._init(data_b, self.size) 

170  

171cdef Buffer Buffer_from_ipc_descriptor( 

172 cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_descriptor, stream 

173): 

174 """Import a buffer that was exported from another process.""" 

175 if not mr.is_ipc_enabled: 

176 raise RuntimeError("Memory resource is not IPC-enabled") 

177 if stream is None: 

178 # Note: match this behavior to DeviceMemoryResource.allocate() 

179 stream = default_stream() 

180 cdef cydriver.CUmemPoolPtrExportData data 

181 memcpy( 

182 data.reserved, 

183 <const void*><const char*>(ipc_descriptor._payload), 

184 sizeof(data.reserved) 

185 ) 

186 cdef cydriver.CUdeviceptr ptr 

187 with nogil: 

188 HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._handle, &data)) 

189 return Buffer._init(<uintptr_t>ptr, ipc_descriptor.size, mr, stream, ipc_descriptor) 

190  

191  

192# DeviceMemoryResource IPC Implementation 

193# --------------------------------------- 

194  

195cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handle): 

196 # Quick exit for registry hits. 

197 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint 

198 mr = registry.get(uuid) 

199 if mr is not None: 

200 return mr 

201  

202 # Ensure we have an allocation handle. Duplicate the file descriptor, if 

203 # necessary. 

204 if isinstance(alloc_handle, int): 

205 fd = os.dup(alloc_handle) 

206 try: 

207 alloc_handle = IPCAllocationHandle._init(fd, None) 

208 except: 

209 os.close(fd) 

210 raise 

211  

212 # Construct a new DMR. 

213 cdef DeviceMemoryResource self = DeviceMemoryResource.__new__(cls) 

214 from .._device import Device 

215 self._dev_id = Device(device_id).device_id 

216 self._mempool_owned = True 

217 self._ipc_data = IPCDataForMR(alloc_handle, True) 

218  

219 # Map the mempool into this process. 

220 cdef int handle = int(alloc_handle) 

221 with nogil: 

222 HANDLE_RETURN(cydriver.cuMemPoolImportFromShareableHandle( 

223 &(self._handle), <void*><uintptr_t>(handle), IPC_HANDLE_TYPE, 0) 

224 ) 

225  

226 # Register it. 

227 if uuid is not None: 

228 registered = self.register(uuid) 

229 assert registered is self 

230  

231 return self 

232  

233  

234cdef DeviceMemoryResource DMR_from_registry(uuid): 

235 try: 

236 return registry[uuid] 

237 except KeyError: 

238 raise RuntimeError(f"Memory resource {uuid} was not found") from None 

239  

240  

241cdef DeviceMemoryResource DMR_register(DeviceMemoryResource self, uuid): 

242 existing = registry.get(uuid) 

243 if existing is not None: 

244 return existing 

245 assert self.uuid is None or self.uuid == uuid 

246 registry[uuid] = self 

247 self._ipc_data._alloc_handle._uuid = uuid 

248 return self 

249  

250  

251cdef IPCAllocationHandle DMR_export_mempool(DeviceMemoryResource self): 

252 # Note: This is Linux only (int for file descriptor) 

253 cdef int fd 

254 with nogil: 

255 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 

256 &fd, self._handle, IPC_HANDLE_TYPE, 0) 

257 ) 

258 try: 

259 return IPCAllocationHandle._init(fd, uuid.uuid4()) 

260 except: 

261 os.close(fd) 

262 raise