Skip to content
Snippets Groups Projects
Commit 018faf3a authored by Rebecca Ewing's avatar Rebecca Ewing
Browse files

add gstlal_ifo_stat: program to track IFO state and output data to influx

parent c5f3bc19
No related branches found
No related tags found
1 merge request!271add gstlal_ifo_stat: program to track IFO state and output data to influx
Pipeline #445891 passed with warnings
......@@ -24,6 +24,7 @@ dist_bin_SCRIPTS = \
gstlal_inspiral_bankviz \
gstlal_segments_trim \
gstlal_glitch_population \
gstlal_ifo_stat \
gstlal_vetoes_from_burst_inj \
gstlal_vetoes_from_burst_triggers \
gstlal_ninja_median_of_psds \
......
#!/usr/bin/env python3
#
# Copyright (C) 2015 Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import io
import logging
import yaml
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from optparse import OptionParser
from collections import deque, defaultdict
from gstlal import simplehandler
from gstlal import pipeparts
from ligo.scald import utils
from ligo.scald.io import influx
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--output-url", metavar = "path", help = "Write to this url")
parser.add_option("--channel-name", metavar="channel", type = str, action = 'append', help = "Specify channel names for each ifo as --channel-name=ifo:channel-name. Can be given multiple times.")
parser.add_option("--shared-memory-directory", metavar="path", type = str, action = 'append', help = "Specify the shared memory directory to read frame files from as --shared-memory-directory=ifo:dir. Can be given multiple times.")
parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time")
parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.")
parser.add_option("--verbose", action="store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
return options, filenames
class IFOStatusTracker(object):
def __init__(self, options):
self.output_url = options.output_url
self.last_reduce = None
# set up influx configuration
with open(options.scald_config, 'r') as f:
config = yaml.safe_load(f)
self.influx_sink = influx.Aggregator(**config["backends"]["default"])
self.influx_sink.load(path=options.scald_config)
# set up deques for storing data
self.timedeq = defaultdict(lambda: deque(maxlen=10000))
routes = ['hoft_ok', 'duration']
self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes}
self.previous = defaultdict(lambda: (None, None))
self.duration = defaultdict(lambda: 0)
def bufhandler(self, elem, ifo):
buf = elem.emit("pull-sample").get_buffer()
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
if mapinfo.data:
s = io.StringIO(mapinfo.data.decode('utf-8'))
# FIXME just get the first value?
time, bit = s.getvalue().split('\n')[0].split()
bit = int(bit)
time = float(time)
state = 1 if (bit & 0b1 == 0b1) else 0
## track duration
if state == self.previous[ifo][1]:
self.duration[ifo] += time - self.previous[ifo][0]
elif self.previous[ifo][0]:
self.duration[ifo] = time - self.previous[ifo][0]
logging.debug(f'{ifo} h(t) state at {time} = {state} | duration = {self.duration[ifo]}')
## store data
self.timedeq[ifo].append(int(time))
self.datadeq['hoft_ok'][ifo].append(state)
self.datadeq['duration'][ifo].append(self.duration[ifo])
if self.last_reduce is None:
self.last_reduce = time
## write to influx
if time - self.last_reduce >= 100.:
self.last_reduce = int(utils.gps_now())
logging.debug(f'Reducing data to influx...')
outdata = {}
for route in self.datadeq.keys():
outdata[route] = {}
for instrument in self.datadeq[route]:
outdata[route][instrument] = {
'time': list(self.timedeq[instrument]), 'fields': {'data': list(self.datadeq[route][instrument])},
}
for route in outdata:
self.influx_sink.store_columns(route, outdata[route], aggregate = "max")
self.previous[ifo] = (time, state)
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def main():
options, filenames = parse_command_line()
channel_dict = {}
dirs_dict = {}
for channel in options.channel_name:
channel_dict.update({channel.split(':')[0]: channel.split(':')[1]})
for dir in options.shared_memory_directory:
dirs_dict.update({dir.split(':')[0]: dir.split(':')[1]})
if not channel_dict.keys() == dirs_dict.keys():
raise Exception(f'List of IFOs provided with channels does not match list of IFOs provided with SHM directories.')
# set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_gw_stat : %(levelname)s : %(message)s")
tracker = IFOStatusTracker(options)
# building the event loop and pipeline
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline("gstlal_dq")
handler = simplehandler.Handler(mainloop, pipeline)
for ifo in channel_dict.keys():
src = pipeparts.mkdevshmsrc(pipeline, shm_dirname = dirs_dict[ifo], wait_time = options.wait_time)
src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True)
pipeparts.framecpp_channeldemux_set_units(src, {f"{ifo}:{channel_dict[ifo]}": "strain"})
state = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 1) # 1 minutes of buffering
pipeparts.src_deferred_link(src, f"{ifo}:{channel_dict[ifo]}", state.get_static_pad("sink"))
sink = pipeparts.mkgeneric(pipeline, state, "lal_nxydump")
sink = pipeparts.mkprogressreport(pipeline, sink, name=ifo)
sink = pipeparts.mkappsink(pipeline, sink, max_buffers = 1)
sink.connect("new-sample", tracker.bufhandler, ifo)
if not pipeline.set_state(Gst.State.PLAYING):
raise RuntimeError("pipeline failed to enter PLAYING state")
mainloop.run()
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment