Decoders

In quantum error correction, decoders are responsible for interpreting measurement outcomes (syndromes) to identify and correct quantum errors. We measure a set of stabilizers that give us information about what errors might have happened. The pattern of these measurements is called a syndrome, and the decoder’s task is to determine what errors most likely caused that syndrome.

The relationship between errors and syndromes is captured mathematically by the parity check matrix. Each row of this matrix represents a stabilizer measurement, while each column represents a possible error. When we multiply an error pattern by this matrix, we get the syndrome that would result from those errors.

Creating a Multi-Round Parity Check Matrix

Below, we’ll show how to use CUDA-Q QEC to construct a multi-round parity check matrix and a custom error correction code for the circuit-level noise model experiment in Python:

import cudaq
import cudaq_qec as qec
import numpy as np

nRounds = 3
nShots = 500
# Physical error rate
p_per_round = 0.01
p_per_mz = 0.01


# Construct the measurement error syndrome matrix based on the distance and number of rounds
def construct_measurement_error_syndrome(distance, nRounds):
    num_stabilizers = distance - 1
    num_mea_q = num_stabilizers * nRounds

    syndrome_rows = []

    # In this scheme, need two rounds for each measurement syndrome
    for i in range(nRounds - 1):
        for j in range(num_stabilizers):
            syndrome = np.zeros((num_mea_q,), dtype=np.uint8)

            # The error on ancilla (j) in round (i) affects stabilizer checks at two positions:
            # First occurrence in round i
            pos1 = i * num_stabilizers + j
            # Second occurrence in round i+1
            pos2 = (i + 1) * num_stabilizers + j

            # Mark the syndrome
            syndrome[pos1] = 1
            syndrome[pos2] = 1

            syndrome_rows.append(syndrome)

    return np.array(syndrome_rows).T


# Generate the parity check matrix for n-rounds by duplicating the input parity check matrix Hz
# and appending the measurement error syndrome matrix.
def get_circuit_level_pcm(distance, nRounds, Hz):
    if nRounds < 2:
        raise ValueError("nRounds must be greater than or equal to 2")
    if distance < 3:
        raise ValueError("distance must be greater than or equal to 3")

    # Parity check matrix for a single round
    H = np.array(Hz)

    # Extends H to nRounds
    rows, cols = H.shape
    H_nrounds = np.zeros((rows * nRounds, cols * nRounds), dtype=np.uint8)
    for i in range(nRounds):
        H_nrounds[i * rows:(i + 1) * rows, i * cols:(i + 1) * cols] = H
    print("H_nrounds\n", H_nrounds)

    # Construct the measurement error syndrome matrix for Z errors
    H_mz = construct_measurement_error_syndrome(distance, nRounds)
    print("H_mz\n", H_mz)
    assert H_nrounds.shape[0] == H_mz.shape[
        0], "Dimensions of H_nrounds and H_mz do not match"

    # Append columns for measurement errors to H
    H_pcm = np.concatenate((H_nrounds, H_mz), axis=1)
    print(f"H_pcm:\n{H_pcm}")

    return H_pcm


# Example of how to construct a repetition code with a distance of 3 and random
# bit flip errors applied to the data qubits
@cudaq.kernel
def three_qubit_repetition_code():
    data_qubits = cudaq.qvector(3)
    ancilla_qubits = cudaq.qvector(2)

    # Initialize the logical |1> state as |111>
    x(data_qubits)

    for i in range(nRounds):
        # Random Bit Flip Errors
        for j in range(3):
            cudaq.apply_noise(cudaq.XError, p_per_round, data_qubits[j])

        # Extract Syndromes
        h(ancilla_qubits)

        # First Parity Check
        z.ctrl(ancilla_qubits[0], data_qubits[0])
        z.ctrl(ancilla_qubits[0], data_qubits[1])

        # Second Parity Check
        z.ctrl(ancilla_qubits[1], data_qubits[1])
        z.ctrl(ancilla_qubits[1], data_qubits[2])

        h(ancilla_qubits)

        # Measure the ancilla qubits
        s0 = mz(ancilla_qubits[0])
        s1 = mz(ancilla_qubits[1])
        reset(ancilla_qubits[0])
        reset(ancilla_qubits[1])

    # Final measurement to get the data qubits
    mz(data_qubits)


# Create a noise model
noise_model = cudaq.NoiseModel()
# Add measurement noise
noise_model.add_all_qubit_channel("mz", cudaq.BitFlipChannel(p_per_mz))

# Run the kernel and observe results
# The percent of samples that are 000 corresponds to the logical error rate
cudaq.set_target("stim")
result = cudaq.sample(three_qubit_repetition_code,
                      shots_count=nShots,
                      noise_model=noise_model,
                      explicit_measurements=True)

# The following section will demonstrate how to decode the results
# Get the parity check matrix for n-rounds of the repetition code
Hz = [[1, 1, 0], [0, 1, 1]]  # Parity check matrix for 1 round
H_pcm = get_circuit_level_pcm(3, nRounds, Hz)

# Get observables
observables = np.array([1, 0, 0, 0, 0, 0], dtype=np.uint8)
Lz = np.array([1, 0, 0], dtype=np.uint8)
print(f"observables:\n{observables}")
print(f"Lz:\n{Lz}")
# Pad the observables to be the same dimension as the decoded observable
Lz_nrounds = np.tile(Lz, nRounds)
pad_size = max(0, H_pcm.shape[1] - Lz_nrounds.shape[0])
Lz_nround_mz = np.pad(Lz_nrounds, (0, pad_size), mode='constant')
print(f"Lz_nround_mz\n{Lz_nround_mz}")

# Get a decoder
decoder = qec.get_decoder("single_error_lut", H_pcm)
nLogicalErrors = 0

# initialize a Pauli frame to track logical flips
# through the stabilizer rounds. Only need the Z component for the repetition code.
pauli_frame = np.array([0, 0], dtype=np.uint8)
expected_value = 1
for shot, outcome in enumerate(result.get_sequential_data()):
    outcome_array = np.array([int(bit) for bit in outcome], dtype=np.uint8)
    syndrome = outcome_array[:len(outcome_array) - 3]
    data = outcome_array[len(outcome_array) - 3:]
    print("\nshot:", shot)
    print("syndrome:", syndrome)

    # Decode the syndrome
    results = decoder.decode(syndrome)
    convergence = results.converged
    result = results.result
    data_prediction = np.array(result, dtype=np.uint8)

    # See if the decoded result anti-commutes with the observables
    print("decode result:", data_prediction)
    decoded_observables = (Lz_nround_mz @ data_prediction) % 2
    print("decoded_observables:", decoded_observables)

    # update pauli frame
    pauli_frame[0] = (pauli_frame[0] + decoded_observables) % 2
    print("pauli frame:", pauli_frame)

    logical_measurements = (Lz @ data.transpose()) % 2
    print("LMz:", logical_measurements)

    corrected_mz = (logical_measurements + pauli_frame[0]) % 2
    print("Expected value:", expected_value)
    print("Corrected value:", corrected_mz)
    if (corrected_mz != expected_value):
        nLogicalErrors += 1

# Count how many shots the decoder failed to correct the errors
print("\nNumber of logical errors:", nLogicalErrors)

This example illustrates how to:

1. Construct a multi-round parity check matrix – Users can extend a single-round parity check matrix across multiple rounds, incorporating measurement errors to track syndrome evolution over time. This enables more accurate circuit-level noise modeling for decoders.

2. Define custom error correction circuits with precise noise injection – Using cudaq.apply_noise, users can introduce specific error channels at targeted locations within the QEC circuit. This fine-grained control allows for precise testing of how different noise sources affect logical error rates.

In the previous example, we demonstrated how to introduce random X errors into each data qubit using cudaq.apply_noise during each round of syndrome extraction. CUDA-Q allows users to inject a variety of error channels at different locations within their circuits, enabling fine-grained noise modeling. The example below showcases additional ways to introduce errors into a quantum kernel:

@cudaq.kernel
def inject_noise_example():
    q = cudaq.qvector(3)

    # Apply depolarization noise to the first qubit
    cudaq.apply_noise(cudaq.DepolarizationChannel, 0.1, q[0])

    # Perform gate operations
    h(q[1])
    x.ctrl(q[1], q[2])

    # Inject a Y error into the second qubit
    cudaq.apply_noise(cudaq.YError, 0.1, q[1])

    # Apply a general Pauli noise channel to the third qubit, where the 3 values indicate the probability of X, Y, and Z errors.
    cudaq.apply_noise(cudaq.Pauli1, 0.1, 0.1, 0.1, q[2])

# Define and apply a noise model
noise = cudaq.NoiseModel()
counts = cudaq.sample(inject_noise_example, noise_model=noise)

For a full list of supported noise models and their parameters, refer to the CUDA-Q documentation.

Detector Error Model

In the previous example, we showed how to create a multi-round parity check matrix manually. Here we introduce the cudaq.qec.detector_error_model type, which allows us to create a detector error model (DEM) from a QEC circuit and noise model.

