Quantum Error Correction with Circuit-level Noise Modeling

This example builds upon the previous code-capacity noise model example. In the circuit-level noise modeling experiment, we have many of the same components from the CUDA-Q QEC library: QEC codes, decoders, and noisy data. The primary difference here, is that we can begin to run CUDA-Q kernels to generate noisy data, rather than just generating random bitstring to represent our errors.

Along with the stabilizers, parity check matrices, and logical observables, the QEC code type also has an encoding map. This map allows codes to define logical gates in terms of gates on the underlying physical qubits. These encodings operate on the qec.patch type, which represents three registers of physical qubits making up a logical qubit. A data qubit register, an X-stabilizer ancilla register, and a Z-stabilizer ancilla register.

The most notable encoding stored in the QEC map, is how the qec.operation.stabilizer_round, which encodes a cudaq.kernel which stores the gate-level information for how to do a stabilizer measurement. These stabilizer rounds are the gate-level way to encode the parity check matrix of a QEC code into quantum circuits.

This example walks through how to use the CUDA-Q QEC library to perform a quantum memory experiment simulation. These experiments model how well QEC cycles, or rounds of stabilizer measuments, can protect the information encoded in a logical qubit. If noise is turned off, then the information is protected indefinitely. Here, we will model depolarization noise after each CX gate, and track how many logical errors occur.

CUDA-Q QEC Implementation

Here’s how to use CUDA-Q QEC to perform a circuit-level noise model experiment in both Python and C++:

import numpy as np
import cudaq
import cudaq_qec as qec

# Get a QEC code
cudaq.set_target("stim")
steane = qec.get_code("steane")

# Get the parity check matrix of a code
# Can get the full code, or for CSS codes
# just the X or Z component
H = steane.get_parity()
print(f"H:\n{H}")
observables = steane.get_pauli_observables_matrix()
Lz = steane.get_observables_z()
print(f"observables:\n{observables}")
print(f"Lz:\n{Lz}")

nShots = 3
nRounds = 4

# error probabily
p = 0.01
noise = cudaq.NoiseModel()
noise.add_all_qubit_channel("x", cudaq.Depolarization2(p), 1)

# prepare logical |0> state, tells the sampler to do z-basis experiment
statePrep = qec.operation.prep0
# our expected measurement in this state is 0
expected_value = 0

# sample the steane memory circuit with noise on each cx gate
# reading out the syndromes after each stabilizer round (xor'd against the previous)
# and readout out the data qubits at the end of the experiment
syndromes, data = qec.sample_memory_circuit(steane, statePrep, nShots, nRounds,
                                            noise)
print("From sample function:\n")
print("syndromes:\n", syndromes)
print("data:\n", data)

# Get a decoder
decoder = qec.get_decoder("single_error_lut", H)
nLogicalErrors = 0

# Logical Mz each shot (use Lx if preparing in X-basis)
logical_measurements = (Lz @ data.transpose()) % 2
# only one logical qubit, so do not need the second axis
logical_measurements = logical_measurements.flatten()
print("LMz:\n", logical_measurements)

# organize data by shot and round if desired
syndromes = syndromes.reshape((nShots, nRounds, syndromes.shape[1]))

# initialize a Pauli frame to track logical flips
# through the stabilizer rounds
pauli_frame = np.array([0, 0], dtype=np.uint8)
for shot in range(0, nShots):
    print("shot:", shot)
    for syndrome in syndromes[shot]:
        print("syndrome:", syndrome)
        # decode the syndrome
        convergence, result = decoder.decode(syndrome)
        data_prediction = np.array(result, dtype=np.uint8)

        # see if the decoded result anti-commutes with the observables
        print("decode result:", data_prediction)
        decoded_observables = (observables @ data_prediction) % 2
        print("decoded_observables:", decoded_observables)

        # update pauli frame
        pauli_frame = (pauli_frame + decoded_observables) % 2
        print("pauli frame:", pauli_frame)

    # after pauli frame has tracked corrections through the rounds
    # apply the pauli frame correction to the measurement, and see
    # if this matches the state we intended to prepare
    # We prepared |0>, so we check if logical measurement Mz + Pf_X = 0
    corrected_mz = (logical_measurements[shot] + pauli_frame[0]) % 2
    print("Expected value:", expected_value)
    print("Corrected value:", corrected_mz)
    if (corrected_mz != expected_value):
        nLogicalErrors += 1

# Count how many shots the decoder failed to correct the errors
print("Number of logical errors:", nLogicalErrors)
// Compile and run with:
// nvq++ --enable-mlir --target=stim -lcudaq-qec circuit_level_noise.cpp
// ./a.out

#include "cudaq.h"
#include "cudaq/qec/decoder.h"
#include "cudaq/qec/experiments.h"
#include "cudaq/qec/noise_model.h"

int main() {
  // Choose a QEC code
  auto steane = cudaq::qec::get_code("steane");

  // Access the parity check matrix
  auto H = steane->get_parity();
  std::cout << "H:\n";
  H.dump();

  // Access the logical observables
  auto observables = steane->get_pauli_observables_matrix();
  auto Lz = steane->get_observables_z();

  // Data qubits the logical Z observable is supported on
  std::cout << "Lz:\n";
  Lz.dump();

  // Observables are stacked as Z over X for mat-vec multiplication
  std::cout << "Obs:\n";
  observables.dump();

  // How many shots to run the experiment
  int nShots = 3;
  // For each shot, how many rounds of stabilizer measurements
  int nRounds = 4;

  // can set seed for reproducibility
  // cudaq::set_random_seed(1337);
  cudaq::noise_model noise;

  // Add a depolarization noise channel after each cx gate
  noise.add_all_qubit_channel("x", cudaq::depolarization2(/*probability*/ 0.01),
                              /*numControls*/ 1);

  // Perform a noisy z-basis memory circuit experiment
  auto [syndromes, data] = cudaq::qec::sample_memory_circuit(
      *steane, cudaq::qec::operation::prep0, nShots, nRounds, noise);

  // With noise, many syndromes will flip each QEC cycle, these are the
  // syndrome differences from the previous cycle.
  std::cout << "syndromes:\n";
  syndromes.dump();

  // With noise, Lz will sometimes be flipped
  std::cout << "data:\n";
  data.dump();

  // Use z-measurements on data qubits to determine the logical mz
  // In an x-basis experiment, use Lx.
  auto logical_mz = Lz.dot(data.transpose()) % 2;
  std::cout << "logical_mz each shot:\n";
  logical_mz.dump();

  // Select a decoder
  auto decoder = cudaq::qec::get_decoder("single_error_lut", H);

  // Initialize a pauli_frame to track the logical errors
  cudaqx::tensor<uint8_t> pauli_frame({observables.shape()[0]});

  // Start a loop to count the number of logical errors
  size_t numLerrors = 0;
  for (size_t shot = 0; shot < nShots; ++shot) {
    std::cout << "shot: " << shot << "\n";

    for (size_t round = 0; round < nRounds; ++round) {
      std::cout << "round: " << round << "\n";

      // Access one row of the syndrome tensor
      size_t count = shot * nRounds + round;
      size_t stride = syndromes.shape()[1];
      cudaqx::tensor<uint8_t> syndrome({stride});
      syndrome.borrow(syndromes.data() + stride * count);
      std::cout << "syndrome:\n";
      syndrome.dump();

      // Decode the syndrome
      auto [converged, v_result] = decoder->decode(syndrome);
      cudaqx::tensor<uint8_t> result_tensor;
      cudaq::qec::convert_vec_soft_to_tensor_hard(v_result, result_tensor);
      std::cout << "decode result:\n";
      result_tensor.dump();

      // See if the decoded result anti-commutes with observables
      auto decoded_observables = observables.dot(result_tensor);
      std::cout << "decoded observable:\n";
      decoded_observables.dump();

      // update from previous stabilizer round
      pauli_frame = (pauli_frame + decoded_observables) % 2;
      std::cout << "pauli frame:\n";
      pauli_frame.dump();
    }

    // prep0 means we expected to measure out 0.
    uint8_t expected_mz = 0;
    // Apply the pauli frame correction to our logical measurement
    uint8_t corrected_mz = (logical_mz.at({0, shot}) + pauli_frame.at({0})) % 2;

    // Check if Logical_mz + pauli_frame_X = 0?
    std::cout << "Corrected readout: " << +corrected_mz << "\n";
    std::cout << "Expected readout: " << +expected_mz << "\n";
    if (corrected_mz != expected_mz)
      numLerrors++;
    std::cout << "\n";
  }

  std::cout << "numLogicalErrors: " << numLerrors << "\n";
}

