diff --git a/bilby/core/result.py b/bilby/core/result.py index cadd5b68a5f26de680227a7341506e7722aad717..940fa5993d45952cb341b15cce2e81e368bef621 100644 --- a/bilby/core/result.py +++ b/bilby/core/result.py @@ -1,5 +1,6 @@ from __future__ import division +import inspect import os from collections import OrderedDict, namedtuple from copy import copy @@ -95,6 +96,140 @@ def read_in_result(filename=None, outdir=None, label=None, extension='json', gzi return result +def get_weights_for_reweighting( + result, new_likelihood=None, new_prior=None, old_likelihood=None, + old_prior=None): + """ Calculate the weights for reweight() + + See bilby.core.result.reweight() for help with the inputs + + Returns + ------- + ln_weights: array + An array of the natural-log weights + new_log_likelihood_array: array + An array of the natural-log likelihoods + new_log_prior_array: array + An array of the natural-log priors + + """ + nposterior = len(result.posterior) + old_log_likelihood_array = np.zeros(nposterior) + old_log_prior_array = np.zeros(nposterior) + new_log_likelihood_array = np.zeros(nposterior) + new_log_prior_array = np.zeros(nposterior) + + for ii, sample in result.posterior.iterrows(): + # Convert sample to dictionary + par_sample = {key: sample[key] for key in result.search_parameter_keys} + + if old_likelihood is not None: + old_likelihood.parameters.update(par_sample) + old_log_likelihood_array[ii] = old_likelihood.log_likelihood() + else: + old_log_likelihood_array[ii] = sample["log_likelihood"] + + if new_likelihood is not None: + new_likelihood.parameters.update(par_sample) + new_log_likelihood_array[ii] = new_likelihood.log_likelihood() + else: + # Don't perform likelihood reweighting (i.e. likelihood isn't updated) + new_log_likelihood_array[ii] = old_log_likelihood_array[ii] + + if old_prior is not None: + old_log_prior_array[ii] = old_prior.ln_prob(par_sample) + else: + old_log_prior_array[ii] = sample["log_prior"] + + if new_prior is not None: + new_log_prior_array[ii] = new_prior.ln_prob(par_sample) + else: + # Don't perform prior reweighting (i.e. prior isn't updated) + new_log_prior_array[ii] = old_log_prior_array[ii] + + ln_weights = ( + new_log_likelihood_array + new_log_prior_array - old_log_likelihood_array - old_log_prior_array) + + return ln_weights, new_log_likelihood_array, new_log_prior_array + + +def rejection_sample(posterior, weights): + """ Perform rejection sampling on a posterior using weights + + Parameters + ---------- + posterior: pd.DataFrame + The dataframe containing posterior samples + weights: np.ndarray + An array of weights + + Returns + ------- + reweighted_posterior: pd.DataFrame + The posterior resampled using rejection sampling + + """ + keep = weights > np.random.uniform(0, max(weights), weights.shape) + return posterior.iloc[keep] + + +def reweight(result, label=None, new_likelihood=None, new_prior=None, + old_likelihood=None, old_prior=None): + """ Reweight a result to a new likelihood/prior using rejection sampling + + Parameters + ---------- + label: str, optional + An updated label to apply to the result object + new_likelihood: bilby.core.likelood.Likelihood, (optional) + If given, the new likelihood to reweight too. If not given, likelihood + reweighting is not applied + new_prior: bilby.core.prior.PriorDict, (optional) + If given, the new prior to reweight too. If not given, prior + reweighting is not applied + old_likelihood: bilby.core.likelihood.Likelihood, (optional) + If given, calculate the old likelihoods from this object. If not given, + the values stored in the posterior are used. + old_prior: bilby.core.prior.PriorDict, (optional) + If given, calculate the old prior from this object. If not given, + the values stored in the posterior are used. + + Returns + ------- + result: bilby.core.result.Result + A copy of the result object with a reweighted posterior + + """ + + result = copy(result) + nposterior = len(result.posterior) + logger.info("Reweighting posterior with {} samples".format(nposterior)) + + ln_weights, new_log_likelihood_array, new_log_prior_array = get_weights_for_reweighting( + result, new_likelihood=new_likelihood, new_prior=new_prior, + old_likelihood=old_likelihood, old_prior=old_prior) + + # Overwrite the likelihood and prior evaluations + result.posterior["log_likelihood"] = new_log_likelihood_array + result.posterior["log_prior"] = new_log_prior_array + + weights = np.exp(ln_weights) + + result.posterior = rejection_sample(result.posterior, weights=weights) + logger.info("Rejection sampling resulted in {} samples".format(len(result.posterior))) + result.meta_data["reweighted_using_rejection_sampling"] = True + + result.log_evidence += logsumexp(ln_weights) - np.log(nposterior) + result.priors = new_prior + + if label: + result.label = label + else: + result.label += "_reweighted" + + return result + + class Result(object): def __init__(self, label='no_label', outdir='.', sampler=None, search_parameter_keys=None, fixed_parameter_keys=None, @@ -1092,7 +1227,7 @@ class Result(object): return posterior def samples_to_posterior(self, likelihood=None, priors=None, - conversion_function=None): + conversion_function=None, npool=1): """ Convert array of samples to posterior (a Pandas data frame) @@ -1123,7 +1258,10 @@ class Result(object): else: data_frame['log_prior'] = self.log_prior_evaluations if conversion_function is not None: - data_frame = conversion_function(data_frame, likelihood, priors) + if "npool" in inspect.getargspec(conversion_function).args: + data_frame = conversion_function(data_frame, likelihood, priors, npool=npool) + else: + data_frame = conversion_function(data_frame, likelihood, priors) self.posterior = data_frame def calculate_prior_values(self, priors): diff --git a/bilby/core/sampler/__init__.py b/bilby/core/sampler/__init__.py index 686c2290bcc5896403b5fd811c6661d268af26ee..548ddb9448111c253d3035718b448be7a60a4e6c 100644 --- a/bilby/core/sampler/__init__.py +++ b/bilby/core/sampler/__init__.py @@ -50,7 +50,7 @@ def run_sampler(likelihood, priors=None, label='label', outdir='outdir', sampler='dynesty', use_ratio=None, injection_parameters=None, conversion_function=None, plot=False, default_priors_file=None, clean=None, meta_data=None, save=True, gzip=False, - result_class=None, **kwargs): + result_class=None, npool=1, **kwargs): """ The primary interface to easy parameter estimation @@ -99,6 +99,9 @@ def run_sampler(likelihood, priors=None, label='label', outdir='outdir', The result class to use. By default, `bilby.core.result.Result` is used, but objects which inherit from this class can be given providing additional methods. + npool: int + An integer specifying the available CPUs to create pool objects for + parallelization. **kwargs: All kwargs are passed directly to the samplers `run` function @@ -151,7 +154,7 @@ def run_sampler(likelihood, priors=None, label='label', outdir='outdir', likelihood, priors=priors, outdir=outdir, label=label, injection_parameters=injection_parameters, meta_data=meta_data, use_ratio=use_ratio, plot=plot, result_class=result_class, - **kwargs) + npool=npool, **kwargs) else: print(IMPLEMENTED_SAMPLERS) raise ValueError( @@ -161,7 +164,7 @@ def run_sampler(likelihood, priors=None, label='label', outdir='outdir', likelihood, priors=priors, outdir=outdir, label=label, use_ratio=use_ratio, plot=plot, injection_parameters=injection_parameters, meta_data=meta_data, - **kwargs) + npool=npool, **kwargs) else: raise ValueError( "Provided sampler should be a Sampler object or name of a known " @@ -204,7 +207,8 @@ def run_sampler(likelihood, priors=None, label='label', outdir='outdir', result.injection_parameters) result.samples_to_posterior(likelihood=likelihood, priors=result.priors, - conversion_function=conversion_function) + conversion_function=conversion_function, + npool=npool) if save: # The overwrite here ensures we overwrite the initially stored data diff --git a/bilby/core/sampler/base_sampler.py b/bilby/core/sampler/base_sampler.py index 6bf70b88c662ef905ac7784f16347e7ca524012f..2ae872b33383f03c13756fe89c62ca79a8372ce1 100644 --- a/bilby/core/sampler/base_sampler.py +++ b/bilby/core/sampler/base_sampler.py @@ -88,6 +88,7 @@ class Sampler(object): """ default_kwargs = dict() + npool_equiv_kwargs = ['queue_size', 'threads', 'nthreads', 'npool'] def __init__( self, likelihood, priors, outdir='outdir', label='label', diff --git a/bilby/core/sampler/cpnest.py b/bilby/core/sampler/cpnest.py index 63f454783ee9410dda379e56ef12aa0e143f4aac..a2b665687cf9aceefd628ba5d4b0dd6bbdd768f3 100644 --- a/bilby/core/sampler/cpnest.py +++ b/bilby/core/sampler/cpnest.py @@ -47,6 +47,11 @@ class Cpnest(NestedSampler): for equiv in self.npoints_equiv_kwargs: if equiv in kwargs: kwargs['nlive'] = kwargs.pop(equiv) + if 'nthreads' not in kwargs: + for equiv in self.npool_equiv_kwargs: + if equiv in kwargs: + kwargs['nthreads'] = kwargs.pop(equiv) + if 'seed' not in kwargs: logger.warning('No seed provided, cpnest will use 1234.') diff --git a/bilby/core/sampler/dynesty.py b/bilby/core/sampler/dynesty.py index a2d0be342ef0fbbce416b67ce92c79cbade16a00..b2587ecc97d9aff3a0684d42bd93e4a2deaf5508 100644 --- a/bilby/core/sampler/dynesty.py +++ b/bilby/core/sampler/dynesty.py @@ -208,6 +208,10 @@ class Dynesty(NestedSampler): for equiv in self.walks_equiv_kwargs: if equiv in kwargs: kwargs['walks'] = kwargs.pop(equiv) + if "queue_size" not in kwargs: + for equiv in self.npool_equiv_kwargs: + if equiv in kwargs: + kwargs['queue_size'] = kwargs.pop(equiv) def _verify_kwargs_against_default_kwargs(self): if not self.kwargs['walks']: diff --git a/bilby/core/sampler/ptemcee.py b/bilby/core/sampler/ptemcee.py index 0e9b71dce449df910c0bfba229f4afea91a145ba..7baa0a9a65ffce828aab4ef19b7b725130787380 100644 --- a/bilby/core/sampler/ptemcee.py +++ b/bilby/core/sampler/ptemcee.py @@ -221,6 +221,10 @@ class Ptemcee(MCMCSampler): for equiv in self.nwalkers_equiv_kwargs: if equiv in kwargs: kwargs["nwalkers"] = kwargs.pop(equiv) + if "threads" not in kwargs: + for equiv in self.npool_equiv_kwargs: + if equiv in kwargs: + kwargs["threads"] = kwargs.pop(equiv) def get_pos0_from_prior(self): """ Draw the initial positions from the prior diff --git a/bilby/core/utils.py b/bilby/core/utils.py index 272cfdb2915334638064f425b7dee62cc6564eb6..e2843a37961115156cf35ec51c4901604d6e8550 100644 --- a/bilby/core/utils.py +++ b/bilby/core/utils.py @@ -1214,6 +1214,26 @@ def safe_save_figure(fig, filename, **kwargs): fig.savefig(fname=filename, **kwargs) +def kish_log_effective_sample_size(ln_weights): + """ Calculate the Kish effective sample size from the natural-log weights + + See https://en.wikipedia.org/wiki/Effective_sample_size for details + + Parameters + ---------- + ln_weights: array + An array of the ln-weights + + Returns + ------- + ln_n_eff: + The natural-log of the effective sample size + + """ + log_n_eff = 2 * logsumexp(ln_weights) - logsumexp(2 * ln_weights) + return log_n_eff + + class IllegalDurationAndSamplingFrequencyException(Exception): pass diff --git a/bilby/gw/conversion.py b/bilby/gw/conversion.py index ced7cc6393d01a14999643711b3bc5638dc1f977..ca7f157ed656d0c8c528e733c8e082e8993f2c85 100644 --- a/bilby/gw/conversion.py +++ b/bilby/gw/conversion.py @@ -1,5 +1,6 @@ from __future__ import division import sys +import multiprocessing from tqdm import tqdm import numpy as np @@ -747,7 +748,7 @@ def lambda_tilde_to_lambda_1_lambda_2( def _generate_all_cbc_parameters(sample, defaults, base_conversion, - likelihood=None, priors=None): + likelihood=None, priors=None, npool=1): """Generate all cbc parameters, helper function for BBH/BNS""" output_sample = sample.copy() waveform_defaults = defaults @@ -770,7 +771,7 @@ def _generate_all_cbc_parameters(sample, defaults, base_conversion, ): try: generate_posterior_samples_from_marginalized_likelihood( - samples=output_sample, likelihood=likelihood) + samples=output_sample, likelihood=likelihood, npool=npool) except MarginalizedLikelihoodReconstructionError as e: logger.warning( "Marginalised parameter reconstruction failed with message " @@ -811,7 +812,7 @@ def _generate_all_cbc_parameters(sample, defaults, base_conversion, return output_sample -def generate_all_bbh_parameters(sample, likelihood=None, priors=None): +def generate_all_bbh_parameters(sample, likelihood=None, priors=None, npool=1): """ From either a single sample or a set of samples fill in all missing BBH parameters, in place. @@ -833,11 +834,11 @@ def generate_all_bbh_parameters(sample, likelihood=None, priors=None): output_sample = _generate_all_cbc_parameters( sample, defaults=waveform_defaults, base_conversion=convert_to_lal_binary_black_hole_parameters, - likelihood=likelihood, priors=priors) + likelihood=likelihood, priors=priors, npool=npool) return output_sample -def generate_all_bns_parameters(sample, likelihood=None, priors=None): +def generate_all_bns_parameters(sample, likelihood=None, priors=None, npool=1): """ From either a single sample or a set of samples fill in all missing BNS parameters, in place. @@ -855,6 +856,9 @@ def generate_all_bns_parameters(sample, likelihood=None, priors=None): likelihood.interferometers. priors: dict, optional Dictionary of prior objects, used to fill in non-sampled parameters. + npool: int, (default=1) + If given, perform generation (where possible) using a multiprocessing pool + """ waveform_defaults = { 'reference_frequency': 50.0, 'waveform_approximant': 'TaylorF2', @@ -862,7 +866,7 @@ def generate_all_bns_parameters(sample, likelihood=None, priors=None): output_sample = _generate_all_cbc_parameters( sample, defaults=waveform_defaults, base_conversion=convert_to_lal_binary_neutron_star_parameters, - likelihood=likelihood, priors=priors) + likelihood=likelihood, priors=priors, npool=npool) try: output_sample = generate_tidal_parameters(output_sample) except KeyError as e: @@ -1151,7 +1155,7 @@ def compute_snrs(sample, likelihood): def generate_posterior_samples_from_marginalized_likelihood( - samples, likelihood): + samples, likelihood, npool=1): """ Reconstruct the distance posterior from a run which used a likelihood which explicitly marginalised over time/distance/phase. @@ -1164,6 +1168,8 @@ def generate_posterior_samples_from_marginalized_likelihood( Posterior from run with a marginalised likelihood. likelihood: bilby.gw.likelihood.GravitationalWaveTransient Likelihood used during sampling. + npool: int, (default=1) + If given, perform generation (where possible) using a multiprocessing pool Return ------ @@ -1174,24 +1180,30 @@ def generate_posterior_samples_from_marginalized_likelihood( likelihood.distance_marginalization, likelihood.time_marginalization]): return samples - else: - logger.info('Reconstructing marginalised parameters.') + + # pass through a dictionary if isinstance(samples, dict): - pass - elif isinstance(samples, DataFrame): - new_time_samples = list() - new_distance_samples = list() - new_phase_samples = list() - for ii in tqdm(range(len(samples)), file=sys.stdout): - sample = dict(samples.iloc[ii]).copy() - likelihood.parameters.update(sample) - new_sample = likelihood.generate_posterior_sample_from_marginalized_likelihood() - new_time_samples.append(new_sample['geocent_time']) - new_distance_samples.append(new_sample['luminosity_distance']) - new_phase_samples.append(new_sample['phase']) - samples['geocent_time'] = new_time_samples - samples['luminosity_distance'] = new_distance_samples - samples['phase'] = new_phase_samples + return samples + elif not isinstance(samples, DataFrame): + raise ValueError("Unable to handle input samples of type {}".format(type(samples))) + + logger.info('Reconstructing marginalised parameters.') + + fill_args = [(ii, row, likelihood) for ii, row in samples.iterrows()] + if npool > 1: + pool = multiprocessing.Pool(processes=npool) + logger.info( + "Using a pool with size {} for nsamples={}" + .format(npool, len(samples)) + ) + new_samples = np.array(pool.map(fill_sample, tqdm(fill_args, file=sys.stdout))) + pool.close() + else: + new_samples = np.array([fill_sample(xx) for xx in tqdm(fill_args, file=sys.stdout)]) + + samples['geocent_time'] = new_samples[:, 0] + samples['luminosity_distance'] = new_samples[:, 1] + samples['phase'] = new_samples[:, 2] return samples @@ -1212,3 +1224,11 @@ def generate_sky_frame_parameters(samples, likelihood): new_samples = DataFrame(new_samples) for key in new_samples: samples[key] = new_samples[key] + + +def fill_sample(args): + ii, sample, likelihood = args + sample = dict(sample).copy() + likelihood.parameters.update(dict(sample).copy()) + new_sample = likelihood.generate_posterior_sample_from_marginalized_likelihood() + return new_sample["geocent_time"], new_sample["luminosity_distance"], new_sample["phase"]