Skip to content
Snippets Groups Projects
Commit b1622695 authored by Gregory Ashton's avatar Gregory Ashton
Browse files

Reorganise PowerSpectralDensity

Makes the options to set the PSD methods. Retains the default behaviour
(will use aLIGO if nothing is set). But now, users must call a
`set_from_` method after initialising the PowerSpectalDensity()`.
parent dde16382
No related branches found
No related tags found
1 merge request!94Add functionality to strain data
......@@ -1212,46 +1212,12 @@ class Interferometer(object):
class PowerSpectralDensity(object):
def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt', frame_file=None, asd_array=None,
psd_array=None, frequencies=None, start_time=0,
psd_duration=1024, psd_offset=16, channel_name=None, filter_freq=1024, alpha=0.25, fft_length=4):
def __init__(self, psd_file=None):
"""
Instantiate a new PowerSpectralDensity object.
Only one of the asd_file or psd_file needs to be specified.
If multiple are given, the first will be used.
Parameters
-------
asd_file: str, optional
File containing amplitude spectral density, format 'f h_f'
psd_file: str, optional
File containing power spectral density, format 'f h_f'
frame_file: str, optional
Frame file to read data from.
asd_array: array_like, optional
List of amplitude spectral density values corresponding to frequency_array,
requires frequency_array to be specified.
psd_array: array_like, optional
List of power spectral density values corresponding to frequency_array,
requires frequency_array to be specified.
frequencies: array_like, optional
List of frequency values corresponding to asd_array/psd_array,
start_time: float, optional
Beginning of segment to analyse.
psd_duration: float, optional
Duration of data to generate PSD from.
psd_offset: float, optional
Offset of data from beginning of analysed segment.
channel_name: str, optional
Name of channel to use to generate PSD.
roll_off: float
The roll-off (in seconds) used in the Tukey window.
fft_length: float, optional
Number of seconds in a single fft.
Attributes
-------
----------
amplitude_spectral_density: array_like
Array representation of the ASD
amplitude_spectral_density_file: str
......@@ -1264,6 +1230,7 @@ class PowerSpectralDensity(object):
Name of the PSD file
power_spectral_density_interpolated: scipy.interpolated.interp1d
Interpolated function of the PSD
"""
self.__power_spectral_density = None
self.__amplitude_spectral_density = None
......@@ -1271,52 +1238,113 @@ class PowerSpectralDensity(object):
self.frequencies = []
self.power_spectral_density_interpolated = None
if asd_file is not None:
self.amplitude_spectral_density_file = asd_file
self.import_amplitude_spectral_density()
if min(self.amplitude_spectral_density) < 1e-30:
logging.warning("You specified an amplitude spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.amplitude_spectral_density)))
logging.warning("You may have intended to provide this as a power spectral density.")
elif frame_file is not None:
strain = tupak.gw.utils.read_frame_file(frame_file, t1=start_time - psd_duration - psd_offset,
t2=start_time - psd_offset, channel=channel_name)
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = gwpy.signal.filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
# Create and save PSDs
nfft = int(sampling_frequency * fft_length)
window = signal.windows.tukey(nfft, alpha=alpha)
psd = strain.psd(fftlength=fft_length, window=window)
self.frequencies = psd.frequencies
self.power_spectral_density = psd.value
elif frequencies is not None:
self.frequencies = frequencies
if asd_array is not None:
self.amplitude_spectral_density = asd_array
elif psd_array is not None:
self.power_spectral_density = psd_array
else:
if psd_file is None:
logging.info("No power spectral density provided, using aLIGO, zero detuning, high power.")
self.power_spectral_density_file = psd_file
self.import_power_spectral_density()
if min(self.power_spectral_density) > 1e-30:
logging.warning("You specified a power spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.power_spectral_density)))
logging.warning("You may have intended to provide this as an amplitude spectral density.")
if psd_file:
self.set_from_power_spectral_density_file(psd_file)
def set_from_amplitude_spectral_density_file(self, asd_file):
""" Set the amplitude spectral density from a given file
Parameters
----------
asd_file: str
File containing amplitude spectral density, format 'f h_f'
"""
self.amplitude_spectral_density_file = asd_file
self.import_amplitude_spectral_density()
if min(self.amplitude_spectral_density) < 1e-30:
logging.warning("You specified an amplitude spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.amplitude_spectral_density)))
logging.warning(
"You may have intended to provide this as a power spectral density.")
def set_from_power_spectral_density_file(self, psd_file):
""" Set the power spectral density from a given file
Parameters
----------
psd_file: str, optional
File containing power spectral density, format 'f h_f'
"""
self.power_spectral_density_file = psd_file
self.import_power_spectral_density()
if min(self.power_spectral_density) > 1e-30:
logging.warning("You specified a power spectral density file.")
logging.warning("{} WARNING {}".format("*" * 30, "*" * 30))
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.power_spectral_density)))
logging.warning(
"You may have intended to provide this as an amplitude spectral density.")
def set_from_frame_file(self, frame_file, psd_start_time, psd_duration,
fft_length=4, filter_freq=1024, alpha=0.25,
channel=None):
""" Generate power spectral density from a frame file
Parameters
----------
frame_file: str, optional
Frame file to read data from.
psd_start_time: float
Beginning of segment to analyse.
psd_duration: float, optional
Duration of data (in seconds) to generate PSD from.
fft_length: float, optional
Number of seconds in a single fft.
filter_freq: float
Low pass filter frequency
alpha: float, optional
Parameter for Tukey window.
channel_name: str, optional
Name of channel to use to generate PSD.
"""
strain = tupak.gw.detector.InterferometerStrainData()
strain.set_from_frame_file(
frame_file, t1=psd_start_time, t2=psd_start_time+psd_duration,
channel=channel)
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = gwpy.signal.filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
# Create and save PSDs
nfft = int(sampling_frequency * fft_length)
window = signal.windows.tukey(nfft, alpha=alpha)
psd = strain.psd(fftlength=fft_length, window=window)
self.frequencies = psd.frequencies
self.power_spectral_density = psd.value
def set_from_amplitude_spectral_density_array(self, frequencies,
asd_array):
self.frequencies = frequencies
self.amplitude_spectral_density = asd_array
def set_from_power_spectral_density_array(self, frequencies, psd_array):
self.frequencies = frequencies
self.power_spectral_density = psd_array
def set_to_aLIGO(self):
psd_file = 'aLIGO_ZERO_DET_high_P_psd.txt'
logging.info("No power spectral density provided, using aLIGO,"
"zero detuning, high power.")
self.set_from_power_spectral_density_file(psd_file)
@property
def power_spectral_density(self):
return self.__power_spectral_density
if self.__power_spectral_density is not None:
return self.__power_spectral_density
else:
self.set_to_aLIGO()
return self.__power_spectral_density
@power_spectral_density.setter
def power_spectral_density(self, power_spectral_density):
......@@ -1338,40 +1366,50 @@ class PowerSpectralDensity(object):
def import_amplitude_spectral_density(self):
"""
Automagically load one of the amplitude spectral density curves contained in the noise_curves directory.
Automagically load one of the amplitude spectral density curves
contained in the noise_curves directory.
Test if the file contains a path (i.e., contains '/').
If not assume the file is in the default directory.
"""
if '/' not in self.amplitude_spectral_density_file:
self.amplitude_spectral_density_file = os.path.join(os.path.dirname(__file__), 'noise_curves',
self.amplitude_spectral_density_file)
self.frequencies, self.amplitude_spectral_density = np.genfromtxt(self.amplitude_spectral_density_file).T
self.amplitude_spectral_density_file = os.path.join(
os.path.dirname(__file__), 'noise_curves',
self.amplitude_spectral_density_file)
self.frequencies, self.amplitude_spectral_density = np.genfromtxt(
self.amplitude_spectral_density_file).T
def import_power_spectral_density(self):
"""
Automagically load one of the power spectral density curves contained in the noise_curves directory.
Automagically load one of the power spectral density curves contained
in the noise_curves directory.
Test if the file contains a path (i.e., contains '/').
If not assume the file is in the default directory.
"""
if '/' not in self.power_spectral_density_file:
self.power_spectral_density_file = os.path.join(os.path.dirname(__file__), 'noise_curves',
self.power_spectral_density_file)
self.frequencies, self.power_spectral_density = np.genfromtxt(self.power_spectral_density_file).T
self.power_spectral_density_file = os.path.join(
os.path.dirname(__file__), 'noise_curves',
self.power_spectral_density_file)
self.frequencies, self.power_spectral_density = np.genfromtxt(
self.power_spectral_density_file).T
def _check_frequency_array_matches_density_array(self, density_array):
"""Check if the provided frequency and spectral density arrays match."""
"""Check the provided frequency and spectral density arrays match."""
try:
self.frequencies - density_array
except ValueError as e:
raise(e, 'Provided spectral density does not match frequency array. Not updating.')
def _interpolate_power_spectral_density(self):
"""Interpolate the loaded PSD so it can be resampled for arbitrary frequency arrays."""
self.power_spectral_density_interpolated = interp1d(self.frequencies, self.power_spectral_density,
bounds_error=False,
fill_value=np.inf)
"""Interpolate the loaded power spectral density so it can be resampled
for arbitrary frequency arrays.
"""
self.power_spectral_density_interpolated = interp1d(
self.frequencies, self.power_spectral_density, bounds_error=False,
fill_value=np.inf)
def get_noise_realisation(self, sampling_frequency, duration):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment