Source code for rexfw.slaves

'''
Slave classes responsible for distributing requests to replicas, proposers, ...
'''

from rexfw import Parcel


[docs]class Slave(object): def __init__(self, replicas, comm): ''' Default slave class :param replicas: a dict of replicas with their names as keys :type replicas: dict :param comm: a communicator object to communicate with the master object :type comm: :class:`.AbstractCommunicator` ''' self.replicas = replicas self._comm = comm
[docs] def _listen(self): ''' Runs an infinite loop, polling for messages and passing them on to their destination, which currently is only a single replica ''' while True: parcel = self._receive_parcel() if parcel.receiver in self.replicas.iterkeys(): result = self.replicas[parcel.receiver].process_request(parcel.data) if result == -1: break else: raise ValueError("Replica '{}' not found.".format(parcel.receiver))
[docs] def listen(self): ''' Starts a thread and runs the infinite loop (_listen) ''' from threading import Thread self._thread = Thread(target=self._listen) self._thread.start()
[docs] def _receive_parcel(self): ''' Uses the communicator to receive a :class:`.Parcel` from any source ''' return self._comm.recv(source='all')
[docs]class UDCountsSlave(object): def __init__(self, replicas, comm, sim_path): self.replicas = replicas self._comm = comm self._sim_path = sim_path
[docs] def _listen(self): while True: parcel = self._receive_parcel() if parcel.receiver in self.replicas.iterkeys(): result = self.replicas[parcel.receiver].process_request(parcel.data) if result == -1: import numpy replica_name = self.replicas[self.replicas.keys()[0]].name numpy.save(self._sim_path + 'statistics/up_down_counts_{}.npy'.format(replica_name), self.replicas[replica_name].up_down_counts) break else: raise ValueError("Replica '{}' not found.".format(parcel.receiver))
[docs] def listen(self): from threading import Thread self._thread = Thread(target=self._listen) self._thread.start()
[docs] def _receive_parcel(self): return self._comm.recv(source='all')