Coverage for cuda / core / experimental / _event.pyx: 84%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-10 01:19 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5from __future__ import annotations 

6  

7cimport cpython 

8from libc.stdint cimport uintptr_t 

9from libc.string cimport memcpy 

10from cuda.bindings cimport cydriver 

11from cuda.core.experimental._utils.cuda_utils cimport ( 

12 check_or_create_options, 

13 HANDLE_RETURN 

14) 

15  

16import cython 

17from dataclasses import dataclass 

18import multiprocessing 

19from typing import TYPE_CHECKING, Optional 

20  

21from cuda.core.experimental._context import Context 

22from cuda.core.experimental._utils.cuda_utils import ( 

23 CUDAError, 

24 check_multiprocessing_start_method, 

25 driver, 

26) 

27if TYPE_CHECKING: 

28 import cuda.bindings 

29  

30  

31@dataclass 

32cdef class EventOptions: 

33 """Customizable :obj:`~_event.Event` options. 

34  

35 Attributes 

36 ---------- 

37 enable_timing : bool, optional 

38 Event will record timing data. (Default to False) 

39 busy_waited_sync : bool, optional 

40 If True, event will use blocking synchronization. When a CPU 

41 thread calls synchronize, the call will block until the event 

42 has actually been completed. 

43 Otherwise, the CPU thread will busy-wait until the event has 

44 been completed. (Default to False) 

45 ipc_enabled : bool, optional 

46 Event will be suitable for interprocess use. 

47 Note that enable_timing must be False. (Default to False) 

48  

49 """ 

50  

51 enable_timing: Optional[bool] = False 

52 busy_waited_sync: Optional[bool] = False 

53 ipc_enabled: Optional[bool] = False 

54  

55  

56cdef class Event: 

57 """Represent a record at a specific point of execution within a CUDA stream. 

58  

59 Applications can asynchronously record events at any point in 

60 the program. An event keeps a record of all previous work within 

61 the last recorded stream. 

62  

63 Events can be used to monitor device's progress, query completion 

64 of work up to event's record, help establish dependencies 

65 between GPU work submissions, and record the elapsed time (in milliseconds) 

66 on GPU: 

67  

68 .. code-block:: python 

69  

70 # To create events and record the timing: 

71 s = Device().create_stream() 

72 e1 = Device().create_event({"enable_timing": True}) 

73 e2 = Device().create_event({"enable_timing": True}) 

74 s.record(e1) 

75 # ... run some GPU works ... 

76 s.record(e2) 

77 e2.sync() 

78 print(f"time = {e2 - e1} milliseconds") 

79  

80 Directly creating an :obj:`~_event.Event` is not supported due to ambiguity, 

81 and they should instead be created through a :obj:`~_stream.Stream` object. 

82  

83 """ 

84 def __cinit__(self): 

85 self._handle = <cydriver.CUevent>(NULL) 

86  

87 def __init__(self, *args, **kwargs): 

88 raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") 

89  

90 @classmethod 

91 def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False): 

92 cdef Event self = Event.__new__(cls) 

93 cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") 

94 cdef unsigned int flags = 0x0 

95 self._timing_disabled = False 

96 self._busy_waited = False 

97 self._ipc_enabled = False 

98 self._ipc_descriptor = None 

99 if not opts.enable_timing: 

100 flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING 

101 self._timing_disabled = True 

102 if opts.busy_waited_sync: 

103 flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC 

104 self._busy_waited = True 

105 if opts.ipc_enabled: 

106 if is_free: 

107 raise TypeError( 

108 "IPC-enabled events must be bound; use Stream.record for creation." 

109 ) 

110 flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS 

111 self._ipc_enabled = True 

112 if not self._timing_disabled: 

113 raise TypeError("IPC-enabled events cannot use timing.") 

114 with nogil: 

115 HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags)) 

116 self._device_id = device_id 

117 self._ctx_handle = ctx_handle 

118 if opts.ipc_enabled: 

119 self.get_ipc_descriptor() 

120 return self 

121  

122 cpdef close(self): 

123 """Destroy the event.""" 

124 if self._handle != NULL: 

125 with nogil: 

126 HANDLE_RETURN(cydriver.cuEventDestroy(self._handle)) 

127 self._handle = <cydriver.CUevent>(NULL) 

128  

129 def __dealloc__(self): 

130 self.close() 

131  

132 def __isub__(self, other): 

133 return NotImplemented 

134  

135 def __rsub__(self, other): 

136 return NotImplemented 

137  

138 def __sub__(self, other: Event): 

139 # return self - other (in milliseconds) 

140 cdef float timing 

141 with nogil: 

142 err = cydriver.cuEventElapsedTime(&timing, other._handle, self._handle) 

143 if err == 0: 

144 return timing 

145 else: 

146 if err == cydriver.CUresult.CUDA_ERROR_INVALID_HANDLE: 

147 if self.is_timing_disabled or other.is_timing_disabled: 

148 explanation = ( 

149 "Both Events must be created with timing enabled in order to subtract them; " 

150 "use EventOptions(enable_timing=True) when creating both events." 

151 ) 

152 else: 

153 explanation = ( 

154 "Both Events must be recorded before they can be subtracted; " 

155 "use Stream.record() to record both events to a stream." 

156 ) 

