Skip to content
Snippets Groups Projects
Commit 03219f50 authored by Colm Talbot's avatar Colm Talbot
Browse files

Merge branch 'restructure_detector' into 'master'

Restructure the detector module

See merge request !470
parents 605b023c 97e5d49c
No related branches found
No related tags found
1 merge request!470Restructure the detector module
Pipeline #60007 passed
Showing
with 1678 additions and 0 deletions
from bilby.gw import conversion
from .interferometer import *
from .networks import *
from .psd import *
from .strain_data import *
try:
import lal
import lalsimulation as lalsim
except ImportError:
logger.warning("You do not have lalsuite installed currently. You will"
" not be able to use some of the prebuilt functions.")
def get_safe_signal_duration(mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, flow=10):
""" Calculate the safe signal duration, given the parameters
Parameters
----------
mass_1, mass_2, a_1, a_2, tilt_1, tilt_2: float
The signal parameters
flow: float
The lower frequency cutoff from which to calculate the signal duration
Returns
-------
safe_signal_duration: float
The shortest safe signal duration (i.e., the signal duration rounded up
to the nearest power of 2)
"""
chirp_time = lalsim.SimInspiralChirpTimeBound(
flow, mass_1 * lal.MSUN_SI, mass_2 * lal.MSUN_SI,
a_1 * np.cos(tilt_1), a_2 * np.cos(tilt_2))
return max(2**(int(np.log2(chirp_time)) + 1), 4)
def inject_signal_into_gwpy_timeseries(
data, waveform_generator, parameters, det, outdir=None, label=None):
""" Inject a signal into a gwpy timeseries
Parameters
----------
data: gwpy.timeseries.TimeSeries
The time-series data into which we want to inject the signal
waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
An initialised waveform_generator
parameters: dict
A dictionary of the signal-parameters to inject
ifo: bilby.gw.detector.Interferometer
The interferometer for which the data refers too
outdir, label: str
If given, the outdir and label used to generate a plot
Returns
-------
data_and_signal: gwpy.timeseries.TimeSeries
The data with the time-domain signal added
meta_data: dict
A dictionary of meta data about the injection
"""
ifo = get_empty_interferometer(det)
ifo.strain_data.set_from_gwpy_timeseries(data)
parameters_check, _ = conversion.convert_to_lal_binary_black_hole_parameters(parameters)
safe_time = get_safe_signal_duration(**parameters_check)
if data.duration.value < safe_time:
ValueError(
"Injecting a signal with safe-duration {} longer than the data {}"
.format(safe_time, data.duration.value))
waveform_polarizations = waveform_generator.time_domain_strain(parameters)
signal = np.zeros(len(data))
for mode in waveform_polarizations.keys():
det_response = ifo.antenna_response(
parameters['ra'], parameters['dec'], parameters['geocent_time'],
parameters['psi'], mode)
signal += waveform_polarizations[mode] * det_response
time_shift = ifo.time_delay_from_geocenter(
parameters['ra'], parameters['dec'], parameters['geocent_time'])
dt = parameters['geocent_time'] + time_shift - data.times[0].value
n_roll = dt * data.sample_rate.value
n_roll = int(np.round(n_roll))
signal_shifted = gwpy.timeseries.TimeSeries(
data=np.roll(signal, n_roll), times=data.times, unit=data.unit)
signal_and_data = data.inject(signal_shifted)
if outdir is not None and label is not None:
fig = gwpy.plot.Plot(signal_shifted)
fig.savefig('{}/{}_{}_time_domain_injected_signal'.format(
outdir, ifo.name, label))
meta_data = dict()
frequency_domain_signal, _ = utils.nfft(signal, waveform_generator.sampling_frequency)
meta_data['optimal_SNR'] = (
np.sqrt(ifo.optimal_snr_squared(signal=frequency_domain_signal)).real)
meta_data['matched_filter_SNR'] = (
ifo.matched_filter_snr(signal=frequency_domain_signal))
meta_data['parameters'] = parameters
logger.info("Injected signal in {}:".format(ifo.name))
logger.info(" optimal SNR = {:.2f}".format(meta_data['optimal_SNR']))
logger.info(" matched filter SNR = {:.2f}".format(meta_data['matched_filter_SNR']))
for key in parameters:
logger.info(' {} = {}'.format(key, parameters[key]))
return signal_and_data, meta_data
def get_interferometer_with_fake_noise_and_injection(
name, injection_parameters, injection_polarizations=None,
waveform_generator=None, sampling_frequency=4096, duration=4,
start_time=None, outdir='outdir', label=None, plot=True, save=True,
zero_noise=False):
"""
Helper function to obtain an Interferometer instance with appropriate
power spectral density and data, given an center_time.
Note: by default this generates an Interferometer with a power spectral
density based on advanced LIGO.
Parameters
----------
name: str
Detector name, e.g., 'H1'.
injection_parameters: dict
injection parameters, needed for sky position and timing
injection_polarizations: dict
Polarizations of waveform to inject, output of
`waveform_generator.frequency_domain_strain()`. If
`waveform_generator` is also given, the injection_polarizations will
be calculated directly and this argument can be ignored.
waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
A WaveformGenerator instance using the source model to inject. If
`injection_polarizations` is given, this will be ignored.
sampling_frequency: float
sampling frequency for data, should match injection signal
duration: float
length of data, should be the same as used for signal generation
start_time: float
Beginning of data segment, if None, injection is placed 2s before
end of segment.
outdir: str
directory in which to store output
label: str
If given, an identifying label used in generating file names.
plot: bool
If true, create an ASD + strain plot
save: bool
If true, save frequency domain data and PSD to file
zero_noise: bool
If true, set noise to zero.
Returns
-------
bilby.gw.detector.Interferometer: An Interferometer instance with a PSD and frequency-domain strain data.
"""
utils.check_directory_exists_and_if_not_mkdir(outdir)
if start_time is None:
start_time = injection_parameters['geocent_time'] + 2 - duration
interferometer = get_empty_interferometer(name)
interferometer.power_spectral_density = PowerSpectralDensity.from_aligo()
if zero_noise:
interferometer.set_strain_data_from_zero_noise(
sampling_frequency=sampling_frequency, duration=duration,
start_time=start_time)
else:
interferometer.set_strain_data_from_power_spectral_density(
sampling_frequency=sampling_frequency, duration=duration,
start_time=start_time)
injection_polarizations = interferometer.inject_signal(
parameters=injection_parameters,
injection_polarizations=injection_polarizations,
waveform_generator=waveform_generator)
signal = interferometer.get_detector_response(
injection_polarizations, injection_parameters)
if plot:
interferometer.plot_data(signal=signal, outdir=outdir, label=label)
if save:
interferometer.save_data(outdir, label=label)
return interferometer
def load_data_from_cache_file(
cache_file, start_time, segment_duration, psd_duration, psd_start_time,
channel_name=None, sampling_frequency=4096, roll_off=0.2,
overlap=0, outdir=None):
""" Helper routine to generate an interferometer from a cache file
Parameters
----------
cache_file: str
Path to the location of the cache file
start_time, psd_start_time: float
GPS start time of the segment and data stretch used for the PSD
segment_duration, psd_duration: float
Segment duration and duration of data to use to generate the PSD (in
seconds).
roll_off: float, optional
Rise time in seconds of tukey window.
overlap: float,
Number of seconds of overlap between FFTs.
channel_name: str
Channel name
sampling_frequency: int
Sampling frequency
outdir: str, optional
The output directory in which the data is saved
Returns
-------
ifo: bilby.gw.detector.Interferometer
An initialised interferometer object with strain data set to the
appropriate data in the cache file and a PSD.
"""
data_set = False
psd_set = False
with open(cache_file, 'r') as ff:
for line in ff:
cache = lal.utils.cache.CacheEntry(line)
data_in_cache = (
(cache.segment[0].gpsSeconds < start_time) &
(cache.segment[1].gpsSeconds > start_time + segment_duration))
psd_in_cache = (
(cache.segment[0].gpsSeconds < psd_start_time) &
(cache.segment[1].gpsSeconds > psd_start_time + psd_duration))
ifo = get_empty_interferometer(
"{}1".format(cache.observatory))
if not data_set & data_in_cache:
ifo.set_strain_data_from_frame_file(
frame_file=cache.path,
sampling_frequency=sampling_frequency,
duration=segment_duration,
start_time=start_time,
channel=channel_name, buffer_time=0)
data_set = True
if not psd_set & psd_in_cache:
ifo.power_spectral_density = \
PowerSpectralDensity.from_frame_file(
cache.path,
psd_start_time=psd_start_time,
psd_duration=psd_duration,
fft_length=segment_duration,
sampling_frequency=sampling_frequency,
roll_off=roll_off,
overlap=overlap,
channel=channel_name,
name=cache.observatory,
outdir=outdir,
analysis_segment_start_time=start_time)
psd_set = True
if data_set and psd_set:
return ifo
elif not data_set:
raise ValueError('Data not loaded for {}'.format(ifo.name))
elif not psd_set:
raise ValueError('PSD not created for {}'.format(ifo.name))
import os
import sys
import numpy as np
from bilby.core import utils
from bilby.core.utils import logger
from .interferometer import Interferometer
from .psd import PowerSpectralDensity
from .strain_data import InterferometerStrainData
class InterferometerList(list):
""" A list of Interferometer objects """
def __init__(self, interferometers):
""" Instantiate a InterferometerList
The InterferometerList is a list of Interferometer objects, each
object has the data used in evaluating the likelihood
Parameters
----------
interferometers: iterable
The list of interferometers
"""
list.__init__(self)
if type(interferometers) == str:
raise TypeError("Input must not be a string")
for ifo in interferometers:
if type(ifo) == str:
ifo = get_empty_interferometer(ifo)
if type(ifo) not in [Interferometer, TriangularInterferometer]:
raise TypeError("Input list of interferometers are not all Interferometer objects")
else:
self.append(ifo)
self._check_interferometers()
def _check_interferometers(self):
""" Check certain aspects of the set are the same """
consistent_attributes = ['duration', 'start_time', 'sampling_frequency']
for attribute in consistent_attributes:
x = [getattr(interferometer.strain_data, attribute)
for interferometer in self]
if not all(y == x[0] for y in x):
raise ValueError("The {} of all interferometers are not the same".format(attribute))
def set_strain_data_from_power_spectral_densities(self, sampling_frequency, duration, start_time=0):
""" Set the `Interferometer.strain_data` from the power spectral densities of the detectors
This uses the `interferometer.power_spectral_density` object to set
the `strain_data` to a noise realization. See
`bilby.gw.detector.InterferometerStrainData` for further information.
Parameters
----------
sampling_frequency: float
The sampling frequency (in Hz)
duration: float
The data duration (in s)
start_time: float
The GPS start-time of the data
"""
for interferometer in self:
interferometer.set_strain_data_from_power_spectral_density(sampling_frequency=sampling_frequency,
duration=duration,
start_time=start_time)
def set_strain_data_from_zero_noise(self, sampling_frequency, duration, start_time=0):
""" Set the `Interferometer.strain_data` from the power spectral densities of the detectors
This uses the `interferometer.power_spectral_density` object to set
the `strain_data` to zero noise. See
`bilby.gw.detector.InterferometerStrainData` for further information.
Parameters
----------
sampling_frequency: float
The sampling frequency (in Hz)
duration: float
The data duration (in s)
start_time: float
The GPS start-time of the data
"""
for interferometer in self:
interferometer.set_strain_data_from_zero_noise(sampling_frequency=sampling_frequency,
duration=duration,
start_time=start_time)
def inject_signal(self, parameters=None, injection_polarizations=None, waveform_generator=None):
""" Inject a signal into noise in each of the three detectors.
Parameters
----------
parameters: dict
Parameters of the injection.
injection_polarizations: dict
Polarizations of waveform to inject, output of
`waveform_generator.frequency_domain_strain()`. If
`waveform_generator` is also given, the injection_polarizations will
be calculated directly and this argument can be ignored.
waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
A WaveformGenerator instance using the source model to inject. If
`injection_polarizations` is given, this will be ignored.
Note
----------
if your signal takes a substantial amount of time to generate, or
you experience buggy behaviour. It is preferable to provide the
injection_polarizations directly.
Returns
-------
injection_polarizations: dict
"""
if injection_polarizations is None:
if waveform_generator is not None:
injection_polarizations = \
waveform_generator.frequency_domain_strain(parameters)
else:
raise ValueError(
"inject_signal needs one of waveform_generator or "
"injection_polarizations.")
all_injection_polarizations = list()
for interferometer in self:
all_injection_polarizations.append(
interferometer.inject_signal(parameters=parameters, injection_polarizations=injection_polarizations))
return all_injection_polarizations
def save_data(self, outdir, label=None):
""" Creates a save file for the data in plain text format
Parameters
----------
outdir: str
The output directory in which the data is supposed to be saved
label: str
The string labelling the data
"""
for interferometer in self:
interferometer.save_data(outdir=outdir, label=label)
def plot_data(self, signal=None, outdir='.', label=None):
if utils.command_line_args.test:
return
for interferometer in self:
interferometer.plot_data(signal=signal, outdir=outdir, label=label)
@property
def number_of_interferometers(self):
return len(self)
@property
def duration(self):
return self[0].strain_data.duration
@property
def start_time(self):
return self[0].strain_data.start_time
@property
def sampling_frequency(self):
return self[0].strain_data.sampling_frequency
@property
def frequency_array(self):
return self[0].strain_data.frequency_array
def append(self, interferometer):
if isinstance(interferometer, InterferometerList):
super(InterferometerList, self).extend(interferometer)
else:
super(InterferometerList, self).append(interferometer)
self._check_interferometers()
def extend(self, interferometers):
super(InterferometerList, self).extend(interferometers)
self._check_interferometers()
def insert(self, index, interferometer):
super(InterferometerList, self).insert(index, interferometer)
self._check_interferometers()
@property
def meta_data(self):
""" Dictionary of the per-interferometer meta_data """
return {interferometer.name: interferometer.meta_data
for interferometer in self}
@staticmethod
def _hdf5_filename_from_outdir_label(outdir, label):
return os.path.join(outdir, label + '.h5')
def to_hdf5(self, outdir='outdir', label='ifo_list'):
""" Saves the object to a hdf5 file
Parameters
----------
outdir: str, optional
Output directory name of the file
label: str, optional
Output file name, is 'ifo_list' if not given otherwise. A list of
the included interferometers will be appended.
"""
import deepdish
if sys.version_info[0] < 3:
raise NotImplementedError('Pickling of InterferometerList is not supported in Python 2.'
'Use Python 3 instead.')
label = label + '_' + ''.join(ifo.name for ifo in self)
utils.check_directory_exists_and_if_not_mkdir(outdir)
deepdish.io.save(self._hdf5_filename_from_outdir_label(outdir, label), self)
@classmethod
def from_hdf5(cls, filename=None):
""" Loads in an InterferometerList object from an hdf5 file
Parameters
----------
filename: str
If given, try to load from this filename
"""
import deepdish
if sys.version_info[0] < 3:
raise NotImplementedError('Pickling of InterferometerList is not supported in Python 2.'
'Use Python 3 instead.')
res = deepdish.io.load(filename)
if res.__class__ == list:
res = cls(res)
if res.__class__ != cls:
raise TypeError('The loaded object is not a InterferometerList')
return res
class TriangularInterferometer(InterferometerList):
def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency,
length, latitude, longitude, elevation, xarm_azimuth, yarm_azimuth,
xarm_tilt=0., yarm_tilt=0.):
InterferometerList.__init__(self, [])
self.name = name
# for attr in ['power_spectral_density', 'minimum_frequency', 'maximum_frequency']:
if isinstance(power_spectral_density, PowerSpectralDensity):
power_spectral_density = [power_spectral_density] * 3
if isinstance(minimum_frequency, float) or isinstance(minimum_frequency, int):
minimum_frequency = [minimum_frequency] * 3
if isinstance(maximum_frequency, float) or isinstance(maximum_frequency, int):
maximum_frequency = [maximum_frequency] * 3
for ii in range(3):
self.append(Interferometer(
'{}{}'.format(name, ii + 1), power_spectral_density[ii], minimum_frequency[ii], maximum_frequency[ii],
length, latitude, longitude, elevation, xarm_azimuth, yarm_azimuth, xarm_tilt, yarm_tilt))
xarm_azimuth += 240
yarm_azimuth += 240
latitude += np.arctan(length * np.sin(xarm_azimuth * np.pi / 180) * 1e3 / utils.radius_of_earth)
longitude += np.arctan(length * np.cos(xarm_azimuth * np.pi / 180) * 1e3 / utils.radius_of_earth)
def get_event_data(
event, interferometer_names=None, duration=4, roll_off=0.2,
psd_offset=-1024, psd_duration=100, cache=True, outdir='outdir',
label=None, plot=True, filter_freq=None, **kwargs):
"""
Get open data for a specified event.
Parameters
----------
event: str
Event descriptor, this can deal with some prefixes, e.g., '150914',
'GW150914', 'LVT151012'
interferometer_names: list, optional
List of interferometer identifiers, e.g., 'H1'.
If None will look for data in 'H1', 'V1', 'L1'
duration: float
Time duration to search for.
roll_off: float
The roll-off (in seconds) used in the Tukey window.
psd_offset, psd_duration: float
The power spectral density (psd) is estimated using data from
`center_time+psd_offset` to `center_time+psd_offset + psd_duration`.
cache: bool
Whether or not to store the acquired data.
outdir: str
Directory where the psd files are saved
label: str
If given, an identifying label used in generating file names.
plot: bool
If true, create an ASD + strain plot
filter_freq: float
Low pass filter frequency
**kwargs:
All keyword arguments are passed to
`gwpy.timeseries.TimeSeries.fetch_open_data()`.
Return
------
list: A list of bilby.gw.detector.Interferometer objects
"""
event_time = utils.get_event_time(event)
interferometers = []
if interferometer_names is None:
interferometer_names = ['H1', 'L1', 'V1']
for name in interferometer_names:
try:
interferometers.append(get_interferometer_with_open_data(
name, trigger_time=event_time, duration=duration, roll_off=roll_off,
psd_offset=psd_offset, psd_duration=psd_duration, cache=cache,
outdir=outdir, label=label, plot=plot, filter_freq=filter_freq,
**kwargs))
except ValueError as e:
logger.debug("Error raised {}".format(e))
logger.warning('No data found for {}.'.format(name))
return InterferometerList(interferometers)
def get_empty_interferometer(name):
"""
Get an interferometer with standard parameters for known detectors.
These objects do not have any noise instantiated.
The available instruments are:
H1, L1, V1, GEO600, CE
Detector positions taken from:
L1/H1: LIGO-T980044-10
V1/GEO600: arXiv:gr-qc/0008066 [45]
CE: located at the site of H1
Detector sensitivities:
H1/L1/V1: https://dcc.ligo.org/LIGO-P1200087-v42/public
GEO600: http://www.geo600.org/1032083/GEO600_Sensitivity_Curves
CE: https://dcc.ligo.org/LIGO-P1600143/public
Parameters
----------
name: str
Interferometer identifier.
Returns
-------
interferometer: Interferometer
Interferometer instance
"""
filename = os.path.join(os.path.dirname(__file__), 'detectors', '{}.interferometer'.format(name))
try:
return load_interferometer(filename)
except OSError:
raise ValueError('Interferometer {} not implemented'.format(name))
def load_interferometer(filename):
"""Load an interferometer from a file."""
parameters = dict()
with open(filename, 'r') as parameter_file:
lines = parameter_file.readlines()
for line in lines:
if line[0] == '#':
continue
split_line = line.split('=')
key = split_line[0].strip()
value = eval('='.join(split_line[1:]))
parameters[key] = value
if 'shape' not in parameters.keys():
ifo = Interferometer(**parameters)
logger.debug('Assuming L shape for {}'.format('name'))
elif parameters['shape'].lower() in ['l', 'ligo']:
parameters.pop('shape')
ifo = Interferometer(**parameters)
elif parameters['shape'].lower() in ['triangular', 'triangle']:
parameters.pop('shape')
ifo = TriangularInterferometer(**parameters)
else:
raise IOError("{} could not be loaded. Invalid parameter 'shape'.".format(filename))
return ifo
def get_interferometer_with_open_data(
name, trigger_time, duration=4, start_time=None, roll_off=0.2,
psd_offset=-1024, psd_duration=100, cache=True, outdir='outdir',
label=None, plot=True, filter_freq=None, **kwargs):
"""
Helper function to obtain an Interferometer instance with appropriate
PSD and data, given an center_time.
Parameters
----------
name: str
Detector name, e.g., 'H1'.
trigger_time: float
Trigger GPS time.
duration: float, optional
The total time (in seconds) to analyse. Defaults to 4s.
start_time: float, optional
Beginning of the segment, if None, the trigger is placed 2s before the end
of the segment.
roll_off: float
The roll-off (in seconds) used in the Tukey window.
psd_offset, psd_duration: float
The power spectral density (psd) is estimated using data from
`center_time+psd_offset` to `center_time+psd_offset + psd_duration`.
cache: bool, optional
Whether or not to store the acquired data
outdir: str
Directory where the psd files are saved
label: str
If given, an identifying label used in generating file names.
plot: bool
If true, create an ASD + strain plot
filter_freq: float
Low pass filter frequency
**kwargs:
All keyword arguments are passed to
`gwpy.timeseries.TimeSeries.fetch_open_data()`.
Returns
-------
bilby.gw.detector.Interferometer: An Interferometer instance with a PSD and frequency-domain strain data.
"""
logger.warning(
"Parameter estimation for real interferometer data in bilby is in "
"alpha testing at the moment: the routines for windowing and filtering"
" have not been reviewed.")
utils.check_directory_exists_and_if_not_mkdir(outdir)
if start_time is None:
start_time = trigger_time + 2 - duration
strain = InterferometerStrainData(roll_off=roll_off)
strain.set_from_open_data(
name=name, start_time=start_time, duration=duration,
outdir=outdir, cache=cache, **kwargs)
strain.low_pass_filter(filter_freq)
strain_psd = InterferometerStrainData(roll_off=roll_off)
strain_psd.set_from_open_data(
name=name, start_time=start_time + duration + psd_offset,
duration=psd_duration, outdir=outdir, cache=cache, **kwargs)
# Low pass filter
strain_psd.low_pass_filter(filter_freq)
# Create and save PSDs
psd_frequencies, psd_array = strain_psd.create_power_spectral_density(
name=name, outdir=outdir, fft_length=strain.duration)
interferometer = get_empty_interferometer(name)
interferometer.power_spectral_density = PowerSpectralDensity(
psd_array=psd_array, frequency_array=psd_frequencies)
interferometer.strain_data = strain
if plot:
interferometer.plot_data(outdir=outdir, label=label)
return interferometer
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment