Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
bilby
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
lscsoft
bilby
Commits
a73b555c
Commit
a73b555c
authored
2 years ago
by
John Douglas Veitch
Browse files
Options
Downloads
Patches
Plain Diff
Revert "Reformat strain_data.py to satisfy pre-commit"
This reverts commit
bc15e3b0
.
parent
bc15e3b0
No related branches found
Branches containing commit
No related tags found
Tags containing commit
1 merge request
!1233
Print fewer info statements when setting strain data
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
bilby/gw/detector/strain_data.py
+165
-273
165 additions, 273 deletions
bilby/gw/detector/strain_data.py
with
165 additions
and
273 deletions
bilby/gw/detector/strain_data.py
+
165
−
273
View file @
a73b555c
...
...
@@ -7,24 +7,17 @@ from .. import utils as gwutils
class
InterferometerStrainData
(
object
):
"""
Strain data for an interferometer
"""
duration
=
PropertyAccessor
(
"
_times_and_frequencies
"
,
"
duration
"
)
sampling_frequency
=
PropertyAccessor
(
"
_times_and_frequencies
"
,
"
sampling_frequency
"
)
start_time
=
PropertyAccessor
(
"
_times_and_frequencies
"
,
"
start_time
"
)
frequency_array
=
PropertyAccessor
(
"
_times_and_frequencies
"
,
"
frequency_array
"
)
time_array
=
PropertyAccessor
(
"
_times_and_frequencies
"
,
"
time_array
"
)
def
__init__
(
self
,
minimum_frequency
=
0
,
maximum_frequency
=
np
.
inf
,
roll_off
=
0.2
,
notch_list
=
None
,
):
"""
Initiate an InterferometerStrainData object
"""
Strain data for an interferometer
"""
duration
=
PropertyAccessor
(
'
_times_and_frequencies
'
,
'
duration
'
)
sampling_frequency
=
PropertyAccessor
(
'
_times_and_frequencies
'
,
'
sampling_frequency
'
)
start_time
=
PropertyAccessor
(
'
_times_and_frequencies
'
,
'
start_time
'
)
frequency_array
=
PropertyAccessor
(
'
_times_and_frequencies
'
,
'
frequency_array
'
)
time_array
=
PropertyAccessor
(
'
_times_and_frequencies
'
,
'
time_array
'
)
def
__init__
(
self
,
minimum_frequency
=
0
,
maximum_frequency
=
np
.
inf
,
roll_off
=
0.2
,
notch_list
=
None
):
"""
Initiate an InterferometerStrainData object
The initialised object contains no data, this should be added using one
of the `set_from..` methods.
...
...
@@ -58,26 +51,22 @@ class InterferometerStrainData(object):
self
.
_channel
=
None
def
__eq__
(
self
,
other
):
if
(
self
.
minimum_frequency
==
other
.
minimum_frequency
and
self
.
maximum_frequency
==
other
.
maximum_frequency
and
self
.
roll_off
==
other
.
roll_off
and
self
.
window_factor
==
other
.
window_factor
and
self
.
sampling_frequency
==
other
.
sampling_frequency
and
self
.
duration
==
other
.
duration
and
self
.
start_time
==
other
.
start_time
and
np
.
array_equal
(
self
.
time_array
,
other
.
time_array
)
and
np
.
array_equal
(
self
.
frequency_array
,
other
.
frequency_array
)
and
np
.
array_equal
(
self
.
frequency_domain_strain
,
other
.
frequency_domain_strain
)
and
np
.
array_equal
(
self
.
time_domain_strain
,
other
.
time_domain_strain
)
):
if
self
.
minimum_frequency
==
other
.
minimum_frequency
\
and
self
.
maximum_frequency
==
other
.
maximum_frequency
\
and
self
.
roll_off
==
other
.
roll_off
\
and
self
.
window_factor
==
other
.
window_factor
\
and
self
.
sampling_frequency
==
other
.
sampling_frequency
\
and
self
.
duration
==
other
.
duration
\
and
self
.
start_time
==
other
.
start_time
\
and
np
.
array_equal
(
self
.
time_array
,
other
.
time_array
)
\
and
np
.
array_equal
(
self
.
frequency_array
,
other
.
frequency_array
)
\
and
np
.
array_equal
(
self
.
frequency_domain_strain
,
other
.
frequency_domain_strain
)
\
and
np
.
array_equal
(
self
.
time_domain_strain
,
other
.
time_domain_strain
):
return
True
return
False
def
time_within_data
(
self
,
time
):
"""
Check if time is within the data span
"""
Check if time is within the data span
Parameters
==========
...
...
@@ -110,10 +99,10 @@ class InterferometerStrainData(object):
@property
def
maximum_frequency
(
self
):
"""
Force the maximum frequency be less than the Nyquist frequency
"""
"""
Force the maximum frequency be less than the Nyquist frequency
"""
if
self
.
sampling_frequency
is
not
None
:
if
2
*
self
.
_maximum_frequency
>
self
.
sampling_frequency
:
self
.
_maximum_frequency
=
self
.
sampling_frequency
/
2.
0
self
.
_maximum_frequency
=
self
.
sampling_frequency
/
2.
return
self
.
_maximum_frequency
@maximum_frequency.setter
...
...
@@ -127,7 +116,7 @@ class InterferometerStrainData(object):
@notch_list.setter
def
notch_list
(
self
,
notch_list
):
"""
Set the notch_list
"""
Set the notch_list
Parameters
==========
...
...
@@ -148,7 +137,7 @@ class InterferometerStrainData(object):
@property
def
frequency_mask
(
self
):
"""
Masking array for limiting the frequency band.
"""
Masking array for limiting the frequency band.
Returns
=======
...
...
@@ -157,9 +146,8 @@ class InterferometerStrainData(object):
"""
if
not
self
.
_frequency_mask_updated
:
frequency_array
=
self
.
_times_and_frequencies
.
frequency_array
mask
=
(
frequency_array
>=
self
.
minimum_frequency
)
&
(
frequency_array
<=
self
.
maximum_frequency
)
mask
=
((
frequency_array
>=
self
.
minimum_frequency
)
&
(
frequency_array
<=
self
.
maximum_frequency
))
for
notch
in
self
.
notch_list
:
mask
[
notch
.
get_idxs
(
frequency_array
)]
=
False
self
.
_frequency_mask
=
mask
...
...
@@ -196,24 +184,22 @@ class InterferometerStrainData(object):
Window function over time array
"""
from
scipy.signal.windows
import
tukey
if
roll_off
is
not
None
:
self
.
roll_off
=
roll_off
elif
alpha
is
not
None
:
self
.
roll_off
=
alpha
*
self
.
duration
/
2
window
=
tukey
(
len
(
self
.
_time_domain_strain
),
alpha
=
self
.
alpha
)
self
.
window_factor
=
np
.
mean
(
window
**
2
)
self
.
window_factor
=
np
.
mean
(
window
**
2
)
return
window
@property
def
time_domain_strain
(
self
):
"""
The time domain strain, in units of strain
"""
"""
The time domain strain, in units of strain
"""
if
self
.
_time_domain_strain
is
not
None
:
return
self
.
_time_domain_strain
elif
self
.
_frequency_domain_strain
is
not
None
:
self
.
_time_domain_strain
=
utils
.
infft
(
self
.
frequency_domain_strain
,
self
.
sampling_frequency
)
self
.
frequency_domain_strain
,
self
.
sampling_frequency
)
return
self
.
_time_domain_strain
else
:
...
...
@@ -221,7 +207,7 @@ class InterferometerStrainData(object):
@property
def
frequency_domain_strain
(
self
):
"""
Returns the frequency domain strain
"""
Returns the frequency domain strain
This is the frequency domain strain normalised to units of
strain / Hz, obtained by a one-sided Fourier transform of the
...
...
@@ -230,19 +216,14 @@ class InterferometerStrainData(object):
if
self
.
_frequency_domain_strain
is
not
None
:
return
self
.
_frequency_domain_strain
*
self
.
frequency_mask
elif
self
.
_time_domain_strain
is
not
None
:
logger
.
debug
(
"
Generating frequency domain strain from given time
"
"
domain strain.
"
)
logger
.
debug
(
"
Applying a tukey window with alpha={}, roll off={}
"
.
format
(
self
.
alpha
,
self
.
roll_off
)
)
logger
.
debug
(
"
Generating frequency domain strain from given time
"
"
domain strain.
"
)
logger
.
debug
(
"
Applying a tukey window with alpha={}, roll off={}
"
.
format
(
self
.
alpha
,
self
.
roll_off
))
# self.low_pass_filter()
window
=
self
.
time_domain_window
()
self
.
_frequency_domain_strain
,
self
.
frequency_array
=
utils
.
nfft
(
self
.
_time_domain_strain
*
window
,
self
.
sampling_frequency
)
self
.
_time_domain_strain
*
window
,
self
.
sampling_frequency
)
return
self
.
_frequency_domain_strain
*
self
.
frequency_mask
else
:
raise
ValueError
(
"
frequency domain strain data not yet set
"
)
...
...
@@ -250,9 +231,7 @@ class InterferometerStrainData(object):
@frequency_domain_strain.setter
def
frequency_domain_strain
(
self
,
frequency_domain_strain
):
if
not
len
(
self
.
frequency_array
)
==
len
(
frequency_domain_strain
):
raise
ValueError
(
"
The frequency_array and the set strain have different lengths
"
)
raise
ValueError
(
"
The frequency_array and the set strain have different lengths
"
)
self
.
_frequency_domain_strain
=
frequency_domain_strain
self
.
_time_domain_strain
=
None
...
...
@@ -266,10 +245,8 @@ class InterferometerStrainData(object):
raise
ModuleNotFoundError
(
"
Cannot output strain data as gwpy TimeSeries
"
)
return
TimeSeries
(
self
.
time_domain_strain
,
sample_rate
=
self
.
sampling_frequency
,
t0
=
self
.
start_time
,
channel
=
self
.
channel
,
self
.
time_domain_strain
,
sample_rate
=
self
.
sampling_frequency
,
t0
=
self
.
start_time
,
channel
=
self
.
channel
)
def
to_pycbc_timeseries
(
self
):
...
...
@@ -284,9 +261,8 @@ class InterferometerStrainData(object):
raise
ModuleNotFoundError
(
"
Cannot output strain data as PyCBC TimeSeries
"
)
return
TimeSeries
(
self
.
time_domain_strain
,
delta_t
=
(
1.0
/
self
.
sampling_frequency
),
epoch
=
LIGOTimeGPS
(
self
.
start_time
),
self
.
time_domain_strain
,
delta_t
=
(
1.
/
self
.
sampling_frequency
),
epoch
=
LIGOTimeGPS
(
self
.
start_time
)
)
def
to_lal_timeseries
(
self
):
...
...
@@ -299,12 +275,8 @@ class InterferometerStrainData(object):
raise
ModuleNotFoundError
(
"
Cannot output strain data as PyCBC TimeSeries
"
)
lal_data
=
CreateREAL8TimeSeries
(
""
,
LIGOTimeGPS
(
self
.
start_time
),
0
,
1
/
self
.
sampling_frequency
,
SecondUnit
,
len
(
self
.
time_domain_strain
),
""
,
LIGOTimeGPS
(
self
.
start_time
),
0
,
1
/
self
.
sampling_frequency
,
SecondUnit
,
len
(
self
.
time_domain_strain
)
)
lal_data
.
data
.
data
[:]
=
self
.
time_domain_strain
...
...
@@ -317,15 +289,13 @@ class InterferometerStrainData(object):
try
:
from
gwpy.frequencyseries
import
FrequencySeries
except
ModuleNotFoundError
:
raise
ModuleNotFoundError
(
"
Cannot output strain data as gwpy FrequencySeries
"
)
raise
ModuleNotFoundError
(
"
Cannot output strain data as gwpy FrequencySeries
"
)
return
FrequencySeries
(
self
.
frequency_domain_strain
,
frequencies
=
self
.
frequency_array
,
epoch
=
self
.
start_time
,
channel
=
self
.
channel
,
channel
=
self
.
channel
)
def
to_pycbc_frequencyseries
(
self
):
...
...
@@ -342,7 +312,7 @@ class InterferometerStrainData(object):
return
FrequencySeries
(
self
.
frequency_domain_strain
,
delta_f
=
1
/
self
.
duration
,
epoch
=
LIGOTimeGPS
(
self
.
start_time
)
,
epoch
=
LIGOTimeGPS
(
self
.
start_time
)
)
def
to_lal_frequencyseries
(
self
):
...
...
@@ -360,49 +330,39 @@ class InterferometerStrainData(object):
self
.
frequency_array
[
0
],
1
/
self
.
duration
,
SecondUnit
,
len
(
self
.
frequency_domain_strain
)
,
len
(
self
.
frequency_domain_strain
)
)
lal_data
.
data
.
data
[:]
=
self
.
frequency_domain_strain
return
lal_data
def
low_pass_filter
(
self
,
filter_freq
=
None
):
"""
Low pass filter the data
"""
"""
Low pass filter the data
"""
from
gwpy.signal.filter_design
import
lowpass
from
gwpy.timeseries
import
TimeSeries
if
filter_freq
is
None
:
logger
.
debug
(
"
Setting low pass filter_freq using given maximum frequency
"
)
logger
.
debug
(
"
Setting low pass filter_freq using given maximum frequency
"
)
filter_freq
=
self
.
maximum_frequency
if
2
*
filter_freq
>=
self
.
sampling_frequency
:
logger
.
info
(
"
Low pass filter frequency of {}Hz requested, this is equal
"
"
or greater than the Nyquist frequency so no filter applied
"
.
format
(
filter_freq
)
)
"
or greater than the Nyquist frequency so no filter applied
"
.
format
(
filter_freq
))
return
logger
.
debug
(
"
Applying low pass filter with filter frequency {}
"
.
format
(
filter_freq
)
)
logger
.
debug
(
"
Applying low pass filter with filter frequency {}
"
.
format
(
filter_freq
))
bp
=
lowpass
(
filter_freq
,
self
.
sampling_frequency
)
strain
=
TimeSeries
(
self
.
time_domain_strain
,
sample_rate
=
self
.
sampling_frequency
)
strain
=
TimeSeries
(
self
.
time_domain_strain
,
sample_rate
=
self
.
sampling_frequency
)
strain
=
strain
.
filter
(
bp
,
filtfilt
=
True
)
self
.
_time_domain_strain
=
strain
.
value
def
create_power_spectral_density
(
self
,
fft_length
,
overlap
=
0
,
name
=
"
unknown
"
,
outdir
=
None
,
analysis_segment_start_time
=
None
,
):
"""
Use the time domain strain to generate a power spectral density
self
,
fft_length
,
overlap
=
0
,
name
=
'
unknown
'
,
outdir
=
None
,
analysis_segment_start_time
=
None
):
"""
Use the time domain strain to generate a power spectral density
This create a Tukey-windowed power spectral density and writes it to a
PSD file.
...
...
@@ -435,16 +395,13 @@ class InterferometerStrainData(object):
if
analysis_segment_start_time
is
not
None
:
analysis_segment_end_time
=
analysis_segment_start_time
+
fft_length
inside
=
(
analysis_segment_start_time
>
self
.
time_array
[
0
]
+
analysis_segment_end_time
<
self
.
time_array
[
-
1
]
)
inside
=
(
analysis_segment_start_time
>
self
.
time_array
[
0
]
+
analysis_segment_end_time
<
self
.
time_array
[
-
1
])
if
inside
:
logger
.
info
(
"
Removing analysis segment data from the PSD data
"
)
idxs
=
(
self
.
time_array
<
analysis_segment_start_time
)
+
(
self
.
time_array
>
analysis_segment_
end
_time
)
idxs
=
(
(
self
.
time_array
<
analysis_segment_
start
_time
)
+
(
self
.
time_array
>
analysis_segment_end_time
)
)
data
=
data
[
idxs
]
# WARNING this line can cause issues if the data is non-contiguous
...
...
@@ -452,87 +409,63 @@ class InterferometerStrainData(object):
psd_alpha
=
2
*
self
.
roll_off
/
fft_length
logger
.
info
(
"
Tukey window PSD data with alpha={}, roll off={}
"
.
format
(
psd_alpha
,
self
.
roll_off
)
)
psd_alpha
,
self
.
roll_off
))
psd
=
strain
.
psd
(
fftlength
=
fft_length
,
overlap
=
overlap
,
window
=
(
"
tukey
"
,
psd_alpha
)
)
fftlength
=
fft_length
,
overlap
=
overlap
,
window
=
(
'
tukey
'
,
psd_alpha
))
if
outdir
:
psd_file
=
"
{}/{}_PSD_{}_{}.txt
"
.
format
(
outdir
,
name
,
self
.
start_time
,
self
.
duration
)
with
open
(
"
{}
"
.
format
(
psd_file
),
"
w+
"
)
as
opened_file
:
psd_file
=
'
{}/{}_PSD_{}_{}.txt
'
.
format
(
outdir
,
name
,
self
.
start_time
,
self
.
duration
)
with
open
(
'
{}
'
.
format
(
psd_file
),
'
w+
'
)
as
opened_file
:
for
f
,
p
in
zip
(
psd
.
frequencies
.
value
,
psd
.
value
):
opened_file
.
write
(
"
{} {}
\n
"
.
format
(
f
,
p
))
opened_file
.
write
(
'
{} {}
\n
'
.
format
(
f
,
p
))
return
psd
.
frequencies
.
value
,
psd
.
value
def
_infer_time_domain_dependence
(
self
,
start_time
,
sampling_frequency
,
duration
,
time_array
):
"""
Helper function to figure out if the time_array, or
sampling_frequency and duration where given
"""
self
.
_infer_dependence
(
domain
=
"
time
"
,
array
=
time_array
,
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
self
,
start_time
,
sampling_frequency
,
duration
,
time_array
):
"""
Helper function to figure out if the time_array, or
sampling_frequency and duration where given
"""
self
.
_infer_dependence
(
domain
=
'
time
'
,
array
=
time_array
,
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
def
_infer_frequency_domain_dependence
(
self
,
start_time
,
sampling_frequency
,
duration
,
frequency_array
):
"""
Helper function to figure out if the frequency_array, or
sampling_frequency and duration where given
"""
self
.
_infer_dependence
(
domain
=
"
frequency
"
,
array
=
frequency_array
,
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
self
,
start_time
,
sampling_frequency
,
duration
,
frequency_array
):
"""
Helper function to figure out if the frequency_array, or
sampling_frequency and duration where given
"""
def
_infer_dependence
(
self
,
domain
,
array
,
duration
,
sampling_frequency
,
start_time
):
self
.
_infer_dependence
(
domain
=
'
frequency
'
,
array
=
frequency_array
,
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
def
_infer_dependence
(
self
,
domain
,
array
,
duration
,
sampling_frequency
,
start_time
):
if
(
sampling_frequency
is
not
None
)
and
(
duration
is
not
None
):
if
array
is
not
None
:
raise
ValueError
(
"
You have given the sampling_frequency, duration, and
"
"
an array
"
)
"
You have given the sampling_frequency, duration, and
"
"
an array
"
)
pass
elif
array
is
not
None
:
if
domain
==
"
time
"
:
if
domain
==
'
time
'
:
self
.
time_array
=
array
elif
domain
==
"
frequency
"
:
elif
domain
==
'
frequency
'
:
self
.
frequency_array
=
array
self
.
start_time
=
start_time
return
elif
sampling_frequency
is
None
or
duration
is
None
:
raise
ValueError
(
"
You must provide both sampling_frequency and duration
"
)
raise
ValueError
(
"
You must provide both sampling_frequency and duration
"
)
else
:
raise
ValueError
(
"
Insufficient information given to set arrays
"
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
raise
ValueError
(
"
Insufficient information given to set arrays
"
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
def
set_from_time_domain_strain
(
self
,
time_domain_strain
,
sampling_frequency
=
None
,
duration
=
None
,
start_time
=
0
,
time_array
=
None
,
):
"""
Set the strain data from a time domain strain array
self
,
time_domain_strain
,
sampling_frequency
=
None
,
duration
=
None
,
start_time
=
0
,
time_array
=
None
):
"""
Set the strain data from a time domain strain array
This sets the time_domain_strain attribute, the frequency_domain_strain
is automatically calculated after a low-pass filter and Tukey window
...
...
@@ -553,14 +486,12 @@ class InterferometerStrainData(object):
given.
"""
self
.
_infer_time_domain_dependence
(
start_time
=
start_time
,
sampling_frequency
=
sampling_frequency
,
duration
=
duration
,
time_array
=
time_array
,
)
self
.
_infer_time_domain_dependence
(
start_time
=
start_time
,
sampling_frequency
=
sampling_frequency
,
duration
=
duration
,
time_array
=
time_array
)
logger
.
debug
(
"
Setting data using provided time_domain_strain
"
)
logger
.
debug
(
'
Setting data using provided time_domain_strain
'
)
if
np
.
shape
(
time_domain_strain
)
==
np
.
shape
(
self
.
time_array
):
self
.
_time_domain_strain
=
time_domain_strain
self
.
_frequency_domain_strain
=
None
...
...
@@ -568,7 +499,7 @@ class InterferometerStrainData(object):
raise
ValueError
(
"
Data times do not match time array
"
)
def
set_from_gwpy_timeseries
(
self
,
time_series
):
"""
Set the strain data from a gwpy TimeSeries
"""
Set the strain data from a gwpy TimeSeries
This sets the time_domain_strain attribute, the frequency_domain_strain
is automatically calculated after a low-pass filter and Tukey window
...
...
@@ -581,15 +512,13 @@ class InterferometerStrainData(object):
"""
from
gwpy.timeseries
import
TimeSeries
logger
.
debug
(
"
Setting data using provided gwpy TimeSeries object
"
)
logger
.
debug
(
'
Setting data using provided gwpy TimeSeries object
'
)
if
not
isinstance
(
time_series
,
TimeSeries
):
raise
ValueError
(
"
Input time_series is not a gwpy TimeSeries
"
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
time_series
.
duration
.
value
,
sampling_frequency
=
time_series
.
sample_rate
.
value
,
start_time
=
time_series
.
epoch
.
value
,
)
self
.
_times_and_frequencies
=
\
CoupledTimeAndFrequencySeries
(
duration
=
time_series
.
duration
.
value
,
sampling_frequency
=
time_series
.
sample_rate
.
value
,
start_time
=
time_series
.
epoch
.
value
)
self
.
_time_domain_strain
=
time_series
.
value
self
.
_frequency_domain_strain
=
None
self
.
_channel
=
time_series
.
channel
...
...
@@ -599,9 +528,9 @@ class InterferometerStrainData(object):
return
self
.
_channel
def
set_from_open_data
(
self
,
name
,
start_time
,
duration
=
4
,
outdir
=
"
outdir
"
,
cache
=
True
,
**
kwargs
):
"""
Set the strain data from open LOSC data
self
,
name
,
start_time
,
duration
=
4
,
outdir
=
'
outdir
'
,
cache
=
True
,
**
kwargs
):
"""
Set the strain data from open LOSC data
This sets the time_domain_strain attribute, the frequency_domain_strain
is automatically calculated after a low-pass filter and Tukey window
...
...
@@ -626,18 +555,13 @@ class InterferometerStrainData(object):
"""
timeseries
=
gwutils
.
get_open_strain_data
(
name
,
start_time
,
start_time
+
duration
,
outdir
=
outdir
,
cache
=
cache
,
**
kwargs
)
name
,
start_time
,
start_time
+
duration
,
outdir
=
outdir
,
cache
=
cache
,
**
kwargs
)
self
.
set_from_gwpy_timeseries
(
timeseries
)
def
set_from_csv
(
self
,
filename
):
"""
Set the strain data from a csv file
"""
Set the strain data from a csv file
Parameters
==========
...
...
@@ -646,19 +570,13 @@ class InterferometerStrainData(object):
"""
from
gwpy.timeseries
import
TimeSeries
timeseries
=
TimeSeries
.
read
(
filename
,
format
=
"
csv
"
)
timeseries
=
TimeSeries
.
read
(
filename
,
format
=
'
csv
'
)
self
.
set_from_gwpy_timeseries
(
timeseries
)
def
set_from_frequency_domain_strain
(
self
,
frequency_domain_strain
,
sampling_frequency
=
None
,
duration
=
None
,
start_time
=
0
,
frequency_array
=
None
,
):
"""
Set the `frequency_domain_strain` from a numpy array
self
,
frequency_domain_strain
,
sampling_frequency
=
None
,
duration
=
None
,
start_time
=
0
,
frequency_array
=
None
):
"""
Set the `frequency_domain_strain` from a numpy array
Parameters
==========
...
...
@@ -676,14 +594,12 @@ class InterferometerStrainData(object):
"""
self
.
_infer_frequency_domain_dependence
(
start_time
=
start_time
,
sampling_frequency
=
sampling_frequency
,
duration
=
duration
,
frequency_array
=
frequency_array
,
)
self
.
_infer_frequency_domain_dependence
(
start_time
=
start_time
,
sampling_frequency
=
sampling_frequency
,
duration
=
duration
,
frequency_array
=
frequency_array
)
logger
.
debug
(
"
Setting data using provided frequency_domain_strain
"
)
logger
.
debug
(
'
Setting data using provided frequency_domain_strain
'
)
if
np
.
shape
(
frequency_domain_strain
)
==
np
.
shape
(
self
.
frequency_array
):
self
.
_frequency_domain_strain
=
frequency_domain_strain
self
.
window_factor
=
1
...
...
@@ -691,9 +607,9 @@ class InterferometerStrainData(object):
raise
ValueError
(
"
Data frequencies do not match frequency_array
"
)
def
set_from_power_spectral_density
(
self
,
power_spectral_density
,
sampling_frequency
,
duration
,
start_time
=
0
):
"""
Set the `frequency_domain_strain` by generating a noise realisation
self
,
power_spectral_density
,
sampling_frequency
,
duration
,
start_time
=
0
):
"""
Set the `frequency_domain_strain` by generating a noise realisation
Parameters
==========
...
...
@@ -708,20 +624,15 @@ class InterferometerStrainData(object):
"""
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
logger
.
debug
(
"
Setting data using noise realization from provided
"
"
power_spectal_density
"
)
(
frequency_domain_strain
,
frequency_array
,
)
=
power_spectral_density
.
get_noise_realisation
(
self
.
sampling_frequency
,
self
.
duration
)
'
Setting data using noise realization from provided
'
'
power_spectal_density
'
)
frequency_domain_strain
,
frequency_array
=
\
power_spectral_density
.
get_noise_realisation
(
self
.
sampling_frequency
,
self
.
duration
)
if
np
.
array_equal
(
frequency_array
,
self
.
frequency_array
):
self
.
_frequency_domain_strain
=
frequency_domain_strain
...
...
@@ -729,7 +640,7 @@ class InterferometerStrainData(object):
raise
ValueError
(
"
Data frequencies do not match frequency_array
"
)
def
set_from_zero_noise
(
self
,
sampling_frequency
,
duration
,
start_time
=
0
):
"""
Set the `frequency_domain_strain` to zero noise
"""
Set the `frequency_domain_strain` to zero noise
Parameters
==========
...
...
@@ -742,26 +653,17 @@ class InterferometerStrainData(object):
"""
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
logger
.
debug
(
"
Setting zero noise data
"
)
self
.
_frequency_domain_strain
=
np
.
zeros_like
(
self
.
frequency_array
,
dtype
=
complex
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
logger
.
debug
(
'
Setting zero noise data
'
)
self
.
_frequency_domain_strain
=
np
.
zeros_like
(
self
.
frequency_array
,
dtype
=
complex
)
def
set_from_frame_file
(
self
,
frame_file
,
sampling_frequency
,
duration
,
start_time
=
0
,
channel
=
None
,
buffer_time
=
1
,
):
"""
Set the `frequency_domain_strain` from a frame fiile
self
,
frame_file
,
sampling_frequency
,
duration
,
start_time
=
0
,
channel
=
None
,
buffer_time
=
1
):
"""
Set the `frequency_domain_strain` from a frame fiile
Parameters
==========
...
...
@@ -782,25 +684,19 @@ class InterferometerStrainData(object):
"""
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
logger
.
info
(
"
Reading data from frame file {}
"
.
format
(
frame_file
))
logger
.
info
(
'
Reading data from frame file {}
'
.
format
(
frame_file
))
strain
=
gwutils
.
read_frame_file
(
frame_file
,
start_time
=
start_time
,
end_time
=
start_time
+
duration
,
buffer_time
=
buffer_time
,
channel
=
channel
,
resample
=
sampling_frequency
,
)
frame_file
,
start_time
=
start_time
,
end_time
=
start_time
+
duration
,
buffer_time
=
buffer_time
,
channel
=
channel
,
resample
=
sampling_frequency
)
self
.
set_from_gwpy_timeseries
(
strain
)
def
set_from_channel_name
(
self
,
channel
,
duration
,
start_time
,
sampling_frequency
):
"""
Set the `frequency_domain_strain` by fetching from given channel
"""
Set the `frequency_domain_strain` by fetching from given channel
using gwpy.TimesSeries.get(), which dynamically accesses either frames
on disk, or a remote NDS2 server to find and return data. This function
also verifies that the specified channel is given in the correct format.
...
...
@@ -818,18 +714,15 @@ class InterferometerStrainData(object):
"""
from
gwpy.timeseries
import
TimeSeries
channel_comp
=
channel
.
split
(
"
:
"
)
channel_comp
=
channel
.
split
(
'
:
'
)
if
len
(
channel_comp
)
!=
2
:
raise
IndexError
(
"
Channel name must have format `IFO:Channel`
"
)
raise
IndexError
(
'
Channel name must have format `IFO:Channel`
'
)
self
.
_times_and_frequencies
=
CoupledTimeAndFrequencySeries
(
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
,
)
duration
=
duration
,
sampling_frequency
=
sampling_frequency
,
start_time
=
start_time
)
logger
.
info
(
"
Fetching data using channel {}
"
.
format
(
channel
))
logger
.
info
(
'
Fetching data using channel {}
'
.
format
(
channel
))
strain
=
TimeSeries
.
get
(
channel
,
start_time
,
start_time
+
duration
)
strain
=
strain
.
resample
(
sampling_frequency
)
...
...
@@ -838,7 +731,7 @@ class InterferometerStrainData(object):
class
Notch
(
object
):
def
__init__
(
self
,
minimum_frequency
,
maximum_frequency
):
"""
A notch object storing the maximum and minimum frequency of the notch
"""
A notch object storing the maximum and minimum frequency of the notch
Parameters
==========
...
...
@@ -851,13 +744,12 @@ class Notch(object):
self
.
minimum_frequency
=
minimum_frequency
self
.
maximum_frequency
=
maximum_frequency
else
:
msg
=
"
Your notch minimum_frequency {} and maximum_frequency {} are invalid
"
.
format
(
minimum_frequency
,
maximum_frequency
)
msg
=
(
"
Your notch minimum_frequency {} and maximum_frequency {} are invalid
"
.
format
(
minimum_frequency
,
maximum_frequency
))
raise
ValueError
(
msg
)
def
get_idxs
(
self
,
frequency_array
):
"""
Get a boolean mask for the frequencies in frequency_array in the notch
"""
Get a boolean mask for the frequencies in frequency_array in the notch
Parameters
==========
...
...
@@ -870,12 +762,12 @@ class Notch(object):
An array of booleans which are True for frequencies in the notch
"""
lower
=
frequency_array
>
self
.
minimum_frequency
upper
=
frequency_array
<
self
.
maximum_frequency
lower
=
(
frequency_array
>
self
.
minimum_frequency
)
upper
=
(
frequency_array
<
self
.
maximum_frequency
)
return
lower
&
upper
def
check_frequency
(
self
,
freq
):
"""
Check if freq is inside the notch
"""
Check if freq is inside the notch
Parameters
==========
...
...
@@ -896,7 +788,7 @@ class Notch(object):
class
NotchList
(
list
):
def
__init__
(
self
,
notch_list
):
"""
A list of notches
"""
A list of notches
Parameters
==========
...
...
@@ -919,7 +811,7 @@ class NotchList(list):
raise
ValueError
(
msg
)
def
check_frequency
(
self
,
freq
):
"""
Check if freq is inside the notch list
"""
Check if freq is inside the notch list
Parameters
==========
...
...
This diff is collapsed.
Click to expand it.
Gregory Ashton
@gregory.ashton
mentioned in commit
600bb94d
·
2 years ago
mentioned in commit
600bb94d
mentioned in commit 600bb94dc54c9659b113f09fbaac7bdd85709471
Toggle commit list
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment