diff --git a/bilby/core/utils.py b/bilby/core/utils.py index bf88828d96b1093bed952e6f30017d19e1843e8a..70516cd316a33f1b074466bf84dd5c7a39c5aa41 100644 --- a/bilby/core/utils.py +++ b/bilby/core/utils.py @@ -246,7 +246,7 @@ def nfft(time_domain_strain, sampling_frequency): Returns ------- - frequency_domain_strain, frequency_array: (array, array) + frequency_domain_strain, frequency_array: (array_like, array_like) Single-sided FFT of time domain strain normalised to units of strain / Hz, and the associated frequency_array. diff --git a/bilby/gw/detector.py b/bilby/gw/detector.py index 23eb90fbd16d28231fba60a322307fd172a764e4..5ca78778564a8f90d483157e08de7749b97ba7ef 100644 --- a/bilby/gw/detector.py +++ b/bilby/gw/detector.py @@ -107,7 +107,7 @@ class InterferometerList(list): """ if injection_polarizations is None: if waveform_generator is not None: - injection_polarizations =\ + injection_polarizations = \ waveform_generator.frequency_domain_strain(parameters) else: raise ValueError( @@ -212,10 +212,12 @@ class InterferometerStrainData(object): def __init__(self, minimum_frequency=0, maximum_frequency=np.inf, roll_off=0.2): - """ Initiate an InterferometerStrainData object - The initialised object contains no data, this should be added using one - of the `set_from..` methods. + """ Initiate an InterferometerStrainData object. + + This __init__ is intended for internal use only. Use the static + `InterferometerStrainData.from_...` methods to create instances for + yourself. Parameters ---------- @@ -228,18 +230,32 @@ class InterferometerStrainData(object): This corresponds to alpha * duration / 2 for scipy tukey window. """ + self.minimum_frequency = minimum_frequency self.maximum_frequency = maximum_frequency self.roll_off = roll_off self.window_factor = 1 - self._set_time_and_frequency_array_parameters(None, None, None) - + self._sampling_frequency = None + self._duration = None + self._start_time = 0 self._frequency_domain_strain = None self._frequency_array = None self._time_domain_strain = None self._time_array = None + @property + def sampling_frequency(self): + return self._sampling_frequency + + @property + def duration(self): + return self._duration + + @property + def start_time(self): + return self._start_time + @property def frequency_array(self): """ Frequencies of the data in Hz """ @@ -251,6 +267,8 @@ class InterferometerStrainData(object): @frequency_array.setter def frequency_array(self, frequency_array): + self._sampling_frequency, self._duration = \ + utils.get_sampling_frequency_and_duration_from_frequency_array(frequency_array) self._frequency_array = frequency_array @property @@ -264,6 +282,8 @@ class InterferometerStrainData(object): @time_array.setter def time_array(self, time_array): + self._sampling_frequency, self._duration = utils.get_sampling_frequency_and_duration_from_time_array(time_array) + self._start_time = time_array[0] self._time_array = time_array def _calculate_time_array(self): @@ -272,7 +292,7 @@ class InterferometerStrainData(object): raise ValueError( "You have not specified the sampling_frequency and duration") - self.time_array = utils.create_time_series( + self._time_array = utils.create_time_series( sampling_frequency=self.sampling_frequency, duration=self.duration, starting_time=self.start_time) @@ -281,7 +301,7 @@ class InterferometerStrainData(object): if (self.sampling_frequency is None) or (self.duration is None): raise ValueError( "You have not specified the sampling_frequency and duration") - self.frequency_array = utils.create_frequency_series( + self._frequency_array = utils.create_frequency_series( sampling_frequency=self.sampling_frequency, duration=self.duration) def time_within_data(self, time): @@ -383,6 +403,14 @@ class InterferometerStrainData(object): else: raise ValueError("time domain strain data not yet set") + @time_domain_strain.setter + def time_domain_strain(self, time_domain_strain): + if not len(self.time_array) == len(time_domain_strain): + raise ValueError("Invalid shape, expected {}, got {}" + .format(np.shape(self.time_array), + np.shape(time_domain_strain))) + self._time_domain_strain = time_domain_strain + @property def frequency_domain_strain(self): """ Returns the frequency domain strain @@ -409,13 +437,11 @@ class InterferometerStrainData(object): @frequency_domain_strain.setter def frequency_domain_strain(self, frequency_domain_strain): if not len(self.frequency_array) == len(frequency_domain_strain): - raise ValueError("The frequency_array and the set strain have different lengths") + raise ValueError("Invalid shape, expected {}, got {}" + .format(np.shape(self.frequency_array), + np.shape(frequency_domain_strain))) self._frequency_domain_strain = frequency_domain_strain - def add_to_frequency_domain_strain(self, x): - """Deprecated""" - self._frequency_domain_strain += x - def low_pass_filter(self, filter_freq=None): """ Low pass filter the data """ @@ -476,51 +502,50 @@ class InterferometerStrainData(object): return psd.frequencies.value, psd.value - def _infer_time_domain_dependence( - self, start_time, sampling_frequency, duration, time_array): - """ Helper function to figure out if the time_array, or - sampling_frequency and duration where given - """ - self._infer_dependence(domain='time', array=time_array, duration=duration, - sampling_frequency=sampling_frequency, start_time=start_time) + @classmethod + def from_time_domain_strain_and_time_array( + cls, time_domain_strain, time_array, minimum_frequency=0.0, + maximum_frequency=np.inf, roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a time domain strain array and a time_array - def _infer_frequency_domain_dependence( - self, start_time, sampling_frequency, duration, frequency_array): - """ Helper function to figure out if the frequency_array, or - sampling_frequency and duration where given + This sets the time_domain_strain attribute, the frequency_domain_strain + is automatically calculated after a low-pass filter and Tukey window + is applied. + + Parameters + ---------- + time_domain_strain: array_like + An array of the time domain strain. + time_array: array_like + The array of times, if sampling_frequency and duration not + given. + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._infer_dependence(domain='frequency', array=frequency_array, - duration=duration, sampling_frequency=sampling_frequency, start_time=start_time) + logger.debug('Setting data using provided time_domain_strain') - def _infer_dependence(self, domain, array, duration, sampling_frequency, start_time): - if (sampling_frequency is not None) and (duration is not None): - if array is not None: - raise ValueError( - "You have given the sampling_frequency, duration, and " - "an array") - pass - elif array is not None: - if domain == 'time': - self.time_array = array - sampling_frequency, duration = utils.get_sampling_frequency_and_duration_from_time_array(array) - elif domain == 'frequency': - self.frequency_array = array - sampling_frequency, duration = utils.get_sampling_frequency_and_duration_from_frequency_array(array) - elif sampling_frequency is None or duration is None: - raise ValueError( - "You must provide both sampling_frequency and duration") - else: - raise ValueError( - "Insufficient information given to set arrays") - self._set_time_and_frequency_array_parameters(duration=duration, - sampling_frequency=sampling_frequency, - start_time=start_time) + if not np.shape(time_domain_strain) == np.shape(time_array): + raise ValueError("Data times do not match time array") + + strain = cls(minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, roll_off=roll_off) + strain.time_array = time_array + strain.time_domain_strain = time_domain_strain + return strain - def set_from_time_domain_strain( - self, time_domain_strain, sampling_frequency=None, duration=None, - start_time=0, time_array=None): - """ Set the strain data from a time domain strain array + @classmethod + def from_time_domain_strain(cls, time_domain_strain, sampling_frequency, + duration, start_time=0.0, minimum_frequency=0.0, + maximum_frequency=np.inf, roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a time domain strain array This sets the time_domain_strain attribute, the frequency_domain_strain is automatically calculated after a low-pass filter and Tukey window @@ -531,30 +556,108 @@ class InterferometerStrainData(object): time_domain_strain: array_like An array of the time domain strain. sampling_frequency: float - The sampling frequency (in Hz). - duration: float - The data duration (in s). - start_time: float - The GPS start-time of the data. - time_array: array_like - The array of times, if sampling_frequency and duration not + Sampling frequency for the time_array. + duration: + Duration for the time_array + start_time: + The GPS start-time for the time_array + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. + """ + + time_array = utils.create_time_series( + sampling_frequency=sampling_frequency, duration=duration, + starting_time=start_time) + return cls.from_time_domain_strain_and_time_array( + time_domain_strain=time_domain_strain, time_array=time_array, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) + + @classmethod + def from_frequency_domain_strain_and_frequency_array( + cls, frequency_domain_strain, frequency_array, start_time=0.0, + minimum_frequency=0.0, maximum_frequency=np.inf, roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a frequency domain strain array and a frequency_array + + Parameters + ---------- + frequency_domain_strain: array_like + The data to set. + frequency_array: array_like + The array of frequencies, if sampling_frequency and duration not given. + start_time: float, optional + The GPS start-time of the data. + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. + """ + + logger.debug('Setting data using provided frequency_domain_strain') + strain_data = cls(minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) + strain_data._start_time = start_time + strain_data.window_factor = 1 + strain_data.frequency_array = frequency_array + strain_data.frequency_domain_strain = frequency_domain_strain + return strain_data + + @classmethod + def from_frequency_domain_strain( + cls, frequency_domain_strain, sampling_frequency, duration, + start_time=0.0, minimum_frequency=0, maximum_frequency=np.inf, + roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a frequency domain strain array + + Parameters + ---------- + frequency_domain_strain: array_like + The data to set. + sampling_frequency: float + Sampling frequency for the time_array. + duration: + Duration for the time_array + start_time: + The GPS start-time for the time_array + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._infer_time_domain_dependence(start_time=start_time, - sampling_frequency=sampling_frequency, - duration=duration, - time_array=time_array) - logger.debug('Setting data using provided time_domain_strain') - if np.shape(time_domain_strain) == np.shape(self.time_array): - self._time_domain_strain = time_domain_strain - self._frequency_domain_strain = None - else: - raise ValueError("Data times do not match time array") + frequency_array = utils.create_frequency_series( + sampling_frequency=sampling_frequency, + duration=duration) + return cls.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=frequency_domain_strain, + frequency_array=frequency_array, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + start_time=start_time, + roll_off=roll_off) - def set_from_gwpy_timeseries(self, time_series): - """ Set the strain data from a gwpy TimeSeries + @classmethod + def from_gwpy_timeseries(cls, time_series, minimum_frequency=0.0, + maximum_frequency=np.inf, roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a gwpy.timeseries.TimeSeries object This sets the time_domain_strain attribute, the frequency_domain_strain is automatically calculated after a low-pass filter and Tukey window @@ -562,22 +665,33 @@ class InterferometerStrainData(object): Parameters ---------- - time_series: gwpy.timeseries.timeseries.TimeSeries - + time_series: gwpy.timeseries.TimeSeries + Contains the time_array and time_domain_strain_data information + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ + logger.debug('Setting data using provided gwpy TimeSeries object') if type(time_series) != gwpy.timeseries.TimeSeries: raise ValueError("Input time_series is not a gwpy TimeSeries") - self._set_time_and_frequency_array_parameters(duration=time_series.duration.value, - sampling_frequency=time_series.sample_rate.value, - start_time=time_series.epoch.value) - self._time_domain_strain = time_series.value - self._frequency_domain_strain = None + return cls.from_time_domain_strain_and_time_array( + time_domain_strain=time_series, + time_array=time_series.times, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) - def set_from_open_data( - self, name, start_time, duration=4, outdir='outdir', cache=True, - **kwargs): - """ Set the strain data from open LOSC data + @classmethod + def from_open_data(cls, name, start_time, duration=4.0, outdir='outdir', + cache=True, minimum_frequency=0.0, + maximum_frequency=np.inf, roll_off=0.2, **kwargs): + """ Instantiate an InterferometerStrainData object given + open LOSC data This sets the time_domain_strain attribute, the frequency_domain_strain is automatically calculated after a low-pass filter and Tukey window @@ -595,67 +709,58 @@ class InterferometerStrainData(object): Directory where the psd files are saved cache: bool, optional Whether or not to store/use the acquired data. + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. **kwargs: All keyword arguments are passed to `gwpy.timeseries.TimeSeries.fetch_open_data()`. - """ - timeseries = gwutils.get_open_strain_data( name, start_time, start_time + duration, outdir=outdir, cache=cache, **kwargs) + return cls.from_gwpy_timeseries(time_series=timeseries, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) - self.set_from_gwpy_timeseries(timeseries) - - def set_from_csv(self, filename): - """ Set the strain data from a csv file + @classmethod + def from_csv(cls, filename, minimum_frequency=0, maximum_frequency=np.inf, + roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a csv file Parameters ---------- filename: str The path to the file to read in - - """ - timeseries = gwpy.timeseries.TimeSeries.read(filename, format='csv') - self.set_from_gwpy_timeseries(timeseries) - - def set_from_frequency_domain_strain( - self, frequency_domain_strain, sampling_frequency=None, - duration=None, start_time=0, frequency_array=None): - """ Set the `frequency_domain_strain` from a numpy array - - Parameters - ---------- - frequency_domain_strain: array_like - The data to set. - sampling_frequency: float - The sampling frequency (in Hz). - duration: float - The data duration (in s). - start_time: float - The GPS start-time of the data. - frequency_array: array_like - The array of frequencies, if sampling_frequency and duration not - given. - + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._infer_frequency_domain_dependence(start_time=start_time, - sampling_frequency=sampling_frequency, - duration=duration, - frequency_array=frequency_array) - - logger.debug('Setting data using provided frequency_domain_strain') - if np.shape(frequency_domain_strain) == np.shape(self.frequency_array): - self._frequency_domain_strain = frequency_domain_strain - self.window_factor = 1 - else: - raise ValueError("Data frequencies do not match frequency_array") + time_series = gwpy.timeseries.TimeSeries.read(filename, format='csv') + return cls.from_gwpy_timeseries( + time_series=time_series, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) - def set_from_power_spectral_density( - self, power_spectral_density, sampling_frequency, duration, - start_time=0): - """ Set the `frequency_domain_strain` by generating a noise realisation + @classmethod + def from_power_spectral_density(cls, power_spectral_density, + sampling_frequency, duration, + start_time=0.0, minimum_frequency=0.0, + maximum_frequency=np.inf, roll_off=0.2): + """ Instantiate an InterferometerStrainData object given + a bilby PowerSpectralDensity Parameters ---------- @@ -667,27 +772,34 @@ class InterferometerStrainData(object): The data duration (in s) start_time: float The GPS start-time of the data - + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._set_time_and_frequency_array_parameters(duration=duration, - sampling_frequency=sampling_frequency, - start_time=start_time) - - logger.debug( - 'Setting data using noise realization from provided' - 'power_spectal_density') + logger.debug('Setting data using noise realization from provided ' + 'power_spectal_density') frequency_domain_strain, frequency_array = \ - power_spectral_density.get_noise_realisation( - self.sampling_frequency, self.duration) - - if np.array_equal(frequency_array, self.frequency_array): - self._frequency_domain_strain = frequency_domain_strain - else: - raise ValueError("Data frequencies do not match frequency_array") + power_spectral_density.get_noise_realisation(sampling_frequency, + duration) + return cls.from_frequency_domain_strain_and_frequency_array( + frequency_array=frequency_array, + frequency_domain_strain=frequency_domain_strain, + start_time=start_time, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) - def set_from_zero_noise(self, sampling_frequency, duration, start_time=0): - """ Set the `frequency_domain_strain` to zero noise + @classmethod + def from_zero_noise(cls, sampling_frequency, duration, start_time=0, + minimum_frequency=0, maximum_frequency=np.inf, + roll_off=0.2): + """ Instantiate an InterferometerStrainData object with + zero noise Parameters ---------- @@ -697,21 +809,35 @@ class InterferometerStrainData(object): The data duration (in s) start_time: float The GPS start-time of the data - + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._set_time_and_frequency_array_parameters(duration=duration, - sampling_frequency=sampling_frequency, - start_time=start_time) - logger.debug('Setting zero noise data') - self._frequency_domain_strain = np.zeros_like(self.frequency_array, - dtype=np.complex) + frequency_array = utils.create_frequency_series( + sampling_frequency=sampling_frequency, + duration=duration) + frequency_domain_strain = \ + np.zeros_like(frequency_array, dtype=np.complex) + return cls.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=frequency_domain_strain, + frequency_array=frequency_array, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + start_time=start_time, + roll_off=roll_off) - def set_from_frame_file( - self, frame_file, sampling_frequency, duration, start_time=0, - channel=None, buffer_time=1): - """ Set the `frequency_domain_strain` from a frame fiile + @classmethod + def from_frame_file(cls, frame_file, sampling_frequency, duration, + start_time=0.0, channel=None, buffer_time=1.0, + minimum_frequency=0.0, maximum_frequency=np.inf, + roll_off=0.2): + """ Instantiate an InterferometerStrainData object given a frame file Parameters ---------- @@ -728,25 +854,25 @@ class InterferometerStrainData(object): buffer_time: float Read in data with `start_time-buffer_time` and `start_time+duration+buffer_time` - + minimum_frequency: float, optional + Minimum frequency to analyse for detector. Default is 0. + maximum_frequency: float, optional + Maximum frequency to analyse for detector. Default is infinity. + roll_off: + The roll-off (in seconds) used in the Tukey window, default=0.2s. + This corresponds to alpha * duration / 2 for scipy tukey window. """ - self._set_time_and_frequency_array_parameters(duration=duration, - sampling_frequency=sampling_frequency, - start_time=start_time) - logger.info('Reading data from frame') strain = gwutils.read_frame_file( frame_file, start_time=start_time, end_time=start_time + duration, buffer_time=buffer_time, channel=channel, resample=sampling_frequency) - - self.set_from_gwpy_timeseries(strain) - - def _set_time_and_frequency_array_parameters(self, duration, sampling_frequency, start_time): - self.sampling_frequency = sampling_frequency - self.duration = duration - self.start_time = start_time + return cls.from_gwpy_timeseries( + time_series=strain, + minimum_frequency=minimum_frequency, + maximum_frequency=maximum_frequency, + roll_off=roll_off) class Interferometer(object): @@ -853,9 +979,28 @@ class Interferometer(object): self._strain_data = strain_data + def set_strain_data_from_frequency_domain_strain_and_frequency_array( + self, frequency_domain_strain, frequency_array): + """ Set the `Interferometer.strain_data` from a numpy array + + Parameters + ---------- + frequency_domain_strain: array_like + The data to set. + frequency_array: array_like + The array of frequencies, if sampling_frequency and duration not + given. + + """ + self.strain_data = InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=frequency_domain_strain, + frequency_array=frequency_array, + minimum_frequency=self.minimum_frequency, + maximum_frequency=self.maximum_frequency) + def set_strain_data_from_frequency_domain_strain( self, frequency_domain_strain, sampling_frequency=None, - duration=None, start_time=0, frequency_array=None): + duration=None, start_time=0): """ Set the `Interferometer.strain_data` from a numpy array Parameters @@ -868,15 +1013,14 @@ class Interferometer(object): The data duration (in s). start_time: float The GPS start-time of the data. - frequency_array: array_like - The array of frequencies, if sampling_frequency and duration not - given. - """ - self.strain_data.set_from_frequency_domain_strain( + self.strain_data = InterferometerStrainData.from_frequency_domain_strain( frequency_domain_strain=frequency_domain_strain, - sampling_frequency=sampling_frequency, duration=duration, - start_time=start_time, frequency_array=frequency_array) + sampling_frequency=sampling_frequency, + duration=duration, + start_time=start_time, + minimum_frequency=self.minimum_frequency, + maximum_frequency=self.maximum_frequency) def set_strain_data_from_power_spectral_density( self, sampling_frequency, duration, start_time=0): @@ -896,9 +1040,12 @@ class Interferometer(object): The GPS start-time of the data """ - self.strain_data.set_from_power_spectral_density( - self.power_spectral_density, sampling_frequency=sampling_frequency, - duration=duration, start_time=start_time) + self.strain_data = InterferometerStrainData.from_power_spectral_density( + power_spectral_density=self.power_spectral_density, + sampling_frequency=sampling_frequency, + duration=duration, start_time=start_time, + minimum_frequency=self.minimum_frequency, + maximum_frequency=self.maximum_frequency) def set_strain_data_from_frame_file( self, frame_file, sampling_frequency, duration, start_time=0, @@ -922,10 +1069,15 @@ class Interferometer(object): `start_time+duration+buffer_time` """ - self.strain_data.set_from_frame_file( - frame_file=frame_file, sampling_frequency=sampling_frequency, - duration=duration, start_time=start_time, - channel=channel, buffer_time=buffer_time) + self.strain_data = InterferometerStrainData.from_frame_file( + frame_file=frame_file, + sampling_frequency=sampling_frequency, + duration=duration, + start_time=start_time, + channel=channel, + buffer_time=buffer_time, + minimum_frequency=self.minimum_frequency, + maximum_frequency=self.maximum_frequency) def set_strain_data_from_csv(self, filename): """ Set the `Interferometer.strain_data` from a csv file @@ -936,7 +1088,11 @@ class Interferometer(object): The path to the file to read in """ - self.strain_data.set_from_csv(filename) + self.strain_data = InterferometerStrainData.from_csv( + filename=filename, + minimum_frequency=self.minimum_frequency, + maximum_frequency=self.maximum_frequency + ) def set_strain_data_from_zero_noise( self, sampling_frequency, duration, start_time=0): @@ -1241,7 +1397,7 @@ class Interferometer(object): if injection_polarizations is None: if waveform_generator is not None: - injection_polarizations =\ + injection_polarizations = \ waveform_generator.frequency_domain_strain(parameters) else: raise ValueError( @@ -1954,18 +2110,17 @@ def get_interferometer_with_open_data( if start_time is None: start_time = trigger_time + 2 - duration - strain = InterferometerStrainData(roll_off=roll_off) - strain.set_from_open_data( - name=name, start_time=start_time, duration=duration, - outdir=outdir, cache=cache, **kwargs) + strain = InterferometerStrainData.from_open_data( + name=name, start_time=start_time, duration=duration, outdir=outdir, + cache=cache, roll_off=roll_off, **kwargs) strain.low_pass_filter(filter_freq) - strain_psd = InterferometerStrainData(roll_off=roll_off) - strain_psd.set_from_open_data( - name=name, start_time=start_time + duration + psd_offset, - duration=psd_duration, outdir=outdir, cache=cache, **kwargs) - # Low pass filter + strain_psd = InterferometerStrainData.from_open_data( + name=name, start_time=(start_time + duration + psd_offset), + duration=psd_duration, outdir=outdir, + cache=cache, roll_off=roll_off, **kwargs) strain_psd.low_pass_filter(filter_freq) + # Create and save PSDs psd_frequencies, psd_array = strain_psd.create_power_spectral_density( name=name, outdir=outdir, fft_length=strain.duration) @@ -2039,13 +2194,18 @@ def get_interferometer_with_fake_noise_and_injection( interferometer = get_empty_interferometer(name) interferometer.power_spectral_density = PowerSpectralDensity.from_aligo() if zero_noise: - interferometer.set_strain_data_from_zero_noise( - sampling_frequency=sampling_frequency, duration=duration, - start_time=start_time) + interferometer.strain_data = InterferometerStrainData. \ + from_zero_noise(sampling_frequency=sampling_frequency, + duration=duration, + start_time=start_time) else: - interferometer.set_strain_data_from_power_spectral_density( - sampling_frequency=sampling_frequency, duration=duration, - start_time=start_time) + interferometer.strain_data = InterferometerStrainData.from_power_spectral_density( + power_spectral_density=interferometer.power_spectral_density, + sampling_frequency=sampling_frequency, + duration=duration, + start_time=start_time, + minimum_frequency=interferometer.minimum_frequency, + maximum_frequency=interferometer.maximum_frequency) injection_polarizations = interferometer.inject_signal( parameters=injection_parameters, diff --git a/test/detector_test.py b/test/detector_test.py index 154bacf4a1fb020d2569c689a87bb9c7240e1beb..9e7f747a3791b764df59983cc78c177af9adf301 100644 --- a/test/detector_test.py +++ b/test/detector_test.py @@ -12,7 +12,7 @@ import os import logging -class TestDetector(unittest.TestCase): +class TestInterferometer(unittest.TestCase): def setUp(self): self.name = 'name' @@ -35,8 +35,10 @@ class TestDetector(unittest.TestCase): elevation=self.elevation, xarm_azimuth=self.xarm_azimuth, yarm_azimuth=self.yarm_azimuth, xarm_tilt=self.xarm_tilt, yarm_tilt=self.yarm_tilt) - self.ifo.strain_data.set_from_frequency_domain_strain( - np.linspace(0, 4096, 4097), sampling_frequency=4096, duration=2) + self.ifo.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=np.linspace(0, 4096, 4097), + sampling_frequency=4096, + duration=2) def tearDown(self): del self.name @@ -331,43 +333,35 @@ class TestInterferometerStrainData(unittest.TestCase): del self.ifosd def test_frequency_mask(self): - self.minimum_frequency = 10 - self.maximum_frequency = 20 - with mock.patch('bilby.core.utils.create_frequency_series') as m: - m.return_value = np.array([5, 15, 25]) - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([5, 15, 25])) - self.assertTrue(np.array_equal(self.ifosd.frequency_mask, [False, True, False])) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([5, 15, 25])) + self.ifosd.minimum_frequency = 10 + self.ifosd.maximum_frequency = 20 + self.assertTrue(np.array_equal(self.ifosd.frequency_mask, [False, True, False])) def test_frequency_array_setting_direct(self): - with mock.patch('bilby.core.utils.create_frequency_series') as m: - m.return_value = np.array([5, 15, 25]) - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([5, 15, 25])) - self.assertTrue(np.array_equal(self.ifosd.frequency_array, np.array(np.array([5, 15, 25])))) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([5, 15, 25])) + self.assertTrue(np.array_equal(self.ifosd.frequency_array, np.array(np.array([5, 15, 25])))) def test_duration_setting(self): - with mock.patch('bilby.core.utils.create_frequency_series') as m: - m.return_value = np.array([0, 1, 2]) - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([0, 1, 2])) - self.assertAlmostEqual(self.ifosd.duration, 1) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([0, 1, 2])) + self.assertAlmostEqual(self.ifosd.duration, 1) def test_sampling_frequency_setting(self): - with mock.patch('bilby.core.utils.create_frequency_series') as n: - with mock.patch('bilby.core.utils.get_sampling_frequency_and_duration_from_frequency_array') as m: - m.return_value = 8, 456 - n.return_value = np.array([1, 2, 3]) - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([0, 1, 2])) - self.assertEqual(8, self.ifosd.sampling_frequency) + with mock.patch('bilby.core.utils.get_sampling_frequency_and_duration_from_frequency_array') as m: + m.return_value = 8, 456 + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([0, 1, 2])) + self.assertEqual(8, self.ifosd.sampling_frequency) def test_frequency_array_setting(self): duration = 3 sampling_frequency = 1 with mock.patch('bilby.core.utils.create_frequency_series') as m: m.return_value = [1, 2, 3] - self.ifosd.set_from_frequency_domain_strain( + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( frequency_domain_strain=np.array([0, 1, 2]), duration=duration, sampling_frequency=sampling_frequency) self.assertTrue(np.array_equal( @@ -375,27 +369,12 @@ class TestInterferometerStrainData(unittest.TestCase): bilby.core.utils.create_frequency_series(duration=duration, sampling_frequency=sampling_frequency))) - def test_set_data_fails(self): - with mock.patch('bilby.core.utils.create_frequency_series') as m: - m.return_value = [1, 2, 3] - with self.assertRaises(ValueError): - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2])) - - def test_set_data_fails_too_much(self): - with mock.patch('bilby.core.utils.create_frequency_series') as m: - m.return_value = [1, 2, 3] - with self.assertRaises(ValueError): - self.ifosd.set_from_frequency_domain_strain( - frequency_domain_strain=np.array([0, 1, 2]), frequency_array=np.array([1, 2, 3]), - duration=3, sampling_frequency=1) - def test_start_time_init(self): with mock.patch('bilby.core.utils.create_frequency_series') as m: m.return_value = [1, 2, 3] duration = 3 sampling_frequency = 1 - self.ifosd.set_from_frequency_domain_strain( + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( frequency_domain_strain=np.array([0, 1, 2]), duration=duration, sampling_frequency=sampling_frequency) self.assertTrue(self.ifosd.start_time == 0) @@ -405,119 +384,75 @@ class TestInterferometerStrainData(unittest.TestCase): m.return_value = [1, 2, 3] duration = 3 sampling_frequency = 1 - self.ifosd.set_from_frequency_domain_strain( + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( frequency_domain_strain=np.array([0, 1, 2]), duration=duration, sampling_frequency=sampling_frequency, start_time=10) self.assertTrue(self.ifosd.start_time == 10) - def test_time_array_frequency_array_consistency(self): - duration = 1 - sampling_frequency = 10 - time_array = bilby.core.utils.create_time_series( - sampling_frequency=sampling_frequency, duration=duration) - time_domain_strain = np.random.normal(0, 1, len(time_array)) - self.ifosd.roll_off = 0 - self.ifosd.set_from_time_domain_strain( - time_domain_strain=time_domain_strain, duration=duration, - sampling_frequency=sampling_frequency) - - frequency_domain_strain, freqs = bilby.core.utils.nfft( - time_domain_strain, sampling_frequency) - - self.assertTrue(np.all( - self.ifosd.frequency_domain_strain == frequency_domain_strain * self.ifosd.frequency_mask)) - def test_time_array_when_set(self): - test_array = np.array([1]) + test_array = bilby.utils.create_time_series(sampling_frequency=100, duration=2) self.ifosd.time_array = test_array - self.assertTrue(test_array, self.ifosd.time_array) - - @patch.object(bilby.core.utils, 'create_time_series') - def test_time_array_when_not_set(self, m): - self.ifosd.start_time = 3 - self.ifosd.sampling_frequency = 1000 - self.ifosd.duration = 5 - m.return_value = 4 - self.assertEqual(m.return_value, self.ifosd.time_array) - m.assert_called_with(sampling_frequency=self.ifosd.sampling_frequency, - duration=self.ifosd.duration, - starting_time=self.ifosd.start_time) - - def test_time_array_without_sampling_frequency(self): - self.ifosd.sampling_frequency = None - self.ifosd.duration = 4 - with self.assertRaises(ValueError): - test = self.ifosd.time_array - - def test_time_array_without_duration(self): - self.ifosd.sampling_frequency = 4096 - self.ifosd.duration = None - with self.assertRaises(ValueError): - test = self.ifosd.time_array + self.assertTrue(np.array_equal(test_array, self.ifosd.time_array)) def test_frequency_array_when_set(self): - test_array = np.array([1]) + test_array = np.array([1, 2, 3]) self.ifosd.frequency_array = test_array - self.assertTrue(test_array, self.ifosd.frequency_array) - - @patch.object(bilby.core.utils, 'create_frequency_series') - def test_time_array_when_not_set(self, m): - self.ifosd.sampling_frequency = 1000 - self.ifosd.duration = 5 - m.return_value = 4 - self.assertEqual(m.return_value, self.ifosd.frequency_array) - m.assert_called_with(sampling_frequency=self.ifosd.sampling_frequency, - duration=self.ifosd.duration) - - def test_frequency_array_without_sampling_frequency(self): - self.ifosd.sampling_frequency = None - self.ifosd.duration = 4 - with self.assertRaises(ValueError): - test = self.ifosd.frequency_array - - def test_frequency_array_without_duration(self): - self.ifosd.sampling_frequency = 4096 - self.ifosd.duration = None - with self.assertRaises(ValueError): - test = self.ifosd.frequency_array + self.assertTrue(np.array_equal(test_array, self.ifosd.frequency_array)) def test_time_within_data_before(self): - self.ifosd.start_time = 3 - self.ifosd.duration = 2 + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=2, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) self.assertFalse(self.ifosd.time_within_data(2)) def test_time_within_data_during(self): - self.ifosd.start_time = 3 - self.ifosd.duration = 2 - self.assertTrue(self.ifosd.time_within_data(3)) + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=2, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) + self.assertTrue(self.ifosd.time_within_data(time_array[0])) self.assertTrue(self.ifosd.time_within_data(4)) - self.assertTrue(self.ifosd.time_within_data(5)) + self.assertTrue(self.ifosd.time_within_data(time_array[-1])) def test_time_within_data_after(self): - self.ifosd.start_time = 3 - self.ifosd.duration = 2 + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=2, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) self.assertFalse(self.ifosd.time_within_data(6)) def test_time_domain_window_no_roll_off_no_alpha(self): - self.ifosd._time_domain_strain = np.array([3]) - self.ifosd.duration = 5 + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=2, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) self.ifosd.roll_off = 2 expected_window = scipy.signal.windows.tukey(len(self.ifosd._time_domain_strain), alpha=self.ifosd.alpha) - self.assertEqual(expected_window, - self.ifosd.time_domain_window()) + self.assertTrue(np.array_equal(expected_window, self.ifosd.time_domain_window())) self.assertEqual(np.mean(expected_window ** 2), self.ifosd.window_factor) def test_time_domain_window_sets_roll_off_directly(self): - self.ifosd._time_domain_strain = np.array([3]) - self.ifosd.duration = 5 + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=5, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) self.ifosd.roll_off = 2 expected_roll_off = 6 self.ifosd.time_domain_window(roll_off=expected_roll_off) self.assertEqual(expected_roll_off, self.ifosd.roll_off) def test_time_domain_window_sets_roll_off_indirectly(self): - self.ifosd._time_domain_strain = np.array([3]) - self.ifosd.duration = 5 + time_array = bilby.core.utils.create_time_series(sampling_frequency=100, + duration=5, + starting_time=3) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_time_domain_strain_and_time_array( + time_domain_strain=time_array, time_array=time_array) self.ifosd.roll_off = 2 alpha = 4 expected_roll_off = alpha * self.ifosd.duration / 2 @@ -532,10 +467,13 @@ class TestInterferometerStrainData(unittest.TestCase): @patch('bilby.core.utils.infft') def test_time_domain_strain_from_frequency_domain_strain(self, m): m.return_value = 5 - self.ifosd.sampling_frequency = 200 - self.ifosd.duration = 4 - self.ifosd._frequency_domain_strain = self.ifosd.frequency_array - self.ifosd.sampling_frequency = 123 + frequency_array = bilby.core.utils.create_frequency_series( + sampling_frequency=100, + duration=5) + self.ifosd = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain_and_frequency_array( + frequency_domain_strain=frequency_array, frequency_array=frequency_array) + + self.ifosd.frequency_domain_strain = self.ifosd.frequency_array self.assertEqual(m.return_value, self.ifosd.time_domain_strain) def test_time_domain_strain_not_set(self): @@ -544,44 +482,12 @@ class TestInterferometerStrainData(unittest.TestCase): with self.assertRaises(ValueError): test = self.ifosd.time_domain_strain - def test_frequency_domain_strain_when_set(self): - self.ifosd.sampling_frequency = 200 - self.ifosd.duration = 4 - expected_strain = self.ifosd.frequency_array * self.ifosd.frequency_mask - self.ifosd._frequency_domain_strain = expected_strain - self.assertTrue(np.array_equal(expected_strain, - self.ifosd.frequency_domain_strain)) - - @patch('bilby.core.utils.nfft') - def test_frequency_domain_strain_from_frequency_domain_strain(self, m): - self.ifosd.start_time = 0 - self.ifosd.duration = 4 - self.ifosd.sampling_frequency = 200 - m.return_value = self.ifosd.frequency_array, self.ifosd.frequency_array - self.ifosd._time_domain_strain = self.ifosd.time_array - self.assertTrue(np.array_equal(self.ifosd.frequency_array * self.ifosd.frequency_mask, - self.ifosd.frequency_domain_strain)) - def test_frequency_domain_strain_not_set(self): self.ifosd._time_domain_strain = None self.ifosd._frequency_domain_strain = None with self.assertRaises(ValueError): test = self.ifosd.frequency_domain_strain - def test_set_frequency_domain_strain(self): - self.ifosd.duration = 4 - self.ifosd.sampling_frequency = 200 - self.ifosd.frequency_domain_strain = np.ones(len(self.ifosd.frequency_array)) - self.assertTrue(np.array_equal(np.ones(len(self.ifosd.frequency_array)), - self.ifosd._frequency_domain_strain)) - - def test_set_frequency_domain_strain_wrong_length(self): - self.ifosd.duration = 4 - self.ifosd.sampling_frequency = 200 - with self.assertRaises(ValueError): - self.ifosd.frequency_domain_strain = np.array([1]) - - class TestInterferometerList(unittest.TestCase): def setUp(self): @@ -631,10 +537,10 @@ class TestInterferometerList(unittest.TestCase): elevation=self.elevation2, xarm_azimuth=self.xarm_azimuth2, yarm_azimuth=self.yarm_azimuth2, xarm_tilt=self.xarm_tilt2, yarm_tilt=self.yarm_tilt2) - self.ifo1.strain_data.set_from_frequency_domain_strain( - self.frequency_arrays, sampling_frequency=4096, duration=2) - self.ifo2.strain_data.set_from_frequency_domain_strain( - self.frequency_arrays, sampling_frequency=4096, duration=2) + self.ifo1.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=self.frequency_arrays, sampling_frequency=4096, duration=2) + self.ifo2.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=self.frequency_arrays, sampling_frequency=4096, duration=2) self.ifo_list = bilby.gw.detector.InterferometerList([self.ifo1, self.ifo2]) def tearDown(self): @@ -688,19 +594,20 @@ class TestInterferometerList(unittest.TestCase): self.assertEqual(self.ifo2, ifo_list[1]) def test_init_inconsistent_duration(self): - self.ifo2.strain_data.set_from_frequency_domain_strain( - np.linspace(0, 4096, 4097), sampling_frequency=4096, duration=3) + self.ifo2.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=bilby.utils.create_frequency_series(4096, 3), sampling_frequency=4096, duration=3) with self.assertRaises(ValueError): bilby.gw.detector.InterferometerList([self.ifo1, self.ifo2]) def test_init_inconsistent_sampling_frequency(self): - self.ifo2.strain_data.set_from_frequency_domain_strain( - np.linspace(0, 4096, 4097), sampling_frequency=234, duration=2) + self.ifo2.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=bilby.utils.create_frequency_series(8192, 2), sampling_frequency=8192, duration=2) with self.assertRaises(ValueError): bilby.gw.detector.InterferometerList([self.ifo1, self.ifo2]) def test_init_inconsistent_start_time(self): - self.ifo2.strain_data.start_time = 1 + self.ifo2.strain_data = bilby.gw.detector.InterferometerStrainData.from_frequency_domain_strain( + frequency_domain_strain=self.frequency_arrays, sampling_frequency=4096, duration=2, start_time=1) with self.assertRaises(ValueError): bilby.gw.detector.InterferometerList([self.ifo1, self.ifo2]) @@ -963,18 +870,6 @@ class TestPowerSpectralDensityWithFiles(unittest.TestCase): psd = bilby.gw.detector.PowerSpectralDensity(asd_file=self.asd_file) self.assertEqual(0, m.call_count) - def test_from_frame_file(self): - expected_frequency_array = np.array([1., 2., 3.]) - expected_psd_array = np.array([16., 25., 36.]) - with mock.patch('bilby.gw.detector.InterferometerStrainData.set_from_frame_file') as m: - with mock.patch('bilby.gw.detector.InterferometerStrainData.create_power_spectral_density') as n: - n.return_value = expected_frequency_array, expected_psd_array - psd = bilby.gw.detector.PowerSpectralDensity.from_frame_file(frame_file=self.asd_file, - psd_start_time=0, - psd_duration=4) - self.assertTrue(np.array_equal(expected_frequency_array, psd.frequency_array)) - self.assertTrue(np.array_equal(expected_psd_array, psd.psd_array)) - def test_repr(self): psd = bilby.gw.detector.PowerSpectralDensity(psd_file=self.psd_file) expected = 'PowerSpectralDensity(psd_file=\'{}\', asd_file=\'{}\')'.format(self.psd_file, None)