import json
import logging
import numpy as np
from netsquid.nodes import Node
import netsquid.components.instructions as instr
from netsquid.components.models.qerrormodels import T1T2NoiseModel
from netsquid.components.qprocessor import QuantumProcessor, PhysicalInstruction
[docs]
class QPUNode(Node):
"""
Represents an entity (i.e. the legendary Alice and Bob) with a quantum processing
unit (QPU) with a program queue and callback functionality. If a program needs to be
executed it should be added to the queue using the add_program method.
Qubit mapping:
0 - Emission qubit
1 - Communications qubit, entangled with emission qubit
2 - Shielded qubit, used for storing & distilling entanglement
Parameters
----------
qbit_count : int, optional
Number of qubits in the processor, by default 2.
depolar_rate : float, optional
Depolarization rate for the noise model, by default 0.
"""
def __init__(self, qnode_id, ideal_qpu, qbit_count=3):
# Changed: generate name from ID
self.id = qnode_id
name = f"qnode_{qnode_id}"
super().__init__(name, port_names=["corrections"])
# The last qubit slot is used for photon emission into fibre
self.processor = self.__create_processor(qbit_count, ideal_qpu)
self.__setup_callbacks()
# Keep track of qubit mappings
self.emit_idx = 0
self.comm_idx = 1
self.shielded_idx = 2
# Fetch logger
self.__logger = logging.getLogger("qpu_logger")
# ======== PRIVATE METHODS ========
# Helper function to create a simple QPU with a few useful instructions
def __create_processor(self, qbit_count, ideal_qpu):
"""
Private helper method used to initialize the quantum processor for the entity.
We have nonphysical instructions as we use an abstract QPU architecture.
Communication qubit coherence times:
T1: 2.68ms
T2: 1ms
Memory (shielded) qubit coherence times:
T1: inf
T2: 3.5ms
Parameters
----------
ideal_qpu : bool
Whether the QPU has noise and delay models applied.
qbit_count : int
Number of qubits in the processor.
depolar_rate : float
Depolarization rate for the noise model.
Returns
-------
QuantumProcessor
A configured quantum processor with fallback to nonphysical instructions.
"""
qproc_name = f"qproc_{self.id}"
if ideal_qpu:
# Build ideal QPU
processor = QuantumProcessor(
qproc_name,
num_positions=qbit_count,
fallback_to_nonphysical=True,
)
processor.add_ports(["qout_hdr", "qout0_hdr"])
else:
# Build a noisy QPU
memory_noise_models = [
None, # Utility qubit: no noise applied
T1T2NoiseModel(
T1=2_680_000, T2=1_000_000
), # Communication qubit: high error (short coherence times)
T1T2NoiseModel(
T1=np.inf, T2=3_500_000
), # Shielded qubit: low error (long coherence times)
]
processor = QuantumProcessor(
qproc_name,
num_positions=qbit_count,
memory_noise_models=memory_noise_models,
phys_instructions=None,
fallback_to_nonphysical=False,
)
processor.add_ports(["qout_hdr", "qout0_hdr"])
processor.add_physical_instruction(
PhysicalInstruction(instruction=instr.INSTR_INIT, duration=200.0)
)
processor.add_physical_instruction(
PhysicalInstruction(
instruction=instr.INSTR_X,
duration=50.0,
# quantum_noise_model=T1T2NoiseModel(T1=0.5, T2=0.3),
)
)
processor.add_physical_instruction(
PhysicalInstruction(
instruction=instr.INSTR_Y,
duration=50.0,
# quantum_noise_model=T1T2NoiseModel(T1=0.5, T2=0.3),
)
)
processor.add_physical_instruction(
PhysicalInstruction(instruction=instr.INSTR_EMIT, duration=20.0)
)
processor.add_physical_instruction(
PhysicalInstruction(instruction=instr.INSTR_SWAP, duration=900.0)
)
processor.add_physical_instruction(
PhysicalInstruction(
instruction=instr.INSTR_CNOT,
duration=300.0,
# quantum_noise_model=DepolarNoiseModel(
# depolar_rate=1e6, time_independent=False
# ),
)
)
processor.add_physical_instruction(
PhysicalInstruction(
instruction=instr.INSTR_MEASURE, duration=400.0, parallel=True
)
)
return processor
# Helper for setting up the callbacks and handlers
def __setup_callbacks(self):
"""Set up callback handling for when programs complete."""
self.processor.ports["qout"].bind_output_handler(
self.__setup_header_wrapper, tag_meta=True
)
self.processor.ports["qout0"].bind_output_handler(
self.__setup_header_wrapper, tag_meta=True
)
def __debug(self, msg):
"""Callback for debugging message contents."""
port = msg.meta.get("rx_port_name", "missing_port_metadata")
self.__logger.debug(f"Received message on port: {port}, MSG: {msg}")
def __setup_header_wrapper(self, msg):
"""
Callback to add metadata headers to outbound messages for routing and
identification.
Parameters
----------
msg : object
The message object containing metadata and payload to be transmitted.
"""
port = msg.meta.get("rx_port_name", "missing_port_metadata")
event_id = msg.meta["put_event"].id
if self.request_uuid is None:
self.__logger.error(
f"""[Emission header callback]: {self.name}
request_uuid not set for port {port}, event_id: {event_id}"""
)
# TODO consider throwing an error
else:
header = {"event_id": event_id, "request_uuid": self.request_uuid}
msg.meta["header"] = json.dumps(header)
self.processor.ports[f"{port}_hdr"].tx_output(msg)
self.request_uuid = None # Remove old UUID once request is transmitted
[docs]
def set_emit_uuid(self, request_uuid):
"""
Set the UUID for the next message coming out of the photon emission port.
Parameters
----------
request_uuid : str
The UUID in question
"""
self.request_uuid = request_uuid