Compile and run with

nvq++ --enable-mlir --target=stim -lcudaq-qec circuit_level_noise.cpp -o circuit_level_noise
./circuit_level_noise
  1. QEC Code and Decoder types:
    • As in the code capacity example, our central objects are the qec.code and qec.decoder types.

  2. Clifford simulation backend:
    • As the size of QEC circuits can grow quite large, Clifford simulation is often the best tool for these simulations.

    • cudaq.set_target("stim") selects the highly performant Stim simulator as the simulation backend.

  3. Noise model:
    • To add noisy gates we use the cudaq.NoiseModel type.

    • CUDA-Q supports the generation of arbitrary noise channels. Here we use a cudaq.Depolarization2 channel to add a depolarization channel.

    • This is added to the CX gate by adding it to the X gate with 1 control.

    • This noisy gate is added to every qubit via that noise.add_all_qubit_channel function.

  4. Getting circuit-level noisy data:
    • The qec.code is the first input parameter here, as the code’s stabilizer_round determines the circuits executed.

    • Each memory circuit runs for an input number of nRounds, which specifies how many stabilizer_round kernels are ran.

    • After nRounds the data qubits are measured and the run is over.

    • This is performed nShots number of times.

    • During a shot, each stabilizer round’s syndrome is xor’d against the preceding syndrome, so that we can track a sparser flow of data showing which round each parity check was violated.

    • The first round returns the syndrome as is, as there is nothing preceding to xor against.

  5. Data qubit measurements:
    • The data qubits are only read out after the end of each shot, so there are nShots worth of data readouts.

    • The basis of the data qubit measurements depends on the state preparation used.

    • Z-basis readout when preparing the logical |0> or logical |1> state with the qec.operation.prep0 or qec.operation.prep1 kernels.

    • X-basis readout when preparing the logical |+> or logical |-> state with the qec.operation.prepp or qec.operation.prepm kernels.

  6. Logical Errors:
    • From here, the decoding procedure is again similar to the code capacity case, expect for we use a pauli frame to track errors that happen each QEC cycle.

    • The final values of the pauli frame tell us how our logical state flipped during the experiment, and what needs to be done to correct it.

    • We compare our known initial state (corrected by the Pauli frame), against our measured data qubits to determine if a logical error occurred.

The CUDA-Q QEC library thus provides a platform for numerical QEC experiments. The qec.code can be used to analyze a variety of QEC codes (both library or user provided), with a variety of decoders (both library or user provided). The CUDA-Q QEC library also provides tools to speed up the automation of generating noisy data and syndromes.

Addtionally, here’s how to use CUDA-Q QEC to construct a multi-round parity check matrix and a custom error correction code for the circuit-level noise model experiment in Python:

import cudaq
import cudaq_qec as qec
import numpy as np

nRounds = 3
nShots = 500
# Physical error rate
p_per_round = 0.01
p_per_mz = 0.01


# Construct the measurement error syndrome matrix based on the distance and number of rounds
def construct_measurement_error_syndrome(distance, nRounds):
    num_stabilizers = distance - 1
    num_mea_q = num_stabilizers * nRounds

    syndrome_rows = []

    # In this scheme, need two rounds for each measurement syndrome
    for i in range(nRounds - 1):
        for j in range(num_stabilizers):
            syndrome = np.zeros((num_mea_q,), dtype=np.uint8)

            # The error on ancilla (j) in round (i) affects stabilizer checks at two positions:
            # First occurrence in round i
            pos1 = i * num_stabilizers + j
            # Second occurrence in round i+1
            pos2 = (i + 1) * num_stabilizers + j

            # Mark the syndrome
            syndrome[pos1] = 1
            syndrome[pos2] = 1

            syndrome_rows.append(syndrome)

    return np.array(syndrome_rows).T


# Generate the parity check matrix for n-rounds by duplicating the input parity check matrix Hz
# and appending the measurement error syndrome matrix.
def get_circuit_level_pcm(distance, nRounds, Hz):
    if nRounds < 2:
        raise ValueError("nRounds must be greater than or equal to 2")
    if distance < 3:
        raise ValueError("distance must be greater than or equal to 3")

    # Parity check matrix for a single round
    H = np.array(Hz)

    # Extends H to nRounds
    rows, cols = H.shape
    H_nrounds = np.zeros((rows * nRounds, cols * nRounds), dtype=np.uint8)
    for i in range(nRounds):
        H_nrounds[i * rows:(i + 1) * rows, i * cols:(i + 1) * cols] = H
    print("H_nrounds\n", H_nrounds)

    # Construct the measurement error syndrome matrix for Z errors
    H_mz = construct_measurement_error_syndrome(distance, nRounds)
    print("H_mz\n", H_mz)
    assert H_nrounds.shape[0] == H_mz.shape[
        0], "Dimensions of H_nrounds and H_mz do not match"

    # Append columns for measurement errors to H
    H_pcm = np.concatenate((H_nrounds, H_mz), axis=1)
    print(f"H_pcm:\n{H_pcm}")

    return H_pcm


# Example of how to construct a repetition code with a distance of 3 and random
# bit flip errors applied to the data qubits
@cudaq.kernel
def three_qubit_repetition_code():
    data_qubits = cudaq.qvector(3)
    ancilla_qubits = cudaq.qvector(2)

    # Initialize the logical |1> state as |111>
    x(data_qubits)

    for i in range(nRounds):
        # Random Bit Flip Errors
        for j in range(3):
            cudaq.apply_noise(cudaq.XError, p_per_round, data_qubits[j])

        # Extract Syndromes
        h(ancilla_qubits)

        # First Parity Check
        z.ctrl(ancilla_qubits[0], data_qubits[0])
        z.ctrl(ancilla_qubits[0], data_qubits[1])

        # Second Parity Check
        z.ctrl(ancilla_qubits[1], data_qubits[1])
        z.ctrl(ancilla_qubits[1], data_qubits[2])

        h(ancilla_qubits)

        # Measure the ancilla qubits
        s0 = mz(ancilla_qubits[0])
        s1 = mz(ancilla_qubits[1])
        reset(ancilla_qubits[0])
        reset(ancilla_qubits[1])

    # Final measurement to get the data qubits
    mz(data_qubits)


# Create a noise model
noise_model = cudaq.NoiseModel()
# Add measurement noise
noise_model.add_all_qubit_channel("mz", cudaq.BitFlipChannel(p_per_mz))

# Run the kernel and observe results
# The percent of samples that are 000 corresponds to the logical error rate
cudaq.set_target("stim")
result = cudaq.sample(three_qubit_repetition_code,
                      shots_count=nShots,
                      noise_model=noise_model,
                      explicit_measurements=True)

# The following section will demonstrate how to decode the results
# Get the parity check matrix for n-rounds of the repetition code
Hz = [[1, 1, 0], [0, 1, 1]]  # Parity check matrix for 1 round
H_pcm = get_circuit_level_pcm(3, nRounds, Hz)

# Get observables
observables = np.array([1, 0, 0, 0, 0, 0], dtype=np.uint8)
Lz = np.array([1, 0, 0], dtype=np.uint8)
print(f"observables:\n{observables}")
print(f"Lz:\n{Lz}")
# Pad the observables to be the same dimension as the decoded observable
Lz_nrounds = np.tile(Lz, nRounds)
pad_size = max(0, H_pcm.shape[1] - Lz_nrounds.shape[0])
Lz_nround_mz = np.pad(Lz_nrounds, (0, pad_size), mode='constant')
print(f"Lz_nround_mz\n{Lz_nround_mz}")

# Get a decoder
decoder = qec.get_decoder("single_error_lut", H_pcm)
nLogicalErrors = 0

# initialize a Pauli frame to track logical flips
# through the stabilizer rounds. Only need the Z component for the repetition code.
pauli_frame = np.array([0, 0], dtype=np.uint8)
expected_value = 1
for shot, outcome in enumerate(result.get_sequential_data()):
    outcome_array = np.array([int(bit) for bit in outcome], dtype=np.uint8)
    syndrome = outcome_array[:len(outcome_array) - 3]
    data = outcome_array[len(outcome_array) - 3:]
    print("\nshot:", shot)
    print("syndrome:", syndrome)

    # Decode the syndrome
    convergence, result = decoder.decode(syndrome)
    data_prediction = np.array(result, dtype=np.uint8)

    # See if the decoded result anti-commutes with the observables
    print("decode result:", data_prediction)
    decoded_observables = (Lz_nround_mz @ data_prediction) % 2
    print("decoded_observables:", decoded_observables)

    # update pauli frame
    pauli_frame[0] = (pauli_frame[0] + decoded_observables) % 2
    print("pauli frame:", pauli_frame)

    logical_measurements = (Lz @ data.transpose()) % 2
    print("LMz:", logical_measurements)

    corrected_mz = (logical_measurements + pauli_frame[0]) % 2
    print("Expected value:", expected_value)
    print("Corrected value:", corrected_mz)
    if (corrected_mz != expected_value):
        nLogicalErrors += 1

