Skip to content
Snippets Groups Projects
Commit 68da1c31 authored by Moritz Huebner's avatar Moritz Huebner
Browse files

Merge branch '170-make-optimal_snr-a-method-of-the-interferometer' into 'master'

Resolve "Make optimal_snr_squared and matched_filter_snr_squared a method of the Interferometer"

Closes #170

See merge request Monash/tupak!151
parents b8c89ef0 be59d528
No related branches found
No related tags found
1 merge request!151Resolve "Make optimal_snr_squared and matched_filter_snr_squared a method of the Interferometer"
Pipeline #28744 passed
......@@ -26,7 +26,8 @@ class TestDetector(unittest.TestCase):
self.ifo = tupak.gw.detector.Interferometer(name=self.name, power_spectral_density=self.power_spectral_density,
minimum_frequency=self.minimum_frequency,
maximum_frequency=self.maximum_frequency, length=self.length,
latitude=self.latitude, longitude=self.longitude, elevation=self.elevation,
latitude=self.latitude, longitude=self.longitude,
elevation=self.elevation,
xarm_azimuth=self.xarm_azimuth, yarm_azimuth=self.yarm_azimuth,
xarm_tilt=self.xarm_tilt, yarm_tilt=self.yarm_tilt)
self.ifo.strain_data.set_from_frequency_domain_strain(
......@@ -209,45 +210,46 @@ class TestDetector(unittest.TestCase):
def test_get_detector_response_default_behaviour(self):
self.ifo.antenna_response = MagicMock(return_value=1)
self.ifo.time_delay_from_geocenter = MagicMock(return_value = 0)
self.ifo.time_delay_from_geocenter = MagicMock(return_value=0)
self.ifo.epoch = 0
self.minimum_frequency = 10
self.maximum_frequency = 20
#self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
# self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
plus = np.linspace(0, 4096, 4097)
response = self.ifo.get_detector_response(
waveform_polarizations=dict(plus=plus),
parameters=dict(ra=0, dec=0, geocent_time=0, psi=0))
self.assertTrue(np.array_equal(response, plus*self.ifo.frequency_mask*np.exp(-0j)))
self.assertTrue(np.array_equal(response, plus * self.ifo.frequency_mask * np.exp(-0j)))
def test_get_detector_response_with_dt(self):
self.ifo.antenna_response = MagicMock(return_value=1)
self.ifo.time_delay_from_geocenter = MagicMock(return_value = 0)
self.ifo.time_delay_from_geocenter = MagicMock(return_value=0)
self.ifo.epoch = 1
self.minimum_frequency = 10
self.maximum_frequency = 20
#self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
# self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
plus = np.linspace(0, 4096, 4097)
response = self.ifo.get_detector_response(
waveform_polarizations=dict(plus=plus),
parameters=dict(ra=0, dec=0, geocent_time=0, psi=0))
expected_response = plus*self.ifo.frequency_mask*np.exp(-1j*2*np.pi*self.ifo.frequency_array)
expected_response = plus * self.ifo.frequency_mask * np.exp(-1j * 2 * np.pi * self.ifo.frequency_array)
self.assertTrue(np.allclose(abs(response),
abs(plus*self.ifo.frequency_mask*np.exp(-1j*2*np.pi*self.ifo.frequency_array))))
abs(plus * self.ifo.frequency_mask * np.exp(
-1j * 2 * np.pi * self.ifo.frequency_array))))
def test_get_detector_response_multiple_modes(self):
self.ifo.antenna_response = MagicMock(return_value=1)
self.ifo.time_delay_from_geocenter = MagicMock(return_value = 0)
self.ifo.time_delay_from_geocenter = MagicMock(return_value=0)
self.ifo.epoch = 0
self.minimum_frequency = 10
self.maximum_frequency = 20
#self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
# self.ifo.frequency_array = np.array([8, 12, 16, 20, 24])
plus = np.linspace(0, 4096, 4097)
cross = np.linspace(0, 4096, 4097)
response = self.ifo.get_detector_response(
waveform_polarizations=dict(plus=plus, cross=cross),
parameters=dict(ra=0, dec=0, geocent_time=0, psi=0))
self.assertTrue(np.array_equal(response, (plus+cross)*self.ifo.frequency_mask*np.exp(-0j)))
self.assertTrue(np.array_equal(response, (plus + cross) * self.ifo.frequency_mask * np.exp(-0j)))
def test_inject_signal_no_waveform_polarizations(self):
with self.assertRaises(ValueError):
......@@ -285,6 +287,26 @@ class TestDetector(unittest.TestCase):
m.return_value = 1
self.assertEqual(self.ifo.vertex_position_geocentric(), 1)
def test_optimal_snr_squared(self):
""" Merely checks parameters are given in the right order """
with mock.patch('tupak.gw.utils.noise_weighted_inner_product') as m:
m.side_effect = lambda a, b, c, d: [a, b, c, d]
signal = 1
expected = [signal, signal, self.ifo.power_spectral_density_array, self.ifo.strain_data.duration]
actual = self.ifo.optimal_snr_squared(signal=signal)
self.assertListEqual(expected, actual)
def test_matched_filter_snr_squared(self):
""" Merely checks parameters are given in the right order """
with mock.patch('tupak.gw.utils.noise_weighted_inner_product') as m:
m.side_effect = lambda a, b, c, d: [b, [a, c, d]]
signal = 1
expected = [self.ifo.frequency_domain_strain, [signal, self.ifo.power_spectral_density_array,
self.ifo.strain_data.duration]]
actual = self.ifo.matched_filter_snr_squared(signal=signal)
self.assertTrue(np.array_equal(expected[0], actual[0])) # array-like element has to be evaluated separately
self.assertListEqual(expected[1], actual[1])
class TestInterferometerStrainData(unittest.TestCase):
......
......@@ -522,10 +522,9 @@ def compute_snrs(sample, likelihood):
signal = interferometer.get_detector_response(signal_polarizations,
likelihood.waveform_generator.parameters)
sample['{}_matched_filter_snr'.format(interferometer.name)] = \
tupak.gw.utils.matched_filter_snr_squared(signal, interferometer,
likelihood.waveform_generator.duration) ** 0.5
sample['{}_optimal_snr'.format(interferometer.name)] = tupak.gw.utils.optimal_snr_squared(
signal, interferometer, likelihood.waveform_generator.duration) ** 0.5
interferometer.matched_filter_snr_squared(signal=signal) ** 0.5
sample['{}_optimal_snr'.format(interferometer.name)] = \
interferometer.optimal_snr_squared(signal=signal) ** 0.5
else:
logger.info('Computing SNRs for every sample, this may take some time.')
all_interferometers = likelihood.interferometers
......@@ -541,10 +540,9 @@ def compute_snrs(sample, likelihood):
for interferometer in all_interferometers:
signal = interferometer.get_detector_response(signal_polarizations,
likelihood.waveform_generator.parameters)
matched_filter_snrs[interferometer.name].append(tupak.gw.utils.matched_filter_snr_squared(
signal, interferometer, likelihood.waveform_generator.duration) ** 0.5)
optimal_snrs[interferometer.name].append(tupak.gw.utils.optimal_snr_squared(
signal, interferometer, likelihood.waveform_generator.duration) ** 0.5)
matched_filter_snrs[interferometer.name]\
.append(interferometer.matched_filter_snr_squared(signal=signal) ** 0.5)
optimal_snrs[interferometer.name].append(interferometer.optimal_snr_squared(signal=signal) ** 0.5)
for interferometer in likelihood.interferometers:
sample['{}_matched_filter_snr'.format(interferometer.name)] = matched_filter_snrs[interferometer.name]
......
......@@ -1257,12 +1257,8 @@ class Interferometer(object):
sampling_frequency=self.strain_data.sampling_frequency,
duration=self.strain_data.duration,
start_time=self.strain_data.start_time)
opt_snr = np.sqrt(gwutils.optimal_snr_squared(
signal=signal_ifo, interferometer=self,
duration=self.strain_data.duration).real)
mf_snr = np.sqrt(gwutils.matched_filter_snr_squared(
signal=signal_ifo, interferometer=self,
duration=self.strain_data.duration).real)
opt_snr = np.sqrt(self.optimal_snr_squared(signal=signal_ifo).real)
mf_snr = np.sqrt(self.matched_filter_snr_squared(signal=signal_ifo).real)
logger.info("Injected signal in {}:".format(self.name))
logger.info(" optimal SNR = {:.2f}".format(opt_snr))
......@@ -1382,6 +1378,40 @@ class Interferometer(object):
"""
return gwutils.get_vertex_position_geocentric(self.__latitude, self.__longitude, self.__elevation)
def optimal_snr_squared(self, signal):
"""
Parameters
----------
signal: array_like
Array containing the signal
Returns
-------
float: The optimal signal to noise ratio possible squared
"""
return gwutils.optimal_snr_squared(signal=signal,
power_spectral_density=self.power_spectral_density_array,
duration=self.strain_data.duration)
def matched_filter_snr_squared(self, signal):
"""
Parameters
----------
signal: array_like
Array containing the signal
Returns
-------
float: The matched filter signal to noise ratio squared
"""
return gwutils.matched_filter_snr_squared(signal=signal,
frequency_domain_strain=self.frequency_domain_strain,
power_spectral_density=self.power_spectral_density_array,
duration=self.strain_data.duration)
@property
def whitened_frequency_domain_strain(self):
""" Calculates the whitened data by dividing data by the amplitude spectral density
......
......@@ -166,13 +166,9 @@ class GravitationalWaveTransient(likelihood.Likelihood):
for interferometer in self.interferometers:
signal_ifo = interferometer.get_detector_response(
waveform_polarizations, self.waveform_generator.parameters)
matched_filter_snr_squared +=\
tupak.gw.utils.matched_filter_snr_squared(
signal_ifo, interferometer,
self.waveform_generator.duration)
optimal_snr_squared += tupak.gw.utils.optimal_snr_squared(
signal_ifo, interferometer, self.waveform_generator.duration)
matched_filter_snr_squared += interferometer.matched_filter_snr_squared(signal=signal_ifo)
optimal_snr_squared += interferometer.optimal_snr_squared(signal=signal_ifo)
if self.time_marginalization:
matched_filter_snr_squared_tc_array +=\
4 / self.waveform_generator.duration * np.fft.fft(
......
......@@ -217,15 +217,17 @@ def noise_weighted_inner_product(aa, bb, power_spectral_density, duration):
return 4 / duration * np.sum(integrand)
def matched_filter_snr_squared(signal, interferometer, duration):
def matched_filter_snr_squared(signal, frequency_domain_strain, power_spectral_density, duration):
"""
Parameters
----------
signal: array_like
Array containing the signal
interferometer: tupak.gw.detector.Interferometer
Interferometer which we want to have the data and noise from
frequency_domain_strain: array_like
power_spectral_density: array_like
duration: float
Time duration of the signal
......@@ -234,28 +236,27 @@ def matched_filter_snr_squared(signal, interferometer, duration):
float: The matched filter signal to noise ratio squared
"""
return noise_weighted_inner_product(
signal, interferometer.frequency_domain_strain,
interferometer.power_spectral_density_array, duration)
return noise_weighted_inner_product(signal, frequency_domain_strain, power_spectral_density, duration)
def optimal_snr_squared(signal, interferometer, duration):
def optimal_snr_squared(signal, power_spectral_density, duration):
"""
Parameters
----------
signal: array_like
Array containing the signal
interferometer: tupak.gw.detector.Interferometer
Interferometer which we want to have the data and noise from
power_spectral_density: array_like
duration: float
Time duration of the signal
Returns
-------
float: The optimal signal to noise ratio possible squared
float: The matched filter signal to noise ratio squared
"""
return noise_weighted_inner_product(signal, signal, interferometer.power_spectral_density_array, duration)
return noise_weighted_inner_product(signal, signal, power_spectral_density, duration)
def get_event_time(event):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment