Skip to content
Snippets Groups Projects
Commit dbc5427d authored by Gregory Ashton's avatar Gregory Ashton
Browse files

Merge branch 'frame_data' into 'master'

Read frame data

See merge request Monash/tupak!54
parents 04f2bf5f 05faef90
No related branches found
No related tags found
1 merge request!54Read frame data
Pipeline #
......@@ -323,8 +323,8 @@ class Interferometer(object):
return self.power_spectral_density.power_spectral_density_interpolated(self.frequency_array)
def set_data(self, sampling_frequency, duration, epoch=0,
from_power_spectral_density=False,
frequency_domain_strain=None):
from_power_spectral_density=False, frame_file=None,
frequency_domain_strain=None, channel_name=None, overwrite_psd=True, **kwargs):
"""
Set the interferometer frequency-domain stain and accompanying PSD values.
......@@ -341,6 +341,15 @@ class Interferometer(object):
from_power_spectral_density: bool
If frequency_domain_strain not given, use IFO's PSD object to
generate noise
frame_file: str
File from which to load data.
channel_name: str
Channel to read from frame.
overwrite_psd: bool
Whether to overwrite the psd in the interferometer with one calculated
from the loaded data, default=True.
kwargs: dict
Additional arguments for loading data.
"""
self.epoch = epoch
......@@ -356,6 +365,14 @@ class Interferometer(object):
frequency_domain_strain, frequencies = \
self.power_spectral_density.get_noise_realisation(
sampling_frequency, duration)
elif frame_file is not None:
logging.info('Reading data from frame, {}.'.format(self.name))
strain = tupak.utils.read_frame_file(
frame_file, t1=epoch, t2=epoch+duration, channel=channel_name, resample=sampling_frequency)
frequency_domain_strain, frequencies = tupak.utils.process_strain_data(strain, **kwargs)
if overwrite_psd:
self.power_spectral_density = PowerSpectralDensity(
frame_file=frame_file, channel_name=channel_name, epoch=epoch, **kwargs)
else:
raise ValueError("No method to set data provided.")
......@@ -407,16 +424,33 @@ class Interferometer(object):
class PowerSpectralDensity:
def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt'):
def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt', frame_file=None, epoch=0,
psd_duration=1024, psd_offset=16, channel_name=None, filter_freq=1024, alpha=0.25, fft_length=4):
"""
Instantiate a new PowerSpectralDensity object.
Only one of the asd_file or psd_file needs to be specified.
If multiple are given, the first will be used.
FIXME: Allow reading a frame and then FFT to get PSD, use gwpy?
:param asd_file: amplitude spectral density, format 'f h_f'
:param psd_file: power spectral density, format 'f h_f'
Parameters:
asd_file: str
File containing amplitude spectral density, format 'f h_f'
psd_file: str
File containing power spectral density, format 'f h_f'
frame_file: str
Frame file to read data from.
epoch: float
Beginning of segment to analyse.
psd_duration: float
Duration of data to generate PSD from.
psd_offset: float
Offset of data from beginning of analysed segment.
channel_name: str
Name of channel to use to generate PSD.
alpha: float
Parameter for Tukey window.
fft_length: float
Number of seconds in a single fft.
"""
self.frequencies = []
......@@ -435,6 +469,24 @@ class PowerSpectralDensity:
logging.warning("The minimum of the provided curve is {:.2e}.".format(
min(self.amplitude_spectral_density)))
logging.warning("You may have intended to provide this as a power spectral density.")
elif frame_file is not None:
strain = tupak.utils.read_frame_file(frame_file, t1=epoch - psd_duration - psd_offset,
t2=epoch - psd_duration, channel=channel_name)
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
# Create and save PSDs
NFFT = int(sampling_frequency * fft_length)
window = signal.windows.tukey(NFFT, alpha=alpha)
psd = strain.psd(fftlength=fft_length, window=window)
self.frequencies = psd.frequencies
self.power_spectral_density = psd.value
self.amplitude_spectral_density = self.power_spectral_density**0.5
self.interpolate_power_spectral_density()
else:
self.power_spectral_density_file = psd_file
self.import_power_spectral_density()
......
......@@ -4,6 +4,8 @@ import os
import numpy as np
from math import fmod
from gwpy.timeseries import TimeSeries
from gwpy.signal import filter_design
from scipy import signal
import argparse
# Constants
......@@ -514,6 +516,112 @@ def get_open_strain_data(
return strain
def read_frame_file(file_name, t1, t2, channel=None, **kwargs):
""" A function which accesses the open strain data
This uses `gwpy` to download the open data and then saves a cached copy for
later use
Parameters
----------
file_name: str
The name of the frame to read
t1, t2: float
The GPS time of the start and end of the data
channel: str
The name of the channel being searched for, some standard channel names are attempted
if channel is not specified or if specified channel is not found.
**kwargs:
Passed to `gwpy.timeseries.TimeSeries.fetch_open_data`
Returns
-----------
strain: gwpy.timeseries.TimeSeries
"""
loaded = False
if channel is not None:
try:
strain = TimeSeries.read(source=file_name, channel=channel, start=t1, end=t2, **kwargs)
loaded = True
logging.info('Successfully loaded {}.'.format(channel))
except RuntimeError:
logging.warning('Channel {} not found. Trying preset channel names'.format(channel))
for channel_type in ['GDS-CALIB_STRAIN', 'DCS-CALIB_STRAIN_C01', 'DCS-CALIB_STRAIN_C02']:
for ifo_name in ['H1', 'L1']:
channel = '{}:{}'.format(ifo_name, channel_type)
if loaded:
continue
try:
strain = TimeSeries.read(source=file_name, channel=channel, start=t1, end=t2, **kwargs)
loaded = True
logging.info('Successfully loaded {}.'.format(channel))
except RuntimeError:
None
if loaded:
return strain
else:
logging.warning('No data loaded.')
return None
def process_strain_data(
strain, alpha=0.25, filter_freq=1024, **kwargs):
"""
Helper function to obtain an Interferometer instance with appropriate
PSD and data, given an center_time.
Parameters
----------
name: str
Detector name, e.g., 'H1'.
center_time: float
GPS time of the center_time about which to perform the analysis.
Note: the analysis data is from `center_time-T/2` to `center_time+T/2`.
T: float
The total time (in seconds) to analyse. Defaults to 4s.
alpha: float
The tukey window shape parameter passed to `scipy.signal.tukey`.
psd_offset, psd_duration: float
The power spectral density (psd) is estimated using data from
`center_time+psd_offset` to `center_time+psd_offset + psd_duration`.
outdir: str
Directory where the psd files are saved
plot: bool
If true, create an ASD + strain plot
filter_freq: float
Low pass filter frequency
**kwargs:
All keyword arguments are passed to
`gwpy.timeseries.TimeSeries.fetch_open_data()`.
Returns
-------
interferometer: `tupak.detector.Interferometer`
An Interferometer instance with a PSD and frequency-domain strain data.
"""
sampling_frequency = int(strain.sample_rate.value)
# Low pass filter
bp = filter_design.lowpass(filter_freq, strain.sample_rate)
strain = strain.filter(bp, filtfilt=True)
strain = strain.crop(*strain.span.contract(1))
time_series = strain.times.value
time_duration = time_series[-1] - time_series[0]
# Apply Tukey window
N = len(time_series)
strain = strain * signal.windows.tukey(N, alpha=alpha)
frequency_domain_strain, frequencies = nfft(strain.value, sampling_frequency)
return frequency_domain_strain, frequencies
def set_up_command_line_arguments():
parser = argparse.ArgumentParser(
description="Command line interface for tupak scripts")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment