Decoders

In quantum error correction, decoders are responsible for interpreting measurement outcomes (syndromes) to identify and correct quantum errors. We measure a set of stabilizers that give us information about what errors might have happened. The pattern of these measurements is called a syndrome, and the decoder’s task is to determine what errors most likely caused that syndrome.

The relationship between errors and syndromes is captured mathematically by the parity check matrix. Each row of this matrix represents a stabilizer measurement, while each column represents a possible error. When we multiply an error pattern by this matrix, we get the syndrome that would result from those errors.

Detector Error Model

Here we introduce the cudaq.qec.detector_error_model type, which allows us to create a detector error model (DEM) from a QEC circuit and noise model.

The DEM can be generated from a QEC circuit and noise model using functions like dem_from_memory_circuit(). For circuit-level noise, the DEM can be put into a canonical form that’s organized by measurement rounds, making it suitable for multi-round decoding.

For a complete example of using the surface code with DEM to generate parity check matrices and perform decoding, see the circuit level noise example.

Generating a Multi-Round Parity Check Matrix

Below, we demonstrate how to use CUDA-Q QEC to construct a multi-round parity check matrix for an error correction code under a circuit-level noise model in Python:

import cudaq
import cudaq_qec as qec
import numpy as np

# Set target simulator (Stim) for fast stabilizer circuit simulation
cudaq.set_target("stim")

distance = 3  # Code distance (number of physical qubits for repetition code)
nRounds = 6  # Number of syndrome measurement rounds
nShots = 10000  # Number of circuit samples to run

# Set verbosity based on shot count
verbose = nShots <= 10


def vprint(*args, **kwargs):
    if verbose:
        print(*args, **kwargs)


# Retrieve a 3-qubit repetition code instance
three_qubit_repetition_code = qec.get_code("repetition", distance=distance)

# Z logical observable (for repetition codes, only Z matters)
logical_single_round = three_qubit_repetition_code.get_observables_z()

# Use predefined state preparation (|1⟩ for logical '1')
statePrep = qec.operation.prep1

# Create a noise model instance
noise_model = cudaq.NoiseModel()

# Define physical gate error probability
p = 0.01
# Define measurement error probability (not activated by default)
p_per_mz = 0.001

# Inject depolarizing noise on CX gates
noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)
# noise_model.add_all_qubit_channel("mz", cudaq.BitFlipChannel(p_per_mz))  # Optional: measurement noise

# === Decoder Setup ===

# Generate full detector error model (DEM), tracking all observables
dem_rep_full = qec.dem_from_memory_circuit(three_qubit_repetition_code,
                                           statePrep, nRounds, noise_model)

# Generate Z-only detector error model (sufficient for repetition code)
dem_rep_z = qec.z_dem_from_memory_circuit(three_qubit_repetition_code,
                                          statePrep, nRounds, noise_model)

# Extract multi-round parity check matrix (H matrix)
H_pcm_from_dem_full = dem_rep_full.detector_error_matrix
H_pcm_from_dem_z = dem_rep_z.detector_error_matrix

# Sanity check: for repetition codes, full and Z-only matrices should match
assert (H_pcm_from_dem_z == H_pcm_from_dem_full).all()

# Retrieve observable flips matrix: maps physical errors to logical flips
Lz_observables_flips_matrix = dem_rep_z.observables_flips_matrix

# Instantiate a decoder: single-error lookup table (fast and sufficient for small codes)
decoder = qec.get_decoder("single_error_lut", H_pcm_from_dem_z)

# === Simulation ===

# Sample noisy executions of the code circuit
syndromes, data = qec.sample_memory_circuit(three_qubit_repetition_code,
                                            statePrep, nShots, nRounds,
                                            noise_model)

syndromes = syndromes.reshape((nShots, nRounds, -1))
syndromes = syndromes.reshape((nShots, -1))

# Expected logical measurement (we prepared |1⟩)
expected_value = 1

# Counters for statistics
nLogicalErrorsWithoutDecoding = 0
nLogicalErrorsWDecoding = 0
nCorrections = 0

