import sys import unittest import lal import lalsimulation import pytest from packaging import version from shutil import rmtree import deepdish as dd import mock import numpy as np from mock import MagicMock, patch import pandas import bilby class TestInterferometer(unittest.TestCase): def setUp(self): self.name = "name" self.power_spectral_density = ( bilby.gw.detector.PowerSpectralDensity.from_aligo() ) self.minimum_frequency = 10 self.maximum_frequency = 20 self.length = 30 self.latitude = 1 self.longitude = 2 self.elevation = 3 self.xarm_azimuth = 4 self.yarm_azimuth = 5 self.xarm_tilt = 0.0 self.yarm_tilt = 0.0 # noinspection PyTypeChecker self.ifo = bilby.gw.detector.Interferometer( name=self.name, power_spectral_density=self.power_spectral_density, minimum_frequency=self.minimum_frequency, maximum_frequency=self.maximum_frequency, length=self.length, latitude=self.latitude, longitude=self.longitude, elevation=self.elevation, xarm_azimuth=self.xarm_azimuth, yarm_azimuth=self.yarm_azimuth, xarm_tilt=self.xarm_tilt, yarm_tilt=self.yarm_tilt, ) self.ifo.strain_data.set_from_frequency_domain_strain( np.linspace(0, 4096, 4097), sampling_frequency=4096, duration=2 ) self.outdir = "outdir" self.injection_polarizations = dict() np.random.seed(42) self.injection_polarizations["plus"] = np.random.random(4097) self.injection_polarizations["cross"] = np.random.random(4097) self.waveform_generator = MagicMock() self.wg_polarizations = dict( plus=np.random.random(4097), cross=np.random.random(4097) ) self.waveform_generator.frequency_domain_strain = ( lambda _: self.wg_polarizations ) self.parameters = dict( ra=0.0, dec=0.0, geocent_time=0.0, psi=0.0, mass_1=100, mass_2=100 ) bilby.core.utils.check_directory_exists_and_if_not_mkdir(self.outdir) def tearDown(self): del self.name del self.power_spectral_density del self.minimum_frequency del self.maximum_frequency del self.length del self.latitude del self.longitude del self.elevation del self.xarm_azimuth del self.yarm_azimuth del self.xarm_tilt del self.yarm_tilt del self.ifo del self.injection_polarizations del self.wg_polarizations del self.waveform_generator del self.parameters rmtree(self.outdir) def test_name_setting(self): self.assertEqual(self.ifo.name, self.name) def test_psd_setting(self): self.assertEqual(self.ifo.power_spectral_density, self.power_spectral_density) def test_min_freq_setting(self): self.assertEqual(self.ifo.strain_data.minimum_frequency, self.minimum_frequency) def test_max_freq_setting(self): self.assertEqual(self.ifo.strain_data.maximum_frequency, self.maximum_frequency) def test_antenna_response_default(self): with mock.patch("bilby.gw.utils.get_polarization_tensor") as m: with mock.patch("numpy.einsum") as n: m.return_value = 0 n.return_value = 1 self.assertEqual(self.ifo.antenna_response(234, 52, 54, 76, "plus"), 1) def test_antenna_response_einsum(self): with mock.patch("bilby.gw.utils.get_polarization_tensor") as m: m.return_value = np.ones((3, 3)) self.assertAlmostEqual( self.ifo.antenna_response(234, 52, 54, 76, "plus"), self.ifo.detector_tensor.sum(), ) def test_get_detector_response_default_behaviour(self): self.ifo.antenna_response = MagicMock(return_value=1) self.ifo.time_delay_from_geocenter = MagicMock(return_value=0) self.ifo.epoch = 0 self.minimum_frequency = 10 self.maximum_frequency = 20 # self.ifo.frequency_array = np.array([8, 12, 16, 20, 24]) plus = np.linspace(0, 4096, 4097) response = self.ifo.get_detector_response( waveform_polarizations=dict(plus=plus), parameters=dict(ra=0, dec=0, geocent_time=0, psi=0), ) self.assertTrue( np.array_equal(response, plus * self.ifo.frequency_mask * np.exp(-0j)) ) def test_get_detector_response_with_dt(self): self.ifo.antenna_response = MagicMock(return_value=1) self.ifo.time_delay_from_geocenter = MagicMock(return_value=0) self.ifo.epoch = 1 self.minimum_frequency = 10 self.maximum_frequency = 20 plus = np.linspace(0, 4096, 4097) response = self.ifo.get_detector_response( waveform_polarizations=dict(plus=plus), parameters=dict(ra=0, dec=0, geocent_time=0, psi=0), ) expected_response = ( plus * self.ifo.frequency_mask * np.exp(-1j * 2 * np.pi * self.ifo.frequency_array) ) self.assertTrue(np.allclose(abs(expected_response), abs(response))) def test_get_detector_response_multiple_modes(self): self.ifo.antenna_response = MagicMock(return_value=1) self.ifo.time_delay_from_geocenter = MagicMock(return_value=0) self.ifo.epoch = 0 self.minimum_frequency = 10 self.maximum_frequency = 20 plus = np.linspace(0, 4096, 4097) cross = np.linspace(0, 4096, 4097) response = self.ifo.get_detector_response( waveform_polarizations=dict(plus=plus, cross=cross), parameters=dict(ra=0, dec=0, geocent_time=0, psi=0), ) self.assertTrue( np.array_equal( response, (plus + cross) * self.ifo.frequency_mask * np.exp(-0j) ) ) def test_inject_signal_from_waveform_polarizations_correct_injection(self): original_strain = self.ifo.strain_data.frequency_domain_strain self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] self.ifo.inject_signal_from_waveform_polarizations( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) expected = ( self.injection_polarizations["plus"] + self.injection_polarizations["cross"] + original_strain ) self.assertTrue( np.array_equal(expected, self.ifo.strain_data._frequency_domain_strain) ) def test_inject_signal_from_waveform_polarizations_update_time_domain_strain(self): original_td_strain = self.ifo.strain_data.time_domain_strain self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] self.ifo.inject_signal_from_waveform_polarizations( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) self.assertFalse( np.array_equal(original_td_strain, self.ifo.strain_data.time_domain_strain) ) def test_inject_signal_from_waveform_polarizations_meta_data(self): self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] self.ifo.inject_signal_from_waveform_polarizations( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) signal_ifo_expected = ( self.injection_polarizations["plus"] + self.injection_polarizations["cross"] ) self.assertAlmostEqual( self.ifo.optimal_snr_squared(signal=signal_ifo_expected).real, self.ifo.meta_data["optimal_SNR"] ** 2, 10, ) self.assertAlmostEqual( self.ifo.matched_filter_snr(signal=signal_ifo_expected), self.ifo.meta_data["matched_filter_SNR"], 10, ) self.assertDictEqual(self.parameters, self.ifo.meta_data["parameters"]) def test_inject_signal_from_waveform_polarizations_incorrect_length(self): self.injection_polarizations["plus"] = np.random.random(1000) self.injection_polarizations["cross"] = np.random.random(1000) self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] with self.assertRaises(ValueError): self.ifo.inject_signal_from_waveform_polarizations( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) @patch.object(bilby.core.utils.logger, "warning") def test_inject_signal_outside_segment_logs_warning(self, m): self.parameters["geocent_time"] = 24345.0 self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] self.ifo.inject_signal_from_waveform_polarizations( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) self.assertTrue(m.called) def test_inject_signal_from_waveform_generator_correct_return_value(self): self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] returned_polarizations = self.ifo.inject_signal_from_waveform_generator( parameters=self.parameters, waveform_generator=self.waveform_generator ) self.assertTrue( np.array_equal( self.wg_polarizations["plus"], returned_polarizations["plus"] ) ) self.assertTrue( np.array_equal( self.wg_polarizations["cross"], returned_polarizations["cross"] ) ) @patch.object( bilby.gw.detector.Interferometer, "inject_signal_from_waveform_generator" ) def test_inject_signal_with_waveform_generator_correct_call(self, m): self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] _ = self.ifo.inject_signal( parameters=self.parameters, waveform_generator=self.waveform_generator ) m.assert_called_with( parameters=self.parameters, waveform_generator=self.waveform_generator ) def test_inject_signal_from_waveform_generator_correct_injection(self): original_strain = self.ifo.strain_data.frequency_domain_strain self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] injection_polarizations = self.ifo.inject_signal_from_waveform_generator( parameters=self.parameters, waveform_generator=self.waveform_generator ) expected = ( injection_polarizations["plus"] + injection_polarizations["cross"] + original_strain ) self.assertTrue( np.array_equal(expected, self.ifo.strain_data._frequency_domain_strain) ) def test_inject_signal_with_injection_polarizations(self): original_strain = self.ifo.strain_data.frequency_domain_strain self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] self.ifo.inject_signal( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) expected = ( self.injection_polarizations["plus"] + self.injection_polarizations["cross"] + original_strain ) self.assertTrue( np.array_equal(expected, self.ifo.strain_data._frequency_domain_strain) ) @patch.object( bilby.gw.detector.Interferometer, "inject_signal_from_waveform_polarizations" ) def test_inject_signal_with_injection_polarizations_and_waveform_generator(self, m): self.ifo.get_detector_response = lambda x, params: x["plus"] + x["cross"] _ = self.ifo.inject_signal( parameters=self.parameters, waveform_generator=self.waveform_generator, injection_polarizations=self.injection_polarizations, ) m.assert_called_with( parameters=self.parameters, injection_polarizations=self.injection_polarizations, ) with self.assertRaises(ValueError): m.assert_called_with( parameters=self.parameters, injection_polarizations=self.wg_polarizations, ) def test_inject_signal_raises_value_error(self): with self.assertRaises(ValueError): self.ifo.inject_signal(injection_polarizations=None, parameters=None) def test_time_delay_from_geocenter(self): with mock.patch("bilby.gw.utils.time_delay_geocentric") as m: m.return_value = 1 self.assertEqual(self.ifo.time_delay_from_geocenter(1, 2, 3), 1) def test_vertex_position_geocentric(self): with mock.patch("bilby.gw.utils.get_vertex_position_geocentric") as m: m.return_value = 1 self.assertEqual(self.ifo.vertex_position_geocentric(), 1) def test_optimal_snr_squared(self): """ Merely checks parameters are given in the right order and the frequency mask is applied. """ with mock.patch("bilby.gw.utils.noise_weighted_inner_product") as m: m.side_effect = lambda a, b, c, d: [a, b, c, d] signal = np.ones_like(self.ifo.power_spectral_density_array) mask = self.ifo.frequency_mask expected = [ signal[mask], signal[mask], self.ifo.power_spectral_density_array[mask], self.ifo.strain_data.duration, ] actual = self.ifo.optimal_snr_squared(signal=signal) self.assertTrue(np.array_equal(expected[0], actual[0])) self.assertTrue(np.array_equal(expected[1], actual[1])) self.assertTrue(np.array_equal(expected[2], actual[2])) self.assertEqual(expected[3], actual[3]) def test_repr(self): expected = ( "Interferometer(name='{}', power_spectral_density={}, minimum_frequency={}, " "maximum_frequency={}, length={}, latitude={}, longitude={}, elevation={}, xarm_azimuth={}, " "yarm_azimuth={}, xarm_tilt={}, yarm_tilt={})".format( self.name, self.power_spectral_density, float(self.minimum_frequency), float(self.maximum_frequency), float(self.length), float(self.latitude), float(self.longitude), float(self.elevation), float(self.xarm_azimuth), float(self.yarm_azimuth), float(self.xarm_tilt), float(self.yarm_tilt), ) ) self.assertEqual(expected, repr(self.ifo)) pandas_version_test = version.parse(pandas.__version__) >= version.parse("1.2.0") skip_reason = "Deepdish requires pandas < 1.2" @pytest.mark.skipif(pandas_version_test, reason=skip_reason) def test_to_and_from_hdf5_loading(self): if sys.version_info[0] < 3: with self.assertRaises(NotImplementedError): self.ifo.to_hdf5(outdir="outdir", label="test") else: self.ifo.to_hdf5(outdir="outdir", label="test") filename = self.ifo._filename_from_outdir_label_extension( outdir="outdir", label="test", extension="h5" ) recovered_ifo = bilby.gw.detector.Interferometer.from_hdf5(filename) self.assertEqual(self.ifo, recovered_ifo) @pytest.mark.skipif(pandas_version_test or sys.version_info[0] < 3, reason=skip_reason) def test_to_and_from_hdf5_wrong_class(self): bilby.core.utils.check_directory_exists_and_if_not_mkdir("outdir") dd.io.save("./outdir/psd.h5", self.power_spectral_density) filename = self.ifo._filename_from_outdir_label_extension( outdir="outdir", label="psd", extension="h5" ) with self.assertRaises(TypeError): bilby.gw.detector.Interferometer.from_hdf5(filename) def test_to_and_from_pkl_loading(self): self.ifo.to_pickle(outdir="outdir", label="test") filename = "outdir/test.pkl" recovered_ifo = bilby.gw.detector.Interferometer.from_pickle(filename) self.assertEqual(self.ifo, recovered_ifo) def test_to_and_from_pkl_wrong_class(self): import dill with open("./outdir/psd.pkl", "wb") as ff: dill.dump(self.ifo.power_spectral_density, ff) filename = self.ifo._filename_from_outdir_label_extension( outdir="outdir", label="psd", extension="pkl" ) with self.assertRaises(TypeError): bilby.gw.detector.Interferometer.from_pickle(filename) class TestInterferometerEquals(unittest.TestCase): def setUp(self): self.name = "name" self.power_spectral_density_1 = ( bilby.gw.detector.PowerSpectralDensity.from_aligo() ) self.power_spectral_density_2 = ( bilby.gw.detector.PowerSpectralDensity.from_aligo() ) self.minimum_frequency = 10 self.maximum_frequency = 20 self.length = 30 self.latitude = 1 self.longitude = 2 self.elevation = 3 self.xarm_azimuth = 4 self.yarm_azimuth = 5 self.xarm_tilt = 0.0 self.yarm_tilt = 0.0 # noinspection PyTypeChecker self.duration = 1 self.sampling_frequency = 200 self.frequency_array = bilby.utils.create_frequency_series( sampling_frequency=self.sampling_frequency, duration=self.duration ) self.strain = self.frequency_array self.ifo_1 = bilby.gw.detector.Interferometer( name=self.name, power_spectral_density=self.power_spectral_density_1, minimum_frequency=self.minimum_frequency, maximum_frequency=self.maximum_frequency, length=self.length, latitude=self.latitude, longitude=self.longitude, elevation=self.elevation, xarm_azimuth=self.xarm_azimuth, yarm_azimuth=self.yarm_azimuth, xarm_tilt=self.xarm_tilt, yarm_tilt=self.yarm_tilt, ) self.ifo_2 = bilby.gw.detector.Interferometer( name=self.name, power_spectral_density=self.power_spectral_density_2, minimum_frequency=self.minimum_frequency, maximum_frequency=self.maximum_frequency, length=self.length, latitude=self.latitude, longitude=self.longitude, elevation=self.elevation, xarm_azimuth=self.xarm_azimuth, yarm_azimuth=self.yarm_azimuth, xarm_tilt=self.xarm_tilt, yarm_tilt=self.yarm_tilt, ) self.ifo_1.set_strain_data_from_frequency_domain_strain( frequency_array=self.frequency_array, frequency_domain_strain=self.strain ) self.ifo_2.set_strain_data_from_frequency_domain_strain( frequency_array=self.frequency_array, frequency_domain_strain=self.strain ) def tearDown(self): del self.name del self.power_spectral_density_1 del self.power_spectral_density_2 del self.minimum_frequency del self.maximum_frequency del self.length del self.latitude del self.longitude del self.elevation del self.xarm_azimuth del self.yarm_azimuth del self.xarm_tilt del self.yarm_tilt del self.ifo_1 del self.ifo_2 del self.sampling_frequency del self.duration del self.frequency_array del self.strain def test_eq_true(self): self.assertEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_psd(self): self.ifo_1.power_spectral_density.psd_array[0] = 1234 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_minimum_frequency(self): self.ifo_1.minimum_frequency -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_maximum_frequency(self): self.ifo_1.minimum_frequency -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_length(self): self.ifo_1.length -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_latitude(self): self.ifo_1.latitude -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_longitude(self): self.ifo_1.longitude -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_elevation(self): self.ifo_1.elevation -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_xarm_azimuth(self): self.ifo_1.xarm_azimuth -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_xarmtilt(self): self.ifo_1.xarm_tilt -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_yarm_azimuth(self): self.ifo_1.yarm_azimuth -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_yarm_tilt(self): self.ifo_1.yarm_tilt -= 1 self.assertNotEqual(self.ifo_1, self.ifo_2) def test_eq_false_different_ifo_strain_data(self): self.strain = bilby.utils.create_frequency_series( sampling_frequency=self.sampling_frequency / 2, duration=self.duration * 2 ) self.ifo_1.set_strain_data_from_frequency_domain_strain( frequency_array=self.frequency_array, frequency_domain_strain=self.strain ) self.assertNotEqual(self.ifo_1, self.ifo_2) class TestInterferometerAntennaPatternAgainstLAL(unittest.TestCase): def setUp(self): self.name = "name" self.ifo_names = ['H1', 'L1', 'V1', 'K1', 'GEO600', 'ET'] self.lal_prefixes = {'H1': 'H1', 'L1': 'L1', 'V1': 'V1', 'K1': 'K1', 'GEO600': 'G1', 'ET': 'E1'} self.polarizations = ['plus', 'cross', 'breathing', 'longitudinal', 'x', 'y'] self.ifos = bilby.gw.detector.InterferometerList(self.ifo_names) self.gpstime = 1305303144 self.trial = 100 def tearDown(self): del self.name del self.ifo_names del self.lal_prefixes del self.polarizations del self.ifos del self.gpstime del self.trial def test_antenna_pattern_vs_lal(self): gmst = lal.GreenwichMeanSiderealTime(self.gpstime) f_bilby = np.zeros((self.trial, 6)) f_lal = np.zeros((self.trial, 6)) for n, ifo_name in enumerate(self.ifo_names): response = lalsimulation.DetectorPrefixToLALDetector(self.lal_prefixes[ifo_name]).response ifo = self.ifos[n] for i in range(self.trial): ra = 2. * np.pi * np.random.uniform() dec = np.pi * np.random.uniform() - np.pi / 2. psi = np.pi * np.random.uniform() f_lal[i] = lal.ComputeDetAMResponseExtraModes(response, ra, dec, psi, gmst) for m, pol in enumerate(self.polarizations): f_bilby[i, m] = ifo.antenna_response(ra, dec, self.gpstime, psi, pol) std = np.std(f_bilby - f_lal, axis=0) for m, pol in enumerate(self.polarizations): with self.subTest(':'.join((ifo_name, pol))): self.assertAlmostEqual(std[m], 0.0, places=7) if __name__ == "__main__": unittest.main()