Coverage for cuda / core / _event.pyx: 81.20%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 01:07 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4  

5from __future__ import annotations 

6  

7cimport cpython 

8from libc.string cimport memcpy 

9from cuda.bindings cimport cydriver 

10from cuda.core._context cimport Context 

11from cuda.core._resource_handles cimport ( 

12 ContextHandle, 

13 EventHandle, 

14 create_event_handle, 

15 create_event_handle_ipc, 

16 as_intptr, 

17 as_cu, 

18 as_py, 

19) 

20  

21from cuda.core._utils.cuda_utils cimport ( 

22 check_or_create_options, 

23 HANDLE_RETURN 

24) 

25  

26import cython 

27from dataclasses import dataclass 

28import multiprocessing 

29  

30from cuda.core._utils.cuda_utils import ( 

31 CUDAError, 

32 check_multiprocessing_start_method, 

33) 

34  

35  

36@dataclass 

37cdef class EventOptions: 

38 """Customizable :obj:`~_event.Event` options. 

39  

40 Attributes 

41 ---------- 

42 enable_timing : bool, optional 

43 Event will record timing data. (Default to False) 

44 busy_waited_sync : bool, optional 

45 If True, event will use blocking synchronization. When a CPU 

46 thread calls synchronize, the call will block until the event 

47 has actually been completed. 

48 Otherwise, the CPU thread will busy-wait until the event has 

49 been completed. (Default to False) 

50 ipc_enabled : bool, optional 

51 Event will be suitable for interprocess use. 

52 Note that enable_timing must be False. (Default to False) 

53  

54 """ 

55  

56 enable_timing: bool | None = False 

57 busy_waited_sync: bool | None = False 

58 ipc_enabled: bool | None = False 

59  

60  

61cdef class Event: 

62 """Represent a record at a specific point of execution within a CUDA stream. 

63  

64 Applications can asynchronously record events at any point in 

65 the program. An event keeps a record of all previous work within 

66 the last recorded stream. 

67  

68 Events can be used to monitor device's progress, query completion 

69 of work up to event's record, help establish dependencies 

70 between GPU work submissions, and record the elapsed time (in milliseconds) 

71 on GPU: 

72  

73 .. code-block:: python 

74  

75 # To create events and record the timing: 

76 s = Device().create_stream() 

77 e1 = Device().create_event({"enable_timing": True}) 

78 e2 = Device().create_event({"enable_timing": True}) 

79 s.record(e1) 

80 # ... run some GPU works ... 

81 s.record(e2) 

82 e2.sync() 

83 print(f"time = {e2 - e1} milliseconds") 

84  

85 Directly creating an :obj:`~_event.Event` is not supported due to ambiguity, 

86 and they should instead be created through a :obj:`~_stream.Stream` object. 

87  

88 """ 

89  

90 def __init__(self, *args, **kwargs): 

91 raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") 1_

92  

93 @staticmethod 

94 cdef Event _init(type cls, int device_id, ContextHandle h_context, options, bint is_free): 

95 cdef Event self = cls.__new__(cls) 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

96 cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

97 cdef unsigned int flags = 0x0 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

98 self._timing_disabled = False 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

99 self._busy_waited = False 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

100 self._ipc_enabled = False 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

101 self._ipc_descriptor = None 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

102 if not opts.enable_timing: 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

103 flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mB:w8x9ry!Cszpkq#$%'()*+,DEFGHIJ-A./uv;=?@

104 self._timing_disabled = True 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mB:w8x9ry!Cszpkq#$%'()*+,DEFGHIJ-A./uv;=?@

105 if opts.busy_waited_sync: 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

106 flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC 1abcdz

107 self._busy_waited = True 1abcdz

108 if opts.ipc_enabled: 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl]^7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

109 if is_free: 1ijeafbgchdl]^k

110 raise TypeError( 1l

111 "IPC-enabled events must be bound; use Stream.record for creation." 

112 ) 

113 flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS 1ijeafbgchdl]^k

114 self._ipc_enabled = True 1ijeafbgchdl]^k

115 if not self._timing_disabled: 1[ijeafbgchdl]^k

116 raise TypeError("IPC-enabled events cannot use timing.") 1]^

