Skip to content
Snippets Groups Projects
Commit 4a0f31b5 authored by Nikhil Sarin's avatar Nikhil Sarin :mountain_snow: Committed by Moritz Huebner
Browse files

Calculating SNR for ROQ

parent c86cb568
No related branches found
No related tags found
No related merge requests found
......@@ -954,23 +954,23 @@ def compute_snrs(sample, likelihood):
ifo.matched_filter_snr(signal=signal)
sample['{}_optimal_snr'.format(ifo.name)] = \
ifo.optimal_snr_squared(signal=signal) ** 0.5
else:
logger.info(
'Computing SNRs for every sample, this may take some time.')
all_interferometers = likelihood.interferometers
matched_filter_snrs = {ifo.name: [] for ifo in all_interferometers}
optimal_snrs = {ifo.name: [] for ifo in all_interferometers}
matched_filter_snrs = {ifo.name: [] for ifo in likelihood.interferometers}
optimal_snrs = {ifo.name: [] for ifo in likelihood.interferometers}
for ii in range(len(sample)):
signal_polarizations =\
likelihood.waveform_generator.frequency_domain_strain(
dict(sample.iloc[ii]))
for ifo in all_interferometers:
signal = ifo.get_detector_response(
signal_polarizations, sample.iloc[ii])
matched_filter_snrs[ifo.name].append(
ifo.matched_filter_snr(signal=signal))
optimal_snrs[ifo.name].append(
ifo.optimal_snr_squared(signal=signal) ** 0.5)
likelihood.parameters.update(sample.iloc[ii])
for ifo in likelihood.interferometers:
per_detector_snr = likelihood.calculate_snrs(signal_polarizations, ifo)
matched_filter_snrs[ifo.name].append(per_detector_snr.complex_matched_filter_snr.real)
optimal_snrs[ifo.name].append(per_detector_snr.optimal_snr_squared ** 0.5)
for ifo in likelihood.interferometers:
sample['{}_matched_filter_snr'.format(ifo.name)] =\
......@@ -978,8 +978,6 @@ def compute_snrs(sample, likelihood):
sample['{}_optimal_snr'.format(ifo.name)] =\
optimal_snrs[ifo.name]
likelihood.interferometers = all_interferometers
else:
logger.debug('Not computing SNRs.')
......
......@@ -21,6 +21,7 @@ from .prior import BBHPriorDict
from .source import lal_binary_black_hole
from .utils import noise_weighted_inner_product, build_roq_weights, blockwise_dot_product
from .waveform_generator import WaveformGenerator
from collections import namedtuple
class GravitationalWaveTransient(likelihood.Likelihood):
......@@ -124,6 +125,40 @@ class GravitationalWaveTransient(likelihood.Likelihood):
"waveform_generator.".format(attr))
setattr(self.waveform_generator, attr, ifo_attr)
def calculate_snrs(self, waveform_polarizations, interferometer):
"""
Compute the snrs
Parameters
----------
waveform_polarizations: dict
A dictionary of waveform polarizations and the corresponding array
interferometer: bilby.gw.detector.Interferometer
The bilby interferometer object
"""
signal = interferometer.get_detector_response(
waveform_polarizations, self.parameters)
d_inner_h = interferometer.inner_product(signal=signal)
optimal_snr_squared = interferometer.optimal_snr_squared(signal=signal)
complex_matched_filter_snr = d_inner_h / (optimal_snr_squared**0.5)
CalculatedSNRs = namedtuple(
'CalculatedSNRs', ['d_inner_h', 'optimal_snr_squared', 'complex_matched_filter_snr', 'd_inner_h_squared_tc_array'])
if self.time_marginalization:
d_inner_h_squared_tc_array =\
4 / self.waveform_generator.duration * np.fft.fft(
signal[0:-1] *
interferometer.frequency_domain_strain.conjugate()[0:-1] /
interferometer.power_spectral_density_array[0:-1])
else:
d_inner_h_squared_tc_array = None
return CalculatedSNRs(
d_inner_h=d_inner_h, optimal_snr_squared=optimal_snr_squared,
complex_matched_filter_snr=complex_matched_filter_snr,
d_inner_h_squared_tc_array=d_inner_h_squared_tc_array)
def _check_prior_is_set(self, key):
if key not in self.priors or not isinstance(
self.priors[key], Prior):
......@@ -167,23 +202,23 @@ class GravitationalWaveTransient(likelihood.Likelihood):
if waveform_polarizations is None:
return np.nan_to_num(-np.inf)
d_inner_h = 0
optimal_snr_squared = 0
d_inner_h = 0.
optimal_snr_squared = 0.
complex_matched_filter_snr = 0.
d_inner_h_squared_tc_array = np.zeros(
self.interferometers.frequency_array[0:-1].shape,
dtype=np.complex128)
for interferometer in self.interferometers:
signal_ifo = interferometer.get_detector_response(
waveform_polarizations, self.parameters)
per_detector_snr = self.calculate_snrs(
waveform_polarizations, interferometer)
d_inner_h += per_detector_snr.d_inner_h
optimal_snr_squared += per_detector_snr.optimal_snr_squared
complex_matched_filter_snr += per_detector_snr.complex_matched_filter_snr
d_inner_h += interferometer.inner_product(signal=signal_ifo)
optimal_snr_squared += interferometer.optimal_snr_squared(signal=signal_ifo)
if self.time_marginalization:
d_inner_h_squared_tc_array +=\
4 / self.waveform_generator.duration * np.fft.fft(
signal_ifo[0:-1] *
interferometer.frequency_domain_strain.conjugate()[0:-1] /
interferometer.power_spectral_density_array[0:-1])
d_inner_h_squared_tc_array += per_detector_snr.d_inner_h_squared_tc_array
if self.time_marginalization:
......@@ -448,63 +483,65 @@ class ROQGravitationalWaveTransient(GravitationalWaveTransient):
self.frequency_nodes_linear =\
waveform_generator.waveform_arguments['frequency_nodes_linear']
def log_likelihood_ratio(self):
optimal_snr_squared = 0.
d_inner_h = 0.
waveform = self.waveform_generator.frequency_domain_strain(
self.parameters)
if waveform is None:
return np.nan_to_num(-np.inf)
def calculate_snrs(self, signal, interferometer):
"""
Compute the snrs for ROQ
for ifo in self.interferometers:
Parameters
----------
signal: waveform
f_plus = ifo.antenna_response(
self.parameters['ra'], self.parameters['dec'],
self.parameters['geocent_time'], self.parameters['psi'], 'plus')
f_cross = ifo.antenna_response(
self.parameters['ra'], self.parameters['dec'],
self.parameters['geocent_time'], self.parameters['psi'], 'cross')
dt = ifo.time_delay_from_geocenter(
self.parameters['ra'], self.parameters['dec'],
ifo.strain_data.start_time)
ifo_time = self.parameters['geocent_time'] + dt - \
ifo.strain_data.start_time
h_plus_linear = f_plus * waveform['linear']['plus']
h_cross_linear = f_cross * waveform['linear']['cross']
h_plus_quadratic = f_plus * waveform['quadratic']['plus']
h_cross_quadratic = f_cross * waveform['quadratic']['cross']
indices, in_bounds = self._closest_time_indices(
ifo_time, self.time_samples)
if not in_bounds:
return np.nan_to_num(-np.inf)
d_inner_h_tc_array = np.einsum(
'i,ji->j', np.conjugate(h_plus_linear + h_cross_linear),
self.weights[ifo.name + '_linear'][indices])
d_inner_h += interp1d(
self.time_samples[indices],
d_inner_h_tc_array, kind='cubic')(ifo_time)
optimal_snr_squared += \
np.vdot(np.abs(h_plus_quadratic + h_cross_quadratic)**2,
self.weights[ifo.name + '_quadratic'])
interferometer: interferometer object
if self.distance_marginalization:
rho_mf_ref, rho_opt_ref = self._setup_rho(d_inner_h, optimal_snr_squared)
if self.phase_marginalization:
rho_mf_ref = abs(rho_mf_ref)
log_l = self._interp_dist_margd_loglikelihood(rho_mf_ref.real, rho_opt_ref)[0]
else:
if self.phase_marginalization:
d_inner_h = self._bessel_function_interped(abs(d_inner_h))
log_l = d_inner_h - optimal_snr_squared / 2
"""
return log_l.real
f_plus = interferometer.antenna_response(
self.parameters['ra'], self.parameters['dec'],
self.parameters['geocent_time'], self.parameters['psi'], 'plus')
f_cross = interferometer.antenna_response(
self.parameters['ra'], self.parameters['dec'],
self.parameters['geocent_time'], self.parameters['psi'], 'cross')
dt = interferometer.time_delay_from_geocenter(
self.parameters['ra'], self.parameters['dec'],
interferometer.strain_data.start_time)
ifo_time = self.parameters['geocent_time'] + dt - \
interferometer.strain_data.start_time
h_plus_linear = f_plus * signal['linear']['plus']
h_cross_linear = f_cross * signal['linear']['cross']
h_plus_quadratic = f_plus * signal['quadratic']['plus']
h_cross_quadratic = f_cross * signal['quadratic']['cross']
CalculatedSNRs = namedtuple(
'CalculatedSNRs', ['d_inner_h', 'optimal_snr_squared', 'complex_matched_filter_snr', 'd_inner_h_squared_tc_array'])
indices, in_bounds = self._closest_time_indices(ifo_time, self.time_samples)
if not in_bounds:
return CalculatedSNRs(
d_inner_h=np.nan_to_num(-np.inf), optimal_snr_squared=0,
complex_matched_filter_snr=np.nan_to_num(-np.inf),
d_inner_h_squared_tc_array=None)
d_inner_h_tc_array = np.einsum(
'i,ji->j', np.conjugate(h_plus_linear + h_cross_linear),
self.weights[interferometer.name + '_linear'][indices])
d_inner_h = interp1d(
self.time_samples[indices],
d_inner_h_tc_array, kind='cubic')(ifo_time)
optimal_snr_squared = \
np.vdot(np.abs(h_plus_quadratic + h_cross_quadratic)**2,
self.weights[interferometer.name + '_quadratic'])
complex_matched_filter_snr = d_inner_h / (optimal_snr_squared**0.5)
d_inner_h_squared_tc_array = None
return CalculatedSNRs(
d_inner_h=d_inner_h, optimal_snr_squared=optimal_snr_squared,
complex_matched_filter_snr=complex_matched_filter_snr,
d_inner_h_squared_tc_array=d_inner_h_squared_tc_array)
@staticmethod
def _closest_time_indices(time, samples):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment