diff --git a/test/likelihood_tests.py b/test/likelihood_tests.py index a3d6da004a6b6690332c957acee07256c6b3d67b..731d14532e3ef56bd2adce5aef7ad17303b7d3f5 100644 --- a/test/likelihood_tests.py +++ b/test/likelihood_tests.py @@ -99,6 +99,7 @@ class TestAnalytical1DLikelihood(unittest.TestCase): def test_set_func(self): def new_func(x): return x + with self.assertRaises(AttributeError): # noinspection PyPropertyAccess self.analytical_1d_likelihood.func = new_func @@ -443,5 +444,102 @@ class TestExponentialLikelihood(unittest.TestCase): self.assertEqual(-3, exponential_likelihood.log_likelihood()) +class TestJointLikelihood(unittest.TestCase): + + def setUp(self): + self.x = np.array([1, 2, 3]) + self.y = np.array([1, 2, 3]) + self.first_likelihood = tupak.core.likelihood.GaussianLikelihood( + x=self.x, + y=self.y, + func=lambda x, param1, param2: (param1 + param2) * x, + sigma=1) + self.second_likelihood = tupak.core.likelihood.PoissonLikelihood( + x=self.x, + y=self.y, + func=lambda x, param2, param3: (param2 + param3) * x) + self.third_likelihood = tupak.core.likelihood.ExponentialLikelihood( + x=self.x, + y=self.y, + func=lambda x, param4, param5: (param4 + param5) * x + ) + self.joint_likelihood = tupak.core.likelihood.JointLikelihood(self.first_likelihood, + self.second_likelihood, + self.third_likelihood) + + self.first_likelihood.parameters['param1'] = 1 + self.first_likelihood.parameters['param2'] = 2 + self.second_likelihood.parameters['param2'] = 2 + self.second_likelihood.parameters['param3'] = 3 + self.third_likelihood.parameters['param4'] = 4 + self.third_likelihood.parameters['param5'] = 5 + + self.joint_likelihood.parameters['param1'] = 1 + self.joint_likelihood.parameters['param2'] = 2 + self.joint_likelihood.parameters['param3'] = 3 + self.joint_likelihood.parameters['param4'] = 4 + self.joint_likelihood.parameters['param5'] = 5 + + def tearDown(self): + del self.x + del self.y + del self.first_likelihood + del self.second_likelihood + del self.third_likelihood + del self.joint_likelihood + + def test_parameters_consistent_from_init(self): + expected = dict(param1=1, param2=2, param3=3, param4=4, param5=5, ) + self.assertDictEqual(expected, self.joint_likelihood.parameters) + + def test_log_likelihood_correctly_sums(self): + expected = self.first_likelihood.log_likelihood() + \ + self.second_likelihood.log_likelihood() + \ + self.third_likelihood.log_likelihood() + self.assertEqual(expected, self.joint_likelihood.log_likelihood()) + + def test_log_likelihood_checks_parameter_updates(self): + self.first_likelihood.parameters['param2'] = 7 + self.second_likelihood.parameters['param2'] = 7 + self.joint_likelihood.parameters['param2'] = 7 + expected = self.first_likelihood.log_likelihood() + \ + self.second_likelihood.log_likelihood() + \ + self.third_likelihood.log_likelihood() + self.assertEqual(expected, self.joint_likelihood.log_likelihood()) + + def test_list_element_parameters_are_updated(self): + self.joint_likelihood.parameters['param2'] = 7 + self.assertEqual(self.joint_likelihood.parameters['param2'], + self.joint_likelihood.likelihoods[0].parameters['param2']) + self.assertEqual(self.joint_likelihood.parameters['param2'], + self.joint_likelihood.likelihoods[1].parameters['param2']) + + def test_log_noise_likelihood(self): + self.first_likelihood.noise_log_likelihood = MagicMock(return_value=1) + self.second_likelihood.noise_log_likelihood = MagicMock(return_value=2) + self.third_likelihood.noise_log_likelihood = MagicMock(return_value=3) + self.joint_likelihood = tupak.core.likelihood.JointLikelihood(self.first_likelihood, + self.second_likelihood, + self.third_likelihood) + expected = self.first_likelihood.noise_log_likelihood() + \ + self.second_likelihood.noise_log_likelihood() + \ + self.third_likelihood.noise_log_likelihood() + self.assertEqual(expected, self.joint_likelihood.noise_log_likelihood()) + + def test_init_with_list_of_likelihoods(self): + with self.assertRaises(ValueError): + tupak.core.likelihood.JointLikelihood([self.first_likelihood, self.second_likelihood, self.third_likelihood]) + + def test_setting_single_likelihood(self): + self.joint_likelihood.likelihoods = self.first_likelihood + self.assertEqual(self.first_likelihood.log_likelihood(), self.joint_likelihood.log_likelihood()) + + # Appending is not supported + # def test_appending(self): + # joint_likelihood = tupak.core.likelihood.JointLikelihood(self.first_likelihood, self.second_likelihood) + # joint_likelihood.likelihoods.append(self.third_likelihood) + # self.assertDictEqual(self.joint_likelihood.parameters, joint_likelihood.parameters) + + if __name__ == '__main__': unittest.main() diff --git a/tupak/core/likelihood.py b/tupak/core/likelihood.py index eefa05754f0321068bbb939f2e6db546ffed5016..b8109a19077e2991296505fe0b355c50f068a810 100644 --- a/tupak/core/likelihood.py +++ b/tupak/core/likelihood.py @@ -3,6 +3,7 @@ from __future__ import division, print_function import numpy as np from scipy.special import gammaln from tupak.core.utils import infer_parameters_from_function +import copy class Likelihood(object): @@ -315,3 +316,60 @@ class StudentTLikelihood(Analytical1DLikelihood): def __summed_log_likelihood(self, nu): return self.n * (gammaln((nu + 1.0) / 2.0) + .5 * np.log(self.lam / (nu * np.pi)) - gammaln(nu / 2.0)) \ - (nu + 1.0) / 2.0 * np.sum(np.log1p(self.lam * self.residual ** 2 / nu)) + + +class JointLikelihood(Likelihood): + def __init__(self, *likelihoods): + """ + A likelihood for combining pre-defined likelihoods. + The parameters dict is automagically combined through parameters dicts + of the given likelihoods. If parameters have different values have + initially different values across different likelihoods, the value + of the last given likelihood is chosen. This does not matter when + using the JointLikelihood for sampling, because the parameters will be + set consistently + + Parameters + ---------- + *likelihoods: tupak.core.likelihood.Likelihood + likelihoods to be combined parsed as arguments + """ + self.likelihoods = likelihoods + Likelihood.__init__(self, parameters={}) + self.__sync_parameters() + + def __sync_parameters(self): + """ Synchronizes parameters between the likelihoods + so that all likelihoods share a single parameter dict.""" + for likelihood in self.likelihoods: + self.parameters.update(likelihood.parameters) + for likelihood in self.likelihoods: + likelihood.parameters = self.parameters + + @property + def likelihoods(self): + """ The list of likelihoods """ + return self.__likelihoods + + @likelihoods.setter + def likelihoods(self, likelihoods): + likelihoods = copy.deepcopy(likelihoods) + if isinstance(likelihoods, tuple) or isinstance(likelihoods, list): + if all(isinstance(likelihood, Likelihood) for likelihood in likelihoods): + self.__likelihoods = list(likelihoods) + else: + raise ValueError('Try setting the JointLikelihood like this\n' + 'JointLikelihood(first_likelihood, second_likelihood, ...)') + elif isinstance(likelihoods, Likelihood): + self.__likelihoods = [likelihoods] + else: + raise ValueError('Input likelihood is not a list of tuple. You need to set multiple likelihoods.') + + def log_likelihood(self): + """ This is just the sum of the log likelihoods of all parts of the joint likelihood""" + return sum([likelihood.log_likelihood() for likelihood in self.likelihoods]) + + def noise_log_likelihood(self): + """ This is just the sum of the noise likelihoods of all parts of the joint likelihood""" + return sum([likelihood.noise_log_likelihood() for likelihood in self.likelihoods]) +