Realtime Pipeline API
The realtime pipeline API provides the reusable host-side runtime for
low-latency QEC pipelines that combine GPU inference with optional CPU
post-processing. The published reference is generated from
cudaq/qec/realtime/pipeline.h.
Note
This API is experimental and subject to change.
Configuration
-
struct core_pinning
CPU core affinity settings for pipeline threads.
Public Members
-
int dispatcher = -1
Core for the host dispatcher thread. -1 disables pinning.
-
int consumer = -1
Core for the consumer (completion) thread. -1 disables pinning.
-
int worker_base = -1
Base core for worker threads. Workers pin to base, base+1, etc. -1 disables pinning.
-
int dispatcher = -1
-
struct pipeline_stage_config
Configuration for a single pipeline stage.
Public Members
-
int num_workers = 8
Number of GPU worker threads (max 64).
-
int num_slots = 32
Number of ring buffer slots.
-
size_t slot_size = 16384
Size of each ring buffer slot in bytes.
-
core_pinning cores
CPU core affinity settings.
-
void *external_ringbuffer = nullptr
When non-null, the pipeline uses this caller-owned ring buffer (cudaq_ringbuffer_t*) instead of allocating its own. The caller is responsible for lifetime. ring_buffer_injector is unavailable in this mode (the FPGA/emulator owns the producer side).
-
int num_workers = 8
GPU Stage
-
struct gpu_worker_resources
Per-worker GPU resources returned by the gpu_stage_factory.
Each worker owns a captured CUDA graph, a dedicated stream, and optional pre/post launch callbacks for DMA staging or result extraction.
Public Members
-
cudaGraphExec_t graph_exec = nullptr
Instantiated CUDA graph for this worker.
-
cudaStream_t stream = nullptr
Dedicated CUDA stream for graph launches.
-
void (*pre_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr
Optional callback invoked before graph launch (e.g. DMA copy).
-
void *pre_launch_data = nullptr
Opaque user data passed to
pre_launch_fn.
-
void (*post_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr
Optional callback invoked after graph launch.
-
void *post_launch_data = nullptr
Opaque user data passed to
post_launch_fn.
-
uint32_t function_id = 0
RPC function ID that this worker handles.
-
void *user_context = nullptr
Opaque user context passed to cpu_stage_callback.
-
cudaGraphExec_t graph_exec = nullptr
-
using cudaq::qec::realtime::experimental::gpu_stage_factory = std::function<gpu_worker_resources(int worker_id)>
Factory called once per worker during start().
- Param worker_id:
Zero-based worker index assigned by the pipeline.
- Return:
GPU resources for the given worker. Any handles, callbacks, and user data returned here must remain valid until the pipeline stops.
CPU Stage
-
struct cpu_stage_context
Context passed to the CPU stage callback for each completed GPU workload.
The callback reads
gpu_output, performs post-processing (e.g. MWPM decoding), and writes the result intoresponse_buffer.Public Members
-
int worker_id
Index of the worker thread invoking this callback.
-
int origin_slot
Ring buffer slot that originated this request.
-
const void *gpu_output
Pointer to GPU inference output (nullptr in poll mode).
-
size_t gpu_output_size
Size of GPU output in bytes.
-
void *response_buffer
Destination buffer for the RPC response.
-
size_t max_response_size
Maximum number of bytes that can be written to
response_buffer.
-
void *user_context
Opaque user context from gpu_worker_resources::user_context.
-
int worker_id
-
using cudaq::qec::realtime::experimental::cpu_stage_callback = std::function<size_t(const cpu_stage_context &ctx)>
CPU stage callback type.
- Param ctx:
Poll-mode view of the current worker state and response buffer.
- Return:
Number of bytes written into
ctx.response_buffer. Return 0 if no GPU result is ready yet (poll again). Return DEFERRED_COMPLETION to release the worker immediately while deferring slot completion to a later complete_deferred() call.
-
static constexpr size_t cudaq::qec::realtime::experimental::DEFERRED_COMPLETION = SIZE_MAX
Sentinel return value from cpu_stage_callback: release the worker (idle_mask) but do NOT signal slot completion (tx_flags). The caller is responsible for calling realtime_pipeline::complete_deferred(slot) once the deferred work (e.g. a separate decode thread) finishes.
Completion
-
struct completion
Metadata for a completed (or errored) pipeline request.
Public Members
-
uint64_t request_id
Original request ID from the RPC header.
-
int slot
Ring buffer slot that held this request.
-
bool success
True if the request completed without CUDA errors.
-
int cuda_error
CUDA error code (0 on success).
-
uint64_t request_id
-
using cudaq::qec::realtime::experimental::completion_callback = std::function<void(const completion &c)>
Callback invoked by the consumer thread for each completed request.
- Param c:
Metadata for the completed or errored request.
Ring Buffer Injector
-
class ring_buffer_injector
Writes RPC-framed requests into the pipeline’s ring buffer, simulating FPGA DMA deposits.
Created via realtime_pipeline::create_injector(). The parent realtime_pipeline must outlive the injector. Not available when the pipeline is configured with an external ring buffer.
Public Functions
-
~ring_buffer_injector()
Destroy the injector state.
-
ring_buffer_injector(ring_buffer_injector&&) noexcept
Move-construct an injector.
-
ring_buffer_injector &operator=(ring_buffer_injector&&) noexcept
Move-assign an injector.
-
bool try_submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id)
Try to submit a request without blocking.
- Parameters:
function_id – RPC function identifier.
payload – Pointer to the payload data.
payload_size – Size of the payload in bytes.
request_id – Caller-assigned request identifier.
- Returns:
True if accepted, false if all slots are busy (backpressure).
-
void submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id)
Submit a request, spinning until a slot becomes available.
- Parameters:
function_id – RPC function identifier.
payload – Pointer to the payload data.
payload_size – Size of the payload in bytes.
request_id – Caller-assigned request identifier.
-
uint64_t backpressure_stalls() const
Return the cumulative number of backpressure stalls.
- Returns:
Number of times submit() had to spin-wait for a free slot.
-
~ring_buffer_injector()
Pipeline
-
class realtime_pipeline
Orchestrates GPU inference and CPU post-processing for low-latency realtime QEC decoding.
The pipeline manages a ring buffer, a host dispatcher thread, per-worker GPU streams with captured CUDA graphs, optional CPU worker threads for post-processing (e.g. PyMatching), and a consumer thread for completion signaling. It supports both an internal ring buffer (for software testing via ring_buffer_injector) and an external ring buffer (for FPGA RDMA).
Public Functions
-
explicit realtime_pipeline(const pipeline_stage_config &config)
Construct a pipeline and allocate ring buffer resources.
Note
Construction allocates the backing ring buffer or binds the caller-provided external ring so ringbuffer_bases can be queried before start.
- Parameters:
config – Stage configuration (slots, slot size, workers, etc.).
-
~realtime_pipeline()
Stop the pipeline if needed and release owned resources.
-
void set_gpu_stage(gpu_stage_factory factory)
Register the GPU stage factory. Must be called before start().
- Parameters:
factory – Callback that returns gpu_worker_resources per worker.
-
void set_cpu_stage(cpu_stage_callback callback)
Register the CPU worker callback. Must be called before start().
- Parameters:
callback – Function invoked by each worker thread to poll for and process completed GPU workloads. If not set, the pipeline operates in GPU-only mode with completion signaled via cudaLaunchHostFunc.
-
void set_completion_handler(completion_callback handler)
Register the completion callback. Must be called before start().
- Parameters:
handler – Function invoked by the consumer thread for each completed or errored request.
-
void start()
Allocate resources, build dispatcher config, and spawn all threads.
- Throws:
std::logic_error – If the GPU stage factory was not registered.
std::logic_error – If GPU-only mode is requested with an external ring buffer.
-
void stop()
Signal shutdown, join all threads, free resources.
Note
Safe to call multiple times. Subsequent calls are no-ops once the pipeline has fully stopped.
-
ring_buffer_injector create_injector()
Create a software injector for testing without FPGA hardware.
- Throws:
std::logic_error – if the pipeline uses an external ring buffer.
- Returns:
A ring_buffer_injector bound to this pipeline’s ring buffer.
-
Stats stats() const
Thread-safe, lock-free stats snapshot.
- Returns:
Current pipeline statistics.
-
void complete_deferred(int slot)
Signal that deferred processing for a slot is complete.
Call from any thread after the cpu_stage callback returned DEFERRED_COMPLETION and the deferred work has finished writing the response into the slot’s ring buffer area.
- Parameters:
slot – Ring buffer slot index to complete.
-
ring_buffer_bases ringbuffer_bases() const
Return the host and device base addresses of the RX data ring.
Note
In external-ring mode these pointers are the caller-provided ring addresses. In internal mode they refer to the owned mapped ring buffer.
- Returns:
Struct containing both base pointers.
-
struct ring_buffer_bases
Host and device base addresses of the RX data ring.
Public Members
-
uint8_t *rx_data_host
Host-mapped base pointer for the RX data ring.
-
uint8_t *rx_data_dev
Device-mapped base pointer for the RX data ring.
-
uint8_t *rx_data_host
-
struct Stats
Pipeline throughput and backpressure statistics.
Public Members
-
uint64_t submitted
Total requests submitted to the ring buffer.
-
uint64_t completed
Total requests that completed (success or error).
-
uint64_t dispatched
Total packets dispatched by the host dispatcher.
-
uint64_t backpressure_stalls
Cumulative producer backpressure stalls.
-
uint64_t submitted
-
explicit realtime_pipeline(const pipeline_stage_config &config)
-
struct Stats
Pipeline throughput and backpressure statistics.
Public Members
-
uint64_t submitted
Total requests submitted to the ring buffer.
-
uint64_t completed
Total requests that completed (success or error).
-
uint64_t dispatched
Total packets dispatched by the host dispatcher.
-
uint64_t backpressure_stalls
Cumulative producer backpressure stalls.
-
uint64_t submitted
-
struct ring_buffer_bases
Host and device base addresses of the RX data ring.
Public Members
-
uint8_t *rx_data_host
Host-mapped base pointer for the RX data ring.
-
uint8_t *rx_data_dev
Device-mapped base pointer for the RX data ring.
-
uint8_t *rx_data_host