# === Loop over shots ===
for i in range(nShots):
    vprint(f"shot: {i}")

    data_i = data[i, :]  # Final data measurement
    vprint(f"data: {data_i}")

    results = decoder.decode(syndromes[i, :])
    convergence = results.converged
    result = results.result
    error_prediction = np.array(result, dtype=np.uint8)
    vprint(f"error_prediction: {error_prediction}")

    predicted_observable_flip = Lz_observables_flips_matrix @ error_prediction % 2
    vprint(f"predicted_observable_flip: {predicted_observable_flip}")

    measured_observable = logical_single_round @ data_i % 2
    vprint(f"measured_observable: {measured_observable}")

    if measured_observable != expected_value:
        nLogicalErrorsWithoutDecoding += 1

    predicted_observable = predicted_observable_flip ^ measured_observable
    vprint(f"predicted_observable: {predicted_observable}")

    if predicted_observable != expected_value:
        nLogicalErrorsWDecoding += 1

    nCorrections += int(predicted_observable_flip[0])

# === Summary statistics ===
print(
    f"{nLogicalErrorsWithoutDecoding} logical errors without decoding in {nShots} shots\n"
)
print(
    f"{nLogicalErrorsWDecoding} logical errors with decoding in {nShots} shots\n"
)
print(f"{nCorrections} corrections applied in {nShots} shots\n")

This example illustrates how to:

  • Retrieve and configure an error correction code Load a repetition code using qec.get_code(...) from the CUDA-Q QEC library, and define a custom circuit-level noise model using .add_all_qubit_channel(...).

  • Generate a multi-round parity check matrix Extend a single-round detector error model (DEM) across multiple rounds using qec.dem_from_memory_circuit(...). This captures syndrome evolution over time, including measurement noise, and provides:

    • detector_error_matrix – the multi-round parity check matrix

    • observables_flips_matrix – used to identify logical flips due to physical errors

  • Simulate circuit-level noise and collect data Run multiple shots of the memory experiment using qec.sample_memory_circuit(...) to sample both the data and syndrome measurements from noisy executions. The resulting bitstrings can be used for decoding and performance evaluation of the error correction scheme.

Creating New QEC codes

Below, we demonstrate how to use CUDA-Q QEC to define a new QEC code entirely in Python. This powerful feature allows for rapid prototyping and testing of custom error correction schemes.

import cudaq
import cudaq_qec as qec
from cudaq_qec import patch
import numpy as np

# Use Stim for fast stabilizer simulation
cudaq.set_target("stim")

# Repetition code parameters
distance = 3  # Number of physical qubits (also sets number of data qubits)
nRounds = 6  # Number of stabilizer measurement rounds
nShots = 10000  # Number of noisy circuit executions to simulate

# Define logical operations as CUDA-Q kernels


@cudaq.kernel
def x_logical(logicalQubit: patch):
    # Apply a logical X: bit-flip all data qubits
    for i in range(len(logicalQubit.data)):
        x(logicalQubit.data[i])


@cudaq.kernel
def prep0(logicalQubit: patch):
    # Initialize all qubits in |0⟩
    reset(logicalQubit.data)
    reset(logicalQubit.ancz)


@cudaq.kernel
def prep1(logicalQubit: patch):
    # Prepare logical |1⟩: apply X to logical |0⟩
    prep0(logicalQubit)
    x_logical(logicalQubit)


@cudaq.kernel
def stabilizer_round(logicalQubit: patch) -> list[bool]:
    # Run one round of stabilizer measurements for the Z-type repetition code

    num_ancilla = len(logicalQubit.ancz)
    num_data = len(logicalQubit.data)

    # Apply depolarizing noise to each data qubit
    for i in range(num_data):
        cudaq.apply_noise(cudaq.DepolarizationChannel, 0.1,
                          logicalQubit.data[i])
        # It is possible to have even more control over the noise.
        # cudaq.apply_noise(cudaq.Pauli1, 0.1, 0.1, 0.1, logicalQubit.data[i]) # in order pX, pY, and pZ errors.

    # Measure each ZZ stabilizer using CNOTs from data to ancilla
    for i in range(num_ancilla):
        x.ctrl(logicalQubit.data[i], logicalQubit.ancz[i])
        x.ctrl(logicalQubit.data[i + 1], logicalQubit.ancz[i])

    # Measure ancilla qubits to extract the syndrome
    measurements = mz(logicalQubit.ancz)

    # Reset ancillas for the next round
    reset(logicalQubit.ancz)

    return measurements


