gstlal_ifo_stat 7.17 KiB
#!/usr/bin/env python3
#
# Copyright (C) 2015 Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import io
import logging
import yaml
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from optparse import OptionParser
from collections import deque, defaultdict
from gstlal import datasource
from gstlal import simplehandler
from gstlal import pipeparts
from ligo.scald import utils
from ligo.scald.io import influx
def parse_command_line():
parser = OptionParser(description = __doc__)
# generic "source" options
datasource.append_options(parser)
parser.add_option("--output-url", metavar = "path", help = "Write to this url")
parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time")
parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.")
parser.add_option("--verbose", action="store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
return options, filenames
class IFOStatusTracker(object):
def __init__(self, options, ifo):
logging.info('setting up ifo status tracker...')
self.ifo = ifo
self.output_url = options.output_url
self.last_reduce = None
# set up influx configuration
with open(options.scald_config, 'r') as f:
config = yaml.safe_load(f)
self.influx_sink = influx.Aggregator(**config["backends"]["default"])
self.influx_sink.load(path=options.scald_config)
# set up deques for storing data
self.timedeq = defaultdict(lambda: deque(maxlen=10000))
routes = ['state', 'hoft_ok', 'duration']
self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes}
self.previous = defaultdict(lambda: (None, None))
self.duration = defaultdict(lambda: 0)
@staticmethod
def LIGO_parse_state(bitmask):
hoftbit = int(bitmask[31])
intentbit = int(bitmask[30])
injbits = int(bitmask[23:26])
logging.debug(f'hoftbit: {hoftbit} | intentbit: {intentbit} | injbits: {injbits}')
if hoftbit:
if intentbit:
if injbits == 111:
state = "Observing"
else:
state = "Injection"
elif injbits == 111:
state = "Ready"
else:
state = "Injection study"
elif injbits == 111:
if intentbit:
state = "Calib not ready"
else:
state = "Down"
else:
state = "Injection study"
return state
def bufhandler(self, elem):
buf = elem.emit("pull-sample").get_buffer()
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
if mapinfo.data:
s = io.StringIO(mapinfo.data.decode('utf-8'))
# FIXME just get the first value?
time, bit = s.getvalue().split('\n')[0].split()
bit = int(bit)
time = float(time)
if self.ifo == "H1" or self.ifo == "L1":
bitmask = bin(bit)
npad = 32 - len(bitmask)
bitmask = "0" * npad + bitmask
state = self.LIGO_parse_state(bitmask)
elif self.ifo == "K1":
# only check 0th bit
state = "Observing" if (bit & 0b1 == 0b1) else "Down"
else:
# for Virgo check 0th and 1st bit
state = "Observing" if (bit & 0b11 == 0b11) else "Down"
# keep this metric for backwards compatibility
if state == "Observing":
hoft_ok = 1
else:
hoft_ok = 0
statechange = False
## track duration
if state == self.previous[self.ifo][1]:
self.duration[self.ifo] += time - self.previous[self.ifo][0]
elif self.previous[self.ifo][0]:
statechange = True
self.duration[self.ifo] = time - self.previous[self.ifo][0]
logging.debug(f'{self.ifo} h(t) state at {time} = {state} | duration = {self.duration[self.ifo]}')
## store data
self.timedeq[self.ifo].append(int(time))
self.datadeq['state'][self.ifo].append(state)
self.datadeq['hoft_ok'][self.ifo].append(hoft_ok)
self.datadeq['duration'][self.ifo].append(self.duration[self.ifo])
if self.last_reduce is None:
self.last_reduce = time
## write to influx
if time - self.last_reduce >= 50. or statechange:
if statechange:
logging.info(f'{self.ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}')
self.last_reduce = int(utils.gps_now())
logging.debug(f'Reducing data to influx...')
outdata = {}
for route in self.datadeq.keys():
outdata[route] = {}
for instrument in self.datadeq[route]:
outdata[route][instrument] = {
'time': list(self.timedeq[instrument]), 'fields': {'data': list(self.datadeq[route][instrument])},
}
for route in outdata:
self.influx_sink.store_columns(route, outdata[route], aggregate = "max")
self.previous[self.ifo] = (time, state)
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def main():
options, filenames = parse_command_line()
# set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_gw_stat : %(levelname)s : %(message)s")
gw_data_source_info = datasource.DataSourceInfo.from_optparse(options)
# only support one channel
if len(gw_data_source_info.channel_dict.keys()) > 1:
logging.warning("Provided more than one instrument, but only one is supporting. Defaulting to use the first provided.")
ifo = list(gw_data_source_info.channel_dict.keys())[0]
tracker = IFOStatusTracker(options, ifo)
# building the event loop and pipeline
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline("gstlal_dq")
handler = simplehandler.Handler(mainloop, pipeline)
src = pipeparts.mkdevshmsrc(pipeline, shm_dirname = gw_data_source_info.shared_memory_dir[ifo], wait_time = options.wait_time)
src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True)
pipeparts.framecpp_channeldemux_set_units(src, {f"{ifo}:{gw_data_source_info.channel_name[ifo]}": "strain"})
state = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 1) # 1 minutes of buffering
pipeparts.src_deferred_link(src, f"{ifo}:{gw_data_source_info.channel_name[ifo]}", state.get_static_pad("sink"))
sink = pipeparts.mkgeneric(pipeline, state, "lal_nxydump")
sink = pipeparts.mkprogressreport(pipeline, sink, name=ifo)
sink = pipeparts.mkappsink(pipeline, sink, max_buffers = 1)
sink.connect("new-sample", tracker.bufhandler)
if not pipeline.set_state(Gst.State.PLAYING):
raise RuntimeError("pipeline failed to enter PLAYING state")
logging.info('starting...')
mainloop.run()
# done. always end with an error code so that dagman does
# not mark the job "done" and the job will be restarted when the dag is
# restarted.
#
sys.exit(1)
if __name__ == '__main__':
main()