Decoders
In quantum error correction, decoders are responsible for interpreting measurement outcomes (syndromes) to identify and correct quantum errors. We measure a set of stabilizers that give us information about what errors might have happened. The pattern of these measurements is called a syndrome, and the decoder’s task is to determine what errors most likely caused that syndrome.
The relationship between errors and syndromes is captured mathematically by the parity check matrix. Each row of this matrix represents a stabilizer measurement, while each column represents a possible error. When we multiply an error pattern by this matrix, we get the syndrome that would result from those errors.
Detector Error Model
Here we introduce the cudaq.qec.detector_error_model type, which allows us to create a detector error model (DEM) from a QEC circuit and noise model.
The DEM can be generated from a QEC circuit and noise model using functions like dem_from_memory_circuit(). For circuit-level noise, the DEM can be put into a
canonical form that’s organized by measurement rounds, making it suitable for multi-round decoding.
For a complete example of using the surface code with DEM to generate parity check matrices and perform decoding, see the circuit level noise example.
Generating a Multi-Round Parity Check Matrix
Below, we demonstrate how to use CUDA-Q QEC to construct a multi-round parity check matrix for an error correction code under a circuit-level noise model in Python:
import cudaq
import cudaq_qec as qec
import numpy as np
# Set target simulator (Stim) for fast stabilizer circuit simulation
cudaq.set_target("stim")
distance = 3 # Code distance (number of physical qubits for repetition code)
nRounds = 6 # Number of syndrome measurement rounds
nShots = 10000 # Number of circuit samples to run
# Set verbosity based on shot count
verbose = nShots <= 10
def vprint(*args, **kwargs):
if verbose:
print(*args, **kwargs)
# Retrieve a 3-qubit repetition code instance
three_qubit_repetition_code = qec.get_code("repetition", distance=distance)
# Z logical observable (for repetition codes, only Z matters)
logical_single_round = three_qubit_repetition_code.get_observables_z()
# Use predefined state preparation (|1⟩ for logical '1')
statePrep = qec.operation.prep1
# Create a noise model instance
noise_model = cudaq.NoiseModel()
# Define physical gate error probability
p = 0.01
# Define measurement error probability (not activated by default)
p_per_mz = 0.001
# Inject depolarizing noise on CX gates
noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)
# noise_model.add_all_qubit_channel("mz", cudaq.BitFlipChannel(p_per_mz)) # Optional: measurement noise
# === Decoder Setup ===
# Generate full detector error model (DEM), tracking all observables
dem_rep_full = qec.dem_from_memory_circuit(three_qubit_repetition_code,
statePrep, nRounds, noise_model)
# Generate Z-only detector error model (sufficient for repetition code)
dem_rep_z = qec.z_dem_from_memory_circuit(three_qubit_repetition_code,
statePrep, nRounds, noise_model)
# Extract multi-round parity check matrix (H matrix)
H_pcm_from_dem_full = dem_rep_full.detector_error_matrix
H_pcm_from_dem_z = dem_rep_z.detector_error_matrix
# Sanity check: for repetition codes, full and Z-only matrices should match
assert (H_pcm_from_dem_z == H_pcm_from_dem_full).all()
# Retrieve observable flips matrix: maps physical errors to logical flips
Lz_observables_flips_matrix = dem_rep_z.observables_flips_matrix
# Instantiate a decoder: single-error lookup table (fast and sufficient for small codes)
decoder = qec.get_decoder("single_error_lut", H_pcm_from_dem_z)
# === Simulation ===
# Sample noisy executions of the code circuit
syndromes, data = qec.sample_memory_circuit(three_qubit_repetition_code,
statePrep, nShots, nRounds,
noise_model)
syndromes = syndromes.reshape((nShots, nRounds, -1))
syndromes = syndromes.reshape((nShots, -1))
# Expected logical measurement (we prepared |1⟩)
expected_value = 1
# Counters for statistics
nLogicalErrorsWithoutDecoding = 0
nLogicalErrorsWDecoding = 0
nCorrections = 0
# === Loop over shots ===
for i in range(nShots):
vprint(f"shot: {i}")
data_i = data[i, :] # Final data measurement
vprint(f"data: {data_i}")
results = decoder.decode(syndromes[i, :])
convergence = results.converged
result = results.result
error_prediction = np.array(result, dtype=np.uint8)
vprint(f"error_prediction: {error_prediction}")
predicted_observable_flip = Lz_observables_flips_matrix @ error_prediction % 2
vprint(f"predicted_observable_flip: {predicted_observable_flip}")
measured_observable = logical_single_round @ data_i % 2
vprint(f"measured_observable: {measured_observable}")
if measured_observable != expected_value:
nLogicalErrorsWithoutDecoding += 1
predicted_observable = predicted_observable_flip ^ measured_observable
vprint(f"predicted_observable: {predicted_observable}")
if predicted_observable != expected_value:
nLogicalErrorsWDecoding += 1
nCorrections += int(predicted_observable_flip[0])
# === Summary statistics ===
print(
f"{nLogicalErrorsWithoutDecoding} logical errors without decoding in {nShots} shots\n"
)
print(
f"{nLogicalErrorsWDecoding} logical errors with decoding in {nShots} shots\n"
)
print(f"{nCorrections} corrections applied in {nShots} shots\n")
This example illustrates how to:
Retrieve and configure an error correction code Load a repetition code using
qec.get_code(...)from the CUDA-Q QEC library, and define a custom circuit-level noise model using.add_all_qubit_channel(...).Generate a multi-round parity check matrix Extend a single-round detector error model (DEM) across multiple rounds using
qec.dem_from_memory_circuit(...). This captures syndrome evolution over time, including measurement noise, and provides:detector_error_matrix– the multi-round parity check matrixobservables_flips_matrix– used to identify logical flips due to physical errors
Simulate circuit-level noise and collect data Run multiple shots of the memory experiment using
qec.sample_memory_circuit(...)to sample both the data and syndrome measurements from noisy executions. The resulting bitstrings can be used for decoding and performance evaluation of the error correction scheme.
Creating New QEC codes
Below, we demonstrate how to use CUDA-Q QEC to define a new QEC code entirely in Python. This powerful feature allows for rapid prototyping and testing of custom error correction schemes.
import cudaq
import cudaq_qec as qec
from cudaq_qec import patch
import numpy as np
# Use Stim for fast stabilizer simulation
cudaq.set_target("stim")
# Repetition code parameters
distance = 3 # Number of physical qubits (also sets number of data qubits)
nRounds = 6 # Number of stabilizer measurement rounds
nShots = 10000 # Number of noisy circuit executions to simulate
# Define logical operations as CUDA-Q kernels
@cudaq.kernel
def x_logical(logicalQubit: patch):
# Apply a logical X: bit-flip all data qubits
for i in range(len(logicalQubit.data)):
x(logicalQubit.data[i])
@cudaq.kernel
def prep0(logicalQubit: patch):
# Initialize all qubits in |0⟩
reset(logicalQubit.data)
reset(logicalQubit.ancz)
@cudaq.kernel
def prep1(logicalQubit: patch):
# Prepare logical |1⟩: apply X to logical |0⟩
prep0(logicalQubit)
x_logical(logicalQubit)
@cudaq.kernel
def stabilizer_round(logicalQubit: patch) -> list[bool]:
# Run one round of stabilizer measurements for the Z-type repetition code
num_ancilla = len(logicalQubit.ancz)
num_data = len(logicalQubit.data)
# Apply depolarizing noise to each data qubit
for i in range(num_data):
cudaq.apply_noise(cudaq.DepolarizationChannel, 0.1,
logicalQubit.data[i])
# It is possible to have even more control over the noise.
# cudaq.apply_noise(cudaq.Pauli1, 0.1, 0.1, 0.1, logicalQubit.data[i]) # in order pX, pY, and pZ errors.
# Measure each ZZ stabilizer using CNOTs from data to ancilla
for i in range(num_ancilla):
x.ctrl(logicalQubit.data[i], logicalQubit.ancz[i])
x.ctrl(logicalQubit.data[i + 1], logicalQubit.ancz[i])
# Measure ancilla qubits to extract the syndrome
measurements = mz(logicalQubit.ancz)
# Reset ancillas for the next round
reset(logicalQubit.ancz)
return measurements
# Define the custom repetition code using the @qec.code decorator
@qec.code('custom_repetition_code')
class MyRepetitionCode:
def __init__(self, **kwargs):
qec.Code.__init__(self) # Without this it won't work
self.distance = kwargs.get("distance", 3)
# Create ZZ stabilizer generators
stabilizers_str = self.__make_stabilizers_strings()
self.stabilizers = [
cudaq.SpinOperator.from_word(s) for s in stabilizers_str
]
# Define logical Z observable
obs_str = self.__make_pauli_observables()
self.pauli_observables = [
cudaq.SpinOperator.from_word(p) for p in obs_str
]
# Register logical operations used by the simulator
self.operation_encodings = {
qec.operation.prep0: prep0,
qec.operation.prep1: prep1,
qec.operation.x: x_logical,
qec.operation.stabilizer_round: stabilizer_round
}
def __make_stabilizers_strings(self):
# Create ZZ stabilizer strings: e.g., "ZZI", "IZZ", etc.
d = self.distance
return ['I' * i + 'ZZ' + 'I' * (d - i - 2) for i in range(d - 1)]
def __make_pauli_observables(self):
# Logical Z is Z on all data qubits
d = self.distance
return ["Z" * d]
# --- Required methods for the QEC backend ---
def get_num_data_qubits(self):
return self.distance
def get_num_ancilla_x_qubits(self):
return 0 # Not needed for Z-type repetition code
def get_num_ancilla_z_qubits(self):
return self.distance - 1
def get_num_ancilla_qubits(self):
return self.get_num_ancilla_z_qubits() + self.get_num_ancilla_x_qubits()
def get_num_x_stabilizers(self):
return 0
def get_num_z_stabilizers(self):
return self.distance - 1
# Instantiate the custom repetition code
my_repetition_code = qec.get_code("custom_repetition_code", distance=distance)
print(f"\n Created custom repetition code with distance {distance}.")
all_codes = qec.get_available_codes()
print("\n Available QEC codes both in the library and in Python:", all_codes)
# Let's check some propreties to verify that code is correctly created
# Display the code's stabilizer generators
stabilizers = my_repetition_code.get_stabilizers()
print(f"\n The code has {len(stabilizers)} stabilizers:")
for s in stabilizers:
print(" ", s)
logical_single_round = my_repetition_code.get_observables_z()
# Define and register a noise model
noise_model = cudaq.NoiseModel()
p = 0.01 # depolarizing noise strength
noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)
# Set initial logical state to |0⟩
statePrep = qec.operation.prep0
# Generate a full detector error model (DEM)
print("\n Generating detector error model...")
dem_rep = qec.dem_from_memory_circuit(my_repetition_code, statePrep, nRounds,
noise_model)
# Extract H matrix (syndrome parity checks)
H_pcm = dem_rep.detector_error_matrix
# Extract observable flips matrix (maps physical errors to logical flips)
Lz_observables_flips_matrix = dem_rep.observables_flips_matrix
# Sample noisy executions of the full memory circuit
print("\n Sampling noisy memory circuit executions...")
syndromes, data = qec.sample_memory_circuit(my_repetition_code, statePrep,
nShots, nRounds, noise_model)
# Reshape syndromes to flatten rounds per shot
syndromes = syndromes.reshape((nShots, -1))
print(f"\n Showing first {min(nShots, 5)} of {nShots} sampled results:")
for i in range(min(nShots, 5)):
print(
f"Shot {i+1:>2}: Logical measurement = {data[i]}, Syndromes = {syndromes[i]}"
)
This example illustrates several key concepts for defining custom codes:
Define a Code Class: A new code is defined by creating a Python class decorated with
@qec.code(...), which registers it with the CUDA-Q QEC runtime. The class must inherit fromqec.Code.Implement Required Methods: The class must implement methods that describe the code’s structure, such as
get_num_data_qubits()andget_num_ancilla_qubits().Define Logical Operations as Kernels: Quantum operations like state preparation (
prep0,prep1), logical gates (x_logical), and stabilizer measurements (stabilizer_round) are implemented as standard CUDA-Q kernels.Map Operations to Kernels: The
operation_encodingsdictionary links abstract QEC operations (e.g.,qec.operation.prep0) to the concrete CUDA-Q kernels that implement them.Provide Stabilizers and Observables: The code’s stabilizer generators and logical observables must be defined. This is typically done by creating lists of
cudaq.SpinOperatorobjects representing the Pauli strings for the stabilizers (e.g., “ZZI”) and logical operators (e.g., “ZZZ”).Specify Fine-Grained Noise: This example demonstrates applying noise at a specific point within a kernel. Inside
stabilizer_round,cudaq.apply_noiseis called on each data qubit, offering precise control over the noise model, in contrast to applying noise globally to all gates of a certain type.
Once defined, the custom code can be instantiated with qec.get_code() and used with all standard CUDA-Q QEC tools, including qec.dem_from_memory_circuit() and qec.sample_memory_circuit().
Getting Started with the NVIDIA QLDPC Decoder
Starting with CUDA-Q QEC v0.2, a GPU-accelerated decoder is included with the
CUDA-Q QEC library. The library follows the CUDA-Q decoder Python and C++ interfaces
(namely cudaq_qec.Decoder for Python and
cudaq::qec::decoder for C++), but as documented in the API sections
(NVIDIA QLDPC Decoder for Python and
NVIDIA QLDPC Decoder for C++), there are many configuration options
that can be passed to the constructor.
Belief Propagation Methods
The nv-qldpc-decoder supports multiple belief propagation (BP) algorithms, each with different trade-offs
between accuracy, convergence, and speed:
Sum-Product BP (
bp_method=0): The standard BP algorithm. Good baseline performance.Min-Sum BP (
bp_method=1): Faster approximation to sum-product. Can be tuned withscale_factor.Memory-based BP (
bp_method=2): Adds uniform memory (gamma0) to help escape local minima. Useful when standard BP fails to converge.Disordered Memory BP (
bp_method=3): Uses per-variable memory strengths for better adaptability to code structure.Sequential Relay BP (
composition=1): Advanced method that runs multiple “relay legs” with different gamma configurations. See examples below for configuration.
Usage Example
The following example shows how to exercise the decoder using non-trivial pre-generated test data. The test data was generated using scripts originating from the GitHub repo for BivariateBicycleCodes [1]; it includes parity check matrices (PCMs) and test syndromes to exercise a decoder.
The example demonstrates:
Basic decoder configuration with OSD post-processing
All BP methods including Sequential Relay BP
Batched decoding for improved performance
import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time
# For fetching data
import requests
import bz2
import os
# Note: running this script will automatically download data if necessary.
### Helper functions ###
def parse_csr_mat(j, dims, mat_name):
"""
Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
"""
assert len(dims) == 2, "dims must be a tuple of two integers"
# Extract indptr and indices from the JSON.
indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
indices = np.array(j[f"{mat_name}_indices"], dtype=int)
# Check that the CSR structure is consistent.
assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
assert np.all(
indices < dims[1]), "All column indices must be less than dims[1]"
# Create a data array of ones.
data = np.ones(indptr[-1], dtype=np.uint8)
# Build the CSR matrix and return it as a dense numpy array.
csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
return csr.toarray()
def parse_H_csr(j, dims):
"""
Parse a CSR-style parity check matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "H")
def parse_obs_csr(j, dims):
"""
Parse a CSR-style observable matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "obs_mat")
### Main decoder loop ###
def run_decoder(filename, num_shots, run_as_batched):
"""
Load a JSON file and decode "num_shots" syndromes.
"""
t_load_begin = time.time()
with open(filename, "r") as f:
j = json.load(f)
dims = j["shape"]
assert len(dims) == 2
# Read the Parity Check Matrix
H = parse_H_csr(j, dims)
syndrome_length, block_length = dims
t_load_end = time.time()
print(f"{filename} parsed in {1e3 * (t_load_end-t_load_begin)} ms")
error_rate_vec = np.array(j["error_rate_vec"])
assert len(error_rate_vec) == block_length
obs_mat_dims = j["obs_mat_shape"]
obs_mat = parse_obs_csr(j, obs_mat_dims)
assert dims[1] == obs_mat_dims[0]
file_num_trials = j["num_trials"]
num_shots = min(num_shots, file_num_trials)
print(
f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')
# osd_method: 0=Off, 1=OSD-0, 2=Exhaustive, 3=Combination Sweep
osd_method = 1
# When osd_method is:
# 2) there are 2^osd_order additional error mechanisms checked.
# 3) there are an additional k + osd_order*(osd_order-1)/2 error
# mechanisms checked.
# Ref: https://arxiv.org/pdf/2005.07016
osd_order = 0
# Maximum number of BP iterations before attempting OSD (if necessary)
max_iter = 50
nv_dec_args = {
"max_iterations": max_iter,
"error_rate_vec": error_rate_vec,
"use_sparsity": True,
"use_osd": osd_method > 0,
"osd_order": osd_order,
"osd_method": osd_method
}
if run_as_batched:
# Perform BP processing for up to 1000 syndromes per batch. If there
# are more than 1000 syndromes, the decoder will chunk them up and
# process each batch sequentially under the hood.
nv_dec_args['bp_batch_size'] = min(1000, num_shots)
try:
nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
**nv_dec_args)
except Exception as e:
print(
'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
'QEC installation.')
exit(0)
decoding_time = 0
bp_converged_flags = []
num_logical_errors = 0
# Batched API
if run_as_batched:
syndrome_list = []
obs_truth_list = []
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
syndrome_list.append(syndrome)
obs_truth_list.append(obs_truth)
t0 = time.time()
results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
t1 = time.time()
decoding_time += t1 - t0
for r, obs_truth in zip(results, obs_truth_list):
bp_converged_flags.append(r.converged)
dec_result = np.array(r.result, dtype=np.uint8)
# See if this prediction flipped the observable
predicted_observable = obs_mat.T @ dec_result % 2
print(f"predicted_observable: {predicted_observable}")
# See if the observable was actually flipped according to the truth
# data
actual_observable = np.array(obs_truth, dtype=np.uint8)
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
# Non-batched API
else:
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
t0 = time.time()
results = nv_dec_gpu_and_cpu.decode(syndrome)
bp_converged = results.converged
dec_result = results.result
t1 = time.time()
trial_diff = t1 - t0
decoding_time += trial_diff
dec_result = np.array(dec_result, dtype=np.uint8)
bp_converged_flags.append(bp_converged)
# See if this prediction flipped the observable
predicted_observable = obs_mat.T @ dec_result % 2
print(f"predicted_observable: {predicted_observable}")
# See if the observable was actually flipped according to the truth
# data
actual_observable = np.array(obs_truth, dtype=np.uint8)
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
# Count how many shots the decoder failed to correct the errors
print(f"{num_logical_errors} logical errors in {num_shots} shots")
print(
f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
)
print(
f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
)
def demonstrate_bp_methods():
"""
Demonstrate different BP methods available in nv-qldpc-decoder.
Shows configurations for: sum-product, min-sum, memory BP,
disordered memory BP, and sequential relay BP.
"""
# Simple 3x7 parity check matrix for demonstration
H_list = [[1, 0, 0, 1, 0, 1, 1], [0, 1, 0, 1, 1, 0, 1],
[0, 0, 1, 0, 1, 1, 1]]
H = np.array(H_list, dtype=np.uint8)
print("=" * 60)
print("Demonstrating BP Methods in nv-qldpc-decoder")
print("=" * 60)
# Method 0: Sum-Product BP (default)
print("\n1. Sum-Product BP (bp_method=0, default):")
try:
decoder_sp = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=0,
max_iterations=30)
except Exception as e:
print(
'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
'QEC installation.')
exit(0)
print(" Created decoder with sum-product BP")
# Method 1: Min-Sum BP
print("\n2. Min-Sum BP (bp_method=1):")
decoder_ms = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=1,
max_iterations=30,
scale_factor=1.0)
print(" Created decoder with min-sum BP")
# Method 2: Min-Sum with uniform Memory (Mem-BP)
print("\n3. Mem-BP (bp_method=2, uniform memory strength):")
decoder_mem = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=2,
max_iterations=30,
gamma0=0.5)
print(" Created decoder with Mem-BP (gamma0=0.5)")
# Method 3: Min-Sum with Disordered Memory (DMem-BP)
print("\n4. DMem-BP (bp_method=3, disordered memory strength):")
# Option A: Using gamma_dist (random gammas in range)
decoder_dmem = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=3,
max_iterations=30,
gamma_dist=[0.1, 0.5],
bp_seed=42)
print(" Created decoder with DMem-BP (gamma_dist=[0.1, 0.5])")
# Option B: Using explicit_gammas (specify exact gamma for each variable)
block_size = H.shape[1]
explicit_gammas = [[0.1 + 0.05 * i for i in range(block_size)]]
decoder_dmem_explicit = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=3,
max_iterations=30,
explicit_gammas=explicit_gammas)
print(" Created decoder with DMem-BP (explicit gammas)")
# Method 4: Sequential Relay BP (composition=1)
print("\n5. Sequential Relay BP (composition=1):")
print(" Requires bp_method=3 and srelay_config")
# Configure relay parameters
srelay_config = {
'pre_iter': 5, # Run 5 iterations with gamma0 before relay legs
'num_sets': 3, # Use 3 relay legs
'stopping_criterion': 'FirstConv' # Stop after first convergence
}
# Option A: Using gamma_dist for relay legs
decoder_relay = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=3,
composition=1,
max_iterations=50,
gamma0=0.3,
gamma_dist=[0.1, 0.5],
srelay_config=srelay_config,
bp_seed=42)
print(" Created decoder with Relay-BP (gamma_dist, FirstConv stopping)")
# Option B: Using explicit gammas for each relay leg
num_relay_legs = 3
explicit_relay_gammas = [
[0.1 + 0.02 * i for i in range(block_size)], # First relay leg
[0.2 + 0.03 * i for i in range(block_size)], # Second relay leg
[0.3 + 0.04 * i for i in range(block_size)] # Third relay leg
]
srelay_config_all = {
'pre_iter': 10,
'num_sets': 3,
'stopping_criterion': 'All' # Run all relay legs
}
decoder_relay_explicit = qec.get_decoder(
"nv-qldpc-decoder",
H,
bp_method=3,
composition=1,
max_iterations=50,
gamma0=0.3,
explicit_gammas=explicit_relay_gammas,
srelay_config=srelay_config_all)
print(" Created decoder with Relay-BP (explicit gammas, All legs)")
# Option C: NConv stopping criterion
srelay_config_nconv = {
'pre_iter': 5,
'num_sets': 10,
'stopping_criterion': 'NConv',
'stop_nconv': 3 # Stop after 3 convergences
}
decoder_relay_nconv = qec.get_decoder("nv-qldpc-decoder",
H,
bp_method=3,
composition=1,
max_iterations=50,
gamma0=0.3,
gamma_dist=[0.1, 0.6],
srelay_config=srelay_config_nconv,
bp_seed=42)
print(" Created decoder with Relay-BP (NConv stopping after 3)")
print("\n" + "=" * 60)
print("All decoder configurations created successfully!")
print("=" * 60)
if __name__ == "__main__":
# Demonstrate different BP methods (introduced in v0.5.0)
print("\n### PART 1: BP Methods Demonstration ###\n")
demonstrate_bp_methods()
# Full decoding with test data
print("\n\n### PART 2: Full Decoding Example with Test Data ###\n")
# See other test data options in https://github.com/NVIDIA/cudaqx/releases/tag/0.2.0
filename = 'osd_1008_8785_0.001.json'
bz2filename = filename + '.bz2'
if not os.path.exists(filename):
url = f"https://github.com/NVIDIA/cudaqx/releases/download/0.2.0/{bz2filename}"
print(f'Downloading data from {url}')
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error if download fails
with open(bz2filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f'Decompressing {bz2filename} into {filename}')
# Decompress the file
with bz2.BZ2File(bz2filename, "rb") as f_in, open(filename,
"wb") as f_out:
f_out.write(f_in.read())
print(f"Decompressed file saved as {filename}")
num_shots = 100
run_as_batched = True
run_decoder(filename, num_shots, run_as_batched)
Footnotes
Exact Maximum Likelihood Decoding with NVIDIA Tensor Network Decoder
Starting with CUDA-Q QEC v0.4.0, a GPU-accelerated Maximum Likelihood Decoder is included with the
CUDA-Q QEC library. The library follows the CUDA-Q decoder Python interface, namely cudaq_qec.Decoder.
At this time, we only support the Python interface for the decoder, which is
available at cudaq_qec.plugins.decoders.tensor_network_decoder.TensorNetworkDecoder.
As documented in the API sections Tensor Network Decoder, there are many configuration options
that can be passed to the constructor. The decoder requires Python 3.11 or higher.
In the following example, we show how to use the TensorNetworkDecoder class from the cudaq_qec library to decode a circuit-level noise problem derived from a Stim surface code circuit.
"""
Example usage of tensor_network_decoder from cudaq-qec.
This script demonstrates how to instantiate and use the tensor network decoder
to decode a circuit level noise problem derived from a Stim surface code experiment.
This example requires the `cudaq-qec` package and the optional tensor-network-decoder dependencies.
To install the required dependencies, run:
pip install cudaq-qec[tensor-network-decoder]
Additionaly, in this example, you will need `stim` and `beliefmatching` packages:
pip install stim beliefmatching
"""
import cudaq_qec as qec
import numpy as np
import platform
if platform.machine().lower() in ("arm64", "aarch64"):
print(
"Warning: stim is not supported on manylinux ARM64/aarch64. Skipping this example..."
)
sys.exit(0)
import stim
from beliefmatching.belief_matching import detector_error_model_to_check_matrices
def parse_detector_error_model(detector_error_model):
matrices = detector_error_model_to_check_matrices(detector_error_model)
out_H = np.zeros(matrices.check_matrix.shape)
matrices.check_matrix.astype(np.float64).toarray(out=out_H)
out_L = np.zeros(matrices.observables_matrix.shape)
matrices.observables_matrix.astype(np.float64).toarray(out=out_L)
return out_H, out_L, [float(p) for p in matrices.priors]
circuit = stim.Circuit.generated("surface_code:rotated_memory_z",
rounds=3,
distance=3,
after_clifford_depolarization=0.001,
after_reset_flip_probability=0.01,
before_measure_flip_probability=0.01,
before_round_data_depolarization=0.01)
detector_error_model = circuit.detector_error_model(decompose_errors=True)
H, logicals, noise_model = parse_detector_error_model(detector_error_model)
decoder = qec.get_decoder(
"tensor_network_decoder",
H,
logical_obs=logicals,
noise_model=noise_model,
contract_noise_model=True,
)
num_shots = 5
sampler = circuit.compile_detector_sampler()
detection_events, observable_flips = sampler.sample(num_shots,
separate_observables=True)
res = decoder.decode_batch(detection_events)
print("Tensor network prediction: ", [r.result[0] > 0.5 for r in res])
print("Actual observable flips: ", [bool(o[0]) for o in observable_flips])
Output:
The decoder returns the probability that the logical observable has flipped for each syndrome. This can be used to assess the performance of the code and the decoder under different error scenarios.
See Also:
cudaq_qec.plugins.decoders.tensor_network_decoder
Deploying AI Decoders with TensorRT
Starting with CUDA-Q QEC v0.5.0, a GPU-accelerated TensorRT-based decoder is included with the
CUDA-Q QEC library. The TensorRT decoder (trt_decoder) enables users to leverage custom AI
models for quantum error correction, providing a flexible framework for deploying trained models
with optimized inference performance on NVIDIA GPUs.
Unlike traditional algorithmic decoders, neural network decoders can be trained on specific error models and code structures, potentially achieving superior performance for certain noise regimes. The TensorRT decoder supports loading models in ONNX format and provides configurable precision modes (fp16, bf16, int8, fp8, tf32) to balance accuracy and inference speed.
This tutorial demonstrates the complete workflow for training a simple multi-layer perceptron (MLP) to decode surface code syndromes using PyTorch and Stim, exporting the model to ONNX format, and deploying it with the TensorRT decoder for accelerated inference.
The workflow consists of three main stages:
Data Generation: Use Stim to generate synthetic quantum error correction data by simulating surface code circuits with realistic noise models. This produces detector measurements (syndromes) and observable flips (logical errors) that serve as training data.
Model Training: Train a neural network (in this case, an MLP) using PyTorch to learn the mapping from syndromes to logical error predictions. The model is trained with standard deep learning techniques including dropout regularization, learning rate scheduling, and validation monitoring.
ONNX Export and Deployment: Export the trained PyTorch model to ONNX format, which can then be loaded by the TensorRT decoder for optimized GPU inference in production QEC workflows.
The following example shows how to generate training data using Stim’s built-in surface code generator, train an MLP decoder with PyTorch, and export the model to ONNX format. For instructions on installing PyTorch, see Installing PyTorch.
import sys
import platform
if platform.machine().lower() in ("arm64", "aarch64"):
print(
"Warning: stim is not supported on manylinux ARM64/aarch64. Skipping this example..."
)
sys.exit(0)
import stim
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
# --------------------------
# Parameters
# --------------------------
distance = 3 # Surface code distance (simpler for demo)
num_rounds = 3 # Rounds of stabilizer measurements
num_train_samples = 5000 # Training samples (more data)
num_val_samples = 1000 # Validation samples
num_test_samples = 1000 # Test samples
hidden_dim = 128 # Larger model capacity
error_prob = 0.005 # Balanced error rate for better learning
# --------------------------
# Build the surface code circuit
# --------------------------
# Use the built-in Stim surface code generator with noise
circuit = stim.Circuit.generated("surface_code:rotated_memory_x",
distance=distance,
rounds=num_rounds,
after_clifford_depolarization=error_prob,
after_reset_flip_probability=error_prob,
before_measure_flip_probability=error_prob,
before_round_data_depolarization=error_prob)
# Convert to detector error model
dem = circuit.detector_error_model()
num_detectors = dem.num_detectors
num_data_qubits = circuit.num_qubits - num_detectors
print(f"Num data qubits: {num_data_qubits}, Num detectors: {num_detectors}")
# --------------------------
# Sample training, validation, and test data
# --------------------------
sampler = circuit.compile_detector_sampler()
def sample_data(num_samples):
"""Sample detector outcomes and observable flips."""
X_data = []
Y_data = []
detector_samples, observable_samples = sampler.sample(
num_samples, separate_observables=True)
for i in range(num_samples):
detectors = torch.tensor(detector_samples[i], dtype=torch.float32)
observable = torch.tensor(observable_samples[i], dtype=torch.float32)
X_data.append(detectors)
Y_data.append(observable)
return torch.stack(X_data), torch.stack(Y_data)
print(f"Sampling {num_train_samples} training samples...")
X_train, Y_train = sample_data(num_train_samples)
print(f"Sampling {num_val_samples} validation samples...")
X_val, Y_val = sample_data(num_val_samples)
print(f"Sampling {num_test_samples} test samples...")
X_test, Y_test = sample_data(num_test_samples)
num_observables = Y_train.shape[1]
print(f"Num observables: {num_observables}")
# --------------------------
# Improved Torch NN decoder with dropout and deeper architecture
# --------------------------
class SurfaceCodeDecoder(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim=128, dropout=0.3):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim * 2), # 256
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim * 2, hidden_dim), # 128
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, hidden_dim // 2), # 64
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim // 2, output_dim),
nn.Sigmoid())
def forward(self, x):
return self.net(x)
model = SurfaceCodeDecoder(input_dim=num_detectors,
output_dim=num_observables,
hidden_dim=hidden_dim,
dropout=0.3)
optimizer = optim.Adam(model.parameters(), lr=5e-4) # Lower learning rate
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,
mode='min',
factor=0.5,
patience=20)
criterion = nn.BCELoss()
# Create DataLoaders for batch training
train_dataset = TensorDataset(X_train, Y_train)
val_dataset = TensorDataset(X_val, Y_val)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
def compute_accuracy(predictions, targets, threshold=0.5):
"""Compute binary accuracy."""
pred_binary = (predictions > threshold).float()
correct = (pred_binary == targets).float().mean()
return correct.item()
# --------------------------
# Train NN with validation
# --------------------------
epochs = 1000 # Train longer for better convergence
best_val_acc = 0.0
print("\nTraining started...")
print("=" * 70)
for epoch in range(epochs):
# Training step with batches
model.train()
train_loss_total = 0.0
train_correct = 0
train_total = 0
for batch_X, batch_Y in train_loader:
optimizer.zero_grad()
train_output = model(batch_X)
train_loss = criterion(train_output, batch_Y)
train_loss.backward()
optimizer.step()
train_loss_total += train_loss.item() * batch_X.size(0)
train_correct += ((train_output > 0.5).float() == batch_Y).sum().item()
train_total += batch_Y.numel()
train_loss_avg = train_loss_total / len(train_loader.dataset)
train_acc = train_correct / train_total
# Validation step with batches
model.eval()
val_loss_total = 0.0
val_correct = 0
val_total = 0
cum_ler = 0.0
with torch.no_grad():
for batch_X, batch_Y in val_loader:
val_output = model(batch_X)
val_loss = criterion(val_output, batch_Y)
val_output_binary = (val_output > 0.5)
ler = val_output_binary ^ (batch_Y > 0.5)
# print(f"loss: {loss.sum().item() / loss.numel()}")
cum_ler += ler.sum().item()
# print(f"val_output_binary: {val_output_binary} ler: {ler} batch_Y: {batch_Y} ")
# print(f"logical_error_rate (pred): {val_output.sum().item() / val_output.numel()}")
# print(f"logical_error_rate (raw): {batch_Y.sum().item() / batch_Y.numel()}")
val_loss_total += val_loss.item() * batch_X.size(0)
val_correct += ((val_output > 0.5).float() == batch_Y).sum().item()
val_total += batch_Y.numel()
# print(f"logical_error_rate (raw): {batch_Y.sum().item() / batch_Y.numel()}")
# print(f"cum_ler: {cum_ler / len(val_loader.dataset)}")
val_loss_avg = val_loss_total / len(val_loader.dataset)
val_acc = val_correct / val_total
# Learning rate scheduling
scheduler.step(val_loss_avg)
# Save best model
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), "surface_code_decoder_best.pth")
# Print progress every 10 epochs
if (epoch + 1) % 10 == 0:
print(
f"Epoch {epoch+1:3d}/{epochs} | "
f"Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f} | "
f"Val Loss: {val_loss_avg:.4f} | Val Acc: {val_acc:.4f}")
print("=" * 70)
print(f"Training complete! Best validation accuracy: {best_val_acc:.4f}")
# --------------------------
# Load best model and evaluate on test set
# --------------------------
print("\nLoading best model and evaluating on test set...")
model.load_state_dict(torch.load("surface_code_decoder_best.pth"))
model.eval()
with torch.no_grad():
test_output = model(X_test)
test_loss = criterion(test_output, Y_test)
test_acc = compute_accuracy(test_output, Y_test)
# Additional metrics
test_pred_binary = (test_output > 0.5).float()
# Count logical errors
total_actual_errors = Y_test.sum().item()
total_predicted_errors = test_pred_binary.sum().item()
correct_predictions = (test_pred_binary == Y_test).sum().item()
total_predictions = Y_test.numel()
print("=" * 70)
print("TEST SET RESULTS:")
print("=" * 70)
print(f"Test Loss: {test_loss.item():.4f}")
print(f"Test Accuracy: {test_acc:.4f} ({test_acc*100:.2f}%)")
print(
f"Correct predictions: {correct_predictions}/{total_predictions}")
print(f"Actual logical errors: {int(total_actual_errors)}")
print(f"Predicted logical errors: {int(total_predicted_errors)}")
print("=" * 70)
# --------------------------
# Export to ONNX
# --------------------------
print("\nExporting model to ONNX...")
torch.onnx.export(model,
X_train[:1],
"surface_code_decoder.onnx",
input_names=["detectors"],
output_names=["data_qubit_probs"],
opset_version=17)
print("ONNX model saved as surface_code_decoder.onnx")
print("PyTorch weights saved as surface_code_decoder_best.pth")
Once you have a trained ONNX model, you can load it with the TensorRT decoder for accelerated inference. The decoder can be used in both C++ and Python workflows.
Loading from ONNX (with automatic TensorRT optimization):
import cudaq_qec as qec
import numpy as np
# Note: The AI decoder doesn't use the parity check matrix.
# A placeholder matrix is provided here to satisfy the API.
H = np.array([[1, 0, 0, 1, 0, 1, 1],
[0, 1, 0, 1, 1, 0, 1],
[0, 0, 1, 0, 1, 1, 1]], dtype=np.uint8)
# Create TensorRT decoder from ONNX model
decoder = qec.get_decoder("trt_decoder", H,
onnx_load_path="ai_decoder.onnx")
# Decode a syndrome
syndrome = np.array([1.0, 0.0, 1.0], dtype=np.float32)
result = decoder.decode(syndrome)
print(f"Predicted error: {result}")
#include "cudaq/qec/decoder.h"
#include "cuda-qx/core/tensor.h"
#include "cuda-qx/core/heterogeneous_map.h"
int main() {
// Note: The AI decoder doesn't use the parity check matrix.
// A placeholder matrix is provided here to satisfy the API.
std::vector<std::vector<uint8_t>> H_vec = {
{1, 0, 0, 1, 0, 1, 1},
{0, 1, 0, 1, 1, 0, 1},
{0, 0, 1, 0, 1, 1, 1}
};
// Convert to tensor
cudaqx::tensor<uint8_t> H({3, 7});
for (size_t i = 0; i < 3; ++i) {
for (size_t j = 0; j < 7; ++j) {
H.at({i, j}) = H_vec[i][j];
}
}
// Create decoder parameters
cudaqx::heterogeneous_map params;
params.insert("onnx_load_path", "ai_decoder.onnx");
params.insert("precision", "fp16");
// Create TensorRT decoder
auto decoder = cudaq::qec::get_decoder("trt_decoder", H, params);
// Decode syndrome
std::vector<cudaq::qec::float_t> syndrome = {1.0, 0.0, 1.0};
auto result = decoder->decode(syndrome);
return 0;
}
Loading a pre-built TensorRT engine (for fastest initialization):
If you’ve already converted your ONNX model to a TensorRT engine using the provided utility script, you can load it directly:
decoder = qec.get_decoder("trt_decoder", H,
engine_load_path="surface_code_decoder.trt")
For production deployments where initialization time is critical, you can pre-build a TensorRT
engine from your ONNX model using the trtexec command-line tool that comes with TensorRT:
# Build with FP16 precision
trtexec --onnx=surface_code_decoder.onnx \
--saveEngine=surface_code_decoder.trt \
--fp16
# Build with best precision for your GPU
trtexec --onnx=surface_code_decoder.onnx \
--saveEngine=surface_code_decoder.trt \
--best
# Build with specific input shape (optional, for optimization)
trtexec --onnx=surface_code_decoder.onnx \
--saveEngine=surface_code_decoder.trt \
--fp16 \
--shapes=detectors:1x24
Pre-built engines offer several advantages:
Faster initialization: Engine loading is significantly faster than ONNX parsing and optimization
Reproducible optimization: The same optimization decisions are made every time
Version control: Engines can be versioned alongside code for reproducible deployments
The TensorRT decoder requires:
TensorRT: Version 10.13.3.9 or higher
CUDA: Version 12.0 or higher for x86 and 13.0 for ARM.
GPU: NVIDIA GPU with compute capability 6.0+ (Pascal architecture or newer)
For training:
PyTorch: Version 2.0+ recommended
Stim: For quantum circuit simulation and data generation
cudaq_qec.Decoder- Base decoder interfaceONNX - Open Neural Network Exchange format
TensorRT Documentation - NVIDIA TensorRT
Stim Documentation - Fast stabilizer circuit simulator