# Define the custom repetition code using the @qec.code decorator
@qec.code('custom_repetition_code')
class MyRepetitionCode:

    def __init__(self, **kwargs):
        qec.Code.__init__(self)  # Without this it won't work
        self.distance = kwargs.get("distance", 3)

        # Create ZZ stabilizer generators
        stabilizers_str = self.__make_stabilizers_strings()
        self.stabilizers = [
            cudaq.SpinOperator.from_word(s) for s in stabilizers_str
        ]

        # Define logical Z observable
        obs_str = self.__make_pauli_observables()
        self.pauli_observables = [
            cudaq.SpinOperator.from_word(p) for p in obs_str
        ]

        # Register logical operations used by the simulator
        self.operation_encodings = {
            qec.operation.prep0: prep0,
            qec.operation.prep1: prep1,
            qec.operation.x: x_logical,
            qec.operation.stabilizer_round: stabilizer_round
        }

    def __make_stabilizers_strings(self):
        # Create ZZ stabilizer strings: e.g., "ZZI", "IZZ", etc.
        d = self.distance
        return ['I' * i + 'ZZ' + 'I' * (d - i - 2) for i in range(d - 1)]

    def __make_pauli_observables(self):
        # Logical Z is Z on all data qubits
        d = self.distance
        return ["Z" * d]

    # --- Required methods for the QEC backend ---

    def get_num_data_qubits(self):
        return self.distance

    def get_num_ancilla_x_qubits(self):
        return 0  # Not needed for Z-type repetition code

    def get_num_ancilla_z_qubits(self):
        return self.distance - 1

    def get_num_ancilla_qubits(self):
        return self.get_num_ancilla_z_qubits() + self.get_num_ancilla_x_qubits()

    def get_num_x_stabilizers(self):
        return 0

    def get_num_z_stabilizers(self):
        return self.distance - 1


# Instantiate the custom repetition code
my_repetition_code = qec.get_code("custom_repetition_code", distance=distance)
print(f"\n Created custom repetition code with distance {distance}.")

all_codes = qec.get_available_codes()
print("\n  Available QEC codes both in the library and in Python:", all_codes)

# Let's check some propreties to verify that code is correctly created
# Display the code's stabilizer generators
stabilizers = my_repetition_code.get_stabilizers()
print(f"\n The code has {len(stabilizers)} stabilizers:")
for s in stabilizers:
    print(" ", s)

logical_single_round = my_repetition_code.get_observables_z()

# Define and register a noise model
noise_model = cudaq.NoiseModel()
p = 0.01  # depolarizing noise strength

noise_model.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)

# Set initial logical state to |0⟩
statePrep = qec.operation.prep0

# Generate a full detector error model (DEM)
print("\n Generating detector error model...")
dem_rep = qec.dem_from_memory_circuit(my_repetition_code, statePrep, nRounds,
                                      noise_model)

# Extract H matrix (syndrome parity checks)
H_pcm = dem_rep.detector_error_matrix

# Extract observable flips matrix (maps physical errors to logical flips)
Lz_observables_flips_matrix = dem_rep.observables_flips_matrix

# Sample noisy executions of the full memory circuit
print("\n Sampling noisy memory circuit executions...")
syndromes, data = qec.sample_memory_circuit(my_repetition_code, statePrep,
                                            nShots, nRounds, noise_model)

# Reshape syndromes to flatten rounds per shot
syndromes = syndromes.reshape((nShots, -1))

print(f"\n Showing first {min(nShots, 5)} of {nShots} sampled results:")
for i in range(min(nShots, 5)):
    print(
        f"Shot {i+1:>2}: Logical measurement = {data[i]}, Syndromes = {syndromes[i]}"
    )

This example illustrates several key concepts for defining custom codes:

  • Define a Code Class: A new code is defined by creating a Python class decorated with @qec.code(...), which registers it with the CUDA-Q QEC runtime. The class must inherit from qec.Code.

  • Implement Required Methods: The class must implement methods that describe the code’s structure, such as get_num_data_qubits() and get_num_ancilla_qubits().

  • Define Logical Operations as Kernels: Quantum operations like state preparation (prep0, prep1), logical gates (x_logical), and stabilizer measurements (stabilizer_round) are implemented as standard CUDA-Q kernels.

  • Map Operations to Kernels: The operation_encodings dictionary links abstract QEC operations (e.g., qec.operation.prep0) to the concrete CUDA-Q kernels that implement them.

  • Provide Stabilizers and Observables: The code’s stabilizer generators and logical observables must be defined. This is typically done by creating lists of cudaq.SpinOperator objects representing the Pauli strings for the stabilizers (e.g., “ZZI”) and logical operators (e.g., “ZZZ”).

  • Specify Fine-Grained Noise: This example demonstrates applying noise at a specific point within a kernel. Inside stabilizer_round, cudaq.apply_noise is called on each data qubit, offering precise control over the noise model, in contrast to applying noise globally to all gates of a certain type.

Once defined, the custom code can be instantiated with qec.get_code() and used with all standard CUDA-Q QEC tools, including qec.dem_from_memory_circuit() and qec.sample_memory_circuit().