117 # C++ creates the event and returns owning handle with context dependency 

118 cdef EventHandle h_event = create_event_handle(h_context, flags) 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

119 if not h_event: 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

120 raise RuntimeError("Failed to create CUDA event") 

121 self._h_event = h_event 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

122 self._h_context = h_context 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

123 self._device_id = device_id 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

124 if opts.ipc_enabled: 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

125 self.get_ipc_descriptor() 1ijeafbgchdlk

126 return self 1KLMNOPQRSTUVWXYZ0123456ijeafbgchdl7mnoB:w8x9ry!Cszptkq#$%'()*+,DEFGHIJ-A./uv;=?@

127  

128 cpdef close(self): 

129 """Destroy the event. 

130  

131 Releases the event handle. The underlying CUDA event is destroyed 

132 when the last reference is released. 

133 """ 

134 self._h_event.reset() 1KLMNOPQRSTUVWXYZ0123456k

135  

136 def __isub__(self, other): 

137 return NotImplemented 

138  

139 def __rsub__(self, other): 

140 return NotImplemented 

141  

142 def __sub__(self, other: Event): 

143 # return self - other (in milliseconds) 

144 cdef float timing 

145 with nogil: 1mnot

146 err = cydriver.cuEventElapsedTime(&timing, as_cu((<Event>other)._h_event), as_cu(self._h_event)) 1mnot

147 if err == 0: 1mnot

148 return timing 1nt

149 else: 

150 if err == cydriver.CUresult.CUDA_ERROR_INVALID_HANDLE: 1mno

151 if self.is_timing_disabled or other.is_timing_disabled: 1mo

152 explanation = ( 

153 "Both Events must be created with timing enabled in order to subtract them; " 1m

154 "use EventOptions(enable_timing=True) when creating both events." 

155 ) 

156 else: 

157 explanation = ( 

158 "Both Events must be recorded before they can be subtracted; " 1o

159 "use Stream.record() to record both events to a stream." 

160 ) 

161 elif err == cydriver.CUresult.CUDA_ERROR_NOT_READY: 1mno

162 explanation = ( 

163 "One or both events have not completed; " 1n

164 "use Event.sync(), Stream.sync(), or Device.sync() to wait for the events to complete " 

165 "before subtracting them." 

166 ) 

167 else: 

168 raise CUDAError(err) 

169 raise RuntimeError(explanation) 1mno

170  

171 def __hash__(self) -> int: 

172 return hash(as_intptr(self._h_event)) 189r!#$%'()*+,./uv

173  

174 def __eq__(self, other) -> bool: 

175 # Note: using isinstance because `Event` can be subclassed. 

176 if not isinstance(other, Event): 1xryCqDEFGHIJAuv

177 return NotImplemented 1CqDEFGHIJ

178 cdef Event _other = <Event>other 1xryqAuv

179 return as_intptr(self._h_event) == as_intptr(_other._h_event) 1xryqAuv

180  

181 def __repr__(self) -> str: 

182 return f"<Event handle={as_intptr(self._h_event):#x}>" 1ijeafbgchd-

183  

184 def get_ipc_descriptor(self) -> IPCEventDescriptor: 

185 """Export an event allocated for sharing between processes.""" 

186 if self._ipc_descriptor is not None: 1ijeafbgchdlk

187 return self._ipc_descriptor 1ijeafbgchdk

188 if not self.is_ipc_enabled: 1ijeafbgchdlk

189 raise RuntimeError("Event is not IPC-enabled") 

190 cdef cydriver.CUipcEventHandle data 

191 with nogil: 1ijeafbgchdlk

192 HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, as_cu(self._h_event))) 1ijeafbgchdlk

193 cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved)) 1ijeafbgchdlk

194 self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) 1ijeafbgchdlk

195 return self._ipc_descriptor 1ijeafbgchdlk

196  

197 @classmethod 

198 def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: 

199 """Import an event that was exported from another process.""" 

200 cdef cydriver.CUipcEventHandle data 

201 memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved)) 

202 cdef Event self = Event.__new__(cls) 

203 # IPC events: the originating process owns the event and its context 

204 cdef EventHandle h_event = create_event_handle_ipc(data) 

