diff --git a/qunetsim/components/network.py b/qunetsim/components/network.py index 0a028a67..f8de64a5 100644 --- a/qunetsim/components/network.py +++ b/qunetsim/components/network.py @@ -2,12 +2,14 @@ import time from inspect import signature from queue import Queue +from threading import Thread import matplotlib.pyplot as plt import networkx as nx from qunetsim.backends import EQSNBackend from qunetsim.objects import Qubit, RoutingPacket, Logger, DaemonThread +from qunetsim.objects.daemon_thread import is_thread_alive from qunetsim.utils.constants import Constants @@ -38,8 +40,7 @@ def __init__(self): self._quantum_routing_algo = nx.shortest_path self._classical_routing_algo = nx.shortest_path self._use_hop_by_hop = True - self._packet_queue = Queue() - self._stop_thread = False + self._packet_queues = {} self._use_ent_swap = False self._queue_processor_thread = None self._delay = 0.1 @@ -178,6 +179,12 @@ def packet_drop_rate(self, drop_rate): def arp(self): return self.ARP + def add_queue(self, host_id): + self._packet_queues[host_id] = Queue() + + def remove_queue(self, host_id): + del self._packet_queues[host_id] + @property def num_hosts(self): return len(self.arp.keys()) @@ -191,6 +198,7 @@ def add_host(self, host): """ Logger.get_instance().debug('host added: ' + host.host_id) + self.add_queue(host.host_id) self.ARP[host.host_id] = host self._update_network_graph(host) @@ -214,6 +222,7 @@ def remove_host(self, host): if host.host_id in self.ARP: del self.ARP[host.host_id] + self.remove_queue(host.host_id) if self.quantum_network.has_node(host.host_id): self.quantum_network.remove_node(host.host_id) if self.classical_network.has_node(host.host_id): @@ -498,19 +507,21 @@ def transfer_qubits(r, s, original_sender=None): i += 1 return True - def _process_queue(self): + def _process_queues(self): """ - Runs a thread for processing the packets in the packet queue. + Runs multiple threads for processing the packets in the packet queues. """ - while True: + def process_queue(packet_queue): + """ + A single thread processes the packet in a single queue. + Each host has it's own queue and thread for processing the queue. + """ + packet = packet_queue.get() - packet = self._packet_queue.get() - - if not packet: - # Stop the network - self._stop_thread = True - break + # If None packet is received, then stop thread + if not packet.payload: + return # Artificially delay the network if self.delay > 0: @@ -522,14 +533,14 @@ def _process_queue(self): Logger.get_instance().log("PACKET DROPPED") if packet.payload_type == Constants.QUANTUM: packet.payload.release() - continue + return sender, receiver = packet.sender, packet.receiver if packet.payload_type == Constants.QUANTUM: if not self._route_quantum_info(sender, receiver, [packet.payload]): - continue + return try: if packet.protocol == Constants.RELAY and not self.use_hop_by_hop: @@ -586,6 +597,12 @@ def _process_queue(self): except Exception as e: Logger.get_instance().error('Error in network: ' + str(e)) + while True: + for host_name, queue in self._packet_queues.items(): + if not queue.empty() and not is_thread_alive(host_name): + thread = Thread(target=process_queue, args=[queue], daemon=False, name=host_name) + thread.start() + def send(self, packet): """ Puts the packet to the packet queue of the network. @@ -593,8 +610,7 @@ def send(self, packet): Args: packet (Packet): Packet to be sent """ - - self._packet_queue.put(packet) + self._packet_queues.get(packet.sender).put(packet) def stop(self, stop_hosts=False): """ @@ -606,8 +622,10 @@ def stop(self, stop_hosts=False): if stop_hosts: for host in self.ARP: self.ARP[host].stop(release_qubits=True) + # Send None to queue to stop the queue + self.send(RoutingPacket(sender=host, receiver=None, protocol=None, payload_type=None, payload=None, + ttl=None, route=None)) - self.send(None) # Send None to queue to stop the queue if self._backend is not None: self._backend.stop() except Exception as e: @@ -624,7 +642,7 @@ def start(self, nodes=None, backend=None): self._backend = backend if nodes is not None: self._backend.start(nodes=nodes) - self._queue_processor_thread = DaemonThread(target=self._process_queue) + self._queue_processor_thread = DaemonThread(target=self._process_queues) def draw_classical_network(self): """ diff --git a/qunetsim/objects/daemon_thread.py b/qunetsim/objects/daemon_thread.py index e1f67df4..852cdea9 100644 --- a/qunetsim/objects/daemon_thread.py +++ b/qunetsim/objects/daemon_thread.py @@ -1,6 +1,12 @@ import threading +def is_thread_alive(thread_name): + for thread in threading.enumerate(): + if thread_name == thread: + return thread.isAlive() + + class DaemonThread(threading.Thread): """ A Daemon thread that runs a task until completion and then exits. """ diff --git a/qunetsim/objects/packets/routing_packet.py b/qunetsim/objects/packets/routing_packet.py index 0f077709..eeecf937 100644 --- a/qunetsim/objects/packets/routing_packet.py +++ b/qunetsim/objects/packets/routing_packet.py @@ -20,7 +20,7 @@ def __init__(self, sender, receiver, protocol, payload_type, payload, ttl, route ttl(int): Time-to-Live parameter route (List): Route the packet takes to its target host. """ - if not isinstance(payload, Packet): + if not isinstance(payload, Packet) and payload is not None: raise ValueError("For the routing packet the payload has to be a packet.") self._ttl = ttl