From cc8781289ab977fbc84bc7e8884fc90f4b8e97d4 Mon Sep 17 00:00:00 2001 From: Rebecca Ewing <ree55@psu.edu> Date: Mon, 17 Jul 2023 10:49:03 -0400 Subject: [PATCH] gstlal-ugly/bin/gstlal_ifo_stat: add option to bootstrap previous state and duration from influx --- gstlal-ugly/bin/gstlal_ifo_stat | 52 +++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/gstlal-ugly/bin/gstlal_ifo_stat b/gstlal-ugly/bin/gstlal_ifo_stat index 1cf1c20f49..f948ba3b2f 100755 --- a/gstlal-ugly/bin/gstlal_ifo_stat +++ b/gstlal-ugly/bin/gstlal_ifo_stat @@ -46,6 +46,7 @@ def parse_command_line(): parser.add_option("--output-url", metavar = "path", help = "Write to this url") parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time") parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.") + parser.add_option("--bootstrap-from-influx", action="store_true", default = False, help = "If given, upon start up the previous IFO state and duration will be read from the Influx database provided by --scald-config. In this case the duration measurement will pick up from the last stored value. Default is false.") parser.add_option("--verbose", action="store_true", help = "Be verbose.") options, filenames = parser.parse_args() @@ -59,22 +60,61 @@ class IFOStatusTracker(object): self.ifo = ifo self.output_url = options.output_url self.last_reduce = None + self.scald_config_filename = options.scald_config # set up influx configuration - with open(options.scald_config, 'r') as f: - config = yaml.safe_load(f) - self.influx_sink = influx.Aggregator(**config["backends"]["default"]) + with open(self.scald_config_filename, 'r') as f: + self.config = yaml.safe_load(f) + self.influx_sink = influx.Aggregator(**self.config["backends"]["default"]) - self.influx_sink.load(path=options.scald_config) + self.influx_sink.load(path = self.scald_config_filename) # set up deques for storing data self.timedeq = defaultdict(lambda: deque(maxlen=10000)) routes = ['state', 'hoft_ok', 'duration'] self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes} - self.previous = defaultdict(lambda: (None, None)) - self.duration = defaultdict(lambda: 0) + # bootstrap previous state and duration from influx + if options.bootstrap_from_influx: + prev_time, prev_state, prev_dur = self.bootstrap_from_influx() + else: + prev_time = None + prev_state = None + prev_dur = 0 + + self.previous = defaultdict(lambda: (prev_time, prev_state)) + self.duration = defaultdict(lambda: prev_dur) + + + def bootstrap_from_influx(self): + backend = self.config["backends"]["default"] + + db = backend['db'] + hostname = backend['hostname'] + auth = backend['auth'] + https = backend['https'] + check_certs = backend['check_certs'] + + influx_consumer = influx.Consumer(hostname=hostname, db=db, auth=auth, https=https, check_certs=check_certs) + influx_consumer.load(self.scald_config_filename) + + try: + state_times, state_values = influx_consumer.retrieve_timeseries_latest("state", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1) + dur_times, dur_values = influx_consumer.retrieve_timeseries_latest("duration", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1) + + previous_time = state_times[0] + previous_state = state_values[0] + previous_dur = dur_values[0] + except IndexError as exception: + logging.warning(f"Failed to bootstrap previous state and duration from influx: {exception}") + + previous_time = None + previous_state = None + previous_dur = 0 + else: + logging.debug(f"Bootstrapping from influx: previous state \"{previous_state}\" at time: {previous_time} with duration: {previous_dur} sec.") + return previous_time, previous_state, previous_dur @staticmethod def LIGO_parse_state(bitmask): -- GitLab