Commit 8683a7ba authored by Madeline Wade's avatar Madeline Wade

Adding in a separate handler for the calibration pipeline and a state vector monitor

parent 5e6e55cc
Pipeline #72539 passed with stages
in 25 minutes and 5 seconds
......@@ -72,6 +72,7 @@ from gstlal import pipeparts
from gstlal import calibration_parts
from gstlal import simplehandler
from gstlal import datasource
from gstlal import calibhandler
from ligo import segments
......@@ -1095,7 +1096,7 @@ if compute_calib_statevector and (any(line_witness_channel_list) or any(witness_
pipeline = Gst.Pipeline(name="gstlal_compute_strain")
mainloop = GObject.MainLoop()
handler = simplehandler.Handler(mainloop, pipeline)
handler = calibhandler.Handler(mainloop, pipeline)
#
# Turn off debugging tools or verboseness
......@@ -2422,6 +2423,27 @@ line_sub_bitnum = 26
noise_sub_bitnum = 27
noise_sub_gate_bitnum = 28
# We'll only want to check things when the OBSERVATION-INTENT and LOW-NOISE bits are set
check_state_bitmask = pow(2, obs_intent_bitnum) + pow(2, lownoise_bitnum)
TDCFs_valid_bitmask_list = []
if apply_kappatst:
TDCFs_valid_bitmask_list.append(pow(2, ktst_smooth_bitnum))
if apply_kappapum:
TDCFs_valid_bitmask_list.append(pow(2, kpum_smooth_bitnum))
if apply_kappauim:
TDCFs_valid_bitmask_list.append(pow(2, kuim_smooth_bitnum))
if apply_kappac:
TDCFs_valid_bitmask_list.append(pow(2, kc_smooth_bitnum))
if apply_fcc:
TDCFs_valid_bitmask_list.append(pow(2, fcc_smooth_bitnum))
if apply_fs:
TDCFs_valid_bitmask_list.append(pow(2, fs_smooth_bitnum))
if apply_Qinv:
TDCFs_valid_bitmask_list.append(pow(2, Qinv_smooth_bitnum))
TDCFs_valid_bitmask = sum(TDCFs_valid_bitmask_list)
print(TDCFs_valid_bitmask)
monitor_bitmask_dict = {'monitor_on': check_state_bitmask, 'TDCFs_valid': TDCFs_valid_bitmask}
if compute_calib_statevector:
#
......@@ -2960,6 +2982,10 @@ if compute_calib_statevector:
calibstatevector = pipeparts.mklatency(pipeline, calibstatevector, name = "%s_calibstatevec" % OutputConfigs["frametype"])
dqtagstr = "channel-name=%s:GDS-CALIB_STATE_VECTOR, instrument=%s" % (instrument, instrument)
calibstatevector = pipeparts.mktaginject(pipeline, calibstatevector, dqtagstr)
calibstatevector = pipeparts.mktee(pipeline, calibstatevector)
statevector_monitor = pipeparts.generic(pipeline, calibstatevector, "lal_nxydump")
statevector_monitor = pipeparts.mkappsink(pipeline, statevector_monitor, max_buffers = 1)
statevector_monitor.connect("new-sample", handler.appsink_statevector_new_buffer, ifo, monitor_bitmask_dict)
#
# Produce time-dependent correction factors to be recorded in the frames
......
# Copyright (C) 2019 Maddie Wade, Aaron Viets
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import numpy
import StringIO
import threading
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from gstlal import simplehandler
from kafka import KafkaProducer
#
# =============================================================================
#
# Misc
#
# =============================================================================
#
#
# =============================================================================
#
# Pipeline Handler
#
# =============================================================================
#
class Handler(simplehandler.Handler):
"""!
A subclass of simplehandler.Handler to be used with e.g.,
gstlal_calibration
Implements...
"""
def __init__(self, mainloop, pipeline, kafka_server = None, verbose = False):
super(Handler, self).__init__(mainloop, pipeline)
#
# initialize
#
self.lock = threading.Lock()
self.pipeline = pipeline
self.verbose = verbose
self.kafka_server = kafka_server
if self.kafka_server is not None:
self.producer = KafkaProducer(
bootstrap_servers = [kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'),
)
def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict):
with self.lock:
# retrieve data from appsink buffer
buf = elem.emit("pull-sample").get_buffer()
result, mapinfo = buf.map(Gst.MapFlags.READ)
buf_timestamp = LIGOTimeGPS(0, buf.pts)
if mapinfo.data:
s = StringIO.StringIO(mapinfo.data)
time, state = s.getvalue().split('\n')[0].split()
state = int(state)
buf.unmap(mapinfo)
monitor_dict = {}
for keys, bitmask in bitmaskdict.items():
monitor_dict[key] = state & bitmask
if self.kafka_server is not None:
self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment