from __future__ import absolute_import import tupak import unittest from mock import Mock import numpy as np import os import shutil from collections import OrderedDict class TestPriorInstantiationWithoutOptionalPriors(unittest.TestCase): def setUp(self): self.prior = tupak.core.prior.Prior() def tearDown(self): del self.prior def test_name(self): self.assertIsNone(self.prior.name) def test_latex_label(self): self.assertIsNone(self.prior.latex_label) def test_is_fixed(self): self.assertFalse(self.prior.is_fixed) def test_class_instance(self): self.assertIsInstance(self.prior, tupak.core.prior.Prior) def test_magic_call_is_the_same_as_sampling(self): self.prior.sample = Mock(return_value=0.5) self.assertEqual(self.prior.sample(), self.prior()) def test_base_rescale_method(self): self.assertIsNone(self.prior.rescale(1)) def test_base_repr(self): self.prior = tupak.core.prior.Prior(name='test_name', latex_label='test_label', minimum=0, maximum=1) expected_string = "Prior(name='test_name', latex_label='test_label', unit=None, minimum=0, maximum=1)" self.assertEqual(expected_string, self.prior.__repr__()) def test_base_prob(self): self.assertTrue(np.isnan(self.prior.prob(5))) def test_base_ln_prob(self): self.prior.prob = lambda val: val self.assertEqual(np.log(5), self.prior.ln_prob(5)) def test_is_in_prior(self): self.prior.minimum = 0 self.prior.maximum = 1 val_below = self.prior.minimum - 0.1 val_at_minimum = self.prior.minimum val_in_prior = (self.prior.minimum + self.prior.maximum)/2. val_at_maximum = self.prior.maximum val_above = self.prior.maximum + 0.1 self.assertTrue(self.prior.is_in_prior_range(val_at_minimum)) self.assertTrue(self.prior.is_in_prior_range(val_at_maximum)) self.assertTrue(self.prior.is_in_prior_range(val_in_prior)) self.assertFalse(self.prior.is_in_prior_range(val_below)) self.assertFalse(self.prior.is_in_prior_range(val_above)) class TestPriorName(unittest.TestCase): def setUp(self): self.test_name = 'test_name' self.prior = tupak.core.prior.Prior(self.test_name) def tearDown(self): del self.prior del self.test_name def test_name_assignment(self): self.prior.name = "other_name" self.assertEqual(self.prior.name, "other_name") class TestPriorLatexLabel(unittest.TestCase): def setUp(self): self.test_name = 'test_name' self.prior = tupak.core.prior.Prior(self.test_name) def tearDown(self): del self.test_name del self.prior def test_label_assignment(self): test_label = 'test_label' self.prior.latex_label = 'test_label' self.assertEqual(test_label, self.prior.latex_label) def test_default_label_assignment(self): self.prior.name = 'chirp_mass' self.prior.latex_label = None self.assertEqual(self.prior.latex_label, '$\mathcal{M}$') def test_default_label_assignment_default(self): self.assertTrue(self.prior.latex_label, self.prior.name) class TestPriorIsFixed(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_is_fixed_parent_class(self): self.prior = tupak.core.prior.Prior() self.assertFalse(self.prior.is_fixed) def test_is_fixed_delta_function_class(self): self.prior = tupak.core.prior.DeltaFunction(peak=0) self.assertTrue(self.prior.is_fixed) def test_is_fixed_uniform_class(self): self.prior = tupak.core.prior.Uniform(minimum=0, maximum=10) self.assertFalse(self.prior.is_fixed) class TestPriorClasses(unittest.TestCase): def setUp(self): self.priors = [ tupak.core.prior.DeltaFunction(name='test', unit='unit', peak=1), tupak.core.prior.Gaussian(name='test', unit='unit', mu=0, sigma=1), tupak.core.prior.Normal(name='test', unit='unit', mu=0, sigma=1), tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1), tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=2, minimum=1, maximum=1e2), tupak.core.prior.Uniform(name='test', unit='unit', minimum=0, maximum=1), tupak.core.prior.LogUniform(name='test', unit='unit', minimum=5e0, maximum=1e2), tupak.gw.prior.UniformComovingVolume(name='test', minimum=2e2, maximum=5e3), tupak.core.prior.Sine(name='test', unit='unit'), tupak.core.prior.Cosine(name='test', unit='unit'), tupak.core.prior.Interped(name='test', unit='unit', xx=np.linspace(0, 10, 1000), yy=np.linspace(0, 10, 1000) ** 4, minimum=3, maximum=5), tupak.core.prior.TruncatedGaussian(name='test', unit='unit', mu=1, sigma=0.4, minimum=-1, maximum=1), tupak.core.prior.TruncatedNormal(name='test', unit='unit', mu=1, sigma=0.4, minimum=-1, maximum=1), tupak.core.prior.HalfGaussian(name='test', unit='unit', sigma=1), tupak.core.prior.HalfNormal(name='test', unit='unit', sigma=1), tupak.core.prior.LogGaussian(name='test', unit='unit', mu=0, sigma=1), tupak.core.prior.LogNormal(name='test', unit='unit', mu=0, sigma=1), tupak.core.prior.Exponential(name='test', unit='unit', mu=1), tupak.core.prior.StudentT(name='test', unit='unit', df=3, mu=0, scale=1), tupak.core.prior.Beta(name='test', unit='unit', alpha=2.0, beta=2.0), tupak.core.prior.Logistic(name='test', unit='unit', mu=0, scale=1), tupak.core.prior.Cauchy(name='test', unit='unit', alpha=0, beta=1), tupak.core.prior.Lorentzian(name='test', unit='unit', alpha=0, beta=1), tupak.core.prior.Gamma(name='test', unit='unit', k=1, theta=1), tupak.core.prior.ChiSquared(name='test', unit='unit', nu=2), tupak.gw.prior.AlignedSpin(name='test', unit='unit') ] def test_minimum_rescaling(self): """Test the the rescaling works as expected.""" for prior in self.priors: minimum_sample = prior.rescale(0) self.assertAlmostEqual(minimum_sample, prior.minimum) def test_maximum_rescaling(self): """Test the the rescaling works as expected.""" for prior in self.priors: maximum_sample = prior.rescale(1) self.assertAlmostEqual(maximum_sample, prior.maximum) def test_many_sample_rescaling(self): """Test the the rescaling works as expected.""" for prior in self.priors: many_samples = prior.rescale(np.random.uniform(0, 1, 1000)) self.assertTrue(all((many_samples >= prior.minimum) & (many_samples <= prior.maximum))) def test_out_of_bounds_rescaling(self): """Test the the rescaling works as expected.""" for prior in self.priors: self.assertRaises(ValueError, lambda: prior.rescale(-1)) def test_sampling_single(self): """Test that sampling from the prior always returns values within its domain.""" for prior in self.priors: single_sample = prior.sample() self.assertTrue((single_sample >= prior.minimum) & (single_sample <= prior.maximum)) def test_sampling_many(self): """Test that sampling from the prior always returns values within its domain.""" for prior in self.priors: many_samples = prior.sample(1000) self.assertTrue(all((many_samples >= prior.minimum) & (many_samples <= prior.maximum))) def test_probability_above_domain(self): """Test that the prior probability is non-negative in domain of validity and zero outside.""" for prior in self.priors: if prior.maximum != np.inf: outside_domain = np.linspace(prior.maximum + 1, prior.maximum + 1e4, 1000) self.assertTrue(all(prior.prob(outside_domain) == 0)) def test_probability_below_domain(self): """Test that the prior probability is non-negative in domain of validity and zero outside.""" for prior in self.priors: if prior.minimum != -np.inf: outside_domain = np.linspace(prior.minimum - 1e4, prior.minimum - 1, 1000) self.assertTrue(all(prior.prob(outside_domain) == 0)) def test_probability_in_domain(self): """Test that the prior probability is non-negative in domain of validity and zero outside.""" for prior in self.priors: if prior.minimum == -np.inf: prior.minimum = -1e5 if prior.maximum == np.inf: prior.maximum = 1e5 domain = np.linspace(prior.minimum, prior.maximum, 1000) self.assertTrue(all(prior.prob(domain) >= 0)) def test_probability_surrounding_domain(self): """Test that the prior probability is non-negative in domain of validity and zero outside.""" for prior in self.priors: # skip delta function prior in this case if isinstance(prior, tupak.core.prior.DeltaFunction): continue surround_domain = np.linspace(prior.minimum - 1, prior.maximum + 1, 1000) prior.prob(surround_domain) def test_normalized(self): """Test that each of the priors are normalised, this needs care for delta function and Gaussian priors""" for prior in self.priors: if isinstance(prior, tupak.core.prior.DeltaFunction): continue if isinstance(prior, tupak.core.prior.Cauchy): continue elif isinstance(prior, tupak.core.prior.Gaussian): domain = np.linspace(-1e2, 1e2, 1000) elif isinstance(prior, tupak.core.prior.Cauchy): domain = np.linspace(-1e2, 1e2, 1000) elif isinstance(prior, tupak.core.prior.StudentT): domain = np.linspace(-1e2, 1e2, 1000) elif isinstance(prior, tupak.core.prior.HalfGaussian): domain = np.linspace(0., 1e2, 1000) elif isinstance(prior, tupak.core.prior.Gamma): domain = np.linspace(0., 1e2, 5000) elif isinstance(prior, tupak.core.prior.LogNormal): domain = np.linspace(0., 1e2, 1000) elif isinstance(prior, tupak.core.prior.Exponential): domain = np.linspace(0., 1e2, 5000) elif isinstance(prior, tupak.core.prior.Logistic): domain = np.linspace(-1e2, 1e2, 1000) else: domain = np.linspace(prior.minimum, prior.maximum, 1000) self.assertAlmostEqual(np.trapz(prior.prob(domain), domain), 1, 3) def test_unit_setting(self): for prior in self.priors: if isinstance(prior, tupak.gw.prior.UniformComovingVolume): self.assertEqual('Mpc', prior.unit) else: self.assertEqual('unit', prior.unit) def test_eq_different_classes(self): for i in range(len(self.priors)): for j in range(len(self.priors)): if i == j: self.assertEqual(self.priors[i], self.priors[j]) else: self.assertNotEqual(self.priors[i], self.priors[j]) def test_eq_other_condition(self): prior_1 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1) prior_2 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1.5) self.assertNotEqual(prior_1, prior_2) def test_eq_different_keys(self): prior_1 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1) prior_2 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1) prior_2.other_key = 5 self.assertNotEqual(prior_1, prior_2) def test_np_array_eq(self): prior_1 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1) prior_2 = tupak.core.prior.PowerLaw(name='test', unit='unit', alpha=0, minimum=0, maximum=1) prior_1.array_attribute = np.array([1, 2, 3]) prior_2.array_attribute = np.array([2, 2, 3]) self.assertNotEqual(prior_1, prior_2) def test_repr(self): for prior in self.priors: if isinstance(prior, tupak.core.prior.Interped): continue # we cannot test this because of the numpy arrays elif isinstance(prior, tupak.gw.prior.UniformComovingVolume): repr_prior_string = 'tupak.gw.prior.' + repr(prior) else: repr_prior_string = 'tupak.core.prior.' + repr(prior) repr_prior = eval(repr_prior_string) self.assertEqual(prior, repr_prior) class TestPriorSet(unittest.TestCase): def setUp(self): self.first_prior = tupak.core.prior.Uniform(name='a', minimum=0, maximum=1, unit='kg') self.second_prior = tupak.core.prior.PowerLaw(name='b', alpha=3, minimum=1, maximum=2, unit='m/s') self.third_prior = tupak.core.prior.DeltaFunction(name='c', peak=42, unit='m') self.prior_dict = dict(mass=self.first_prior, speed=self.second_prior, length=self.third_prior) self.prior_set_from_dict = tupak.core.prior.PriorSet(dictionary=self.prior_dict) self.default_prior_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'prior_files/binary_black_holes.prior') self.prior_set_from_file = tupak.core.prior.PriorSet(filename=self.default_prior_file) def tearDown(self): del self.first_prior del self.second_prior del self.third_prior del self.prior_dict del self.prior_set_from_dict del self.default_prior_file del self.prior_set_from_file def test_prior_set_is_ordered_dict(self): self.assertIsInstance(self.prior_set_from_dict, OrderedDict) def test_prior_set_has_correct_length(self): self.assertEqual(3, len(self.prior_set_from_dict)) def test_prior_set_has_expected_priors(self): self.assertDictEqual(self.prior_dict, dict(self.prior_set_from_dict)) # Removed for now as it does not create the outdir in the Gitlab CI # def test_write_to_file(self): # outdir = 'prior_set_test_outdir' # label = 'test_label' # label_dot_prior = label + '.prior' # current_dir_path = os.path.dirname(os.path.realpath(__file__)) # outdir_path = os.path.join(current_dir_path, outdir) # outfile_path = os.path.join(outdir_path, label_dot_prior) # self.prior_set_from_dict.write_to_file(outdir=outdir, label=label) # self.assertTrue(os.path.isdir(outdir_path)) # self.assertTrue(os.listdir(outdir_path)[0] == label_dot_prior) # with open(outfile_path, 'r') as f: # expected_outfile = [ # 'speed = PowerLaw(alpha=3, minimum=1, maximum=2, name=\'b\', latex_label=\'b\', unit=\'m/s\')\n', # 'mass = Uniform(minimum=0, maximum=1, name=\'a\', latex_label=\'a\', unit=\'kg\')\n', # 'length = DeltaFunction(peak=42, name=\'c\', latex_label=\'c\', unit=\'m\')\n'] # self.assertListEqual(sorted(expected_outfile), sorted(f.readlines())) # shutil.rmtree(outdir_path) def test_read_from_file(self): expected = dict(mass_1=tupak.core.prior.Uniform(name='mass_1', minimum=5, maximum=100), mass_2=tupak.core.prior.Uniform(name='mass_2', minimum=5, maximum=100), a_1=tupak.core.prior.Uniform(name='a_1', minimum=0, maximum=0.8), a_2=tupak.core.prior.Uniform(name='a_2', minimum=0, maximum=0.8), tilt_1=tupak.core.prior.Sine(name='tilt_1'), tilt_2=tupak.core.prior.Sine(name='tilt_2'), phi_12=tupak.core.prior.Uniform(name='phi_12', minimum=0, maximum=2 * np.pi), phi_jl=tupak.core.prior.Uniform(name='phi_jl', minimum=0, maximum=2 * np.pi), luminosity_distance=tupak.gw.prior.UniformComovingVolume(name='luminosity_distance', minimum=1e2, maximum=5e3), dec=tupak.core.prior.Cosine(name='dec'), ra=tupak.core.prior.Uniform(name='ra', minimum=0, maximum=2 * np.pi), iota=tupak.core.prior.Sine(name='iota'), psi=tupak.core.prior.Uniform(name='psi', minimum=0, maximum=np.pi), phase=tupak.core.prior.Uniform(name='phase', minimum=0, maximum=2 * np.pi) ) self.assertDictEqual(expected, self.prior_set_from_file) def test_convert_floats_to_delta_functions(self): self.prior_set_from_dict['d'] = 5 self.prior_set_from_dict['e'] = 7.3 self.prior_set_from_dict['f'] = 'unconvertable' self.prior_set_from_dict.convert_floats_to_delta_functions() expected = dict(mass=tupak.core.prior.Uniform(name='a', minimum=0, maximum=1, unit='kg'), speed=tupak.core.prior.PowerLaw(name='b', alpha=3, minimum=1, maximum=2, unit='m/s'), length=tupak.core.prior.DeltaFunction(name='c', peak=42, unit='m'), d=tupak.core.prior.DeltaFunction(peak=5), e=tupak.core.prior.DeltaFunction(peak=7.3), f='unconvertable') self.assertDictEqual(expected, self.prior_set_from_dict) def test_prior_set_from_dict_but_using_a_string(self): prior_set = tupak.core.prior.PriorSet(dictionary=self.default_prior_file) expected = dict(mass_1=tupak.core.prior.Uniform(name='mass_1', minimum=5, maximum=100), mass_2=tupak.core.prior.Uniform(name='mass_2', minimum=5, maximum=100), a_1=tupak.core.prior.Uniform(name='a_1', minimum=0, maximum=0.8), a_2=tupak.core.prior.Uniform(name='a_2', minimum=0, maximum=0.8), tilt_1=tupak.core.prior.Sine(name='tilt_1'), tilt_2=tupak.core.prior.Sine(name='tilt_2'), phi_12=tupak.core.prior.Uniform(name='phi_12', minimum=0, maximum=2 * np.pi), phi_jl=tupak.core.prior.Uniform(name='phi_jl', minimum=0, maximum=2 * np.pi), luminosity_distance=tupak.gw.prior.UniformComovingVolume(name='luminosity_distance', minimum=1e2, maximum=5e3), dec=tupak.core.prior.Cosine(name='dec'), ra=tupak.core.prior.Uniform(name='ra', minimum=0, maximum=2 * np.pi), iota=tupak.core.prior.Sine(name='iota'), psi=tupak.core.prior.Uniform(name='psi', minimum=0, maximum=np.pi), phase=tupak.core.prior.Uniform(name='phase', minimum=0, maximum=2 * np.pi) ) self.assertDictEqual(expected, prior_set) def test_dict_argument_is_not_string_or_dict(self): with self.assertRaises(ValueError): tupak.core.prior.PriorSet(dictionary=list()) def test_sample_subset_correct_size(self): size = 7 samples = self.prior_set_from_dict.sample_subset(keys=self.prior_set_from_dict.keys(), size=size) self.assertEqual(len(self.prior_set_from_dict), len(samples)) for key in samples: self.assertEqual(size, len(samples[key])) def test_sample_subset_correct_size_when_non_priors_in_dict(self): self.prior_set_from_dict['asdf'] = 'not_a_prior' samples = self.prior_set_from_dict.sample_subset(keys=self.prior_set_from_dict.keys()) self.assertEqual(len(self.prior_set_from_dict) - 1, len(samples)) def test_sample_subset_with_actual_subset(self): size = 3 samples = self.prior_set_from_dict.sample_subset(keys=['length'], size=size) expected = dict(length=np.array([42., 42., 42.])) self.assertTrue(np.array_equal(expected['length'], samples['length'])) def test_sample(self): size = 7 np.random.seed(42) samples1 = self.prior_set_from_dict.sample_subset(keys=self.prior_set_from_dict.keys(), size=size) np.random.seed(42) samples2 = self.prior_set_from_dict.sample(size=size) self.assertEqual(samples1.keys(), samples2.keys()) for key in samples1: self.assertTrue(np.array_equal(samples1[key], samples2[key])) def test_prob(self): samples = self.prior_set_from_dict.sample_subset(keys=['mass', 'speed']) expected = self.first_prior.prob(samples['mass']) * self.second_prior.prob(samples['speed']) self.assertEqual(expected, self.prior_set_from_dict.prob(samples)) def test_ln_prob(self): samples = self.prior_set_from_dict.sample_subset(keys=['mass', 'speed']) expected = self.first_prior.ln_prob(samples['mass']) + self.second_prior.ln_prob(samples['speed']) self.assertEqual(expected, self.prior_set_from_dict.ln_prob(samples)) def test_rescale(self): theta = [0.5, 0.5, 0.5] expected = [self.first_prior.rescale(0.5), self.second_prior.rescale(0.5), self.third_prior.rescale(0.5)] self.assertListEqual(sorted(expected), sorted(self.prior_set_from_dict.rescale( keys=self.prior_set_from_dict.keys(), theta=theta))) def test_redundancy(self): for key in self.prior_set_from_dict.keys(): self.assertFalse(self.prior_set_from_dict.test_redundancy(key=key)) class TestFillPrior(unittest.TestCase): def setUp(self): self.likelihood = Mock() self.likelihood.parameters = dict(a=0, b=0, c=0, d=0, asdf=0, ra=1) self.likelihood.non_standard_sampling_parameter_keys = dict(t=8) self.priors = dict(a=1, b=1.1, c='string', d=tupak.core.prior.Uniform(0, 1)) self.priors = tupak.core.prior.PriorSet(dictionary=self.priors) self.default_prior_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'prior_files/binary_black_holes.prior') self.priors.fill_priors(self.likelihood, self.default_prior_file) def tearDown(self): del self.likelihood del self.priors def test_prior_instances_are_not_changed_by_parsing(self): self.assertIsInstance(self.priors['d'], tupak.core.prior.Uniform) def test_parsing_ints_to_delta_priors_class(self): self.assertIsInstance(self.priors['a'], tupak.core.prior.DeltaFunction) def test_parsing_ints_to_delta_priors_with_right_value(self): self.assertEqual(self.priors['a'].peak, 1) def test_parsing_floats_to_delta_priors_class(self): self.assertIsInstance(self.priors['b'], tupak.core.prior.DeltaFunction) def test_parsing_floats_to_delta_priors_with_right_value(self): self.assertAlmostEqual(self.priors['b'].peak, 1.1, 1e-8) def test_without_available_default_priors_no_prior_is_set(self): with self.assertRaises(KeyError): print(self.priors['asdf']) def test_with_available_default_priors_a_default_prior_is_set(self): self.assertIsInstance(self.priors['ra'], tupak.core.prior.Uniform) class TestCreateDefaultPrior(unittest.TestCase): def test_none_behaviour(self): self.assertIsNone(tupak.core.prior.create_default_prior(name='name', default_priors_file=None)) def test_bbh_params(self): prior_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'prior_files/binary_black_holes.prior') prior_set = tupak.core.prior.PriorSet(filename=prior_file) for prior in prior_set: self.assertEqual(prior_set[prior], tupak.core.prior.create_default_prior(name=prior, default_priors_file=prior_file)) def test_unknown_prior(self): prior_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'prior_files/binary_black_holes.prior') self.assertIsNone(tupak.core.prior.create_default_prior(name='name', default_priors_file=prior_file)) if __name__ == '__main__': unittest.main()