'''
Replica classes which sample from a single PDF
'''
from abc import abstractmethod, abstractproperty
from rexfw import Parcel
from rexfw.replicas.requests import GetStateAndEnergyRequest, StoreStateEnergyRequest
[docs]class Replica(object):
_current_master = None
def __init__(self, name, state, pdf, sampler_class, sampler_params,
proposers, output_folder, comm):
'''
Default replica class
:param name: name of the replica object
:type name: str
:param state: initial state
:type state: up to you
:param pdf: a probability density function exposing the interface
defined in :class:`.AbstractPDF`
:type pdf: :class:`.AbstractPDF`
:param sampler_class: a class (not an instance) exposing the interface
defined in :class:`.AbstractSampler`
:type sampler: :class:`.AbstractSampler`
:param sampler_params: a dictionary holding any additional parameters
needed to instantiate the sampler object
:type sampler_params: dict
:param proposers: a dictionary containing proposers for the replica
with keys being the proposer names
:type proposers: dict
:param output_folder: the folder where simulation output will be stored
:type output_folder: str
:param comm: a communicator object to communicate with the master object
and other replicas
:type comm: :class:`.AbstractCommunicator`
'''
self.name = name
self.samples = []
self._sampler = None
self._state = None
self.state = state
self.pdf = pdf
self.sampler_class = sampler_class
self.sampler_params = sampler_params
self.proposers = proposers
self.output_folder = output_folder
self._comm = comm
self._setup_sampler()
self.energy_trace = []
self._n_samples_drawn = 0
self.sampler_stats = []
self._request_processing_table = {}
self._setup_request_processing_table()
[docs] def _setup_request_processing_table(self):
'''
Sets up a dictionary containing possible incoming request objects as
keys and string templates for the function calls to process the request
as values
'''
self._request_processing_table = dict(
SampleRequest='self._sample({})',
SendStatsRequest='self._send_stats({})',
ProposeRequest='self._propose({})',
AcceptBufferedProposalRequest='self._accept_buffered_proposal({})',
SendGetStateAndEnergyRequest='self._send_get_state_and_energy_request({})',
StoreStateEnergyRequest='self._store_state_energy({})',
GetStateAndEnergyRequest='self._send_state_and_energy({})',
DumpSamplesRequest='self._dump_samples({})',
DoNothingRequest='self._do_nothing({})',
DieRequest='-1')
[docs] def _do_nothing(self, request):
'''
Does nothing :-) Serves to synchronize communication with the master object
:param request: a :class:`.DoNothingRequest`
:type request: :class:`.DoNothingRequest`
'''
pass
[docs] def _setup_sampler(self):
'''
Instantiates the sampler object from the sampler class given in
:meth:`rexfw.replicas.Replica.__init__`
'''
from copy import deepcopy
self._sampler = self.sampler_class(self.pdf, deepcopy(self.state),
**self.sampler_params)
@property
def state(self):
'''
Returns the current state of the replica (and thus the sampler)
:return: current state of replica (and sampler)
:rtype: depends on your application
'''
return self._state
@state.setter
def state(self, value):
'''
Sets the replica and sampler state
:param value: value to set states to
:type value: depends on your application
'''
self._state = value
if not self._sampler is None:
self._sampler._state = value
[docs] def _send_state_and_energy(self, request):
'''
Sends the current state and energy to another replica and makes it store them
:param request: a request object telling this replica which other replica
to send state and energy to
:type request: :class:`.GetStateAndEnergyRequest`
'''
new_request = StoreStateEnergyRequest(self.name, self.state, self.energy)
self._comm.send(Parcel(self.name, request.sender, new_request), request.sender)
[docs] def _send_get_state_and_energy_request(self, request):
'''
Sends a :class:`.GetStateAndEnergyRequest` to another replica given in the request
parameter
:param request: a request object telling this replica which other replica
to ask for its state and energy
:type request: :class:`.SendGetStateAndEnergyRequest`
'''
self._current_master = request.sender
self._comm.send(Parcel(self.name, request.partner, GetStateAndEnergyRequest(self.name)),
request.partner)
[docs] def _store_state_energy(self, request):
'''
Stores state and energy given in request parameter. Also sends a
:class:`.DoNothingRequest` to the master object. This is neccessary to
keep communication in sync
:param request: a request object containing current state and energy of a different
replica
:type request: :class:`.StoreStateEnergyRequest`
'''
self._buffered_partner_state = request.state
self._buffered_partner_energy = request.energy
from rexfw.replicas.requests import DoNothingRequest
parcel = Parcel(self.name, self._current_master, DoNothingRequest(self.name))
self._comm.send(parcel, dest=self._current_master)
[docs] def _sample(self, request):
'''
Makes the sampler draw a sample, stores it, stores sampling statistics,
increases the sample counter and updates the energy trace list
:param request: a request object asking this replica to do all of the above
:type request: :class:`.SampleRequest`
'''
from copy import deepcopy
res = deepcopy(self._sampler.sample())
self.state = res
self.samples.append(deepcopy(res))
self.sampler_stats.append([self._n_samples_drawn, self._sampler.last_draw_stats])
self._update_energy_trace()
self._increase_sample_counter()
[docs] def _send_stats(self, request):
'''
Sends sampling statistics to master object. Also empties sampling stats list
:param request: a request object asking this replica to do the above
:type request: :class:`.SendStatsRequest`
'''
parcel = Parcel(self.name, request.sender, self.sampler_stats)
self._comm.send(parcel, request.sender)
self.sampler_stats = []
[docs] def _dump_samples(self, request):
'''
Writes samples and energies to a file, then empties list of stored samples
in memory
:param request: a request object containing information which samples to write
:type request: :class:`.DumpSamplesRequest`
'''
import numpy, os
filename = '{}samples/samples_{}_{}-{}.pickle'.format(self.output_folder,
self.name,
request.s_min + request.offset,
request.s_max + request.offset)
with open(filename, 'w') as opf:
from cPickle import dump
dump(self.samples[::request.dump_step], opf, 2)
self.samples = []
self._dump_energies()
[docs] def _dump_energies(self):
'''
Updates files with replica energies and empties list of stored energies
'''
import numpy, os
Es_folder = self.output_folder + 'energies/'
Es_filename = Es_folder + self.name + '.npy'
if os.path.exists(Es_filename):
self.energy_trace = list(numpy.load(Es_filename)) + self.energy_trace
numpy.save(Es_filename, numpy.array(self.energy_trace))
self.energy_trace = []
[docs] def process_request(self, request):
'''
Processes a request by looking up the corresponding function in the
request processing table
:param request: request object to process
:type request: depends
'''
return eval(self._request_processing_table[request.__class__.__name__].format('request'))
[docs] def _propose(self, request):
'''
Calculates a swap proposal state and sends works and heats to master object
:param request: a request object containing information needed to calculate
the proposal
:type request: :class:`.ProposeRequest`
'''
proposal = self._calculate_proposal(request)
self._send_works_heats(proposal)
self._buffered_proposal = proposal[-1]
[docs] def _send_works_heats(self, proposal):
'''
Sends works and heats corresponding to a swap proposal to the master object
:param proposal: a state proposed for swapping
:type proposal: depends on your application
'''
self._comm.send(Parcel(self.name, self._current_master,
(float(proposal.work), float(proposal.heat))),
self._current_master)
[docs] def _pick_proposer(self, params):
'''
Picks proposer from this replicas' proposer list
:param params: an object holding parameters defining the exchange to be performed
:type params: :class:`.ExchangeParams`
:return: the proposer object which will be used to calculate the proposal
:rtype: :class:`.AbstractProposer`
'''
return list(set(self.proposers.keys()).intersection(set(params.proposers)))[-1]
[docs] def _calculate_proposal(self, request):
'''
Calculates a proposal for a replica and using information given in request
parameter
:param request: a request object containing the name of the replica to which
a state will be proposed to and other information needed to
calculate the proposal
:type request: :class:`.ProposeRequest`
:return: a swap proposal state
:rtype: depends on your application
'''
partner_name = request.partner
params = request.params
proposer_params = params.proposer_params
self._current_master = request.sender
proposer = self._pick_proposer(params)
self.proposers[proposer].partner_name = partner_name
proposal = self.proposers[proposer].propose(self,
self._buffered_partner_state,
self._buffered_partner_energy,
proposer_params)
return proposal
[docs] def _accept_buffered_proposal(self, request):
'''
If the information in the request object says so, accepts a proposal
by setting the replica state to the buffered proposal and, in any case,
appending the current replica state to the list of stored samples. Also
syncs communication with master object and updates stored replica energies
and the sample counter
:param request: a request containing information whether the proposal
should be accepted or not
:type request: :class:`.AcceptBufferedProposalRequest`
'''
from copy import deepcopy
if request.accept:
self.state = self._buffered_proposal
self.samples.append(deepcopy(self.state))
from rexfw.replicas.requests import DoNothingRequest
self._comm.send(Parcel(self.name, self._current_master, DoNothingRequest(self.name)),
self._current_master)
self._update_energy_trace()
self._increase_sample_counter()
[docs] def _increase_sample_counter(self):
'''
Guess what - increases the sample counter!
'''
self._n_samples_drawn += 1
[docs] def _update_energy_trace(self):
'''
Updates the list of stored replica energies
'''
self.energy_trace.append(self.energy)
@property
def energy(self):
'''
Returns the replica energy, that is, the negative log-probability of the replica's
PDF evaluated at the current state
:return: the current replica energy
:rtype: depends on your application
'''
return self.get_energy(self.state)
[docs] def get_energy(self, state):
'''
Calculates replica energy (negative log-probability) for a given state
:param state: state for which to evaluate the negative log-probability of
the replica's PDF
:type state: depends on your application
'''
return -self.pdf.log_prob(state)