Getting Started with the NVIDIA QLDPC Decoder

Starting with CUDA-Q QEC v0.2, a GPU-accelerated decoder is included with the CUDA-Q QEC library. The library follows the CUDA-Q decoder Python and C++ interfaces (namely cudaq_qec.Decoder for Python and cudaq::qec::decoder for C++), but as documented in the API sections (NVIDIA QLDPC Decoder for Python and NVIDIA QLDPC Decoder for C++), there are many configuration options that can be passed to the constructor. The following example shows how to exercise the decoder using non-trivial pre-generated test data. The test data was generated using scripts originating from the GitHub repo for BivariateBicycleCodes [1]; it includes parity check matrices (PCMs) and test syndromes to exercise a decoder.


import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time

# For fetching data
import requests
import bz2
import os

# Note: running this script will automatically download data if necessary.

### Helper functions ###


def parse_csr_mat(j, dims, mat_name):
    """
    Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
    """
    assert len(dims) == 2, "dims must be a tuple of two integers"

    # Extract indptr and indices from the JSON.
    indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
    indices = np.array(j[f"{mat_name}_indices"], dtype=int)

    # Check that the CSR structure is consistent.
    assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
    assert np.all(
        indices < dims[1]), "All column indices must be less than dims[1]"

    # Create a data array of ones.
    data = np.ones(indptr[-1], dtype=np.uint8)

    # Build the CSR matrix and return it as a dense numpy array.
    csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
    return csr.toarray()


def parse_H_csr(j, dims):
    """
    Parse a CSR-style parity check matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "H")


def parse_obs_csr(j, dims):
    """
    Parse a CSR-style observable matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "obs_mat")


### Main decoder loop ###


def run_decoder(filename, num_shots, run_as_batched):
    """
    Load a JSON file and decode "num_shots" syndromes.
    """
    t_load_begin = time.time()
    with open(filename, "r") as f:
        j = json.load(f)

    dims = j["shape"]
    assert len(dims) == 2

    # Read the Parity Check Matrix
    H = parse_H_csr(j, dims)
    syndrome_length, block_length = dims
    t_load_end = time.time()

    print(f"{filename} parsed in {1e3 * (t_load_end-t_load_begin)} ms")

    error_rate_vec = np.array(j["error_rate_vec"])
    assert len(error_rate_vec) == block_length
    obs_mat_dims = j["obs_mat_shape"]
    obs_mat = parse_obs_csr(j, obs_mat_dims)
    assert dims[1] == obs_mat_dims[0]
    file_num_trials = j["num_trials"]
    num_shots = min(num_shots, file_num_trials)
    print(
        f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')

    # osd_method: 0=Off, 1=OSD-0, 2=Exhaustive, 3=Combination Sweep
    osd_method = 1

    # When osd_method is:
    #  2) there are 2^osd_order additional error mechanisms checked.
    #  3) there are an additional k + osd_order*(osd_order-1)/2 error
    #     mechanisms checked.
    # Ref: https://arxiv.org/pdf/2005.07016
    osd_order = 0

    # Maximum number of BP iterations before attempting OSD (if necessary)
    max_iter = 50

    nv_dec_args = {
        "max_iterations": max_iter,
        "error_rate_vec": error_rate_vec,
        "use_sparsity": True,
        "use_osd": osd_method > 0,
        "osd_order": osd_order,
        "osd_method": osd_method
    }

    if run_as_batched:
        # Perform BP processing for up to 1000 syndromes per batch. If there
        # are more than 1000 syndromes, the decoder will chunk them up and
        # process each batch sequentially under the hood.
        nv_dec_args['bp_batch_size'] = min(1000, num_shots)

    try:
        nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
                                             **nv_dec_args)
    except Exception as e:
        print(
            'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
            'QEC installation.')
        exit(0)
    decoding_time = 0
    bp_converged_flags = []
    num_logical_errors = 0

    # Batched API
    if run_as_batched:
        syndrome_list = []
        obs_truth_list = []
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]
            syndrome_list.append(syndrome)
            obs_truth_list.append(obs_truth)
        t0 = time.time()
        results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
        t1 = time.time()
        decoding_time += t1 - t0
        for r, obs_truth in zip(results, obs_truth_list):
            bp_converged_flags.append(r.converged)
            dec_result = np.array(r.result, dtype=np.uint8)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Non-batched API
    else:
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]

            t0 = time.time()
            results = nv_dec_gpu_and_cpu.decode(syndrome)
            bp_converged = results.converged
            dec_result = results.result
            t1 = time.time()
            trial_diff = t1 - t0
            decoding_time += trial_diff

            dec_result = np.array(dec_result, dtype=np.uint8)
            bp_converged_flags.append(bp_converged)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Count how many shots the decoder failed to correct the errors
    print(f"{num_logical_errors} logical errors in {num_shots} shots")
    print(
        f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
    )
    print(
        f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
    )


