-
Gregory Ashton authored
By putting in the dictionary as an ordered dict, the way priors are entered determines the order they are plotted, which is what most people will expect. - Also adds some catches to help improve error messages for when the input prior is something else.
Gregory Ashton authoredBy putting in the dictionary as an ordered dict, the way priors are entered determines the order they are plotted, which is what most people will expect. - Also adds some catches to help improve error messages for when the input prior is something else.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
prior.py 30.19 KiB
from __future__ import division
import tupak
import numpy as np
from scipy.interpolate import interp1d
from scipy.integrate import cumtrapz
from scipy.special import erf, erfinv
import scipy.stats
import os
from collections import OrderedDict
from tupak.core.utils import logger
from tupak.core import utils
class PriorSet(OrderedDict):
def __init__(self, dictionary=None, filename=None):
""" A set of priors
Parameters
----------
dictionary: dict, None
If given, a dictionary to generate the prior set.
filename: str, None
If given, a file containing the prior to generate the prior set.
"""
OrderedDict.__init__(self)
if type(dictionary) in [dict, OrderedDict]:
self.update(dictionary)
elif type(dictionary) is str:
logger.debug('Argument "dictionary" is a string.'
+ ' Assuming it is intended as a file name.')
self.read_in_file(dictionary)
elif type(filename) is str:
self.read_in_file(filename)
elif dictionary is not None:
raise ValueError("PriorSet input dictionay not understood")
def write_to_file(self, outdir, label):
""" Write the prior distribution to file.
Parameters
----------
outdir: str
output directory name
label: str
Output file naming scheme
"""
utils.check_directory_exists_and_if_not_mkdir(outdir)
prior_file = os.path.join(outdir, "{}.prior".format(label))
logger.debug("Writing priors to {}".format(prior_file))
with open(prior_file, "w") as outfile:
for key in self.keys():
outfile.write(
"{} = {}\n".format(key, self[key]))
def read_in_file(self, filename):
""" Reads in a prior from a file specification
Parameters
-------
filename: str
Name of the file to be read in
"""
prior = {}
with open(filename, 'r') as f:
for line in f:
if line[0] == '#':
continue
elements = line.split('=')
key = elements[0].replace(' ', '')
val = '='.join(elements[1:])
prior[key] = eval(val)
self.update(prior)
def convert_floats_to_delta_functions(self):
""" Convert all float parameters to delta functions """
for key in self:
if isinstance(self[key], Prior):
continue
elif isinstance(self[key], float) or isinstance(self[key], int):
self[key] = DeltaFunction(self[key])
logger.debug(
"{} converted to delta function prior.".format(key))
else:
logger.debug(
"{} cannot be converted to delta function prior."
.format(key))
def fill_priors(self, likelihood, default_priors_file=None):
"""
Fill dictionary of priors based on required parameters of likelihood
Any floats in prior will be converted to delta function prior. Any
required, non-specified parameters will use the default.
Note: if `likelihood` has `non_standard_sampling_parameter_keys`, then
this will set-up default priors for those as well.
Parameters
----------
likelihood: tupak.likelihood.GravitationalWaveTransient instance
Used to infer the set of parameters to fill the prior with
default_priors_file: str, optional
If given, a file containing the default priors; otherwise defaults
to the tupak defaults for a binary black hole.
Returns
-------
prior: dict
The filled prior dictionary
"""
self.convert_floats_to_delta_functions()
missing_keys = set(likelihood.parameters) - set(self.keys())
if getattr(likelihood, 'non_standard_sampling_parameter_keys', None) is not None:
for parameter in likelihood.non_standard_sampling_parameter_keys:
if parameter in self:
continue
self[parameter] = create_default_prior(parameter, default_priors_file)
for missing_key in missing_keys:
if not self.test_redundancy(missing_key):
default_prior = create_default_prior(missing_key, default_priors_file)
if default_prior is None:
set_val = likelihood.parameters[missing_key]
logger.warning(
"Parameter {} has no default prior and is set to {}, this"
" will not be sampled and may cause an error."
.format(missing_key, set_val))
else:
self[missing_key] = default_prior
for key in self:
self.test_redundancy(key)
def sample(self, size=None):
"""Draw samples from the prior set
Parameters
----------
size: int or tuple of ints, optional
See numpy.random.uniform docs
Returns
-------
dict: Dictionary of the samples
"""
return self.sample_subset(keys=self.keys(), size=size)
def sample_subset(self, keys=iter([]), size=None):
"""Draw samples from the prior set for parameters which are not a DeltaFunction
Parameters
----------
keys: list
List of prior keys to draw samples from
size: int or tuple of ints, optional
See numpy.random.uniform docs
Returns
-------
dict: Dictionary of the drawn samples
"""
self.convert_floats_to_delta_functions()
samples = dict()
for key in keys:
if isinstance(self[key], Prior):
samples[key] = self[key].sample(size=size)
else:
logger.debug('{} not a known prior.'.format(key))
return samples
def prob(self, sample):
"""
Parameters
----------
sample: dict
Dictionary of the samples of which we want to have the probability of
Returns
-------
float: Joint probability of all individual sample probabilities
"""
return np.product([self[key].prob(sample[key]) for key in sample])
def ln_prob(self, sample):
"""
Parameters
----------
sample: dict
Dictionary of the samples of which we want to have the log probability of
Returns
-------
float: Joint log probability of all the individual sample probabilities
"""
return np.sum([self[key].ln_prob(sample[key]) for key in sample])
def rescale(self, keys, theta):
"""Rescale samples from unit cube to prior
Parameters
----------
keys: list
List of prior keys to be rescaled
theta: list
List of randomly drawn values on a unit cube associated with the prior keys
Returns
-------
list: List of floats containing the rescaled sample
"""
return [self[key].rescale(sample) for key, sample in zip(keys, theta)]
def test_redundancy(self, key):
"""Empty redundancy test, should be overwritten in subclasses"""
return False
def create_default_prior(name, default_priors_file=None):
"""Make a default prior for a parameter with a known name.
Parameters
----------
name: str
Parameter name
default_priors_file: str, optional
If given, a file containing the default priors; otherwise defaults to
the tupak defaults for a binary black hole.
Return
------
prior: Prior
Default prior distribution for that parameter, if unknown None is
returned.
"""
if default_priors_file is None:
logger.debug(
"No prior file given.")
prior = None
else:
default_priors = PriorSet(filename=default_priors_file)
if name in default_priors.keys():
prior = default_priors[name]
else:
logger.debug(
"No default prior found for variable {}.".format(name))
prior = None
return prior
class Prior(object):
_default_latex_labels = dict()
def __init__(self, name=None, latex_label=None, minimum=-np.inf, maximum=np.inf):
""" Implements a Prior object
Parameters
----------
name: str, optional
Name associated with prior.
latex_label: str, optional
Latex label associated with prior, used for plotting.
minimum: float, optional
Minimum of the domain, default=-np.inf
maximum: float, optional
Maximum of the domain, default=np.inf
"""
self.name = name
self.latex_label = latex_label
self.minimum = minimum
self.maximum = maximum
def __call__(self):
"""Overrides the __call__ special method. Calls the sample method.
Returns
-------
float: The return value of the sample method.
"""
return self.sample()
def sample(self, size=None):
"""Draw a sample from the prior
Parameters
----------
size: int or tuple of ints, optional
See numpy.random.uniform docs
Returns
-------
float: A random number between 0 and 1, rescaled to match the distribution of this Prior
"""
return self.rescale(np.random.uniform(0, 1, size))
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to the prior.
This should be overwritten by each subclass.
Parameters
----------
val: float
A random number between 0 and 1
Returns
-------
None
"""
return None
def prob(self, val):
"""Return the prior probability of val, this should be overwritten
Parameters
----------
val: float
Returns
-------
np.nan
"""
return np.nan
def ln_prob(self, val):
"""Return the prior ln probability of val, this should be overwritten
Parameters
----------
val: float
Returns
-------
np.nan
"""
return np.log(self.prob(val))
@staticmethod
def test_valid_for_rescaling(val):
"""Test if 0 < val < 1
Parameters
----------
val: float
Raises
-------
ValueError: If val is not between 0 and 1
"""
val = np.atleast_1d(val)
tests = (val < 0) + (val > 1)
if np.any(tests):
raise ValueError("Number to be rescaled should be in [0, 1]")
def __repr__(self):
"""Overrides the special method __repr__.
Should return a representation of this instance that resembles how it is instantiated
Returns
-------
str: A string representation of this instance
"""
return self._subclass_repr_helper()
def _subclass_repr_helper(self, subclass_args=iter([])):
"""Helps out subclass _repr__ methods by creating a common template
Parameters
----------
subclass_args: list, optional
List of attributes in the subclass instance
Returns
-------
str: A string representation for this subclass instance.
"""
prior_name = self.__class__.__name__
args = ['name', 'latex_label', 'minimum', 'maximum']
args.extend(subclass_args)
property_names = [p for p in dir(self.__class__) if isinstance(getattr(self.__class__, p), property)]
dict_with_properties = self.__dict__.copy()
for key in property_names:
dict_with_properties[key] = getattr(self, key)
args = ', '.join(['{}={}'.format(key, repr(dict_with_properties[key])) for key in args])
return "{}({})".format(prior_name, args)
@property
def is_fixed(self):
"""
Returns True if the prior is fixed and should not be used in the sampler. Does this by checking if this instance
is an instance of DeltaFunction.
Returns
-------
bool: Whether it's fixed or not!
"""
return isinstance(self, DeltaFunction)
@property
def latex_label(self):
"""Latex label that can be used for plots.
Draws from a set of default labels if no label is given
Returns
-------
str: A latex representation for this prior
"""
return self.__latex_label
@latex_label.setter
def latex_label(self, latex_label=None):
if latex_label is None:
self.__latex_label = self.__default_latex_label
else:
self.__latex_label = latex_label
@property
def minimum(self):
return self.__minimum
@minimum.setter
def minimum(self, minimum):
self.__minimum = minimum
@property
def maximum(self):
return self.__maximum
@maximum.setter
def maximum(self, maximum):
self.__maximum = maximum
@property
def __default_latex_label(self):
if self.name in self._default_latex_labels.keys():
label = self._default_latex_labels[self.name]
else:
label = self.name
return label
class DeltaFunction(Prior):
def __init__(self, peak, name=None, latex_label=None):
"""Dirac delta function prior, this always returns peak.
Parameters
----------
peak: float
Peak value of the delta function
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label, minimum=peak, maximum=peak)
self.peak = peak
def rescale(self, val):
"""Rescale everything to the peak with the correct shape.
Parameters
----------
val: float
Returns
-------
float: Rescaled probability, equivalent to peak
"""
Prior.test_valid_for_rescaling(val)
return self.peak * val ** 0
def prob(self, val):
"""Return the prior probability of val
Parameters
----------
val: float
Returns
-------
float: np.inf if val = peak, 0 otherwise
"""
if self.peak == val:
return np.inf
else:
return 0
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['peak'])
class PowerLaw(Prior):
def __init__(self, alpha, minimum, maximum, name=None, latex_label=None):
"""Power law with bounds and alpha, spectral index
Parameters
----------
alpha: float
Power law exponent parameter
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label, minimum, maximum)
self.alpha = alpha
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to the power-law prior.
This maps to the inverse CDF. This has been analytically solved for this case.
Parameters
----------
val: float
Uniform probability
Returns
-------
float: Rescaled probability
"""
Prior.test_valid_for_rescaling(val)
if self.alpha == -1:
return self.minimum * np.exp(val * np.log(self.maximum / self.minimum))
else:
return (self.minimum ** (1 + self.alpha) + val *
(self.maximum ** (1 + self.alpha) - self.minimum ** (1 + self.alpha))) ** (1. / (1 + self.alpha))
def prob(self, val):
"""Return the prior probability of val
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
in_prior = (val >= self.minimum) & (val <= self.maximum)
if self.alpha == -1:
return np.nan_to_num(1 / val / np.log(self.maximum / self.minimum)) * in_prior
else:
return np.nan_to_num(val ** self.alpha * (1 + self.alpha) / (self.maximum ** (1 + self.alpha)
- self.minimum ** (1 + self.alpha))) * in_prior
def lnprob(self, val):
"""Return the logarithmic prior probability of val
Parameters
----------
val: float
Returns
-------
float:
"""
in_prior = (val >= self.minimum) & (val <= self.maximum)
normalising = (1 + self.alpha) / (self.maximum ** (1 + self.alpha)
- self.minimum ** (1 + self.alpha))
return self.alpha * np.log(val) * np.log(normalising) * in_prior
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['alpha'])
class Uniform(Prior):
def __init__(self, minimum, maximum, name=None, latex_label=None):
"""Uniform prior with bounds
Parameters
----------
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label, minimum, maximum)
def rescale(self, val):
Prior.test_valid_for_rescaling(val)
return self.minimum + val * (self.maximum - self.minimum)
def prob(self, val):
"""Return the prior probability of val
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
return scipy.stats.uniform.pdf(val, loc=self.minimum,
scale=self.maximum-self.minimum)
def ln_prob(self, val):
"""Return the log prior probability of val
Parameters
----------
val: float
Returns
-------
float: log probability of val
"""
return scipy.stats.uniform.logpdf(val, loc=self.minimum,
scale=self.maximum-self.minimum)
class LogUniform(PowerLaw):
def __init__(self, minimum, maximum, name=None, latex_label=None):
"""Log-Uniform prior with bounds
Parameters
----------
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
PowerLaw.__init__(self, name=name, latex_label=latex_label, minimum=minimum, maximum=maximum, alpha=-1)
if self.minimum <= 0:
logger.warning('You specified a uniform-in-log prior with minimum={}'.format(self.minimum))
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self)
class Cosine(Prior):
def __init__(self, name=None, latex_label=None, minimum=-np.pi / 2, maximum=np.pi / 2):
"""Cosine prior with bounds
Parameters
----------
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label, minimum, maximum)
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to a uniform in cosine prior.
This maps to the inverse CDF. This has been analytically solved for this case.
"""
Prior.test_valid_for_rescaling(val)
return np.arcsin(-1 + val * 2)
def prob(self, val):
"""Return the prior probability of val. Defined over [-pi/2, pi/2].
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
in_prior = (val >= self.minimum) & (val <= self.maximum)
return np.cos(val) / 2 * in_prior
class Sine(Prior):
def __init__(self, name=None, latex_label=None, minimum=0, maximum=np.pi):
"""Sine prior with bounds
Parameters
----------
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label, minimum, maximum)
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to a uniform in sine prior.
This maps to the inverse CDF. This has been analytically solved for this case.
"""
Prior.test_valid_for_rescaling(val)
return np.arccos(1 - val * 2)
def prob(self, val):
"""Return the prior probability of val. Defined over [0, pi].
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
in_prior = (val >= self.minimum) & (val <= self.maximum)
return np.sin(val) / 2 * in_prior
class Gaussian(Prior):
def __init__(self, mu, sigma, name=None, latex_label=None):
"""Gaussian prior with mean mu and width sigma
Parameters
----------
mu: float
Mean of the Gaussian prior
sigma:
Width/Standard deviation of the Gaussian prior
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name, latex_label)
self.mu = mu
self.sigma = sigma
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to the appropriate Gaussian prior.
This maps to the inverse CDF. This has been analytically solved for this case.
"""
Prior.test_valid_for_rescaling(val)
return self.mu + erfinv(2 * val - 1) * 2 ** 0.5 * self.sigma
def prob(self, val):
"""Return the prior probability of val.
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
return np.exp(-(self.mu - val) ** 2 / (2 * self.sigma ** 2)) / (2 * np.pi) ** 0.5 / self.sigma
def lnprob(self, val):
return -0.5 * ((self.mu - val) ** 2 / self.sigma ** 2 + np.log(2 * np.pi * self.sigma ** 2))
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['mu', 'sigma'])
class TruncatedGaussian(Prior):
def __init__(self, mu, sigma, minimum, maximum, name=None, latex_label=None):
"""Truncated Gaussian prior with mean mu and width sigma
https://en.wikipedia.org/wiki/Truncated_normal_distribution
Parameters
----------
mu: float
Mean of the Gaussian prior
sigma:
Width/Standard deviation of the Gaussian prior
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
"""
Prior.__init__(self, name=name, latex_label=latex_label, minimum=minimum, maximum=maximum)
self.mu = mu
self.sigma = sigma
@property
def normalisation(self):
""" Calculates the proper normalisation of the truncated Gaussian
Returns
-------
float: Proper normalisation of the truncated Gaussian
"""
return (erf((self.maximum - self.mu) / 2 ** 0.5 / self.sigma) - erf(
(self.minimum - self.mu) / 2 ** 0.5 / self.sigma)) / 2
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to the appropriate truncated Gaussian prior.
This maps to the inverse CDF. This has been analytically solved for this case.
"""
Prior.test_valid_for_rescaling(val)
return erfinv(2 * val * self.normalisation + erf(
(self.minimum - self.mu) / 2 ** 0.5 / self.sigma)) * 2 ** 0.5 * self.sigma + self.mu
def prob(self, val):
"""Return the prior probability of val.
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
in_prior = (val >= self.minimum) & (val <= self.maximum)
return np.exp(-(self.mu - val) ** 2 / (2 * self.sigma ** 2)) / (
2 * np.pi) ** 0.5 / self.sigma / self.normalisation * in_prior
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['mu', 'sigma'])
class Interped(Prior):
def __init__(self, xx, yy, minimum=np.nan, maximum=np.nan, name=None, latex_label=None):
"""Creates an interpolated prior function from arrays of xx and yy=p(xx)
Parameters
----------
xx: array_like
x values for the to be interpolated prior function
yy: array_like
p(xx) values for the to be interpolated prior function
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
Attributes
-------
probability_density: scipy.interpolate.interp1d
Interpolated prior probability distribution
cumulative_distribution: scipy.interpolate.interp1d
Interpolated cumulative prior probability distribution
inverse_cumulative_distribution: scipy.interpolate.interp1d
Inverted cumulative prior probability distribution
YY: array_like
Cumulative prior probability distribution
"""
self.xx = xx
self.yy = yy
self.__all_interpolated = interp1d(x=xx, y=yy, bounds_error=False, fill_value=0)
Prior.__init__(self, name, latex_label,
minimum=np.nanmax(np.array((min(xx), minimum))),
maximum=np.nanmin(np.array((max(xx), maximum))))
self.__initialize_attributes()
def prob(self, val):
"""Return the prior probability of val.
Parameters
----------
val: float
Returns
-------
float: Prior probability of val
"""
return self.probability_density(val)
def rescale(self, val):
"""
'Rescale' a sample from the unit line element to the prior.
This maps to the inverse CDF. This is done using interpolation.
"""
Prior.test_valid_for_rescaling(val)
rescaled = self.inverse_cumulative_distribution(val)
if rescaled.shape == ():
rescaled = float(rescaled)
return rescaled
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['xx', 'yy'])
@property
def minimum(self):
"""Return minimum of the prior distribution.
Updates the prior distribution if minimum is set to a different value.
Returns
-------
float: Minimum of the prior distribution
"""
return self.__minimum
@minimum.setter
def minimum(self, minimum):
self.__minimum = minimum
if '_Interped__maximum' in self.__dict__ and self.__maximum < np.inf:
self.__update_instance()
@property
def maximum(self):
"""Return maximum of the prior distribution.
Updates the prior distribution if maximum is set to a different value.
Returns
-------
float: Maximum of the prior distribution
"""
return self.__maximum
@maximum.setter
def maximum(self, maximum):
self.__maximum = maximum
if '_Interped__minimum' in self.__dict__ and self.__minimum < np.inf:
self.__update_instance()
def __update_instance(self):
self.xx = np.linspace(self.minimum, self.maximum, len(self.xx))
self.yy = self.__all_interpolated(self.xx)
self.__initialize_attributes()
def __initialize_attributes(self):
if np.trapz(self.yy, self.xx) != 1:
logger.debug('Supplied PDF for {} is not normalised, normalising.'.format(self.name))
self.yy /= np.trapz(self.yy, self.xx)
self.YY = cumtrapz(self.yy, self.xx, initial=0)
# Need last element of cumulative distribution to be exactly one.
self.YY[-1] = 1
self.probability_density = interp1d(x=self.xx, y=self.yy, bounds_error=False, fill_value=0)
self.cumulative_distribution = interp1d(x=self.xx, y=self.YY, bounds_error=False, fill_value=0)
self.inverse_cumulative_distribution = interp1d(x=self.YY, y=self.xx, bounds_error=True)
class FromFile(Interped):
def __init__(self, file_name, minimum=None, maximum=None, name=None, latex_label=None):
"""Creates an interpolated prior function from arrays of xx and yy=p(xx) extracted from a file
Parameters
----------
file_name: str
Name of the file containing the xx and yy arrays
minimum: float
See superclass
maximum: float
See superclass
name: str
See superclass
latex_label: str
See superclass
Attributes
-------
all_interpolated: scipy.interpolate.interp1d
Interpolated prior function
"""
try:
self.id = file_name
xx, yy = np.genfromtxt(self.id).T
Interped.__init__(self, xx=xx, yy=yy, minimum=minimum, maximum=maximum, name=name, latex_label=latex_label)
except IOError:
logger.warning("Can't load {}.".format(self.id))
logger.warning("Format should be:")
logger.warning(r"x\tp(x)")
def __repr__(self):
"""Call to helper method in the super class."""
return Prior._subclass_repr_helper(self, subclass_args=['id'])