Skip to content
Snippets Groups Projects
gstlal_ifo_stat 7.17 KiB
#!/usr/bin/env python3
#
# Copyright (C) 2015  Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


import io
import logging
import yaml

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)

from optparse import OptionParser
from collections import deque, defaultdict

from gstlal import datasource
from gstlal import simplehandler
from gstlal import pipeparts

from ligo.scald import utils
from ligo.scald.io import influx 

def parse_command_line():
	parser = OptionParser(description = __doc__)

	# generic "source" options
	datasource.append_options(parser)

	parser.add_option("--output-url", metavar = "path", help = "Write to this url")
	parser.add_option("--wait-time", metavar = int, default = 120, help = "wait time")
	parser.add_option("--scald-config", metavar = "file", help = "sets ligo-scald options based on yaml configuration.")
	parser.add_option("--verbose", action="store_true", help = "Be verbose.")

	options, filenames = parser.parse_args()

	return options, filenames


class IFOStatusTracker(object):
	def __init__(self, options, ifo):
		logging.info('setting up ifo status tracker...')
		self.ifo = ifo
		self.output_url = options.output_url
		self.last_reduce = None

		# set up influx configuration
		with open(options.scald_config, 'r') as f:
			config = yaml.safe_load(f)
		self.influx_sink = influx.Aggregator(**config["backends"]["default"])

		self.influx_sink.load(path=options.scald_config)

		# set up deques for storing data
		self.timedeq = defaultdict(lambda: deque(maxlen=10000))
		routes = ['state', 'hoft_ok', 'duration']
		self.datadeq = {route: defaultdict(lambda: deque(maxlen=10000)) for route in routes}

		self.previous = defaultdict(lambda: (None, None))
		self.duration = defaultdict(lambda: 0)


	@staticmethod
	def LIGO_parse_state(bitmask):
		hoftbit = int(bitmask[31])
		intentbit = int(bitmask[30])
		injbits = int(bitmask[23:26])

		logging.debug(f'hoftbit: {hoftbit} | intentbit: {intentbit} | injbits: {injbits}')

		if hoftbit:
			if intentbit:
				if injbits == 111:
					state = "Observing"
				else:
					state = "Injection"
			elif injbits == 111:
				state = "Ready"
			else:
				state = "Injection study"
		elif injbits == 111:
			if intentbit:
				state = "Calib not ready"
			else:
				state = "Down"
		else:
			state = "Injection study"

		return state

	def bufhandler(self, elem):
		buf = elem.emit("pull-sample").get_buffer()
		(result, mapinfo) = buf.map(Gst.MapFlags.READ)
		if mapinfo.data:
			s = io.StringIO(mapinfo.data.decode('utf-8'))
			# FIXME just get the first value?
			time, bit = s.getvalue().split('\n')[0].split()
			bit = int(bit)
			time = float(time)

			if self.ifo == "H1" or self.ifo == "L1":
				bitmask = bin(bit)
				npad = 32 - len(bitmask)
				bitmask = "0" * npad + bitmask
				state = self.LIGO_parse_state(bitmask)

			elif self.ifo == "K1":
				# only check 0th bit
				state = "Observing" if (bit & 0b1 == 0b1) else "Down"
			else:
				# for Virgo check 0th and 1st bit
				state = "Observing" if (bit & 0b11 == 0b11) else "Down"

			# keep this metric for backwards compatibility
			if state == "Observing":
				hoft_ok = 1
			else:
				hoft_ok = 0

			statechange = False

			## track duration
			if state == self.previous[self.ifo][1]:
				self.duration[self.ifo] += time - self.previous[self.ifo][0]
			elif self.previous[self.ifo][0]:
				statechange = True
				self.duration[self.ifo] = time - self.previous[self.ifo][0]

			logging.debug(f'{self.ifo} h(t) state at {time} = {state} | duration = {self.duration[self.ifo]}')

			## store data
			self.timedeq[self.ifo].append(int(time))
			self.datadeq['state'][self.ifo].append(state)
			self.datadeq['hoft_ok'][self.ifo].append(hoft_ok)
			self.datadeq['duration'][self.ifo].append(self.duration[self.ifo])

			if self.last_reduce is None:
				self.last_reduce = time

			## write to influx
			if time - self.last_reduce >= 50. or statechange:
				if statechange:
					logging.info(f'{self.ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}')
				self.last_reduce = int(utils.gps_now())
				logging.debug(f'Reducing data to influx...')

				outdata = {}
				for route in self.datadeq.keys():
					outdata[route] = {}
					for instrument in self.datadeq[route]:
						outdata[route][instrument] = {
							'time': list(self.timedeq[instrument]), 'fields': {'data': list(self.datadeq[route][instrument])},
						}

				for route in outdata:
					self.influx_sink.store_columns(route, outdata[route], aggregate = "max")

			self.previous[self.ifo] = (time, state)

		buf.unmap(mapinfo)
		return Gst.FlowReturn.OK

def main():
	options, filenames = parse_command_line()

	# set up logging
	log_level = logging.DEBUG if options.verbose else logging.INFO
	logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_gw_stat : %(levelname)s : %(message)s")

	gw_data_source_info = datasource.DataSourceInfo.from_optparse(options)

	# only support one channel
	if len(gw_data_source_info.channel_dict.keys()) > 1:
		logging.warning("Provided more than one instrument, but only one is supporting. Defaulting to use the first provided.")
	ifo = list(gw_data_source_info.channel_dict.keys())[0]

	tracker = IFOStatusTracker(options, ifo)

	# building the event loop and pipeline
	mainloop = GObject.MainLoop()
	pipeline = Gst.Pipeline("gstlal_dq")
	handler = simplehandler.Handler(mainloop, pipeline)

	src = pipeparts.mkdevshmsrc(pipeline, shm_dirname = gw_data_source_info.shared_memory_dir[ifo], wait_time = options.wait_time)
	src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True)
	pipeparts.framecpp_channeldemux_set_units(src, {f"{ifo}:{gw_data_source_info.channel_name[ifo]}": "strain"})
	state = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 60 * 1) # 1 minutes of buffering
	pipeparts.src_deferred_link(src, f"{ifo}:{gw_data_source_info.channel_name[ifo]}", state.get_static_pad("sink"))
	sink = pipeparts.mkgeneric(pipeline, state, "lal_nxydump")
	sink = pipeparts.mkprogressreport(pipeline, sink, name=ifo)
	sink = pipeparts.mkappsink(pipeline, sink, max_buffers = 1)
	sink.connect("new-sample", tracker.bufhandler)

	if not pipeline.set_state(Gst.State.PLAYING):
		raise RuntimeError("pipeline failed to enter PLAYING state")
	logging.info('starting...')
	mainloop.run()

	# done.   always end with an error code so that dagman does
	# not mark the job "done" and the job will be restarted when the dag is
	# restarted.
	#
	sys.exit(1)


if __name__ == '__main__':
	main()