157 elif err == cydriver.CUresult.CUDA_ERROR_NOT_READY: 

158 explanation = ( 

159 "One or both events have not completed; " 

160 "use Event.sync(), Stream.sync(), or Device.sync() to wait for the events to complete " 

161 "before subtracting them." 

162 ) 

163 else: 

164 raise CUDAError(err) 

165 raise RuntimeError(explanation) 

166  

167 def __hash__(self) -> int: 

168 return hash((self._ctx_handle, <uintptr_t>(self._handle))) 

169  

170 def __eq__(self, other) -> bool: 

171 # Note: using isinstance because `Event` can be subclassed. 

172 if not isinstance(other, Event): 

173 return NotImplemented 

174 cdef Event _other = <Event>other 

175 return <uintptr_t>(self._handle) == <uintptr_t>(_other._handle) 

176  

177 def get_ipc_descriptor(self) -> IPCEventDescriptor: 

178 """Export an event allocated for sharing between processes.""" 

179 if self._ipc_descriptor is not None: 

180 return self._ipc_descriptor 

181 if not self.is_ipc_enabled: 

182 raise RuntimeError("Event is not IPC-enabled") 

183 cdef cydriver.CUipcEventHandle data 

184 with nogil: 

185 HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, <cydriver.CUevent>(self._handle))) 

186 cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved)) 

187 self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) 

188 return self._ipc_descriptor 

189  

190 @classmethod 

191 def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: 

192 """Import an event that was exported from another process.""" 

193 cdef cydriver.CUipcEventHandle data 

194 memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved)) 

195 cdef Event self = Event.__new__(cls) 

196 with nogil: 

197 HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data)) 

198 self._timing_disabled = True 

199 self._busy_waited = ipc_descriptor._busy_waited 

200 self._ipc_enabled = True 

201 self._ipc_descriptor = ipc_descriptor 

202 self._device_id = -1 # ?? 

203 self._ctx_handle = None # ?? 

204 return self 

205  

206 @property 

207 def is_ipc_enabled(self) -> bool: 

208 """Return True if the event can be shared across process boundaries, otherwise False.""" 

209 return self._ipc_enabled 

210  

211 @property 

212 def is_timing_disabled(self) -> bool: 

213 """Return True if the event does not record timing data, otherwise False.""" 

214 return self._timing_disabled 

215  

216 @property 

217 def is_sync_busy_waited(self) -> bool: 

218 """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" 

219 return self._busy_waited 

220  

221 def sync(self): 

222 """Synchronize until the event completes. 

223  

224 If the event was created with busy_waited_sync, then the 

225 calling CPU thread will block until the event has been 

226 completed by the device. 

227 Otherwise the CPU thread will busy-wait until the event 

228 has been completed. 

229  

230 """ 

231 with nogil: 

232 HANDLE_RETURN(cydriver.cuEventSynchronize(self._handle)) 

233  

234 @property 

235 def is_done(self) -> bool: 

236 """Return True if all captured works have been completed, otherwise False.""" 

237 with nogil: 

238 result = cydriver.cuEventQuery(self._handle) 

239 if result == cydriver.CUresult.CUDA_SUCCESS: 

240 return True 

241 if result == cydriver.CUresult.CUDA_ERROR_NOT_READY: 

242 return False 

243 HANDLE_RETURN(result) 

244  

245 @property 

246 def handle(self) -> cuda.bindings.driver.CUevent: 

247 """Return the underlying CUevent object. 

248  

249 .. caution:: 

250  

251 This handle is a Python object. To get the memory address of the underlying C 

252 handle, call ``int(Event.handle)``. 

253 """ 

254 return driver.CUevent(<uintptr_t>(self._handle)) 

255  

256 @property 

257 def device(self) -> Device: 

258 """Return the :obj:`~_device.Device` singleton associated with this event. 

259  

260 Note 

261 ---- 

262 The current context on the device may differ from this 

263 event's context. This case occurs when a different CUDA 

264 context is set current after a event is created. 

265  

266 """ 

267 if self._device_id >= 0: 

268 from ._device import Device # avoid circular import 

269 return Device(self._device_id) 

270  

271 @property 

272 def context(self) -> Context: 

273 """Return the :obj:`~_context.Context` associated with this event.""" 

274 if self._ctx_handle is not None and self._device_id >= 0: 

275 return Context._from_ctx(self._ctx_handle, self._device_id) 

276  

277  

278cdef class IPCEventDescriptor: 

279 """Serializable object describing an event that can be shared between processes.""" 

280  

281 cdef: 

282 bytes _reserved 

283 bint _busy_waited 

284  

285 def __init__(self, *arg, **kwargs): 

286 raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") 

287  

288 @classmethod 

289 def _init(cls, reserved: bytes, busy_waited: cython.bint): 

290 cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls) 

291 self._reserved = reserved 

292 self._busy_waited = busy_waited 

293 return self 

294  

295 def __eq__(self, IPCEventDescriptor rhs): 

296 # No need to check self._busy_waited. 

297 return self._reserved == rhs._reserved 

298  

299 def __reduce__(self): 

300 return self._init, (self._reserved, self._busy_waited) 

301  

302  

303def _reduce_event(event): 

304 check_multiprocessing_start_method() 

305 return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) 

306  

307multiprocessing.reduction.register(Event, _reduce_event)