Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
likelihood.py 12.01 KiB
from __future__ import division, print_function
import numpy as np

try:
    from scipy.special import logsumexp
except ImportError:
    from scipy.misc import logsumexp
from scipy.special import i0e
from scipy.interpolate import interp1d
import tupak
import logging


class Likelihood(object):
    """ Empty likelihood class to be subclassed by other likelihoods """

    def __init__(self, parameters=None):
        self.parameters = parameters

    def log_likelihood(self):
        return np.nan

    def noise_log_likelihood(self):
        return np.nan

    def log_likelihood_ratio(self):
        return self.log_likelihood() - self.noise_log_likelihood()


class GravitationalWaveTransient(Likelihood):
    """ A gravitational-wave transient likelihood object

    This is the usual likelihood object to use for transient gravitational
    wave parameter estimation. It computes the log-likelihood in the frequency
    domain assuming a colored Gaussian noise model described by a power
    spectral density


    Parameters
    ----------
    interferometers: list
        A list of `tupak.detector.Interferometer` instances - contains the
        detector data and power spectral densities
    waveform_generator: `tupak.waveform_generator.WaveformGenerator`
        An object which computes the frequency-domain strain of the signal,
        given some set of parameters
    distance_marginalization: bool
        If true, analytic distance marginalization
    phase_marginalization: bool
        If true, analytic phase marginalization
    prior: dict
        If given, used in the distance and phase marginalization.

    Returns
    -------
    Likelihood: `tupak.likelihood.Likelihood`
        A likelihood object, able to compute the likelihood of the data given
        some model parameters

    """
    def __init__(self, interferometers, waveform_generator, distance_marginalization=False, phase_marginalization=False,
                 prior=None):
        Likelihood.__init__(self, waveform_generator.parameters)
        self.interferometers = interferometers
        self.waveform_generator = waveform_generator
        self.non_standard_sampling_parameter_keys = self.waveform_generator.non_standard_sampling_parameter_keys
        self.distance_marginalization = distance_marginalization
        self.phase_marginalization = phase_marginalization
        self.prior = prior

        if self.distance_marginalization:
            self.distance_array = np.array([])
            self.delta_distance = 0
            self.distance_prior_array = np.array([])
            self.setup_distance_marginalization()
            self.prior['luminosity_distance'] = 1

        if self.phase_marginalization:
            self.bessel_function_interped = None
            self.setup_phase_marginalization()
            self.prior['psi'] = 0

    @property
    def prior(self):
        return self.__prior

    @prior.setter
    def prior(self, prior):
        if prior is not None:
            self.__prior = prior
        else:
            self.__prior = dict()

    def noise_log_likelihood(self):
        log_l = 0
        for interferometer in self.interferometers:
            log_l -= tupak.utils.noise_weighted_inner_product(interferometer.data, interferometer.data,
                                                              interferometer.power_spectral_density_array,
                                                              self.waveform_generator.time_duration) / 2
        return log_l.real

    def log_likelihood_ratio(self):
        waveform_polarizations = self.waveform_generator.frequency_domain_strain()

        if waveform_polarizations is None:
            return np.nan_to_num(-np.inf)

        matched_filter_snr_squared = 0
        optimal_snr_squared = 0

        for interferometer in self.interferometers:
            signal_ifo = interferometer.get_detector_response(waveform_polarizations,
                                                              self.waveform_generator.parameters)
            matched_filter_snr_squared += tupak.utils.matched_filter_snr_squared(
                signal_ifo, interferometer, self.waveform_generator.time_duration)

            optimal_snr_squared += tupak.utils.optimal_snr_squared(
                signal_ifo, interferometer, self.waveform_generator.time_duration)

        if self.distance_marginalization:

            optimal_snr_squared_array = optimal_snr_squared \
                                        * self.waveform_generator.parameters['luminosity_distance'] ** 2 \
                                        / self.distance_array ** 2

            matched_filter_snr_squared_array = matched_filter_snr_squared * \
                self.waveform_generator.parameters['luminosity_distance'] / self.distance_array

            if self.phase_marginalization:
                matched_filter_snr_squared_array = self.bessel_function_interped(abs(matched_filter_snr_squared_array))
            else:
                matched_filter_snr_squared_array = np.real(matched_filter_snr_squared_array)

            log_l = logsumexp(matched_filter_snr_squared_array - optimal_snr_squared_array / 2,
                              b=self.distance_prior_array * self.delta_distance)
        elif self.phase_marginalization:
            matched_filter_snr_squared = self.bessel_function_interped(abs(matched_filter_snr_squared))
            log_l = matched_filter_snr_squared - optimal_snr_squared / 2

        else:
            log_l = matched_filter_snr_squared.real - optimal_snr_squared / 2

        return log_l.real

    def log_likelihood(self):
        return self.log_likelihood_ratio() + self.noise_log_likelihood()

    def setup_distance_marginalization(self):
        if 'luminosity_distance' not in self.prior.keys():
            logging.info('No prior provided for distance, using default prior.')
            self.prior['luminosity_distance'] = tupak.prior.create_default_prior('luminosity_distance')
        self.distance_array = np.linspace(self.prior['luminosity_distance'].minimum,
                                          self.prior['luminosity_distance'].maximum, int(1e4))
        self.delta_distance = self.distance_array[1] - self.distance_array[0]
        self.distance_prior_array = np.array([self.prior['luminosity_distance'].prob(distance)
                                              for distance in self.distance_array])

    def setup_phase_marginalization(self):
        if 'psi' not in self.prior.keys() or not isinstance(self.prior['psi'], tupak.prior.Prior):
            logging.info('No prior provided for polarization, using default prior.')
            self.prior['psi'] = tupak.prior.create_default_prior('psi')
        self.bessel_function_interped = interp1d(np.linspace(0, 1e6, int(1e5)),
                                                 np.log([i0e(snr) for snr in np.linspace(0, 1e6, int(1e5))])
                                                 + np.linspace(0, 1e6, int(1e5)),
                                                 bounds_error=False, fill_value=-np.inf)