if __name__ == "__main__":
    # See other test data options in https://github.com/NVIDIA/cudaqx/releases/tag/0.2.0
    filename = 'osd_1008_8785_0.001.json'
    bz2filename = filename + '.bz2'
    if not os.path.exists(filename):
        url = f"https://github.com/NVIDIA/cudaqx/releases/download/0.2.0/{bz2filename}"

        print(f'Downloading data from {url}')

        # Download the file
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error if download fails
        with open(bz2filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f'Decompressing {bz2filename} into {filename}')

        # Decompress the file
        with bz2.BZ2File(bz2filename, "rb") as f_in, open(filename,
                                                          "wb") as f_out:
            f_out.write(f_in.read())

        print(f"Decompressed file saved as {filename}")

    num_shots = 100
    run_as_batched = True
    run_decoder(filename, num_shots, run_as_batched)

Footnotes

Exact Maximum Likelihood Decoding with NVIDIA Tensor Network Decoder

Starting with CUDA-Q QEC v0.4.0, a GPU-accelerated Maximum Likelihood Decoder is included with the CUDA-Q QEC library. The library follows the CUDA-Q decoder Python interface, namely cudaq_qec.Decoder. At this time, we only support the Python interface for the decoder, which is available at cudaq_qec.plugins.decoders.tensor_network_decoder.TensorNetworkDecoder. As documented in the API sections Tensor Network Decoder, there are many configuration options that can be passed to the constructor. The decoder requires Python 3.11 or higher.

In the following example, we show how to use the TensorNetworkDecoder class from the cudaq_qec library to decode a circuit-level noise problem derived from a Stim surface code circuit.

"""
Example usage of tensor_network_decoder from cudaq_qec.

This script demonstrates how to instantiate and use the tensor network decoder
to decode a circuit level noise problem derived from a Stim surface code experiment.

This example requires the `cudaq-qec` package and the optional tensor-network dependencies.
To install the required dependencies, run:

pip install cudaq-qec[tensor_network_decoder]

Additionaly, you will need `stim` and `beliefmatching` packages:
pip install stim beliefmatching

"""
import cudaq_qec as qec
import numpy as np

import platform
if platform.machine().lower() in ("arm64", "aarch64"):
    print(
        "Warning: stim is not supported on manylinux ARM64/aarch64. Skipping this example..."
    )
    sys.exit(0)

import stim

from beliefmatching.belief_matching import detector_error_model_to_check_matrices


def parse_detector_error_model(detector_error_model):
    matrices = detector_error_model_to_check_matrices(detector_error_model)

    out_H = np.zeros(matrices.check_matrix.shape)
    matrices.check_matrix.astype(np.float64).toarray(out=out_H)
    out_L = np.zeros(matrices.observables_matrix.shape)
    matrices.observables_matrix.astype(np.float64).toarray(out=out_L)

    return out_H, out_L, [float(p) for p in matrices.priors]


circuit = stim.Circuit.generated("surface_code:rotated_memory_z",
                                 rounds=3,
                                 distance=3,
                                 after_clifford_depolarization=0.001,
                                 after_reset_flip_probability=0.01,
                                 before_measure_flip_probability=0.01,
                                 before_round_data_depolarization=0.01)

detector_error_model = circuit.detector_error_model(decompose_errors=True)

H, logicals, noise_model = parse_detector_error_model(detector_error_model)

decoder = qec.get_decoder(
    "tensor_network_decoder",
    H,
    logical_obs=logicals,
    noise_model=noise_model,
    contract_noise_model=True,
)

num_shots = 5
sampler = circuit.compile_detector_sampler()
detection_events, observable_flips = sampler.sample(num_shots,
                                                    separate_observables=True)

res = decoder.decode_batch(detection_events)

print("Tensor network prediction: ", [r.result[0] > 0.5 for r in res])
print("Actual observable flips: ", [bool(o[0]) for o in observable_flips])

Output:

The decoder returns the probability that the logical observable has flipped for each syndrome. This can be used to assess the performance of the code and the decoder under different error scenarios.

See Also:

  • cudaq_qec.plugins.decoders.tensor_network_decoder