# Count how many shots the decoder failed to correct the errors
print("\nNumber of logical errors:", nLogicalErrors)

This example illustrates how to:

1. Construct a multi-round parity check matrix – Users can extend a single-round parity check matrix across multiple rounds, incorporating measurement errors to track syndrome evolution over time. This enables more accurate circuit-level noise modeling for decoders.

2. Define custom error correction circuits with precise noise injection – Using cudaq.apply_noise, users can introduce specific error channels at targeted locations within the QEC circuit. This fine-grained control allows for precise testing of how different noise sources affect logical error rates.

In the previous example, we demonstrated how to introduce random X errors into each data qubit using cudaq.apply_noise during each round of syndrome extraction. CUDA-Q allows users to inject a variety of error channels at different locations within their circuits, enabling fine-grained noise modeling. The example below showcases additional ways to introduce errors into a quantum kernel:

@cudaq.kernel
def inject_noise_example():
    q = cudaq.qvector(3)

    # Apply depolarization noise to the first qubit
    cudaq.apply_noise(cudaq.DepolarizationChannel, 0.1, q[0])

    # Perform gate operations
    h(q[1])
    x.ctrl(q[1], q[2])

    # Inject a Y error into the second qubit
    cudaq.apply_noise(cudaq.YError, 0.1, q[1])

    # Apply a general Pauli noise channel to the third qubit, where the 3 values indicate the probability of X, Y, and Z errors.
    cudaq.apply_noise(cudaq.Pauli1, 0.1, 0.1, 0.1, q[2])

# Define and apply a noise model
noise = cudaq.NoiseModel()
counts = cudaq.sample(inject_noise_example, noise_model=noise)

For a full list of supported noise models and their parameters, refer to the CUDA-Q documentation.

Getting Started with the NVIDIA QLDPC Decoder

Starting with CUDA-Q QEC v0.2, a GPU-accelerated decoder is included with the CUDA-Q QEC library. The library follows the CUDA-Q decoder Python and C++ interfaces (namely cudaq_qec.Decoder for Python and cudaq::qec::decoder for C++), but as documented in the API sections (NVIDIA QLDPC Decoder for Python and NVIDIA QLDPC Decoder for C++), there are many configuration options that can be passed to the constructor. The following example shows how to exercise the decoder using non-trivial pre-generated test data. The test data was generated using scripts originating from the GitHub repo for BivariateBicycleCodes [1]; it includes parity check matrices (PCMs) and test syndromes to exercise a decoder.


import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time

# For fetching data
import requests
import bz2
import os

# Note: running this script will automatically download data if necessary.

### Helper functions ###


def parse_csr_mat(j, dims, mat_name):
    """
    Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
    """
    assert len(dims) == 2, "dims must be a tuple of two integers"

    # Extract indptr and indices from the JSON.
    indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
    indices = np.array(j[f"{mat_name}_indices"], dtype=int)

    # Check that the CSR structure is consistent.
    assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
    assert np.all(
        indices < dims[1]), "All column indices must be less than dims[1]"

    # Create a data array of ones.
    data = np.ones(indptr[-1], dtype=np.uint8)

    # Build the CSR matrix and return it as a dense numpy array.
    csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
    return csr.toarray()


def parse_H_csr(j, dims):
    """
    Parse a CSR-style parity check matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "H")


def parse_obs_csr(j, dims):
    """
    Parse a CSR-style observable matrix from an input file in JSON format"
    """
    return parse_csr_mat(j, dims, "obs_mat")


### Main decoder loop ###


def run_decoder(filename, num_shots, run_as_batched):
    """
    Load a JSON file and decode "num_shots" syndromes.
    """
    t_load_begin = time.time()
    with open(filename, "r") as f:
        j = json.load(f)

    dims = j["shape"]
    assert len(dims) == 2

    # Read the Parity Check Matrix
    H = parse_H_csr(j, dims)
    syndrome_length, block_length = dims
    t_load_end = time.time()

    print(f"{filename} parsed in {1e3 * (t_load_end-t_load_begin)} ms")

    error_rate_vec = np.array(j["error_rate_vec"])
    assert len(error_rate_vec) == block_length
    obs_mat_dims = j["obs_mat_shape"]
    obs_mat = parse_obs_csr(j, obs_mat_dims)
    assert dims[1] == obs_mat_dims[0]
    file_num_trials = j["num_trials"]
    num_shots = min(num_shots, file_num_trials)
    print(
        f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')

    # osd_method: 0=Off, 1=OSD-0, 2=Exhaustive, 3=Combination Sweep
    osd_method = 1

    # When osd_method is:
    #  2) there are 2^osd_order additional error mechanisms checked.
    #  3) there are an additional k + osd_order*(osd_order-1)/2 error
    #     mechanisms checked.
    # Ref: https://arxiv.org/pdf/2005.07016
    osd_order = 0

    # Maximum number of BP iterations before attempting OSD (if necessary)
    max_iter = 50

    nv_dec_args = {
        "max_iterations": max_iter,
        "error_rate_vec": error_rate_vec,
        "use_sparsity": True,
        "use_osd": osd_method > 0,
        "osd_order": osd_order,
        "osd_method": osd_method
    }

    if run_as_batched:
        # Perform BP processing for up to 1000 syndromes per batch. If there
        # are more than 1000 syndromes, the decoder will chunk them up and
        # process each batch sequentially under the hood.
        nv_dec_args['bp_batch_size'] = min(1000, num_shots)

    try:
        nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
                                             **nv_dec_args)
    except Exception as e:
        print(
            'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
            'QEC installation.')
        exit(0)
    decoding_time = 0
    bp_converged_flags = []
    num_logical_errors = 0

    # Batched API
    if run_as_batched:
        syndrome_list = []
        obs_truth_list = []
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]
            syndrome_list.append(syndrome)
            obs_truth_list.append(obs_truth)
        t0 = time.time()
        results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
        t1 = time.time()
        decoding_time += t1 - t0
        for r, obs_truth in zip(results, obs_truth_list):
            bp_converged_flags.append(r.converged)
            dec_result = np.array(r.result, dtype=np.uint8)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Non-batched API
    else:
        for i in range(num_shots):
            syndrome = j["trials"][i]["syndrome_truth"]
            obs_truth = j["trials"][i]["obs_truth"]

            t0 = time.time()
            bp_converged, dec_result = nv_dec_gpu_and_cpu.decode(syndrome)
            t1 = time.time()
            trial_diff = t1 - t0
            decoding_time += trial_diff

            dec_result = np.array(dec_result, dtype=np.uint8)
            bp_converged_flags.append(bp_converged)

            # See if this prediction flipped the observable
            predicted_observable = obs_mat.T @ dec_result % 2
            print(f"predicted_observable: {predicted_observable}")

            # See if the observable was actually flipped according to the truth
            # data
            actual_observable = np.array(obs_truth, dtype=np.uint8)
            print(f"actual_observable:    {actual_observable}")

            if np.sum(predicted_observable != actual_observable) > 0:
                num_logical_errors += 1

    # Count how many shots the decoder failed to correct the errors
    print(f"{num_logical_errors} logical errors in {num_shots} shots")
    print(
        f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
    )
    print(
        f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
    )


if __name__ == "__main__":
    # See other test data options in https://github.com/NVIDIA/cudaqx/releases/tag/0.2.0
    filename = 'osd_1008_8785_0.001.json'
    bz2filename = filename + '.bz2'
    if not os.path.exists(filename):
        url = f"https://github.com/NVIDIA/cudaqx/releases/download/0.2.0/{bz2filename}"

        print(f'Downloading data from {url}')

        # Download the file
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error if download fails
        with open(bz2filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f'Decompressing {bz2filename} into {filename}')

        # Decompress the file
        with bz2.BZ2File(bz2filename, "rb") as f_in, open(filename,
                                                          "wb") as f_out:
            f_out.write(f_in.read())

        print(f"Decompressed file saved as {filename}")

    num_shots = 100
    run_as_batched = True
    run_decoder(filename, num_shots, run_as_batched)

Footnotes