The DEM can be generated from a QEC circuit and noise model using functions like dem_from_memory_circuit(). For circuit-level noise, the DEM can be put into a canonical form that’s organized by measurement rounds, making it suitable for multi-round decoding.

For a complete example of using the surface code with DEM to generate parity check matrices and perform decoding, see the circuit level noise example.

Getting Started with the NVIDIA QLDPC Decoder

Starting with CUDA-Q QEC v0.2, a GPU-accelerated decoder is included with the CUDA-Q QEC library. The library follows the CUDA-Q decoder Python and C++ interfaces (namely cudaq_qec.Decoder for Python and cudaq::qec::decoder for C++), but as documented in the API sections (NVIDIA QLDPC Decoder for Python and NVIDIA QLDPC Decoder for C++), there are many configuration options that can be passed to the constructor. The following example shows how to exercise the decoder using non-trivial pre-generated test data. The test data was generated using scripts originating from the GitHub repo for BivariateBicycleCodes [1]; it includes parity check matrices (PCMs) and test syndromes to exercise a decoder.


import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time

# For fetching data
import requests
import bz2
import os

# Note: running this script will automatically download data if necessary.

### Helper functions ###


def parse_csr_mat(j, dims, mat_name):
    """
    Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
    """
    assert len(dims) == 2, "dims must be a tuple of two integers"

    # Extract indptr and indices from the JSON.
    indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
    indices = np.array(j[f"{mat_name}_indices"], dtype=int)

    # Check that the CSR structure is consistent.
    assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
    assert np.all(
        indices < dims[1]), "All column indices must be less than dims[1]"

    # Create a data array of ones.
    data = np.ones(indptr[-1], dtype=np.uint8)

    # Build the CSR matrix and return it as a dense numpy array.
    csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
    return csr.toarray()


def parse_H_csr(j, dims):
    """
    Parse a CSR-style parity check matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "H")


def parse_obs_csr(j, dims):
    """
    Parse a CSR-style observable matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "obs_mat")


### Main decoder loop ###


def run_decoder(filename, num_shots, run_as_batched):
    """
    Load a JSON file and decode "num_shots" syndromes.
    """
    t_load_begin = time.time()
    with open(filename, "r") as f:
        j = json.load(f)

    dims = j["shape"]
    assert len(dims) == 2

    # Read the Parity Check Matrix
    H = parse_H_csr(j, dims)
    syndrome_length, block_length = dims
    t_load_end = time.time()

    print(f"{filename} parsed in {1e3 * (t_load_end-t_load_begin)} ms")

    error_rate_vec = np.array(j["error_rate_vec"])
    assert len(error_rate_vec) == block_length
    obs_mat_dims = j["obs_mat_shape"]
    obs_mat = parse_obs_csr(j, obs_mat_dims)
    assert dims[1] == obs_mat_dims[0]
    file_num_trials = j["num_trials"]
    num_shots = min(num_shots, file_num_trials)
    print(
        f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')

    # osd_method: 0=Off, 1=OSD-0, 2=Exhaustive, 3=Combination Sweep
    osd_method = 1

    # When osd_method is:
    #  2) there are 2^osd_order additional error mechanisms checked.
    #  3) there are an additional k + osd_order*(osd_order-1)/2 error
    #     mechanisms checked.
    # Ref: https://arxiv.org/pdf/2005.07016
    osd_order = 0

    # Maximum number of BP iterations before attempting OSD (if necessary)
    max_iter = 50

    nv_dec_args = {
        "max_iterations": max_iter,
        "error_rate_vec": error_rate_vec,
        "use_sparsity": True,
        "use_osd": osd_method > 0,
        "osd_order": osd_order,
        "osd_method": osd_method
    }

    if run_as_batched:
        # Perform BP processing for up to 1000 syndromes per batch. If there
        # are more than 1000 syndromes, the decoder will chunk them up and
        # process each batch sequentially under the hood.
        nv_dec_args['bp_batch_size'] = min(1000, num_shots)

    try:
        nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
                                             **nv_dec_args)
    except Exception as e:
        print(
            'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
            'QEC installation.')
        exit(0)
    decoding_time = 0
    bp_converged_flags = []
    num_logical_errors = 0

    # Batched API
    if run_as_batched:
        syndrome_list = []
        obs_truth_list = []
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]
            syndrome_list.append(syndrome)
            obs_truth_list.append(obs_truth)
        t0 = time.time()
        results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
        t1 = time.time()
        decoding_time += t1 - t0
        for r, obs_truth in zip(results, obs_truth_list):
            bp_converged_flags.append(r.converged)
            dec_result = np.array(r.result, dtype=np.uint8)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Non-batched API
    else:
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]

            t0 = time.time()
            results = nv_dec_gpu_and_cpu.decode(syndrome)
            bp_converged = results.converged
            dec_result = results.result
            t1 = time.time()
            trial_diff = t1 - t0
            decoding_time += trial_diff

            dec_result = np.array(dec_result, dtype=np.uint8)
            bp_converged_flags.append(bp_converged)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Count how many shots the decoder failed to correct the errors
    print(f"{num_logical_errors} logical errors in {num_shots} shots")
    print(
        f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
    )
    print(
        f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
    )


if __name__ == "__main__":
    # See other test data options in https://github.com/NVIDIA/cudaqx/releases/tag/0.2.0
    filename = 'osd_1008_8785_0.001.json'
    bz2filename = filename + '.bz2'
    if not os.path.exists(filename):
        url = f"https://github.com/NVIDIA/cudaqx/releases/download/0.2.0/{bz2filename}"

        print(f'Downloading data from {url}')

        # Download the file
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error if download fails
        with open(bz2filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f'Decompressing {bz2filename} into {filename}')

        # Decompress the file
        with bz2.BZ2File(bz2filename, "rb") as f_in, open(filename,
                                                          "wb") as f_out:
            f_out.write(f_in.read())

        print(f"Decompressed file saved as {filename}")

    num_shots = 100
    run_as_batched = True
    run_decoder(filename, num_shots, run_as_batched)

Footnotes

Exact Maximum Likelihood Decoding with NVIDIA Tensor Networks Decoder

Starting with CUDA-Q QEC v0.4.0, a GPU-accelerated Maximum Likelihood Decoder is included with the CUDA-Q QEC library. The library follows the CUDA-Q decoder Python interface, namely cudaq_qec.Decoder. At this time, we only support the Python interface for the decoder, which is available at cudaq_qec.plugins.decoders.tensor_network_decoder.TensorNetworkDecoder. As documented in the API sections Tensor Network Decoder, there are many configuration options that can be passed to the constructor.

In the following example, we show how to use the TensorNetworkDecoder class from the cudaq_qec library to decode a circuit-level noise problem derived from a Stim surface code circuit.

"""
Example usage of tensor_network_decoder from cudaq_qec.

This script demonstrates how to instantiate and use the tensor network decoder
to decode a circuit level noise problem derived from a Stim surface code experiment.

This example requires the `cudaq-qec` package and the optional tensor-network dependencies.
To install the required dependencies, run:

pip install cudaq-qec[tensor_network_decoder]

Additionaly, you will need `stim` and `beliefmatching` packages:
pip install stim beliefmatching

"""
import cudaq_qec as qec
import numpy as np

import stim

from beliefmatching.belief_matching import detector_error_model_to_check_matrices


def parse_detector_error_model(detector_error_model):
    matrices = detector_error_model_to_check_matrices(detector_error_model)

    out_H = np.zeros(matrices.check_matrix.shape)
    matrices.check_matrix.astype(np.float64).toarray(out=out_H)
    out_L = np.zeros(matrices.observables_matrix.shape)
    matrices.observables_matrix.astype(np.float64).toarray(out=out_L)

    return out_H, out_L, [float(p) for p in matrices.priors]


circuit = stim.Circuit.generated("surface_code:rotated_memory_z",
                                 rounds=3,
                                 distance=3,
                                 after_clifford_depolarization=0.001,
                                 after_reset_flip_probability=0.01,
                                 before_measure_flip_probability=0.01,
                                 before_round_data_depolarization=0.01)

detector_error_model = circuit.detector_error_model(decompose_errors=True)

H, logicals, noise_model = parse_detector_error_model(detector_error_model)

decoder = qec.get_decoder(
    "tensor_network_decoder",
    H,
    logical_obs=logicals,
    noise_model=noise_model,
    contract_noise_model=True,
)

num_shots = 5
sampler = circuit.compile_detector_sampler()
detection_events, observable_flips = sampler.sample(num_shots,
                                                    separate_observables=True)

res = decoder.decode_batch(detection_events)

print("Tensor network prediction: ", [r.result[0] > 0.5 for r in res])
print("Actual observable flips: ", [bool(o[0]) for o in observable_flips])

Output:

The decoder returns the probability that the logical observable has flipped for each syndrome. This can be used to assess the performance of the code and the decoder under different error scenarios.

See Also:

  • cudaq_qec.plugins.decoders.tensor_network_decoder