import json
import logging
from utils import loss_prob
from netsquid.nodes import Node
from bsm_wrapper import BSMWrapper
from netsquid.qubits.qubitapi import amplitude_dampen
from netsquid.components import QuantumChannel
from netsquid.components.models import (
FibreDelayModel,
FibreLossModel,
)
[docs]
class FSOSwitch(Node):
"""
A Free-Space Optical (FSO) switch component for routing quantum signals.
This routes photonic qubits through lossy fibre channels and connects to
a Bell-state measurement (BSM) detector. It applies fibre models for noise and loss.
The connections to and from the switch should be registered using register() to
allow for proper switching on a component name basis.
It uses the "fso_logger" logger object, get it with:
``logging.getLogger("fso_logger")``
Parameters
----------
switch_id : int
Integer ID of the switch, it's used to create the :class:`netsquid.nodes.Node`
superclass name.
ctrl_port : netsquid.components.component.Port
The port object of the control node, all output commands are sent to it.
dampening_parameter : float
Amplitude dampening parameter for photons passing through the switch.
Should be between 0 and 1.
ideal : bool
Specify whether the switch is ideal, i.e. no photon losses and path differences
in fibre channels.
herald_ports : list[str]
The entanglement heralding port names which will be connected to the BSM device.
visibility : float
The HOM visibility parameter of the BSM detector attached to the herald ports.
Should be between 0 and 1.
Examples
--------
>>> ctrl_node = ControlNode(id=0, network_type="tree")
>>> ctrl_port = ctrl_node.ports["switch_herald"]
>>> fsoswitch_node = FSOSwitch(
... switch_id=1,
... ctrl_port=ctrl_port,
... dampening_parameter=0.15,
... ideal=False,
... herald_ports=["qout0", "qout1"],
... visibility=0.85,
... )
"""
def __init__(
self,
switch_id,
ctrl_port,
dampening_parameter,
ideal=False,
herald_ports=["qout0", "qout1"],
visibility=1,
):
ports = [
"qin0",
"qin1",
"qin2",
"qout0",
"qout1",
"qout2",
"cout",
]
self.id = switch_id
name = f"switch_{switch_id}"
super().__init__(name, port_names=ports)
self.__ctrl_port = ctrl_port
self.__setup_fibre_channels(ideal)
self.__setup_bsm_detector(
herald_ports=herald_ports,
det_eff=1, # Ideal detector
dampening_parameter=dampening_parameter,
visibility=visibility,
)
self.__setup_port_forwarding(ctrl_port)
# Amplitude dampening parameter
self.__amplitude_dampening = dampening_parameter
# Save the outbound port name for quick lookup
self.__outbound_port = list({"qout0", "qout1", "qout2"} - set(herald_ports))[0]
self.__herald_ports = herald_ports
# Default routing
self.__routing_table = {"qin0": "qout0", "qin1": "qout1", "qin2": "qout2"}
# Connections registry
self.__registry = {}
# Fetch logger
self.__logger = logging.getLogger("fso_logger")
def __setup_bsm_detector(
self,
herald_ports,
dampening_parameter,
det_eff,
visibility,
p_dark=0,
):
"""
Creates a BSM detector component and adds it as a subcomponent to the FSO Switch
Assuming herald_ports of ["qout0", "qout1"], we have these port bindings:
[FSO] qout0 -> qin0 [BSM]
[FSO] qout1 -> qin1 [BSM]
Parameters
----------
herald_ports : list[str]
The entanglement heralding port names which will be connected to the BSM.
dampening_parameter : float
Amplitude dampening parameter for photons passing through the switch.
Should be between 0 and 1.
det_eff : float
Efficiency per detector, i.e. the probability of detecting an incoming
photon.
visibility : float
The HOM visibility parameter of the BSM detector attached to the herald ports.
Should be between 0 and 1.
p_dark : float, optional
Dark-count probability, i.e. probability of measuring a photon while
no photon was present.
"""
bsm_wrapper = BSMWrapper(
name=f"BSMWrap_{self.id}",
p_dark=p_dark,
det_eff=det_eff,
visibility=visibility,
)
self.add_subcomponent(bsm_wrapper)
# Connect ports from FSO to BSM heralding station wrapper
self.ports[herald_ports[0]].bind_output_handler(
bsm_wrapper.ports["qin0"].tx_input
)
self.ports[herald_ports[1]].bind_output_handler(
bsm_wrapper.ports["qin1"].tx_input
)
bsm_wrapper.ports["cout"].bind_input_handler(self.__ctrl_port.tx_input)
def __setup_fibre_channels(self, ideal):
"""
Configure fibre loss channels with noise, delay, and depolarization models.
Parameters
----------
ideal : bool
A boolean parameter controlling whether the switch is an ideal component
(no loss and path differences).
"""
model_map_short = {
"quantum_loss_model": FibreLossModel(
p_loss_init=0 if ideal else loss_prob(1.319),
rng=None,
),
}
model_map_mid = {
"quantum_loss_model": FibreLossModel(
p_loss_init=0 if ideal else loss_prob(2.12),
rng=None,
),
}
model_map_long = {
"quantum_loss_model": FibreLossModel(
p_loss_init=0 if ideal else loss_prob(2.005),
rng=None,
),
}
# Model the three different routes qubits can take through the switch
qchannel_short = QuantumChannel(
name="qchannel_short",
models=model_map_short,
length=0 if ideal else 0.005,
)
qchannel_mid = QuantumChannel(
name="qchannel_mid",
models=model_map_mid,
length=0 if ideal else 0.00587,
)
qchannel_long = QuantumChannel(
name="qchannel_long",
models=model_map_long,
length=0 if ideal else 0.00756,
)
# Add subcomponents
self.__channels = [qchannel_short, qchannel_mid, qchannel_long]
def __setup_port_forwarding(self, ctrl_port):
"""
Setup routing for the incoming ports through the fibre channels to the output
ports. The classical output (cout) port is bound to the control node's port
(ctrl_port) for communicating routing requests.
Parameters
----------
ctrl_port : netsquid.components.component.Port
The port object of the control node to which requests from the "cout" port
are relayed.
"""
# Bind input handlers
self.ports["qin0"].bind_input_handler(self.__recv_qubit, tag_meta=True)
self.ports["qin1"].bind_input_handler(self.__recv_qubit, tag_meta=True)
self.ports["qin2"].bind_input_handler(self.__recv_qubit, tag_meta=True)
# Bind output handlers
self.__channels[0].ports["recv"].bind_output_handler(self.__relay_qubit)
self.__channels[1].ports["recv"].bind_output_handler(self.__relay_qubit)
self.__channels[2].ports["recv"].bind_output_handler(self.__relay_qubit)
# Route COUT messages to CTRL_PORT.tx_input via a lambda
self.ports["cout"].bind_output_handler(lambda msg: ctrl_port.tx_input(msg))
def __relay_qubit(self, msg):
"""
Route an incoming quantum message to the appropriate output port. This requires
deserializing the message headers, popping the output port and relaying the
message through the port.
Parameters
----------
msg : netsquid.components.component.Message
Quantum message containing metadata for routing.
"""
serialized_headers = msg.meta.get("header", "{}")
dict_headers = json.loads(serialized_headers)
outbound_port = dict_headers.pop("outport", None)
# Debug print
self.__logger.debug(f"{self.name} Relaying qubit to port: {outbound_port}")
# Serialize headers before sending (dict is unhashable)
msg.meta["header"] = json.dumps(dict_headers)
self.ports[outbound_port].tx_output(msg)
def __recv_qubit(self, msg):
"""
Process an inbound qubit, apply amplitude dampening, determine the routing path
and forward it through the appropriate lossy channel.
Parameters
----------
msg : netsquid.components.component.Message
Quantum message received on a specific input port.
"""
inbound_port = msg.meta.get("rx_port_name", "missing_port_name")
outbound_port = self.__routing_table[inbound_port]
self.__logger.debug(
f"""[{self.name}] Received ({msg.items[0]} sender: {msg.meta['source']}
hdr: {msg.meta['header']}) on port {inbound_port}"""
)
# Deserialize the JSON headers
serialized_headers = msg.meta.get("header", "{}")
dict_headers = json.loads(serialized_headers)
dict_headers["outport"] = outbound_port
self.__logger.debug(
f"[{self.name}] Incoming: {inbound_port} | Outbound: {outbound_port}"
)
# Calculate which channel to route through:
# 0 -> Short channel
# 1 -> Medium channel
# 2 -> Long channel
channel_idx = abs(int(inbound_port[-1]) - int(outbound_port[-1]))
channel = self.__channels[channel_idx]
# Serialize the headers
msg.meta["header"] = json.dumps(dict_headers)
# Apply amplitude dampening
amplitude_dampen(msg.items[0], self.__amplitude_dampening)
# Relay qubit
channel.ports["send"].tx_input(msg)
def __switch(self, routing_table):
"""
Configure the FSO switch's routing table for input-output port mapping.
Parameters
----------
routing_table : dict
Dictionary mapping input ports (qin0, qin1, qin2) to output ports
(qout0, qout1, qout2).
Raises
------
ValueError
If the provided routing table has invalid keys or values.
"""
self.__logger.info(f"Switching {self.name}: {routing_table}")
valid_keys = sorted(routing_table.keys()) == ["qin0", "qin1", "qin2"]
valid_vals = sorted(routing_table.values()) == ["qout0", "qout1", "qout2"]
if not (valid_keys and valid_vals):
self.__logger.error(f"[FSO] Invalid routing table: {routing_table}")
self.__routing_table = routing_table.copy()
def _query_node(self, node_name):
return self.__registry.get(node_name)
[docs]
def register(self, node_name, inbound_port):
"""
Register a node that is connected to the switch's port for later querying.
Parameters
----------
node_name : str
The name of the node object.
inbound_port : str
The port name to which the node is connected to.
"""
self.__registry[node_name] = inbound_port
[docs]
def herald_switch(self, node_one, node_two):
"""
Change to a routing configuration which routes the input ports connected to the
two nodes to the BSM detector node (heralding device).
Parameters
----------
node_one : str
The first node which must be routed to the heralding device.
node_two : str
The second node which must be routed to the heralding device.
"""
# Fetch port names
inbound_one = self._query_node(node_one)
inbound_two = self._query_node(node_two)
remaining = list({"qin0", "qin1", "qin2"} - {inbound_one, inbound_two})[0]
self.__logger.info(
f"""[HERALD] {node_one} ({inbound_one}) & {node_two} ({inbound_two})
To herald: {self.__herald_ports}"""
)
# Construct routing table
routing_table = {
inbound_one: self.__herald_ports[0],
inbound_two: self.__herald_ports[1],
remaining: self.__outbound_port,
}
self.__logger.info(f"[HERALD TABLE] {routing_table}")
self.__switch(routing_table)
[docs]
def relay_switch(self, node_in, node_out):
"""
Change the switching configuration to connect node_in to node_out, assuming they
have been registered with the node. Remaining paths are connected in no
particular order.
Parameters
----------
node_in : str
The name of the node from which the photon is inbound.
node_out : str
The name of the photon's destination node, connected to an output port.
"""
# Fetch port names
inbound_port = self._query_node(node_in)
outbound_port = self._query_node(node_out)
remaining_in = list({"qin0", "qin1", "qin2"} - {inbound_port})
remaining_out = list({"qout0", "qout1", "qout2"} - {outbound_port})
# Construct routing table to relay in -> out
routing_table = {
inbound_port: outbound_port,
remaining_in[0]: remaining_out[0],
remaining_in[1]: remaining_out[1],
}
self.__logger.debug("========= RELAY SWITCHING ============")
self.__logger.debug(f"REGISTRY: {self.__registry}")
self.__logger.debug(f"INBOUND ({node_in}): {inbound_port}")
self.__logger.debug(f"OUTBOUND ({node_out}): {outbound_port}")
self.__logger.debug(f"REMAINING IN: {inbound_port}")
self.__logger.debug(f"REMAINING OUT: {inbound_port}")
self.__switch(routing_table)
# Switch to the initial saved configuration
[docs]
def default_switch(self):
"""
Reset the switch to the initial switching configuration (no deflections)
default: {"qin0": "qout0", "qin1": "qout1", "qin2": "qout2"}
"""
self.__switch(self.__routing_table)