Skip to content
Snippets Groups Projects
Commit cc878128 authored by Rebecca Ewing's avatar Rebecca Ewing Committed by Rebecca Ewing
Browse files

gstlal-ugly/bin/gstlal_ifo_stat: add option to bootstrap previous state and duration from influx

parent 387dacbe
No related branches found
No related tags found
1 merge request!522gstlal_ifo_stat updates for gwistat
Pipeline #545349 passed
......@@ -46,6 +46,7 @@ def parse_command_line():
parser.add_option("--output-url", metavar = "path", help = "Write to this url")
parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time")
parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.")
parser.add_option("--bootstrap-from-influx", action="store_true", default = False, help = "If given, upon start up the previous IFO state and duration will be read from the Influx database provided by --scald-config. In this case the duration measurement will pick up from the last stored value. Default is false.")
parser.add_option("--verbose", action="store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
......@@ -59,22 +60,61 @@ class IFOStatusTracker(object):
self.ifo = ifo
self.output_url = options.output_url
self.last_reduce = None
self.scald_config_filename = options.scald_config
# set up influx configuration
with open(options.scald_config, 'r') as f:
config = yaml.safe_load(f)
self.influx_sink = influx.Aggregator(**config["backends"]["default"])
with open(self.scald_config_filename, 'r') as f:
self.config = yaml.safe_load(f)
self.influx_sink = influx.Aggregator(**self.config["backends"]["default"])
self.influx_sink.load(path=options.scald_config)
self.influx_sink.load(path = self.scald_config_filename)
# set up deques for storing data
self.timedeq = defaultdict(lambda: deque(maxlen=10000))
routes = ['state', 'hoft_ok', 'duration']
self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes}
self.previous = defaultdict(lambda: (None, None))
self.duration = defaultdict(lambda: 0)
# bootstrap previous state and duration from influx
if options.bootstrap_from_influx:
prev_time, prev_state, prev_dur = self.bootstrap_from_influx()
else:
prev_time = None
prev_state = None
prev_dur = 0
self.previous = defaultdict(lambda: (prev_time, prev_state))
self.duration = defaultdict(lambda: prev_dur)
def bootstrap_from_influx(self):
backend = self.config["backends"]["default"]
db = backend['db']
hostname = backend['hostname']
auth = backend['auth']
https = backend['https']
check_certs = backend['check_certs']
influx_consumer = influx.Consumer(hostname=hostname, db=db, auth=auth, https=https, check_certs=check_certs)
influx_consumer.load(self.scald_config_filename)
try:
state_times, state_values = influx_consumer.retrieve_timeseries_latest("state", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1)
dur_times, dur_values = influx_consumer.retrieve_timeseries_latest("duration", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1)
previous_time = state_times[0]
previous_state = state_values[0]
previous_dur = dur_values[0]
except IndexError as exception:
logging.warning(f"Failed to bootstrap previous state and duration from influx: {exception}")
previous_time = None
previous_state = None
previous_dur = 0
else:
logging.debug(f"Bootstrapping from influx: previous state \"{previous_state}\" at time: {previous_time} with duration: {previous_dur} sec.")
return previous_time, previous_state, previous_dur
@staticmethod
def LIGO_parse_state(bitmask):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment