diff --git a/gstlal-ugly/bin/gstlal_ifo_stat b/gstlal-ugly/bin/gstlal_ifo_stat index 540c2decc0ccfee3f0822bfd517a70627ab43d36..d320f609277a51b9cae48c262f917c8bf7fa722f 100755 --- a/gstlal-ugly/bin/gstlal_ifo_stat +++ b/gstlal-ugly/bin/gstlal_ifo_stat @@ -30,6 +30,7 @@ Gst.init(None) from optparse import OptionParser from collections import deque, defaultdict +from gstlal import datasource from gstlal import simplehandler from gstlal import pipeparts @@ -39,11 +40,13 @@ from ligo.scald.io import influx def parse_command_line(): parser = OptionParser(description = __doc__) + # generic "source" options + datasource.append_options(parser) + parser.add_option("--output-url", metavar = "path", help = "Write to this url") - parser.add_option("--channel-name", metavar="channel", type = str, action = 'append', help = "Specify channel names for each ifo as --channel-name=ifo:channel-name. Can be given multiple times.") - parser.add_option("--shared-memory-directory", metavar="path", type = str, action = 'append', help = "Specify the shared memory directory to read frame files from as --shared-memory-directory=ifo:dir. Can be given multiple times.") parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time") parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.") + parser.add_option("--bootstrap-from-influx", action="store_true", default = False, help = "If given, upon start up the previous IFO state and duration will be read from the Influx database provided by --scald-config. In this case the duration measurement will pick up from the last stored value. Default is false.") parser.add_option("--verbose", action="store_true", help = "Be verbose.") options, filenames = parser.parse_args() @@ -52,27 +55,96 @@ def parse_command_line(): class IFOStatusTracker(object): - def __init__(self, options): + def __init__(self, options, ifo): logging.info('setting up ifo status tracker...') + self.ifo = ifo self.output_url = options.output_url self.last_reduce = None + self.scald_config_filename = options.scald_config # set up influx configuration - with open(options.scald_config, 'r') as f: - config = yaml.safe_load(f) - self.influx_sink = influx.Aggregator(**config["backends"]["default"]) + with open(self.scald_config_filename, 'r') as f: + self.config = yaml.safe_load(f) + self.influx_sink = influx.Aggregator(**self.config["backends"]["default"]) - self.influx_sink.load(path=options.scald_config) + self.influx_sink.load(path = self.scald_config_filename) # set up deques for storing data self.timedeq = defaultdict(lambda: deque(maxlen=10000)) - routes = ['hoft_ok', 'duration'] + routes = ['state', 'hoft_ok', 'duration'] self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes} - self.previous = defaultdict(lambda: (None, None)) - self.duration = defaultdict(lambda: 0) + # bootstrap previous state and duration from influx + if options.bootstrap_from_influx: + prev_time, prev_state, prev_dur = self.bootstrap_from_influx() + else: + prev_time = None + prev_state = None + prev_dur = 0 + + self.previous = defaultdict(lambda: (prev_time, prev_state)) + self.duration = defaultdict(lambda: prev_dur) + + + def bootstrap_from_influx(self): + backend = self.config["backends"]["default"] + + db = backend['db'] + hostname = backend['hostname'] + auth = backend['auth'] + https = backend['https'] + check_certs = backend['check_certs'] + + influx_consumer = influx.Consumer(hostname=hostname, db=db, auth=auth, https=https, check_certs=check_certs) + influx_consumer.load(self.scald_config_filename) + + try: + state_times, state_values = influx_consumer.retrieve_timeseries_latest("state", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1) + dur_times, dur_values = influx_consumer.retrieve_timeseries_latest("duration", "data", tags=[("ifo", self.ifo)], dt = 1, num_latest = 1) + + previous_time = state_times[0] + previous_state = state_values[0] + previous_dur = dur_values[0] + except IndexError as exception: + logging.warning(f"Failed to bootstrap previous state and duration from influx: {exception}") + + previous_time = None + previous_state = None + previous_dur = 0 + else: + logging.debug(f"Bootstrapping from influx: previous state \"{previous_state}\" at time: {previous_time} with duration: {previous_dur} sec.") + + return previous_time, previous_state, previous_dur + + @staticmethod + def LIGO_parse_state(bitmask): + hoftbit = int(bitmask[31]) + intentbit = int(bitmask[30]) + injbits = int(bitmask[23:26]) + + logging.debug(f'hoftbit: {hoftbit} | intentbit: {intentbit} | injbits: {injbits}') + + if hoftbit: + if intentbit: + if injbits == 111: + state = "Observing" + else: + state = "Injection" + elif injbits == 111: + state = "Ready" + else: + state = "Injection study" + elif injbits == 111: + if intentbit: + state = "Calib not ready" + else: + state = "Down" + else: + state = "Injection study" + + return state - def bufhandler(self, elem, ifo): + def bufhandler(self, elem): buf = elem.emit("pull-sample").get_buffer() (result, mapinfo) = buf.map(Gst.MapFlags.READ) if mapinfo.data: @@ -82,29 +154,43 @@ class IFOStatusTracker(object): bit = int(bit) time = float(time) - ## FIXME hacky - if ifo == "K1": + if self.ifo == "H1" or self.ifo == "L1": + bitmask = bin(bit) + npad = 32 - len(bitmask) + bitmask = "0" * npad + bitmask + state = self.LIGO_parse_state(bitmask) + + elif self.ifo == "K1": # only check 0th bit - state = 1 if (bit & 0b1 == 0b1) else 0 + state = "Observing" if (bit & 0b1 == 0b1) else "Down" + elif self.ifo == "V1": + # for Virgo check 0th and 1st bit + state = "Observing" if (bit & 0b11 == 0b11) else "Down" + else: + raise ValueError(f"Unsupported interferometer: {self.ifo}") + + # keep this metric for backwards compatibility + if state == "Observing": + hoft_ok = 1 else: - # for HLV check 0th and 1st bit - state = 1 if (bit & 0b11 == 0b11) else 0 + hoft_ok = 0 statechange = False ## track duration - if state == self.previous[ifo][1]: - self.duration[ifo] += time - self.previous[ifo][0] - elif self.previous[ifo][0]: + if state == self.previous[self.ifo][1]: + self.duration[self.ifo] += time - self.previous[self.ifo][0] + elif self.previous[self.ifo][0]: statechange = True - self.duration[ifo] = time - self.previous[ifo][0] + self.duration[self.ifo] = time - self.previous[self.ifo][0] - logging.debug(f'{ifo} h(t) state at {time} = {state} | duration = {self.duration[ifo]}') + logging.debug(f'{self.ifo} h(t) state at {time} = {state} | duration = {self.duration[self.ifo]}') ## store data - self.timedeq[ifo].append(int(time)) - self.datadeq['hoft_ok'][ifo].append(state) - self.datadeq['duration'][ifo].append(self.duration[ifo]) + self.timedeq[self.ifo].append(int(time)) + self.datadeq['state'][self.ifo].append(state) + self.datadeq['hoft_ok'][self.ifo].append(hoft_ok) + self.datadeq['duration'][self.ifo].append(self.duration[self.ifo]) if self.last_reduce is None: self.last_reduce = time @@ -112,7 +198,7 @@ class IFOStatusTracker(object): ## write to influx if time - self.last_reduce >= 50. or statechange: if statechange: - logging.info(f'{ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}') + logging.info(f'{self.ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}') self.last_reduce = int(utils.gps_now()) logging.debug(f'Reducing data to influx...') @@ -127,7 +213,7 @@ class IFOStatusTracker(object): for route in outdata: self.influx_sink.store_columns(route, outdata[route], aggregate = "max") - self.previous[ifo] = (time, state) + self.previous[self.ifo] = (time, state) buf.unmap(mapinfo) return Gst.FlowReturn.OK @@ -135,44 +221,33 @@ class IFOStatusTracker(object): def main(): options, filenames = parse_command_line() - channel_dict = {} - dirs_dict = {} - for channel in options.channel_name: - ifo, channel_name = channel.split('=') - channel_dict.update({ - ifo: channel_name - }) - - for dir in options.shared_memory_directory: - ifo, shm_dir = dir.split('=') - dirs_dict.update({ - ifo: shm_dir - }) - - if not channel_dict.keys() == dirs_dict.keys(): - raise Exception(f'List of IFOs provided with channels does not match list of IFOs provided with SHM directories.') - # set up logging log_level = logging.DEBUG if options.verbose else logging.INFO logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_gw_stat : %(levelname)s : %(message)s") - tracker = IFOStatusTracker(options) + gw_data_source_info = datasource.DataSourceInfo.from_optparse(options) + + # only support one channel + if len(gw_data_source_info.channel_dict.keys()) > 1: + logging.warning("Provided more than one instrument, but only one is supporting. Defaulting to use the first provided.") + ifo = list(gw_data_source_info.channel_dict.keys())[0] + + tracker = IFOStatusTracker(options, ifo) # building the event loop and pipeline mainloop = GObject.MainLoop() pipeline = Gst.Pipeline("gstlal_dq") handler = simplehandler.Handler(mainloop, pipeline) - for ifo in channel_dict.keys(): - src = pipeparts.mkdevshmsrc(pipeline, shm_dirname = dirs_dict[ifo], wait_time = options.wait_time) - src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True) - pipeparts.framecpp_channeldemux_set_units(src, {f"{ifo}:{channel_dict[ifo]}": "strain"}) - state = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 1) # 1 minutes of buffering - pipeparts.src_deferred_link(src, f"{ifo}:{channel_dict[ifo]}", state.get_static_pad("sink")) - sink = pipeparts.mkgeneric(pipeline, state, "lal_nxydump") - sink = pipeparts.mkprogressreport(pipeline, sink, name=ifo) - sink = pipeparts.mkappsink(pipeline, sink, max_buffers = 1) - sink.connect("new-sample", tracker.bufhandler, ifo) + src = pipeparts.mkdevshmsrc(pipeline, shm_dirname = gw_data_source_info.shared_memory_dir[ifo], wait_time = options.wait_time) + src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True) + pipeparts.framecpp_channeldemux_set_units(src, {f"{ifo}:{gw_data_source_info.channel_name[ifo]}": "strain"}) + state = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 1) # 1 minutes of buffering + pipeparts.src_deferred_link(src, f"{ifo}:{gw_data_source_info.channel_name[ifo]}", state.get_static_pad("sink")) + sink = pipeparts.mkgeneric(pipeline, state, "lal_nxydump") + sink = pipeparts.mkprogressreport(pipeline, sink, name=ifo) + sink = pipeparts.mkappsink(pipeline, sink, max_buffers = 1) + sink.connect("new-sample", tracker.bufhandler) if not pipeline.set_state(Gst.State.PLAYING): raise RuntimeError("pipeline failed to enter PLAYING state")