205 if not h_event: 

206 raise RuntimeError("Failed to open IPC event handle") 

207 self._h_event = h_event 

208 self._h_context = ContextHandle() 

209 self._timing_disabled = True 

210 self._busy_waited = ipc_descriptor._busy_waited 

211 self._ipc_enabled = True 

212 self._ipc_descriptor = ipc_descriptor 

213 self._device_id = -1 

214 return self 

215  

216 @property 

217 def is_ipc_enabled(self) -> bool: 

218 """Return True if the event can be shared across process boundaries, otherwise False.""" 

219 return self._ipc_enabled 1ijeafbgchdlk

220  

221 @property 

222 def is_timing_disabled(self) -> bool: 

223 """Return True if the event does not record timing data, otherwise False.""" 

224 return self._timing_disabled 1eafbgchdmo

225  

226 @property 

227 def is_sync_busy_waited(self) -> bool: 

228 """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" 

229 return self._busy_waited 1eafbgchdz

230  

231 def sync(self): 

232 """Synchronize until the event completes. 

233  

234 If the event was created with busy_waited_sync, then the 

235 calling CPU thread will block until the event has been 

236 completed by the device. 

237 Otherwise the CPU thread will busy-wait until the event 

238 has been completed. 

239  

240 """ 

241 with nogil: 1npt

242 HANDLE_RETURN(cydriver.cuEventSynchronize(as_cu(self._h_event))) 1npt

243  

244 @property 

245 def is_done(self) -> bool: 

246 """Return True if all captured works have been completed, otherwise False.""" 

247 with nogil: 1sp

248 result = cydriver.cuEventQuery(as_cu(self._h_event)) 1sp

249 if result == cydriver.CUresult.CUDA_SUCCESS: 1sp

250 return True 1sp

251 if result == cydriver.CUresult.CUDA_ERROR_NOT_READY: 

252 return False 

253 HANDLE_RETURN(result) 

254  

255 @property 

256 def handle(self) -> cuda.bindings.driver.CUevent: 

257 """Return the underlying CUevent object. 

258  

259 .. caution:: 

260  

261 This handle is a Python object. To get the memory address of the underlying C 

262 handle, call ``int(Event.handle)``. 

263 """ 

264 return as_py(self._h_event) 1ij7q

265  

266 @property 

267 def device(self) -> Device: 

268 """Return the :obj:`~_device.Device` singleton associated with this event. 

269  

270 Note 

271 ---- 

272 The current context on the device may differ from this 

273 event's context. This case occurs when a different CUDA 

274 context is set current after a event is created. 

275  

276 """ 

277 if self._device_id >= 0: 1w

278 from ._device import Device # avoid circular import 1w

279 return Device(self._device_id) 1w

280  

281 @property 

282 def context(self) -> Context: 

283 """Return the :obj:`~_context.Context` associated with this event.""" 

284 if self._h_context and self._device_id >= 0: 1B

285 return Context._from_handle(Context, self._h_context, self._device_id) 1B

286  

287  

288cdef class IPCEventDescriptor: 

289 """Serializable object describing an event that can be shared between processes.""" 

290  

291 cdef: 

292 bytes _reserved 

293 bint _busy_waited 

294  

295 def __init__(self, *arg, **kwargs): 

296 raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") 

297  

298 @staticmethod 

299 def _init(reserved: bytes, busy_waited: cython.bint): 

300 cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(IPCEventDescriptor) 1ijeafbgchdlk

301 self._reserved = reserved 1ijeafbgchdlk

302 self._busy_waited = busy_waited 1ijeafbgchdlk

303 return self 1ijeafbgchdlk

304  

305 def __eq__(self, IPCEventDescriptor rhs): 

306 # No need to check self._busy_waited. 

307 return self._reserved == rhs._reserved 1eafbgchd

308  

309 def __reduce__(self): 

310 return IPCEventDescriptor._init, (self._reserved, self._busy_waited) 1ijeafbgchd

311  

312  

313def _reduce_event(event): 

314 check_multiprocessing_start_method() 1ijeafbgchdk

315 return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) 1ijeafbgchdk

316  

317multiprocessing.reduction.register(Event, _reduce_event)