Coverage for cuda / core / experimental / _event.pyx: 84%
120 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-10 01:19 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-10 01:19 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2#
3# SPDX-License-Identifier: Apache-2.0
5from __future__ import annotations
7cimport cpython
8from libc.stdint cimport uintptr_t
9from libc.string cimport memcpy
10from cuda.bindings cimport cydriver
11from cuda.core.experimental._utils.cuda_utils cimport (
12 check_or_create_options,
13 HANDLE_RETURN
14)
16import cython
17from dataclasses import dataclass
18import multiprocessing
19from typing import TYPE_CHECKING, Optional
21from cuda.core.experimental._context import Context
22from cuda.core.experimental._utils.cuda_utils import (
23 CUDAError,
24 check_multiprocessing_start_method,
25 driver,
26)
27if TYPE_CHECKING:
28 import cuda.bindings
31@dataclass
32cdef class EventOptions:
33 """Customizable :obj:`~_event.Event` options.
35 Attributes
36 ----------
37 enable_timing : bool, optional
38 Event will record timing data. (Default to False)
39 busy_waited_sync : bool, optional
40 If True, event will use blocking synchronization. When a CPU
41 thread calls synchronize, the call will block until the event
42 has actually been completed.
43 Otherwise, the CPU thread will busy-wait until the event has
44 been completed. (Default to False)
45 ipc_enabled : bool, optional
46 Event will be suitable for interprocess use.
47 Note that enable_timing must be False. (Default to False)
49 """
51 enable_timing: Optional[bool] = False
52 busy_waited_sync: Optional[bool] = False
53 ipc_enabled: Optional[bool] = False
56cdef class Event:
57 """Represent a record at a specific point of execution within a CUDA stream.
59 Applications can asynchronously record events at any point in
60 the program. An event keeps a record of all previous work within
61 the last recorded stream.
63 Events can be used to monitor device's progress, query completion
64 of work up to event's record, help establish dependencies
65 between GPU work submissions, and record the elapsed time (in milliseconds)
66 on GPU:
68 .. code-block:: python
70 # To create events and record the timing:
71 s = Device().create_stream()
72 e1 = Device().create_event({"enable_timing": True})
73 e2 = Device().create_event({"enable_timing": True})
74 s.record(e1)
75 # ... run some GPU works ...
76 s.record(e2)
77 e2.sync()
78 print(f"time = {e2 - e1} milliseconds")
80 Directly creating an :obj:`~_event.Event` is not supported due to ambiguity,
81 and they should instead be created through a :obj:`~_stream.Stream` object.
83 """
84 def __cinit__(self):
85 self._handle = <cydriver.CUevent>(NULL)
87 def __init__(self, *args, **kwargs):
88 raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).")
90 @classmethod
91 def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False):
92 cdef Event self = Event.__new__(cls)
93 cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options")
94 cdef unsigned int flags = 0x0
95 self._timing_disabled = False
96 self._busy_waited = False
97 self._ipc_enabled = False
98 self._ipc_descriptor = None
99 if not opts.enable_timing:
100 flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING
101 self._timing_disabled = True
102 if opts.busy_waited_sync:
103 flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC
104 self._busy_waited = True
105 if opts.ipc_enabled:
106 if is_free:
107 raise TypeError(
108 "IPC-enabled events must be bound; use Stream.record for creation."
109 )
110 flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS
111 self._ipc_enabled = True
112 if not self._timing_disabled:
113 raise TypeError("IPC-enabled events cannot use timing.")
114 with nogil:
115 HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags))
116 self._device_id = device_id
117 self._ctx_handle = ctx_handle
118 if opts.ipc_enabled:
119 self.get_ipc_descriptor()
120 return self
122 cpdef close(self):
123 """Destroy the event."""
124 if self._handle != NULL:
125 with nogil:
126 HANDLE_RETURN(cydriver.cuEventDestroy(self._handle))
127 self._handle = <cydriver.CUevent>(NULL)
129 def __dealloc__(self):
130 self.close()
132 def __isub__(self, other):
133 return NotImplemented
135 def __rsub__(self, other):
136 return NotImplemented
138 def __sub__(self, other: Event):
139 # return self - other (in milliseconds)
140 cdef float timing
141 with nogil:
142 err = cydriver.cuEventElapsedTime(&timing, other._handle, self._handle)
143 if err == 0:
144 return timing
145 else:
146 if err == cydriver.CUresult.CUDA_ERROR_INVALID_HANDLE:
147 if self.is_timing_disabled or other.is_timing_disabled:
148 explanation = (
149 "Both Events must be created with timing enabled in order to subtract them; "
150 "use EventOptions(enable_timing=True) when creating both events."
151 )
152 else:
153 explanation = (
154 "Both Events must be recorded before they can be subtracted; "
155 "use Stream.record() to record both events to a stream."
156 )
157 elif err == cydriver.CUresult.CUDA_ERROR_NOT_READY:
158 explanation = (
159 "One or both events have not completed; "
160 "use Event.sync(), Stream.sync(), or Device.sync() to wait for the events to complete "
161 "before subtracting them."
162 )
163 else:
164 raise CUDAError(err)
165 raise RuntimeError(explanation)
167 def __hash__(self) -> int:
168 return hash((self._ctx_handle, <uintptr_t>(self._handle)))
170 def __eq__(self, other) -> bool:
171 # Note: using isinstance because `Event` can be subclassed.
172 if not isinstance(other, Event):
173 return NotImplemented
174 cdef Event _other = <Event>other
175 return <uintptr_t>(self._handle) == <uintptr_t>(_other._handle)
177 def get_ipc_descriptor(self) -> IPCEventDescriptor:
178 """Export an event allocated for sharing between processes."""
179 if self._ipc_descriptor is not None:
180 return self._ipc_descriptor
181 if not self.is_ipc_enabled:
182 raise RuntimeError("Event is not IPC-enabled")
183 cdef cydriver.CUipcEventHandle data
184 with nogil:
185 HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, <cydriver.CUevent>(self._handle)))
186 cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved))
187 self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited)
188 return self._ipc_descriptor
190 @classmethod
191 def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event:
192 """Import an event that was exported from another process."""
193 cdef cydriver.CUipcEventHandle data
194 memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved))
195 cdef Event self = Event.__new__(cls)
196 with nogil:
197 HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data))
198 self._timing_disabled = True
199 self._busy_waited = ipc_descriptor._busy_waited
200 self._ipc_enabled = True
201 self._ipc_descriptor = ipc_descriptor
202 self._device_id = -1 # ??
203 self._ctx_handle = None # ??
204 return self
206 @property
207 def is_ipc_enabled(self) -> bool:
208 """Return True if the event can be shared across process boundaries, otherwise False."""
209 return self._ipc_enabled
211 @property
212 def is_timing_disabled(self) -> bool:
213 """Return True if the event does not record timing data, otherwise False."""
214 return self._timing_disabled
216 @property
217 def is_sync_busy_waited(self) -> bool:
218 """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False."""
219 return self._busy_waited
221 def sync(self):
222 """Synchronize until the event completes.
224 If the event was created with busy_waited_sync, then the
225 calling CPU thread will block until the event has been
226 completed by the device.
227 Otherwise the CPU thread will busy-wait until the event
228 has been completed.
230 """
231 with nogil:
232 HANDLE_RETURN(cydriver.cuEventSynchronize(self._handle))
234 @property
235 def is_done(self) -> bool:
236 """Return True if all captured works have been completed, otherwise False."""
237 with nogil:
238 result = cydriver.cuEventQuery(self._handle)
239 if result == cydriver.CUresult.CUDA_SUCCESS:
240 return True
241 if result == cydriver.CUresult.CUDA_ERROR_NOT_READY:
242 return False
243 HANDLE_RETURN(result)
245 @property
246 def handle(self) -> cuda.bindings.driver.CUevent:
247 """Return the underlying CUevent object.
249 .. caution::
251 This handle is a Python object. To get the memory address of the underlying C
252 handle, call ``int(Event.handle)``.
253 """
254 return driver.CUevent(<uintptr_t>(self._handle))
256 @property
257 def device(self) -> Device:
258 """Return the :obj:`~_device.Device` singleton associated with this event.
260 Note
261 ----
262 The current context on the device may differ from this
263 event's context. This case occurs when a different CUDA
264 context is set current after a event is created.
266 """
267 if self._device_id >= 0:
268 from ._device import Device # avoid circular import
269 return Device(self._device_id)
271 @property
272 def context(self) -> Context:
273 """Return the :obj:`~_context.Context` associated with this event."""
274 if self._ctx_handle is not None and self._device_id >= 0:
275 return Context._from_ctx(self._ctx_handle, self._device_id)
278cdef class IPCEventDescriptor:
279 """Serializable object describing an event that can be shared between processes."""
281 cdef:
282 bytes _reserved
283 bint _busy_waited
285 def __init__(self, *arg, **kwargs):
286 raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.")
288 @classmethod
289 def _init(cls, reserved: bytes, busy_waited: cython.bint):
290 cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls)
291 self._reserved = reserved
292 self._busy_waited = busy_waited
293 return self
295 def __eq__(self, IPCEventDescriptor rhs):
296 # No need to check self._busy_waited.
297 return self._reserved == rhs._reserved
299 def __reduce__(self):
300 return self._init, (self._reserved, self._busy_waited)
303def _reduce_event(event):
304 check_multiprocessing_start_method()
305 return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)
307multiprocessing.reduction.register(Event, _reduce_event)