class BasicGravitationalWaveTransient(Likelihood):
    """ A basic gravitational wave transient likelihood

    The simplest frequency-domain gravitational wave transient likelihood. Does
    not include distance/phase marginalization.

    Parameters
    ----------
    interferometers: list
        A list of `tupak.detector.Interferometer` instances - contains the
        detector data and power spectral densities
    waveform_generator: `tupak.waveform_generator.WaveformGenerator`
        An object which computes the frequency-domain strain of the signal,
        given some set of parameters

    Returns
    -------
    Likelihood: `tupak.likelihood.Likelihood`
        A likelihood object, able to compute the likelihood of the data given
        some model parameters

    """
    def __init__(self, interferometers, waveform_generator):
        Likelihood.__init__(self, waveform_generator.parameters)
        self.interferometers = interferometers
        self.waveform_generator = waveform_generator

    def noise_log_likelihood(self):
        log_l = 0
        for interferometer in self.interferometers:
            log_l -= 2. / self.waveform_generator.time_duration * np.sum(
                abs(interferometer.data) ** 2 /
                interferometer.power_spectral_density_array)
        return log_l.real

    def log_likelihood(self):
        log_l = 0
        waveform_polarizations = self.waveform_generator.frequency_domain_strain()
        if waveform_polarizations is None:
            return np.nan_to_num(-np.inf)
        for interferometer in self.interferometers:
            log_l += self.log_likelihood_interferometer(
                waveform_polarizations, interferometer)
        return log_l.real

    def log_likelihood_interferometer(self, waveform_polarizations,
                                      interferometer):
        signal_ifo = interferometer.get_detector_response(
            waveform_polarizations, self.waveform_generator.parameters)

        log_l = - 2. / self.waveform_generator.time_duration * np.vdot(
            interferometer.data - signal_ifo,
            (interferometer.data - signal_ifo)
            / interferometer.power_spectral_density_array)
        return log_l.real


def get_binary_black_hole_likelihood(interferometers):
    """ A rapper to quickly set up a likelihood for BBH parameter estimation

    Parameters
    ----------
    interferometers: list
        A list of `tupak.detector.Interferometer` instances, typically the
        output of either `tupak.detector.get_interferometer_with_open_data`
        or `tupak.detector.get_interferometer_with_fake_noise_and_injection`

    Returns
    -------
    likelihood: tupak.likelihood.GravitationalWaveTransient
        The likelihood to pass to `run_sampler`
    """
    waveform_generator = tupak.waveform_generator.WaveformGenerator(
        time_duration=interferometers[0].duration, sampling_frequency=interferometers[0].sampling_frequency,
        frequency_domain_source_model=tupak.source.lal_binary_black_hole,
        parameters={'waveform_approximant': 'IMRPhenomPv2', 'reference_frequency': 50})
    likelihood = tupak.likelihood.GravitationalWaveTransient(interferometers, waveform_generator)
    return likelihood


class HyperparameterLikelihood(Likelihood):
    """ A likelihood for infering hyperparameter posterior distributions

    See Eq. (1) of https://arxiv.org/abs/1801.02699 for a definition.

    Parameters
    ----------
    samples: list
        An N-dimensional list of individual sets of samples. Each set may have
        a different size.
    hyper_prior: `tupak.prior.Prior`
        A prior distribution with a `parameters` argument pointing to the
        hyperparameters to infer from the samples. These may need to be
        initialized to any arbitrary value, but this will not effect the
        result.
    run_prior: `tupak.prior.Prior`
        The prior distribution used in the inidivudal inferences which resulted
        in the set of samples.

    """

    def __init__(self, samples, hyper_prior, run_prior):
        Likelihood.__init__(self, parameters=hyper_prior.__dict__)
        self.samples = samples
        self.hyper_prior = hyper_prior
        self.run_prior = run_prior
        if hasattr(hyper_prior, 'lnprob') and hasattr(run_prior, 'lnprob'):
            logging.info("Using log-probabilities in likelihood")
            self.log_likelihood = self.log_likelihood_using_lnprob
        else:
            logging.info("Using probabilities in likelihood")
            self.log_likelihood = self.log_likelihood_using_prob

    def log_likelihood_using_lnprob(self):
        L = []
        self.hyper_prior.__dict__.update(self.parameters)
        for samp in self.samples:
            f = self.hyper_prior.lnprob(samp) - self.run_prior.lnprob(samp)
            L.append(logsumexp(f))
        return np.sum(L)

    def log_likelihood_using_prob(self):
        L = []
        self.hyper_prior.__dict__.update(self.parameters)
        for samp in self.samples:
            L.append(
                np.sum(self.hyper_prior.prob(samp) /
                       self.run_prior.prob(samp)))
        return np.sum(np.log(L))