Skip to content
Snippets Groups Projects

Draft: WIP Positive/negative standard deviation refinement

Open Camilla Compton requested to merge camilla.compton/locklost:pos_neg_std into master
1 unresolved thread
+ 31
14
import logging
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from gwpy.segments import Segment
@@ -65,14 +66,20 @@ def plot_indicators(event, channel, refined_gps, threshold):
linestyle='--',
linewidth=2,
)
if y_ind[0] < threshold:
xyoffset = (8, 10)
valign = 'bottom'
else:
xyoffset = (8, -10)
valign = 'top'
ax1.annotate(
"threshold",
xy=(ax1.get_xlim()[0], threshold),
xycoords='data',
xytext=(8, -10),
xytext=xyoffset,
textcoords='offset points',
horizontalalignment='left',
verticalalignment='top',
verticalalignment=valign,
bbox=dict(
boxstyle="round", fc="w", ec="green", alpha=0.95),
)
@@ -153,8 +160,8 @@ def find_transition(channel, segment, std_threshold, minimum=None):
returns `threshold`, `refined_gps` tuple
"""
refined_gps = None
threshold = None
refined_gps = None
buf = data.fetch(channel, segment)[0]
# calculate mean and std using first 5 seconds of window
@@ -173,25 +180,35 @@ def find_transition(channel, segment, std_threshold, minimum=None):
if minimum and lock_mean < minimum:
logging.info("channel mean below minimum, unable to resolve time")
# look for positive and negative threshold crossings, add them to a list,
# then choose the first crossing if one exists
# look for positive and negative threshold crossings, add them to a
# dictionary, then choose the first crossing if one exists
else:
inds_above = np.where(buf.data > upper_threshold)[0]
inds_below = np.where(buf.data < lower_threshold)[0]
thresh_crossing_list = []
thresh_crossing_dict = defaultdict(lambda: defaultdict(int))
if inds_above.any():
thresh_crossing_list.append([np.min(inds_above), upper_threshold])
thresh_crossing_dict['ind']['upper'] = np.min(inds_above)
thresh_crossing_dict['thresh']['upper'] = upper_threshold
if inds_below.any():
thresh_crossing_list.append([np.min(inds_below), lower_threshold])
if len(thresh_crossing_list) > 0:
thresh_crossing_list.sort()
ind = thresh_crossing_list[0][0]
threshold = thresh_crossing_list[0][1]
thresh_crossing_dict['ind']['lower'] = np.min(inds_below)
thresh_crossing_dict['thresh']['lower'] = lower_threshold
first_crossing = min(
thresh_crossing_dict['ind'],
key = lambda k: thresh_crossing_dict['ind'][k],
default = None
)
if first_crossing is None:
logging.info("no threshold crossings, unable to resolve time")
else:
threshold = thresh_crossing_dict['thresh'][first_crossing]
ind = thresh_crossing_dict['ind'][first_crossing]
refined_gps = buf.tarray[ind]
logging.info("threshold: {}".format(threshold))
logging.info("refined time: {}".format(refined_gps))
else:
logging.info("no threshold crossings, unable to resolve time")
# add a threshold to be diplayed on graphs if refinement failed
if threshold is None:
threshold = lower_threshold
return threshold, refined_gps
Loading