Realtime Pipeline API

The realtime pipeline API provides the reusable host-side runtime for low-latency QEC pipelines that combine GPU inference with optional CPU post-processing. The published reference is generated from cudaq/qec/realtime/pipeline.h.

Note

This API is experimental and subject to change.

Configuration

struct core_pinning

CPU core affinity settings for pipeline threads.

Public Members

int dispatcher = -1

Core for the host dispatcher thread. -1 disables pinning.

int consumer = -1

Core for the consumer (completion) thread. -1 disables pinning.

int worker_base = -1

Base core for worker threads. Workers pin to base, base+1, etc. -1 disables pinning.

struct pipeline_stage_config

Configuration for a single pipeline stage.

Public Members

int num_workers = 8

Number of GPU worker threads (max 64).

int num_slots = 32

Number of ring buffer slots.

size_t slot_size = 16384

Size of each ring buffer slot in bytes.

core_pinning cores

CPU core affinity settings.

void *external_ringbuffer = nullptr

When non-null, the pipeline uses this caller-owned ring buffer (cudaq_ringbuffer_t*) instead of allocating its own. The caller is responsible for lifetime. ring_buffer_injector is unavailable in this mode (the FPGA/emulator owns the producer side).

GPU Stage

struct gpu_worker_resources

Per-worker GPU resources returned by the gpu_stage_factory.

Each worker owns a captured CUDA graph, a dedicated stream, and optional pre/post launch callbacks for DMA staging or result extraction.

Public Members

cudaGraphExec_t graph_exec = nullptr

Instantiated CUDA graph for this worker.

cudaStream_t stream = nullptr

Dedicated CUDA stream for graph launches.

void (*pre_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr

Optional callback invoked before graph launch (e.g. DMA copy).

void *pre_launch_data = nullptr

Opaque user data passed to pre_launch_fn.

void (*post_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream) = nullptr

Optional callback invoked after graph launch.

void *post_launch_data = nullptr

Opaque user data passed to post_launch_fn.

uint32_t function_id = 0

RPC function ID that this worker handles.

void *user_context = nullptr

Opaque user context passed to cpu_stage_callback.

using cudaq::qec::realtime::experimental::gpu_stage_factory = std::function<gpu_worker_resources(int worker_id)>

Factory called once per worker during start().

Param worker_id:

Zero-based worker index assigned by the pipeline.

Return:

GPU resources for the given worker. Any handles, callbacks, and user data returned here must remain valid until the pipeline stops.

CPU Stage

struct cpu_stage_context

Context passed to the CPU stage callback for each completed GPU workload.

The callback reads gpu_output, performs post-processing (e.g. MWPM decoding), and writes the result into response_buffer.

Public Members

int worker_id

Index of the worker thread invoking this callback.

int origin_slot

Ring buffer slot that originated this request.

const void *gpu_output

Pointer to GPU inference output (nullptr in poll mode).

size_t gpu_output_size

Size of GPU output in bytes.

void *response_buffer

Destination buffer for the RPC response.

size_t max_response_size

Maximum number of bytes that can be written to response_buffer.

void *user_context

Opaque user context from gpu_worker_resources::user_context.

using cudaq::qec::realtime::experimental::cpu_stage_callback = std::function<size_t(const cpu_stage_context &ctx)>

CPU stage callback type.

Param ctx:

Poll-mode view of the current worker state and response buffer.

Return:

Number of bytes written into ctx.response_buffer. Return 0 if no GPU result is ready yet (poll again). Return DEFERRED_COMPLETION to release the worker immediately while deferring slot completion to a later complete_deferred() call.

static constexpr size_t cudaq::qec::realtime::experimental::DEFERRED_COMPLETION = SIZE_MAX

Sentinel return value from cpu_stage_callback: release the worker (idle_mask) but do NOT signal slot completion (tx_flags). The caller is responsible for calling realtime_pipeline::complete_deferred(slot) once the deferred work (e.g. a separate decode thread) finishes.

Completion

struct completion

Metadata for a completed (or errored) pipeline request.

Public Members

uint64_t request_id

Original request ID from the RPC header.

int slot

Ring buffer slot that held this request.

bool success

True if the request completed without CUDA errors.

int cuda_error

CUDA error code (0 on success).

using cudaq::qec::realtime::experimental::completion_callback = std::function<void(const completion &c)>

Callback invoked by the consumer thread for each completed request.

Param c:

Metadata for the completed or errored request.

Ring Buffer Injector

class ring_buffer_injector

Writes RPC-framed requests into the pipeline’s ring buffer, simulating FPGA DMA deposits.

Created via realtime_pipeline::create_injector(). The parent realtime_pipeline must outlive the injector. Not available when the pipeline is configured with an external ring buffer.

Public Functions

~ring_buffer_injector()

Destroy the injector state.

ring_buffer_injector(ring_buffer_injector&&) noexcept

Move-construct an injector.

ring_buffer_injector &operator=(ring_buffer_injector&&) noexcept

Move-assign an injector.

bool try_submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id)

Try to submit a request without blocking.

Parameters:
  • function_id – RPC function identifier.

  • payload – Pointer to the payload data.

  • payload_size – Size of the payload in bytes.

  • request_id – Caller-assigned request identifier.

Returns:

True if accepted, false if all slots are busy (backpressure).

void submit(uint32_t function_id, const void *payload, size_t payload_size, uint64_t request_id)

Submit a request, spinning until a slot becomes available.

Parameters:
  • function_id – RPC function identifier.

  • payload – Pointer to the payload data.

  • payload_size – Size of the payload in bytes.

  • request_id – Caller-assigned request identifier.

uint64_t backpressure_stalls() const

Return the cumulative number of backpressure stalls.

Returns:

Number of times submit() had to spin-wait for a free slot.

Pipeline

class realtime_pipeline

Orchestrates GPU inference and CPU post-processing for low-latency realtime QEC decoding.

The pipeline manages a ring buffer, a host dispatcher thread, per-worker GPU streams with captured CUDA graphs, optional CPU worker threads for post-processing (e.g. PyMatching), and a consumer thread for completion signaling. It supports both an internal ring buffer (for software testing via ring_buffer_injector) and an external ring buffer (for FPGA RDMA).

Public Functions

explicit realtime_pipeline(const pipeline_stage_config &config)

Construct a pipeline and allocate ring buffer resources.

Note

Construction allocates the backing ring buffer or binds the caller-provided external ring so ringbuffer_bases can be queried before start.

Parameters:

config – Stage configuration (slots, slot size, workers, etc.).

~realtime_pipeline()

Stop the pipeline if needed and release owned resources.

void set_gpu_stage(gpu_stage_factory factory)

Register the GPU stage factory. Must be called before start().

Parameters:

factory – Callback that returns gpu_worker_resources per worker.

void set_cpu_stage(cpu_stage_callback callback)

Register the CPU worker callback. Must be called before start().

Parameters:

callback – Function invoked by each worker thread to poll for and process completed GPU workloads. If not set, the pipeline operates in GPU-only mode with completion signaled via cudaLaunchHostFunc.

void set_completion_handler(completion_callback handler)

Register the completion callback. Must be called before start().

Parameters:

handler – Function invoked by the consumer thread for each completed or errored request.

void start()

Allocate resources, build dispatcher config, and spawn all threads.

Throws:
  • std::logic_error – If the GPU stage factory was not registered.

  • std::logic_error – If GPU-only mode is requested with an external ring buffer.

void stop()

Signal shutdown, join all threads, free resources.

Note

Safe to call multiple times. Subsequent calls are no-ops once the pipeline has fully stopped.

ring_buffer_injector create_injector()

Create a software injector for testing without FPGA hardware.

Throws:

std::logic_error – if the pipeline uses an external ring buffer.

Returns:

A ring_buffer_injector bound to this pipeline’s ring buffer.

Stats stats() const

Thread-safe, lock-free stats snapshot.

Returns:

Current pipeline statistics.

void complete_deferred(int slot)

Signal that deferred processing for a slot is complete.

Call from any thread after the cpu_stage callback returned DEFERRED_COMPLETION and the deferred work has finished writing the response into the slot’s ring buffer area.

Parameters:

slot – Ring buffer slot index to complete.

ring_buffer_bases ringbuffer_bases() const

Return the host and device base addresses of the RX data ring.

Note

In external-ring mode these pointers are the caller-provided ring addresses. In internal mode they refer to the owned mapped ring buffer.

Returns:

Struct containing both base pointers.

struct ring_buffer_bases

Host and device base addresses of the RX data ring.

Public Members

uint8_t *rx_data_host

Host-mapped base pointer for the RX data ring.

uint8_t *rx_data_dev

Device-mapped base pointer for the RX data ring.

struct Stats

Pipeline throughput and backpressure statistics.

Public Members

uint64_t submitted

Total requests submitted to the ring buffer.

uint64_t completed

Total requests that completed (success or error).

uint64_t dispatched

Total packets dispatched by the host dispatcher.

uint64_t backpressure_stalls

Cumulative producer backpressure stalls.

struct Stats

Pipeline throughput and backpressure statistics.

Public Members

uint64_t submitted

Total requests submitted to the ring buffer.

uint64_t completed

Total requests that completed (success or error).

uint64_t dispatched

Total packets dispatched by the host dispatcher.

uint64_t backpressure_stalls

Cumulative producer backpressure stalls.

struct ring_buffer_bases

Host and device base addresses of the RX data ring.

Public Members

uint8_t *rx_data_host

Host-mapped base pointer for the RX data ring.

uint8_t *rx_data_dev

Device-mapped base pointer for the RX data ring.