Decoders
In quantum error correction, decoders are responsible for interpreting measurement outcomes (syndromes) to identify and correct quantum errors. We measure a set of stabilizers that give us information about what errors might have happened. The pattern of these measurements is called a syndrome, and the decoder’s task is to determine what errors most likely caused that syndrome.
The relationship between errors and syndromes is captured mathematically by the parity check matrix. Each row of this matrix represents a stabilizer measurement, while each column represents a possible error. When we multiply an error pattern by this matrix, we get the syndrome that would result from those errors.
Detector Error Model
Here we introduce the cudaq.qec.detector_error_model
type, which allows us to create a detector error model (DEM) from a QEC circuit and noise model.
The DEM can be generated from a QEC circuit and noise model using functions like dem_from_memory_circuit()
. For circuit-level noise, the DEM can be put into a
canonical form that’s organized by measurement rounds, making it suitable for multi-round decoding.
For a complete example of using the surface code with DEM to generate parity check matrices and perform decoding, see the circuit level noise example.
Generating a Multi-Round Parity Check Matrix
Below, we demonstrate how to use CUDA-Q QEC to construct a multi-round parity check matrix for an error correction code under a circuit-level noise model in Python:
import cudaq
import cudaq_qec as qec
import numpy as np
# Set target simulator (Stim) for fast stabilizer circuit simulation
cudaq.set_target("stim")
distance = 3 # Code distance (number of physical qubits for repetition code)
nRounds = 6 # Number of syndrome measurement rounds
nShots = 10000 # Number of circuit samples to run
# Set verbosity based on shot count
verbose = nShots <= 10
def vprint(*args, **kwargs):
if verbose:
print(*args, **kwargs)
# Retrieve a 3-qubit repetition code instance
three_qubit_repetition_code = qec.get_code("repetition", distance=distance)
# Z logical observable (for repetition codes, only Z matters)
logical_single_round = three_qubit_repetition_code.get_observables_z()
# Use predefined state preparation (|1⟩ for logical '1')
statePrep = qec.operation.prep1
# Create a noise model instance
noise_model = cudaq.NoiseModel()
# Define physical gate error probability
p = 0.01
# Define measurement error probability (not activated by default)
p_per_mz = 0.001
# Inject depolarizing noise on CX gates
noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)
# noise_model.add_all_qubit_channel("mz", cudaq.BitFlipChannel(p_per_mz)) # Optional: measurement noise
# === Decoder Setup ===
# Generate full detector error model (DEM), tracking all observables
dem_rep_full = qec.dem_from_memory_circuit(three_qubit_repetition_code,
statePrep, nRounds, noise_model)
# Generate Z-only detector error model (sufficient for repetition code)
dem_rep_z = qec.z_dem_from_memory_circuit(three_qubit_repetition_code,
statePrep, nRounds, noise_model)
# Extract multi-round parity check matrix (H matrix)
H_pcm_from_dem_full = dem_rep_full.detector_error_matrix
H_pcm_from_dem_z = dem_rep_z.detector_error_matrix
# Sanity check: for repetition codes, full and Z-only matrices should match
assert (H_pcm_from_dem_z == H_pcm_from_dem_full).all()
# Retrieve observable flips matrix: maps physical errors to logical flips
Lz_observables_flips_matrix = dem_rep_z.observables_flips_matrix
# Instantiate a decoder: single-error lookup table (fast and sufficient for small codes)
decoder = qec.get_decoder("single_error_lut", H_pcm_from_dem_z)
# === Simulation ===
# Sample noisy executions of the code circuit
syndromes, data = qec.sample_memory_circuit(three_qubit_repetition_code,
statePrep, nShots, nRounds,
noise_model)
syndromes = syndromes.reshape((nShots, nRounds, -1))
syndromes = syndromes.reshape((nShots, -1))
# Expected logical measurement (we prepared |1⟩)
expected_value = 1
# Counters for statistics
nLogicalErrorsWithoutDecoding = 0
nLogicalErrorsWDecoding = 0
nCorrections = 0
# === Loop over shots ===
for i in range(nShots):
vprint(f"shot: {i}")
data_i = data[i, :] # Final data measurement
vprint(f"data: {data_i}")
results = decoder.decode(syndromes[i, :])
convergence = results.converged
result = results.result
error_prediction = np.array(result, dtype=np.uint8)
vprint(f"error_prediction: {error_prediction}")
predicted_observable_flip = Lz_observables_flips_matrix @ error_prediction % 2
vprint(f"predicted_observable_flip: {predicted_observable_flip}")
measured_observable = logical_single_round @ data_i % 2
vprint(f"measured_observable: {measured_observable}")
if measured_observable != expected_value:
nLogicalErrorsWithoutDecoding += 1
predicted_observable = predicted_observable_flip ^ measured_observable
vprint(f"predicted_observable: {predicted_observable}")
if predicted_observable != expected_value:
nLogicalErrorsWDecoding += 1
nCorrections += int(predicted_observable_flip[0])
# === Summary statistics ===
print(
f"{nLogicalErrorsWithoutDecoding} logical errors without decoding in {nShots} shots\n"
)
print(
f"{nLogicalErrorsWDecoding} logical errors with decoding in {nShots} shots\n"
)
print(f"{nCorrections} corrections applied in {nShots} shots\n")
This example illustrates how to:
Retrieve and configure an error correction code Load a repetition code using
qec.get_code(...)
from the CUDA-Q QEC library, and define a custom circuit-level noise model using.add_all_qubit_channel(...)
.Generate a multi-round parity check matrix Extend a single-round detector error model (DEM) across multiple rounds using
qec.dem_from_memory_circuit(...)
. This captures syndrome evolution over time, including measurement noise, and provides:detector_error_matrix
– the multi-round parity check matrixobservables_flips_matrix
– used to identify logical flips due to physical errors
Simulate circuit-level noise and collect data Run multiple shots of the memory experiment using
qec.sample_memory_circuit(...)
to sample both the data and syndrome measurements from noisy executions. The resulting bitstrings can be used for decoding and performance evaluation of the error correction scheme.
Creating New QEC codes
Below, we demonstrate how to use CUDA-Q QEC to define a new QEC code entirely in Python. This powerful feature allows for rapid prototyping and testing of custom error correction schemes.
import cudaq
import cudaq_qec as qec
from cudaq_qec import patch
import numpy as np
# Use Stim for fast stabilizer simulation
cudaq.set_target("stim")
# Repetition code parameters
distance = 3 # Number of physical qubits (also sets number of data qubits)
nRounds = 6 # Number of stabilizer measurement rounds
nShots = 10000 # Number of noisy circuit executions to simulate
# Define logical operations as CUDA-Q kernels
@cudaq.kernel
def x_logical(logicalQubit: patch):
# Apply a logical X: bit-flip all data qubits
for i in range(len(logicalQubit.data)):
x(logicalQubit.data[i])
@cudaq.kernel
def prep0(logicalQubit: patch):
# Initialize all qubits in |0⟩
reset(logicalQubit.data)
reset(logicalQubit.ancz)
@cudaq.kernel
def prep1(logicalQubit: patch):
# Prepare logical |1⟩: apply X to logical |0⟩
prep0(logicalQubit)
x_logical(logicalQubit)
@cudaq.kernel
def stabilizer_round(logicalQubit: patch) -> list[bool]:
# Run one round of stabilizer measurements for the Z-type repetition code
num_ancilla = len(logicalQubit.ancz)
num_data = len(logicalQubit.data)
# Apply depolarizing noise to each data qubit
for i in range(num_data):
cudaq.apply_noise(cudaq.DepolarizationChannel, 0.1,
logicalQubit.data[i])
# It is possible to have even more control over the noise.
# cudaq.apply_noise(cudaq.Pauli1, 0.1, 0.1, 0.1, logicalQubit.data[i]) # in order pX, pY, and pZ errors.
# Measure each ZZ stabilizer using CNOTs from data to ancilla
for i in range(num_ancilla):
x.ctrl(logicalQubit.data[i], logicalQubit.ancz[i])
x.ctrl(logicalQubit.data[i + 1], logicalQubit.ancz[i])
# Measure ancilla qubits to extract the syndrome
measurements = mz(logicalQubit.ancz)
# Reset ancillas for the next round
reset(logicalQubit.ancz)
return measurements
# Define the custom repetition code using the @qec.code decorator
@qec.code('custom_repetition_code')
class MyRepetitionCode:
def __init__(self, **kwargs):
qec.Code.__init__(self) # Without this it won't work
self.distance = kwargs.get("distance", 3)
# Create ZZ stabilizer generators
stabilizers_str = self.__make_stabilizers_strings()
self.stabilizers = [
cudaq.SpinOperator.from_word(s) for s in stabilizers_str
]
# Define logical Z observable
obs_str = self.__make_pauli_observables()
self.pauli_observables = [
cudaq.SpinOperator.from_word(p) for p in obs_str
]
# Register logical operations used by the simulator
self.operation_encodings = {
qec.operation.prep0: prep0,
qec.operation.prep1: prep1,
qec.operation.x: x_logical,
qec.operation.stabilizer_round: stabilizer_round
}
def __make_stabilizers_strings(self):
# Create ZZ stabilizer strings: e.g., "ZZI", "IZZ", etc.
d = self.distance
return ['I' * i + 'ZZ' + 'I' * (d - i - 2) for i in range(d - 1)]
def __make_pauli_observables(self):
# Logical Z is Z on all data qubits
d = self.distance
return ["Z" * d]
# --- Required methods for the QEC backend ---
def get_num_data_qubits(self):
return self.distance
def get_num_ancilla_x_qubits(self):
return 0 # Not needed for Z-type repetition code
def get_num_ancilla_z_qubits(self):
return self.distance - 1
def get_num_ancilla_qubits(self):
return self.get_num_ancilla_z_qubits() + self.get_num_ancilla_x_qubits()
def get_num_x_stabilizers(self):
return 0
def get_num_z_stabilizers(self):
return self.distance - 1
# Instantiate the custom repetition code
my_repetition_code = qec.get_code("custom_repetition_code", distance=distance)
print(f"\n Created custom repetition code with distance {distance}.")
all_codes = qec.get_available_codes()
print("\n Available QEC codes both in the library and in Python:", all_codes)
# Let's check some propreties to verify that code is correctly created
# Display the code's stabilizer generators
stabilizers = my_repetition_code.get_stabilizers()
print(f"\n The code has {len(stabilizers)} stabilizers:")
for s in stabilizers:
print(" ", s)
logical_single_round = my_repetition_code.get_observables_z()
# Define and register a noise model
noise_model = cudaq.NoiseModel()
p = 0.01 # depolarizing noise strength
noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)
# Set initial logical state to |0⟩
statePrep = qec.operation.prep0
# Generate a full detector error model (DEM)
print("\n Generating detector error model...")
dem_rep = qec.dem_from_memory_circuit(my_repetition_code, statePrep, nRounds,
noise_model)
# Extract H matrix (syndrome parity checks)
H_pcm = dem_rep.detector_error_matrix
# Extract observable flips matrix (maps physical errors to logical flips)
Lz_observables_flips_matrix = dem_rep.observables_flips_matrix
# Sample noisy executions of the full memory circuit
print("\n Sampling noisy memory circuit executions...")
syndromes, data = qec.sample_memory_circuit(my_repetition_code, statePrep,
nShots, nRounds, noise_model)
# Reshape syndromes to flatten rounds per shot
syndromes = syndromes.reshape((nShots, -1))
print(f"\n Showing first {min(nShots, 5)} of {nShots} sampled results:")
for i in range(min(nShots, 5)):
print(
f"Shot {i+1:>2}: Logical measurement = {data[i]}, Syndromes = {syndromes[i]}"
)
This example illustrates several key concepts for defining custom codes:
Define a Code Class: A new code is defined by creating a Python class decorated with
@qec.code(...)
, which registers it with the CUDA-Q QEC runtime. The class must inherit fromqec.Code
.Implement Required Methods: The class must implement methods that describe the code’s structure, such as
get_num_data_qubits()
andget_num_ancilla_qubits()
.Define Logical Operations as Kernels: Quantum operations like state preparation (
prep0
,prep1
), logical gates (x_logical
), and stabilizer measurements (stabilizer_round
) are implemented as standard CUDA-Q kernels.Map Operations to Kernels: The
operation_encodings
dictionary links abstract QEC operations (e.g.,qec.operation.prep0
) to the concrete CUDA-Q kernels that implement them.Provide Stabilizers and Observables: The code’s stabilizer generators and logical observables must be defined. This is typically done by creating lists of
cudaq.SpinOperator
objects representing the Pauli strings for the stabilizers (e.g., “ZZI”) and logical operators (e.g., “ZZZ”).Specify Fine-Grained Noise: This example demonstrates applying noise at a specific point within a kernel. Inside
stabilizer_round
,cudaq.apply_noise
is called on each data qubit, offering precise control over the noise model, in contrast to applying noise globally to all gates of a certain type.
Once defined, the custom code can be instantiated with qec.get_code()
and used with all standard CUDA-Q QEC tools, including qec.dem_from_memory_circuit()
and qec.sample_memory_circuit()
.
Getting Started with the NVIDIA QLDPC Decoder
Starting with CUDA-Q QEC v0.2, a GPU-accelerated decoder is included with the
CUDA-Q QEC library. The library follows the CUDA-Q decoder Python and C++ interfaces
(namely cudaq_qec.Decoder
for Python and
cudaq::qec::decoder
for C++), but as documented in the API sections
(NVIDIA QLDPC Decoder for Python and
NVIDIA QLDPC Decoder for C++), there are many configuration options
that can be passed to the constructor. The following example shows how to
exercise the decoder using non-trivial pre-generated test data. The test data
was generated using scripts originating from the GitHub repo for
BivariateBicycleCodes [1]; it includes parity
check matrices (PCMs) and test syndromes to exercise a decoder.
import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time
# For fetching data
import requests
import bz2
import os
# Note: running this script will automatically download data if necessary.
### Helper functions ###
def parse_csr_mat(j, dims, mat_name):
"""
Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
"""
assert len(dims) == 2, "dims must be a tuple of two integers"
# Extract indptr and indices from the JSON.
indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
indices = np.array(j[f"{mat_name}_indices"], dtype=int)
# Check that the CSR structure is consistent.
assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
assert np.all(
indices < dims[1]), "All column indices must be less than dims[1]"
# Create a data array of ones.
data = np.ones(indptr[-1], dtype=np.uint8)
# Build the CSR matrix and return it as a dense numpy array.
csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
return csr.toarray()
def parse_H_csr(j, dims):
"""
Parse a CSR-style parity check matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "H")
def parse_obs_csr(j, dims):
"""
Parse a CSR-style observable matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "obs_mat")
### Main decoder loop ###
def run_decoder(filename, num_shots, run_as_batched):
"""
Load a JSON file and decode "num_shots" syndromes.
"""
t_load_begin = time.time()
with open(filename, "r") as f:
j = json.load(f)
dims = j["shape"]
assert len(dims) == 2
# Read the Parity Check Matrix
H = parse_H_csr(j, dims)
syndrome_length, block_length = dims
t_load_end = time.time()
print(f"{filename} parsed in {1e3 * (t_load_end-t_load_begin)} ms")
error_rate_vec = np.array(j["error_rate_vec"])
assert len(error_rate_vec) == block_length
obs_mat_dims = j["obs_mat_shape"]
obs_mat = parse_obs_csr(j, obs_mat_dims)
assert dims[1] == obs_mat_dims[0]
file_num_trials = j["num_trials"]
num_shots = min(num_shots, file_num_trials)
print(
f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')
# osd_method: 0=Off, 1=OSD-0, 2=Exhaustive, 3=Combination Sweep
osd_method = 1
# When osd_method is:
# 2) there are 2^osd_order additional error mechanisms checked.
# 3) there are an additional k + osd_order*(osd_order-1)/2 error
# mechanisms checked.
# Ref: https://arxiv.org/pdf/2005.07016
osd_order = 0
# Maximum number of BP iterations before attempting OSD (if necessary)
max_iter = 50
nv_dec_args = {
"max_iterations": max_iter,
"error_rate_vec": error_rate_vec,
"use_sparsity": True,
"use_osd": osd_method > 0,
"osd_order": osd_order,
"osd_method": osd_method
}
if run_as_batched:
# Perform BP processing for up to 1000 syndromes per batch. If there
# are more than 1000 syndromes, the decoder will chunk them up and
# process each batch sequentially under the hood.
nv_dec_args['bp_batch_size'] = min(1000, num_shots)
try:
nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
**nv_dec_args)
except Exception as e:
print(
'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
'QEC installation.')
exit(0)
decoding_time = 0
bp_converged_flags = []
num_logical_errors = 0
# Batched API
if run_as_batched:
syndrome_list = []
obs_truth_list = []
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
syndrome_list.append(syndrome)
obs_truth_list.append(obs_truth)
t0 = time.time()
results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
t1 = time.time()
decoding_time += t1 - t0
for r, obs_truth in zip(results, obs_truth_list):
bp_converged_flags.append(r.converged)
dec_result = np.array(r.result, dtype=np.uint8)
# See if this prediction flipped the observable
predicted_observable = obs_mat.T @ dec_result % 2
print(f"predicted_observable: {predicted_observable}")
# See if the observable was actually flipped according to the truth
# data
actual_observable = np.array(obs_truth, dtype=np.uint8)
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
# Non-batched API
else:
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
t0 = time.time()
results = nv_dec_gpu_and_cpu.decode(syndrome)
bp_converged = results.converged
dec_result = results.result
t1 = time.time()
trial_diff = t1 - t0
decoding_time += trial_diff
dec_result = np.array(dec_result, dtype=np.uint8)
bp_converged_flags.append(bp_converged)
# See if this prediction flipped the observable
predicted_observable = obs_mat.T @ dec_result % 2
print(f"predicted_observable: {predicted_observable}")
# See if the observable was actually flipped according to the truth
# data
actual_observable = np.array(obs_truth, dtype=np.uint8)
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
# Count how many shots the decoder failed to correct the errors
print(f"{num_logical_errors} logical errors in {num_shots} shots")
print(
f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
)
print(
f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
)
if __name__ == "__main__":
# See other test data options in https://github.com/NVIDIA/cudaqx/releases/tag/0.2.0
filename = 'osd_1008_8785_0.001.json'
bz2filename = filename + '.bz2'
if not os.path.exists(filename):
url = f"https://github.com/NVIDIA/cudaqx/releases/download/0.2.0/{bz2filename}"
print(f'Downloading data from {url}')
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error if download fails
with open(bz2filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f'Decompressing {bz2filename} into {filename}')
# Decompress the file
with bz2.BZ2File(bz2filename, "rb") as f_in, open(filename,
"wb") as f_out:
f_out.write(f_in.read())
print(f"Decompressed file saved as {filename}")
num_shots = 100
run_as_batched = True
run_decoder(filename, num_shots, run_as_batched)
Footnotes
Exact Maximum Likelihood Decoding with NVIDIA Tensor Network Decoder
Starting with CUDA-Q QEC v0.4.0, a GPU-accelerated Maximum Likelihood Decoder is included with the
CUDA-Q QEC library. The library follows the CUDA-Q decoder Python interface, namely cudaq_qec.Decoder
.
At this time, we only support the Python interface for the decoder, which is
available at cudaq_qec.plugins.decoders.tensor_network_decoder.TensorNetworkDecoder
.
As documented in the API sections Tensor Network Decoder, there are many configuration options
that can be passed to the constructor. The decoder requires Python 3.11 or higher.
In the following example, we show how to use the TensorNetworkDecoder
class from the cudaq_qec
library to decode a circuit-level noise problem derived from a Stim surface code circuit.
"""
Example usage of tensor_network_decoder from cudaq_qec.
This script demonstrates how to instantiate and use the tensor network decoder
to decode a circuit level noise problem derived from a Stim surface code experiment.
This example requires the `cudaq-qec` package and the optional tensor-network dependencies.
To install the required dependencies, run:
pip install cudaq-qec[tensor_network_decoder]
Additionaly, you will need `stim` and `beliefmatching` packages:
pip install stim beliefmatching
"""
import cudaq_qec as qec
import numpy as np
import platform
if platform.machine().lower() in ("arm64", "aarch64"):
print(
"Warning: stim is not supported on manylinux ARM64/aarch64. Skipping this example..."
)
sys.exit(0)
import stim
from beliefmatching.belief_matching import detector_error_model_to_check_matrices
def parse_detector_error_model(detector_error_model):
matrices = detector_error_model_to_check_matrices(detector_error_model)
out_H = np.zeros(matrices.check_matrix.shape)
matrices.check_matrix.astype(np.float64).toarray(out=out_H)
out_L = np.zeros(matrices.observables_matrix.shape)
matrices.observables_matrix.astype(np.float64).toarray(out=out_L)
return out_H, out_L, [float(p) for p in matrices.priors]
circuit = stim.Circuit.generated("surface_code:rotated_memory_z",
rounds=3,
distance=3,
after_clifford_depolarization=0.001,
after_reset_flip_probability=0.01,
before_measure_flip_probability=0.01,
before_round_data_depolarization=0.01)
detector_error_model = circuit.detector_error_model(decompose_errors=True)
H, logicals, noise_model = parse_detector_error_model(detector_error_model)
decoder = qec.get_decoder(
"tensor_network_decoder",
H,
logical_obs=logicals,
noise_model=noise_model,
contract_noise_model=True,
)
num_shots = 5
sampler = circuit.compile_detector_sampler()
detection_events, observable_flips = sampler.sample(num_shots,
separate_observables=True)
res = decoder.decode_batch(detection_events)
print("Tensor network prediction: ", [r.result[0] > 0.5 for r in res])
print("Actual observable flips: ", [bool(o[0]) for o in observable_flips])
Output:
The decoder returns the probability that the logical observable has flipped for each syndrome. This can be used to assess the performance of the code and the decoder under different error scenarios.
See Also:
cudaq_qec.plugins.decoders.tensor_network_decoder