-
Colm Talbot authoredColm Talbot authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
detector.py 33.67 KiB
from __future__ import division, print_function, absolute_import
import logging
import os
import matplotlib.pyplot as plt
import numpy as np
from gwpy.signal import filter_design
from scipy import signal
from scipy.interpolate import interp1d
import tupak
from . import utils
class Interferometer(object):
"""Class for the Interferometer """
def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency,
length, latitude, longitude, elevation, xarm_azimuth, yarm_azimuth,
xarm_tilt=0., yarm_tilt=0.):
"""
Instantiate an Interferometer object.
Parameters
----------
name: str
Interferometer name, e.g., H1.
power_spectral_density: PowerSpectralDensity
Power spectral density determining the sensitivity of the detector.
minimum_frequency: float
Minimum frequency to analyse for detector.
maximum_frequency: float
Maximum frequency to analyse for detector.
length: float
Length of the interferometer in km.
latitude: float
Latitude North in degrees (South is negative).
longitude: float
Longitude East in degrees (West is negative).
elevation: float
Height above surface in metres.
xarm_azimuth: float
Orientation of the x arm in degrees North of East.
yarm_azimuth: float
Orientation of the y arm in degrees North of East.
xarm_tilt: float
Tilt of the x arm in radians above the horizontal defined by ellipsoid earth model in LIGO-T980044-08.
yarm_tilt: float
Tilt of the y arm in radians above the horizontal.
"""
self.__x_updated = False
self.__y_updated = False
self.__vertex_updated = False
self.__detector_tensor_updated = False
self.name = name
self.minimum_frequency = minimum_frequency
self.maximum_frequency = maximum_frequency
self.length = length
self.latitude = latitude
self.longitude = longitude
self.elevation = elevation
self.xarm_azimuth = xarm_azimuth
self.yarm_azimuth = yarm_azimuth
self.xarm_tilt = xarm_tilt
self.yarm_tilt = yarm_tilt
self.power_spectral_density = power_spectral_density
self.data = np.array([])
self.frequency_array = np.array([])
self.sampling_frequency = None
self.duration = None
self.time_marginalization = False
self.epoch = 0
@property
def minimum_frequency(self):
return self.__minimum_frequency
@minimum_frequency.setter
def minimum_frequency(self, minimum_frequency):
self.__minimum_frequency = minimum_frequency
@property
def maximum_frequency(self):
return self.__maximum_frequency
@maximum_frequency.setter
def maximum_frequency(self, maximum_frequency):
self.__maximum_frequency = maximum_frequency
@property
def frequency_mask(self):
"""Masking array for limiting the frequency band."""
return (self.frequency_array > self.minimum_frequency) & (self.frequency_array < self.maximum_frequency)
@property
def latitude(self):
return self.__latitude * 180 / np.pi
@latitude.setter
def latitude(self, latitude):
self.__latitude = latitude * np.pi / 180
self.__x_updated = False
self.__y_updated = False
self.__vertex_updated = False
@property
def longitude(self):
return self.__longitude * 180 / np.pi
@longitude.setter
def longitude(self, longitude):
self.__longitude = longitude * np.pi / 180
self.__x_updated = False
self.__y_updated = False
self.__vertex_updated = False
@property
def elevation(self):
return self.__elevation
@elevation.setter
def elevation(self, elevation):
self.__elevation = elevation
self.__vertex_updated = False
@property
def xarm_azimuth(self):
return self.__xarm_azimuth * 180 / np.pi
@xarm_azimuth.setter
def xarm_azimuth(self, xarm_azimuth):
self.__xarm_azimuth = xarm_azimuth * np.pi / 180
self.__x_updated = False
@property
def yarm_azimuth(self):
return self.__yarm_azimuth * 180 / np.pi
@yarm_azimuth.setter
def yarm_azimuth(self, yarm_azimuth):
self.__yarm_azimuth = yarm_azimuth * np.pi / 180
self.__y_updated = False
@property
def xarm_tilt(self):
return self.__xarm_tilt
@xarm_tilt.setter
def xarm_tilt(self, xarm_tilt):
self.__xarm_tilt = xarm_tilt
self.__x_updated = False
@property
def yarm_tilt(self):
return self.__yarm_tilt
@yarm_tilt.setter
def yarm_tilt(self, yarm_tilt):
self.__yarm_tilt = yarm_tilt
self.__y_updated = False
@property
def vertex(self):
if self.__vertex_updated is False:
self.__vertex = utils.get_vertex_position_geocentric(self.__latitude, self.__longitude, self.elevation)
self.__vertex_updated = True
return self.__vertex
@property
def x(self):
if self.__x_updated is False:
self.__x = self.unit_vector_along_arm('x')
self.__x_updated = True
self.__detector_tensor_updated = False
return self.__x
@property
def y(self):
if self.__y_updated is False:
self.__y = self.unit_vector_along_arm('y')
self.__y_updated = True
self.__detector_tensor_updated = False
return self.__y
@property
def detector_tensor(self):
"""
Calculate the detector tensor from the unit vectors along each arm of the detector.
See Eq. B6 of arXiv:gr-qc/0008066
"""
if self.__detector_tensor_updated is False:
self.__detector_tensor = 0.5 * (np.einsum('i,j->ij', self.x, self.x) - np.einsum('i,j->ij', self.y, self.y))
self.__detector_tensor_updated = True
return self.__detector_tensor
def antenna_response(self, ra, dec, time, psi, mode):
"""
Calculate the antenna response function for a given sky location
See Nishizawa et al. (2009) arXiv:0903.0528 for definitions of the polarisation tensors.
[u, v, w] represent the Earth-frame
[m, n, omega] represent the wave-frame
Note: there is a typo in the definition of the wave-frame in Nishizawa et al.
:param ra: right ascension in radians
:param dec: declination in radians
:param time: geocentric GPS time
:param psi: binary polarisation angle counter-clockwise about the direction of propagation
:param mode: polarisation mode
:return: detector_response(theta, phi, psi, mode): antenna response for the specified mode.
"""
polarization_tensor = utils.get_polarization_tensor(ra, dec, time, psi, mode)
detector_response = np.einsum('ij,ij->', self.detector_tensor, polarization_tensor)
return detector_response
def get_detector_response(self, waveform_polarizations, parameters):
"""
Get the detector response for a particular waveform
:param waveform_polarizations: dict, polarizations of the waveform
:param parameters: dict, parameters describing position and time of arrival of the signal
:return: detector_response: signal observed in the interferometer
"""
signal = {}
for mode in waveform_polarizations.keys():
det_response = self.antenna_response(
parameters['ra'],
parameters['dec'],
parameters['geocent_time'],
parameters['psi'], mode)
signal[mode] = waveform_polarizations[mode] * det_response
signal_ifo = sum(signal.values())
signal_ifo *= self.frequency_mask
time_shift = self.time_delay_from_geocenter(
parameters['ra'],
parameters['dec'],
self.epoch) # parameters['geocent_time'])
if self.time_marginalization:
dt = time_shift # when marginalizing over time we only care about relative time shifts between detectors and marginalized over
# all candidate coalescence times
else:
dt = self.epoch - (parameters['geocent_time'] - time_shift)
signal_ifo = signal_ifo * np.exp(
-1j * 2 * np.pi * dt * self.frequency_array)
return signal_ifo
def inject_signal(self, waveform_polarizations, parameters):
"""
Inject a signal into noise.
Adds the requested signal to self.data
:param waveform_polarizations: dict, polarizations of the waveform
:param parameters: dict, parameters describing position and time of arrival of the signal
"""
if waveform_polarizations is None:
logging.warning('Trying to inject signal which is None.')
else:
signal_ifo = self.get_detector_response(waveform_polarizations, parameters)
if np.shape(self.data).__eq__(np.shape(signal_ifo)):
self.data += signal_ifo
else:
logging.info('Injecting into zero noise.')
self.data = signal_ifo
opt_snr = np.sqrt(tupak.utils.optimal_snr_squared(signal=signal_ifo, interferometer=self,
time_duration=1 / (self.frequency_array[1]
- self.frequency_array[0])).real)
mf_snr = np.sqrt(tupak.utils.matched_filter_snr_squared(signal=signal_ifo, interferometer=self,
time_duration=1 / (self.frequency_array[1]
- self.frequency_array[0])).real)
logging.info("Injection found with optimal SNR = {:.2f} and matched filter SNR = {:.2f} in {}".format(
opt_snr, mf_snr, self.name))
def unit_vector_along_arm(self, arm):
"""
Calculate the unit vector pointing along the specified arm in cartesian Earth-based coordinates.
See Eqs. B14-B17 in arXiv:gr-qc/0008066
Input:
arm - x or y arm of the detector
Output:
n - unit vector along arm in cartesian Earth-based coordinates
"""
if arm == 'x':
return self.__calculate_arm(self.xarm_tilt, self.xarm_azimuth)
elif arm == 'y':
return self.__calculate_arm(self.yarm_tilt, self.yarm_azimuth)
else:
logging.warning('Not a recognized arm, aborting!')
return
def __calculate_arm(self, arm_tilt, arm_azimuth):
e_long = np.array([-np.sin(self.__longitude), np.cos(self.__longitude), 0])
e_lat = np.array([-np.sin(self.__latitude) * np.cos(self.__longitude),
-np.sin(self.__latitude) * np.sin(self.__longitude), np.cos(self.__latitude)])
e_h = np.array([np.cos(self.__latitude) * np.cos(self.__longitude),
np.cos(self.__latitude) * np.sin(self.__longitude), np.sin(self.__latitude)])
return np.cos(arm_tilt) * np.cos(arm_azimuth) * e_long +\
np.cos(arm_tilt) * np.sin(arm_azimuth) * e_lat + \
np.sin(arm_tilt) * e_h
@property
def amplitude_spectral_density_array(self):
"""
Set the PSD for the interferometer for a user-specified frequency series, this matches the data provided.
"""
return self.power_spectral_density_array ** 0.5
@property
def power_spectral_density_array(self):
return self.power_spectral_density.power_spectral_density_interpolated(self.frequency_array)
def set_data(self, sampling_frequency, duration, epoch=0,
from_power_spectral_density=False, frame_file=None,
frequency_domain_strain=None, channel_name=None, **kwargs):
"""
Set the interferometer frequency-domain stain and accompanying PSD values.
Parameters
----------
sampling_frequency: float
The sampling frequency of the data
duration: float
Duration of data
epoch: float
The GPS time of the start of the data
frequency_domain_strain: array_like
The frequency-domain strain
from_power_spectral_density: bool
If frequency_domain_strain not given, use IFO's PSD object to
generate noise
frame_file: str
File from which to load data.
channel_name: str
Channel to read from frame.
kwargs: dict
Additional arguments for loading data.
"""
self.epoch = epoch
if frequency_domain_strain is not None:
logging.info(
'Setting {} data using provided frequency_domain_strain'.format(self.name))
frequencies = utils.create_frequency_series(sampling_frequency, duration)
elif from_power_spectral_density is True:
logging.info(
'Setting {} data using noise realization from provided'
'power_spectal_density'.format(self.name))
frequency_domain_strain, frequencies = \
self.power_spectral_density.get_noise_realisation(
sampling_frequency, duration)
elif frame_file is not None:
logging.info('Reading data from frame, {}.'.format(self.name))
strain = tupak.utils.read_frame_file(frame_file, t1=epoch, t2=epoch+duration, channel=channel_name,
resample=sampling_frequency)
frequency_domain_strain = tupak.utils.process_strain_data(strain, **kwargs)
frequencies = utils.create_frequency_series(sampling_frequency, duration)
self.power_spectral_density = PowerSpectralDensity(frame_file=frame_file, channel_name=channel_name,
**kwargs)
else:
raise ValueError("No method to set data provided.")
self.sampling_frequency = sampling_frequency
self.duration = duration
self.frequency_array = frequencies
self.data = frequency_domain_strain * self.frequency_mask
return
def time_delay_from_geocenter(self, ra, dec, time):
"""
Calculate the time delay from the geocenter for the interferometer.
Use the time delay function from utils.
Input:
ra - right ascension of source in radians
dec - declination of source in radians
time - GPS time
Output:
delta_t - time delay from geocenter
"""
delta_t = utils.time_delay_geocentric(self.vertex, np.array([0, 0, 0]), ra, dec, time)
return delta_t
def vertex_position_geocentric(self):
"""
Calculate the position of the IFO vertex in geocentric coordinates in meters.
Based on arXiv:gr-qc/0008066 Eqs. B11-B13 except for the typo in the definition of the local radius.
See Section 2.1 of LIGO-T980044-10 for the correct expression
"""
vertex_position = utils.get_vertex_position_geocentric(self.__latitude, self.__longitude, self.__elevation)
return vertex_position
@property
def whitened_data(self):
return self.data / self.amplitude_spectral_density_array
def save_data(self, outdir):
np.savetxt('{}/{}_frequency_domain_data.dat'.format(outdir, self.name),
[self.frequency_array, self.data.real, self.data.imag],
header='f real_h(f) imag_h(f)')
np.savetxt('{}/{}_psd.dat'.format(outdir, self.name),
[self.frequency_array, self.amplitude_spectral_density_array],
header='f h(f)')
class PowerSpectralDensity:
def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt', frame_file=None, epoch=0,
psd_duration=1024, psd_offset=16, channel_name=None, filter_freq=1024, alpha=0.25, fft_length=4):
"""
Instantiate a new PowerSpectralDensity object.
Only one of the asd_file or psd_file needs to be specified.
If multiple are given, the first will be used.
Parameters:
asd_file: str
File containing amplitude spectral density, format 'f h_f'
psd_file: str
File containing power spectral density, format 'f h_f'
frame_file: str
Frame file to read data from.
epoch: float
Beginning of segment to analyse.
psd_duration: float
Duration of data to generate PSD from.
psd_offset: float
Offset of data from beginning of analysed segment.
channel_name: str
Name of channel to use to generate PSD.
alpha: float
Parameter for Tukey window.
fft_length: float
Number of seconds in a single fft.
"""
self.frequencies = []
self.power_spectral_density = []
self.amplitude_spectral_density = []
self.frequency_noise_realization = []
self.interpolated_frequency = []
self.power_spectral_density_interpolated = None
if asd_file is not None:
self.amplitude_spectral_density_file = asd_file
self.import_amplitude_spectral_density()
if min(self.amplitude_spectral_density) < 1e-30:
logging.warning("You specified an amplitude spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.amplitude_spectral_density)))
logging.warning("You may have intended to provide this as a power spectral density.")
elif frame_file is not None:
strain = tupak.utils.read_frame_file(frame_file, t1=epoch - psd_duration - psd_offset,
t2=epoch - psd_duration, channel=channel_name)
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
# Create and save PSDs
NFFT = int(sampling_frequency * fft_length)
window = signal.windows.tukey(NFFT, alpha=alpha)
psd = strain.psd(fftlength=fft_length, window=window)
self.frequencies = psd.frequencies
self.power_spectral_density = psd.value
self.amplitude_spectral_density = self.power_spectral_density**0.5
self.interpolate_power_spectral_density()
else:
self.power_spectral_density_file = psd_file
self.import_power_spectral_density()
if min(self.power_spectral_density) > 1e-30:
logging.warning("You specified a power spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.power_spectral_density)))
logging.warning("You may have intended to provide this as an amplitude spectral density.")
def import_amplitude_spectral_density(self):
"""
Automagically load one of the amplitude spectral density curves contained in the noise_curves directory.
Test if the file contains a path (i.e., contains '/').
If not assume the file is in the default directory.
"""
if '/' not in self.amplitude_spectral_density_file:
self.amplitude_spectral_density_file = os.path.join(os.path.dirname(__file__), 'noise_curves',
self.amplitude_spectral_density_file)
spectral_density = np.genfromtxt(self.amplitude_spectral_density_file)
self.frequencies = spectral_density[:, 0]
self.amplitude_spectral_density = spectral_density[:, 1]
self.power_spectral_density = self.amplitude_spectral_density ** 2
self.interpolate_power_spectral_density()
def import_power_spectral_density(self):
"""
Automagically load one of the power spectral density curves contained in the noise_curves directory.
Test if the file contains a path (i.e., contains '/').
If not assume the file is in the default directory.
"""
if '/' not in self.power_spectral_density_file:
self.power_spectral_density_file = os.path.join(os.path.dirname(__file__), 'noise_curves',
self.power_spectral_density_file)
spectral_density = np.genfromtxt(self.power_spectral_density_file)
self.frequencies = spectral_density[:, 0]
self.power_spectral_density = spectral_density[:, 1]
self.amplitude_spectral_density = np.sqrt(self.power_spectral_density)
self.interpolate_power_spectral_density()
def interpolate_power_spectral_density(self):
"""Interpolate the loaded PSD so it can be resampled for arbitrary frequency arrays."""
self.power_spectral_density_interpolated = interp1d(self.frequencies, self.power_spectral_density,
bounds_error=False,
fill_value=max(self.power_spectral_density))
def get_noise_realisation(self, sampling_frequency, duration):
"""
Generate frequency Gaussian noise scaled to the power spectral density.
:param sampling_frequency: sampling frequency of noise
:param duration: duration of noise
:return: frequency_domain_strain (array), frequency (array)
"""
white_noise, frequency = utils.create_white_noise(sampling_frequency, duration)
interpolated_power_spectral_density = self.power_spectral_density_interpolated(frequency)
frequency_domain_strain = interpolated_power_spectral_density ** 0.5 * white_noise
return frequency_domain_strain, frequency
def get_empty_interferometer(name):
"""
Get an interferometer with standard parameters for known detectors.
These objects do not have any noise instantiated.
The available instruments are:
H1, L1, V1, GEO600
Detector positions taken from LIGO-T980044-10 for L1/H1 and from
arXiv:gr-qc/0008066 [45] for V1/GEO600
Parameters
----------
name: str
Interferometer identifier.
Returns
-------
interferometer: Interferometer
Interferometer instance
"""
known_interferometers = {
'H1': Interferometer(name='H1', power_spectral_density=PowerSpectralDensity(),
minimum_frequency=20, maximum_frequency=2048,
length=4, latitude=46 + 27. / 60 + 18.528 / 3600,
longitude=-(119 + 24. / 60 + 27.5657 / 3600), elevation=142.554, xarm_azimuth=125.9994,
yarm_azimuth=215.994, xarm_tilt=-6.195e-4, yarm_tilt=1.25e-5),
'L1': Interferometer(name='L1', power_spectral_density=PowerSpectralDensity(),
minimum_frequency=20, maximum_frequency=2048,
length=4, latitude=30 + 33. / 60 + 46.4196 / 3600,
longitude=-(90 + 46. / 60 + 27.2654 / 3600), elevation=-6.574, xarm_azimuth=197.7165,
yarm_azimuth=287.7165,
xarm_tilt=-3.121e-4, yarm_tilt=-6.107e-4),
'V1': Interferometer(name='V1', power_spectral_density=PowerSpectralDensity(psd_file='AdV_psd.txt'), length=3,
minimum_frequency=20, maximum_frequency=2048,
latitude=43 + 37. / 60 + 53.0921 / 3600, longitude=10 + 30. / 60 + 16.1878 / 3600,
elevation=51.884, xarm_azimuth=70.5674, yarm_azimuth=160.5674),
'GEO600': Interferometer(name='GEO600',
power_spectral_density=PowerSpectralDensity(asd_file='GEO600_S6e_asd.txt'),
minimum_frequency=40, maximum_frequency=2048, length=0.6,
latitude=52 + 14. / 60 + 42.528 / 3600, longitude=9 + 48. / 60 + 25.894 / 3600,
elevation=114.425, xarm_azimuth=115.9431, yarm_azimuth=21.6117),
'CE': Interferometer(name='CE', power_spectral_density=PowerSpectralDensity(psd_file='CE_psd.txt'),
minimum_frequency=10, maximum_frequency=2048,
length=40, latitude=46 + 27. / 60 + 18.528 / 3600,
longitude=-(119 + 24. / 60 + 27.5657 / 3600), elevation=142.554, xarm_azimuth=125.9994,
yarm_azimuth=215.994, xarm_tilt=-6.195e-4, yarm_tilt=1.25e-5),
}
if name in known_interferometers.keys():
interferometer = known_interferometers[name]
return interferometer
else:
logging.warning('Interferometer {} not implemented'.format(name))
def get_interferometer_with_open_data(
name, center_time, T=4, alpha=0.25, psd_offset=-1024, psd_duration=100,
cache=True, outdir='outdir', plot=True, filter_freq=1024,
raw_data_file=None, **kwargs):
"""
Helper function to obtain an Interferometer instance with appropriate
PSD and data, given an center_time.
Parameters
----------
name: str
Detector name, e.g., 'H1'.
center_time: float
GPS time of the center_time about which to perform the analysis.
Note: the analysis data is from `center_time-T/2` to `center_time+T/2`.
T: float
The total time (in seconds) to analyse. Defaults to 4s.
alpha: float
The tukey window shape parameter passed to `scipy.signal.tukey`.
psd_offset, psd_duration: float
The power spectral density (psd) is estimated using data from
`center_time+psd_offset` to `center_time+psd_offset + psd_duration`.
outdir: str
Directory where the psd files are saved
plot: bool
If true, create an ASD + strain plot
filter_freq: float
Low pass filter frequency
**kwargs:
All keyword arguments are passed to
`gwpy.timeseries.TimeSeries.fetch_open_data()`.
Returns
-------
interferometer: `tupak.detector.Interferometer`
An Interferometer instance with a PSD and frequency-domain strain data.
"""
logging.warning(
"Parameter estimation for real interferometer data in tupak is in "
"alpha testing at the moment: the routines for windowing and filtering"
" have not been reviewed.")
utils.check_directory_exists_and_if_not_mkdir(outdir)
strain = utils.get_open_strain_data(
name, center_time - T / 2, center_time + T / 2, outdir=outdir, cache=cache,
raw_data_file=raw_data_file, **kwargs)
strain_psd = utils.get_open_strain_data(
name, center_time + psd_offset, center_time + psd_offset + psd_duration,
raw_data_file=raw_data_file,
outdir=outdir, cache=cache, **kwargs)
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
strain_psd = strain_psd.filter(bp, filtfilt=True)
strain_psd = strain_psd.crop(*strain_psd.span.contract(1))
# Create and save PSDs
NFFT = int(sampling_frequency * T)
window = signal.windows.tukey(NFFT, alpha=alpha)
psd = strain_psd.psd(fftlength=T, window=window)
psd_file = '{}/{}_PSD_{}_{}.txt'.format(
outdir, name, center_time + psd_offset, psd_duration)
with open('{}'.format(psd_file), 'w+') as file:
for f, p in zip(psd.frequencies.value, psd.value):
file.write('{} {}\n'.format(f, p))
time_series = strain.times.value
time_duration = time_series[-1] - time_series[0]
# Apply Tukey window
N = len(time_series)
strain = strain * signal.windows.tukey(N, alpha=alpha)
interferometer = get_empty_interferometer(name)
interferometer.power_spectral_density = PowerSpectralDensity(
psd_file=psd_file)
interferometer.set_data(
sampling_frequency, time_duration,
frequency_domain_strain=utils.nfft(
strain.value, sampling_frequency)[0],
epoch=strain.epoch.value)
if plot:
fig, ax = plt.subplots()
ax.loglog(interferometer.frequency_array, np.abs(interferometer.data),
'-C0', label=name)
ax.loglog(interferometer.frequency_array,
interferometer.amplitude_spectral_density_array,
'-C1', lw=0.5, label=name + ' ASD')
ax.grid('on')
ax.set_ylabel(r'strain [strain/$\sqrt{\rm Hz}$]')
ax.set_xlabel(r'frequency [Hz]')
ax.set_xlim(20, 2000)
ax.legend(loc='best')
fig.savefig('{}/{}_frequency_domain_data.png'.format(outdir, name))
return interferometer
def get_interferometer_with_fake_noise_and_injection(
name, injection_polarizations, injection_parameters,
sampling_frequency=4096, time_duration=4, outdir='outdir', plot=True,
save=True):
"""
Helper function to obtain an Interferometer instance with appropriate
power spectral density and data, given an center_time.
Parameters
----------
name: str
Detector name, e.g., 'H1'.
injection_polarizations: dict
polarizations of waveform to inject, output of
`waveform_generator.get_frequency_domain_signal`
injection_parameters: dict
injection parameters, needed for sky position and timing
sampling_frequency: float
sampling frequency for data, should match injection signal
time_duration: float
length of data, should be the same as used for signal generation
outdir: str
directory in which to store output
plot: bool
If true, create an ASD + strain plot
save: bool
If true, save frequency domain data and PSD to file
Returns
-------
interferometer: `tupak.detector.Interferometer`
An Interferometer instance with a PSD and frequency-domain strain data.
"""
utils.check_directory_exists_and_if_not_mkdir(outdir)
interferometer = get_empty_interferometer(name)
interferometer.set_data(
sampling_frequency=sampling_frequency, duration=time_duration,
from_power_spectral_density=True, epoch=(injection_parameters['geocent_time']+2)-time_duration)
interferometer.inject_signal(
waveform_polarizations=injection_polarizations,
parameters=injection_parameters)
interferometer_signal = interferometer.get_detector_response(
injection_polarizations, injection_parameters)
if plot:
fig, ax = plt.subplots()
ax.loglog(interferometer.frequency_array, np.abs(interferometer.data),
'-C0', label=name)
ax.loglog(interferometer.frequency_array,
interferometer.amplitude_spectral_density_array,
'-C1', lw=0.5, label=name + ' ASD')
ax.loglog(interferometer.frequency_array, abs(interferometer_signal),
'-C2', label='Signal')
ax.grid('on')
ax.set_ylabel(r'strain [strain/$\sqrt{\rm Hz}$]')
ax.set_xlabel(r'frequency [Hz]')
ax.set_xlim(20, 2000)
ax.legend(loc='best')
fig.savefig('{}/{}_frequency_domain_data.png'.format(outdir, name))
if save:
interferometer.save_data(outdir)
return interferometer
def get_event_data(
event, interferometer_names=None, time_duration=4, alpha=0.25,
psd_offset=-1024, psd_duration=100, cache=True, outdir='outdir',
plot=True, filter_freq=1024, raw_data_file=None, **kwargs):
"""
Get open data for a specified event.
Parameters
----------
event: str
Event descriptor, this can deal with some prefixes, e.g., '150914',
'GW150914', 'LVT151012'
interferometer_names: list, optional
List of interferometer identifiers, e.g., 'H1'.
If None will look for data in 'H1', 'V1', 'L1'
time_duration: float
Time duration to search for.
alpha: float
The tukey window shape parameter passed to `scipy.signal.tukey`.
psd_offset, psd_duration: float
The power spectral density (psd) is estimated using data from
`center_time+psd_offset` to `center_time+psd_offset + psd_duration`.
cache: bool
Whether or not to store the acquired data.
outdir: str
Directory where the psd files are saved
plot: bool
If true, create an ASD + strain plot
filter_freq: float
Low pass filter frequency
**kwargs:
All keyword arguments are passed to
`gwpy.timeseries.TimeSeries.fetch_open_data()`.
Return
------
interferometers: list
A list of tupak.detector.Interferometer objects
"""
event_time = tupak.utils.get_event_time(event)
interferometers = []
if interferometer_names is None:
if utils.command_line_args.detectors:
interferometer_names = utils.command_line_args.detectors
else:
interferometer_names = ['H1', 'L1', 'V1']
for name in interferometer_names:
try:
interferometers.append(get_interferometer_with_open_data(
name, event_time, T=time_duration, alpha=alpha,
psd_offset=psd_offset, psd_duration=psd_duration, cache=cache,
outdir=outdir, plot=plot, filter_freq=filter_freq,
raw_data_file=raw_data_file, **kwargs))
except ValueError:
logging.info('No data found for {}.'.format(name))
return interferometers