Coverage for cuda / core / _memory / _ipc.pyx: 51.64%
122 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 01:37 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-22 01:37 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5cimport cpython
7from cuda.bindings cimport cydriver
8from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
9from cuda.core._memory._memory_pool cimport _MemPool
10from cuda.core._stream cimport Stream, Stream_accept
11from cuda.core._resource_handles cimport (
12 DevicePtrHandle,
13 create_fd_handle,
14 create_mempool_handle_ipc,
15 deviceptr_import_ipc,
16 get_last_error,
17 as_cu,
18 as_intptr,
19 as_py,
20)
22from cuda.core._utils.cuda_utils cimport HANDLE_RETURN
23from cuda.core._utils.cuda_utils import check_multiprocessing_start_method
25import multiprocessing
26import os
27import platform
28import uuid
29import weakref
31__all__ = ['IPCBufferDescriptor', 'IPCAllocationHandle']
34cdef object registry = weakref.WeakValueDictionary()
37cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = \
38 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR \
39 if platform.system() == "Linux" else \
40 cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE
42cdef is_supported():
43 return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
46cdef class IPCDataForBuffer:
47 """Data members related to sharing memory buffers via IPC."""
48 def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
49 self._ipc_descriptor = ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
50 self._is_mapped = is_mapped 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
52 @property
53 def ipc_descriptor(self):
54 return self._ipc_descriptor 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
56 @property
57 def is_mapped(self):
58 return self._is_mapped 1stuvwxyzABCDEFcdGHIJqr
61cdef class IPCDataForMR:
62 """Data members related to sharing memory resources via IPC."""
63 def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
64 self._alloc_handle = alloc_handle 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
65 self._is_mapped = is_mapped 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
67 @property
68 def alloc_handle(self):
69 return self._alloc_handle
71 @property
72 def is_mapped(self):
73 return self._is_mapped
75 @property
76 def uuid(self):
77 return getattr(self._alloc_handle, 'uuid', None) 1abqref
80cdef class IPCBufferDescriptor:
81 """Serializable object describing a buffer that can be shared between processes."""
83 def __init__(self, *arg, **kwargs):
84 raise RuntimeError("IPCBufferDescriptor objects cannot be instantiated directly. Please use MemoryResource APIs.")
86 @staticmethod
87 def _init(reserved: bytes, size: int):
88 cdef IPCBufferDescriptor self = IPCBufferDescriptor.__new__(IPCBufferDescriptor) 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
89 self._payload = reserved 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
90 self._size = size 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
91 return self 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
93 def __reduce__(self):
94 return IPCBufferDescriptor._init, (self._payload, self._size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefop
96 @property
97 def size(self):
98 return self._size 1n
100 cdef const void* payload_ptr(self) noexcept:
101 """Return the payload as a const void* for C API calls."""
102 return <const void*><const char*>(self._payload)
105cdef class IPCAllocationHandle:
106 """Shareable handle to an IPC-enabled device memory pool."""
108 def __init__(self, *arg, **kwargs):
109 raise RuntimeError("IPCAllocationHandle objects cannot be instantiated directly. Please use MemoryResource APIs.")
111 @classmethod
112 def _init(cls, handle: int, uuid): # no-cython-lint
113 cdef IPCAllocationHandle self = IPCAllocationHandle.__new__(cls) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
114 assert handle >= 0 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
115 self._h_fd = create_fd_handle(handle) 2m Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
116 self._uuid = uuid 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
117 return self 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
119 cpdef close(self):
120 """Close the handle."""
121 self._h_fd.reset()
123 def __int__(self) -> int:
124 if not self._h_fd or as_intptr(self._h_fd) < 0:
125 raise ValueError(
126 f"Cannot convert IPCAllocationHandle to int: the handle (id={id(self)}) is closed."
127 )
128 return as_py(self._h_fd)
130 @property
131 def handle(self) -> int:
132 return as_py(self._h_fd) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
134 @property
135 def uuid(self) -> uuid.UUID:
136 return self._uuid 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
139def _reduce_allocation_handle(alloc_handle):
140 check_multiprocessing_start_method() 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
141 df = multiprocessing.reduction.DupFd(alloc_handle.handle) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
142 return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) 2NbOba b K L g h s t u v PbM QbRbN SbTbO UbVbP WbXbQ YbZbR 0bw x y z A B C D E F c d G H I J q r i j k l e f S
145def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint
146 return cls._init(df.detach(), uuid)
149multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle)
152# Buffer IPC Implementation
153# -------------------------
154cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
155 if not self.memory_resource.is_ipc_enabled: 2a b K L g h s t u v M T N U O V P W Q X R Y w x y z A B C D E F c d G H I J q r i j k l e f Lbn Mbo p
156 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
157 cdef cydriver.CUmemPoolPtrExportData data
158 with nogil: 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
159 HANDLE_RETURN( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
160 cydriver.cuMemPoolExportPointer(&data, as_cu(self._h_ptr)) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
161 )
162 cdef bytes data_b = cpython.PyBytes_FromStringAndSize( 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
163 <char*>(data.reserved), sizeof(data.reserved)
164 )
165 return IPCBufferDescriptor._init(data_b, self.size) 1abKLghstuvMTNUOVPWQXRYwxyzABCDEFcdGHIJqrijklefnop
167cdef Buffer Buffer_from_ipc_descriptor(
168 cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream
169):
170 """Import a buffer that was exported from another process."""
171 if not mr.is_ipc_enabled: 2LbMb
172 raise RuntimeError("Memory resource is not IPC-enabled") 2LbMb
173 cdef Stream s = Stream_accept(stream)
174 cdef DevicePtrHandle h_ptr = deviceptr_import_ipc(
175 mr._h_pool,
176 ipc_descriptor.payload_ptr(),
177 s._h_stream
178 )
179 if not h_ptr:
180 HANDLE_RETURN(get_last_error())
181 return Buffer_from_deviceptr_handle(h_ptr, ipc_descriptor.size, mr, ipc_descriptor)
184# _MemPool IPC Implementation
185# ---------------------------
187cdef _MemPool MP_from_allocation_handle(cls, alloc_handle):
188 # Quick exit for registry hits.
189 uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint
190 mr = registry.get(uuid)
191 if mr is not None:
192 if not isinstance(mr, cls):
193 raise TypeError(
194 f"Registry contains a {type(mr).__name__} for uuid "
195 f"{uuid}, but {cls.__name__} was requested")
196 return mr
198 # Ensure we have an allocation handle. Duplicate the file descriptor, if
199 # necessary.
200 if isinstance(alloc_handle, int):
201 fd = os.dup(alloc_handle)
202 try:
203 alloc_handle = IPCAllocationHandle._init(fd, None)
204 except:
205 os.close(fd)
206 raise
208 # Construct a new mempool.
209 cdef _MemPool self = <_MemPool>(cls.__new__(cls))
210 self._mempool_owned = True
211 cdef int ipc_fd = int(alloc_handle)
212 self._h_pool = create_mempool_handle_ipc(ipc_fd, IPC_HANDLE_TYPE)
213 if not self._h_pool:
214 HANDLE_RETURN(get_last_error())
215 raise RuntimeError(
216 f"Failed to import {cls.__name__} from an allocation handle: "
217 "cuda-core returned an empty memory pool handle without recording a CUDA error. "
218 "This is an internal cuda-core error; please report it with your CUDA driver, "
219 "CUDA Toolkit, and cuda-python versions."
220 )
221 self._ipc_data = IPCDataForMR(alloc_handle, True)
223 # Register it.
224 if uuid is not None:
225 registered = self.register(uuid)
226 assert registered is self
228 return self
231cdef _MemPool MP_from_registry(uuid):
232 try:
233 return registry[uuid]
234 except KeyError:
235 raise RuntimeError(f"Memory resource {uuid} was not found") from None
238cdef _MemPool MP_register(_MemPool self, uuid):
239 existing = registry.get(uuid)
240 if existing is not None:
241 return existing
242 assert self.uuid is None or self.uuid == uuid
243 registry[uuid] = self
244 self._ipc_data._alloc_handle._uuid = uuid
245 return self
248cdef IPCAllocationHandle MP_export_mempool(_MemPool self):
249 # Note: This is Linux only (int for file descriptor)
250 cdef int fd
251 with nogil: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
252 HANDLE_RETURN(cydriver.cuMemPoolExportToShareableHandle( 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
253 &fd, as_cu(self._h_pool), IPC_HANDLE_TYPE, 0)
254 )
255 try: 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
256 return IPCAllocationHandle._init(fd, uuid.uuid4()) 2Z 0 a 1 b 2 3 4 g 5 h 6 7 8 9 ! # $ % ' ( ) * + , - . / : ; = ? @ [ ] ^ _ ` { | } ~ abbbcbdbebfbgbhbibjbc d kblbmbnbobpbi j k l e f qbrbsbtbubvbwbxbybzbAbBbCbDbEbFbn GbHbIbS JbKbo p
257 except:
258 os.close(fd)
259 raise