diff --git a/tupak/detector.py b/tupak/detector.py index 64b929184e35cc0b1c471c6d26e9ad454f06c90b..66e9dc1c1136a99b1f8c72359815cefa1e55b871 100644 --- a/tupak/detector.py +++ b/tupak/detector.py @@ -323,8 +323,8 @@ class Interferometer(object): return self.power_spectral_density.power_spectral_density_interpolated(self.frequency_array) def set_data(self, sampling_frequency, duration, epoch=0, - from_power_spectral_density=False, - frequency_domain_strain=None): + from_power_spectral_density=False, frame_file=None, + frequency_domain_strain=None, channel_name=None, overwrite_psd=True, **kwargs): """ Set the interferometer frequency-domain stain and accompanying PSD values. @@ -341,6 +341,15 @@ class Interferometer(object): from_power_spectral_density: bool If frequency_domain_strain not given, use IFO's PSD object to generate noise + frame_file: str + File from which to load data. + channel_name: str + Channel to read from frame. + overwrite_psd: bool + Whether to overwrite the psd in the interferometer with one calculated + from the loaded data, default=True. + kwargs: dict + Additional arguments for loading data. """ self.epoch = epoch @@ -356,6 +365,14 @@ class Interferometer(object): frequency_domain_strain, frequencies = \ self.power_spectral_density.get_noise_realisation( sampling_frequency, duration) + elif frame_file is not None: + logging.info('Reading data from frame, {}.'.format(self.name)) + strain = tupak.utils.read_frame_file( + frame_file, t1=epoch, t2=epoch+duration, channel=channel_name, resample=sampling_frequency) + frequency_domain_strain, frequencies = tupak.utils.process_strain_data(strain, **kwargs) + if overwrite_psd: + self.power_spectral_density = PowerSpectralDensity( + frame_file=frame_file, channel_name=channel_name, epoch=epoch, **kwargs) else: raise ValueError("No method to set data provided.") @@ -407,16 +424,33 @@ class Interferometer(object): class PowerSpectralDensity: - def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt'): + def __init__(self, asd_file=None, psd_file='aLIGO_ZERO_DET_high_P_psd.txt', frame_file=None, epoch=0, + psd_duration=1024, psd_offset=16, channel_name=None, filter_freq=1024, alpha=0.25, fft_length=4): """ Instantiate a new PowerSpectralDensity object. Only one of the asd_file or psd_file needs to be specified. If multiple are given, the first will be used. - FIXME: Allow reading a frame and then FFT to get PSD, use gwpy? - :param asd_file: amplitude spectral density, format 'f h_f' - :param psd_file: power spectral density, format 'f h_f' + Parameters: + asd_file: str + File containing amplitude spectral density, format 'f h_f' + psd_file: str + File containing power spectral density, format 'f h_f' + frame_file: str + Frame file to read data from. + epoch: float + Beginning of segment to analyse. + psd_duration: float + Duration of data to generate PSD from. + psd_offset: float + Offset of data from beginning of analysed segment. + channel_name: str + Name of channel to use to generate PSD. + alpha: float + Parameter for Tukey window. + fft_length: float + Number of seconds in a single fft. """ self.frequencies = [] @@ -435,6 +469,24 @@ class PowerSpectralDensity: logging.warning("The minimum of the provided curve is {:.2e}.".format( min(self.amplitude_spectral_density))) logging.warning("You may have intended to provide this as a power spectral density.") + elif frame_file is not None: + strain = tupak.utils.read_frame_file(frame_file, t1=epoch - psd_duration - psd_offset, + t2=epoch - psd_duration, channel=channel_name) + sampling_frequency = int(strain.sample_rate.value) + + # Low pass filter + bp = filter_design.lowpass(filter_freq, strain.sample_rate) + strain = strain.filter(bp, filtfilt=True) + strain = strain.crop(*strain.span.contract(1)) + + # Create and save PSDs + NFFT = int(sampling_frequency * fft_length) + window = signal.windows.tukey(NFFT, alpha=alpha) + psd = strain.psd(fftlength=fft_length, window=window) + self.frequencies = psd.frequencies + self.power_spectral_density = psd.value + self.amplitude_spectral_density = self.power_spectral_density**0.5 + self.interpolate_power_spectral_density() else: self.power_spectral_density_file = psd_file self.import_power_spectral_density() diff --git a/tupak/utils.py b/tupak/utils.py index 6b6f575ae53843ff279d8d6a076d224b1c4b467b..3a32e54e7adfd2dc9b2337bd7194112ed86b4ae5 100644 --- a/tupak/utils.py +++ b/tupak/utils.py @@ -4,6 +4,8 @@ import os import numpy as np from math import fmod from gwpy.timeseries import TimeSeries +from gwpy.signal import filter_design +from scipy import signal import argparse # Constants @@ -514,6 +516,112 @@ def get_open_strain_data( return strain +def read_frame_file(file_name, t1, t2, channel=None, **kwargs): + """ A function which accesses the open strain data + + This uses `gwpy` to download the open data and then saves a cached copy for + later use + + Parameters + ---------- + file_name: str + The name of the frame to read + t1, t2: float + The GPS time of the start and end of the data + channel: str + The name of the channel being searched for, some standard channel names are attempted + if channel is not specified or if specified channel is not found. + **kwargs: + Passed to `gwpy.timeseries.TimeSeries.fetch_open_data` + + Returns + ----------- + strain: gwpy.timeseries.TimeSeries + + """ + loaded = False + if channel is not None: + try: + strain = TimeSeries.read(source=file_name, channel=channel, start=t1, end=t2, **kwargs) + loaded = True + logging.info('Successfully loaded {}.'.format(channel)) + except RuntimeError: + logging.warning('Channel {} not found. Trying preset channel names'.format(channel)) + for channel_type in ['GDS-CALIB_STRAIN', 'DCS-CALIB_STRAIN_C01', 'DCS-CALIB_STRAIN_C02']: + for ifo_name in ['H1', 'L1']: + channel = '{}:{}'.format(ifo_name, channel_type) + if loaded: + continue + try: + strain = TimeSeries.read(source=file_name, channel=channel, start=t1, end=t2, **kwargs) + loaded = True + logging.info('Successfully loaded {}.'.format(channel)) + except RuntimeError: + None + + if loaded: + return strain + else: + logging.warning('No data loaded.') + return None + + +def process_strain_data( + strain, alpha=0.25, filter_freq=1024, **kwargs): + """ + Helper function to obtain an Interferometer instance with appropriate + PSD and data, given an center_time. + + Parameters + ---------- + name: str + Detector name, e.g., 'H1'. + center_time: float + GPS time of the center_time about which to perform the analysis. + Note: the analysis data is from `center_time-T/2` to `center_time+T/2`. + T: float + The total time (in seconds) to analyse. Defaults to 4s. + alpha: float + The tukey window shape parameter passed to `scipy.signal.tukey`. + psd_offset, psd_duration: float + The power spectral density (psd) is estimated using data from + `center_time+psd_offset` to `center_time+psd_offset + psd_duration`. + outdir: str + Directory where the psd files are saved + plot: bool + If true, create an ASD + strain plot + filter_freq: float + Low pass filter frequency + **kwargs: + All keyword arguments are passed to + `gwpy.timeseries.TimeSeries.fetch_open_data()`. + + Returns + ------- + interferometer: `tupak.detector.Interferometer` + An Interferometer instance with a PSD and frequency-domain strain data. + + """ + + sampling_frequency = int(strain.sample_rate.value) + + # Low pass filter + bp = filter_design.lowpass(filter_freq, strain.sample_rate) + strain = strain.filter(bp, filtfilt=True) + strain = strain.crop(*strain.span.contract(1)) + + time_series = strain.times.value + time_duration = time_series[-1] - time_series[0] + + # Apply Tukey window + N = len(time_series) + strain = strain * signal.windows.tukey(N, alpha=alpha) + + frequency_domain_strain, frequencies = nfft(strain.value, sampling_frequency) + + return frequency_domain_strain, frequencies + + def set_up_command_line_arguments(): parser = argparse.ArgumentParser( description="Command line interface for tupak scripts")