Commit 03b3ac1b authored by Madeline Wade's avatar Madeline Wade

Fixing what I broke when added monitoring for state vector

parent fb3e0813
Pipeline #72548 passed with stages
in 18 minutes and 1 second
......@@ -174,6 +174,11 @@ SampleRates = ConfigSectionMap("SampleRates")
Bitmasks = ConfigSectionMap("Bitmasks")
PipelineConfigs = ConfigSectionMap("PipelineConfigurations")
DataCleaningConfigs = ConfigSectionMap("DataCleaningConfigurations")
try:
MonitorConfigs = ConfigSectionMap("MonitoringConfigurations")
kafka_server = MonitorConfigs["kafkaserver"]
except:
kafka_server = None
# Sanity checks for command line options
......@@ -1095,7 +1100,7 @@ if compute_calib_statevector and (any(line_witness_channel_list) or any(witness_
pipeline = Gst.Pipeline(name="gstlal_compute_strain")
mainloop = GObject.MainLoop()
handler = calibhandler.Handler(mainloop, pipeline, kafka_server = MonitorConfigs["kafkaserver"])
handler = calibhandler.Handler(mainloop, pipeline, kafka_server = kafka_server)
#
# Turn off debugging tools or verboseness
......@@ -2437,10 +2442,9 @@ if apply_fcc:
TDCFs_valid_bitmask_list.append(pow(2, fcc_smooth_bitnum))
if apply_fs:
TDCFs_valid_bitmask_list.append(pow(2, fs_smooth_bitnum))
if apply_Qinv:
if apply_srcq:
TDCFs_valid_bitmask_list.append(pow(2, Qinv_smooth_bitnum))
TDCFs_valid_bitmask = sum(TDCFs_valid_bitmask_list)
print(TDCFs_valid_bitmask)
monitor_bitmask_dict = {'monitor_on': check_state_bitmask, 'TDCFs_valid': TDCFs_valid_bitmask}
if compute_calib_statevector:
......@@ -2982,9 +2986,9 @@ if compute_calib_statevector:
dqtagstr = "channel-name=%s:GDS-CALIB_STATE_VECTOR, instrument=%s" % (instrument, instrument)
calibstatevector = pipeparts.mktaginject(pipeline, calibstatevector, dqtagstr)
calibstatevector = pipeparts.mktee(pipeline, calibstatevector)
statevector_monitor = pipeparts.generic(pipeline, calibstatevector, "lal_nxydump")
statevector_monitor = pipeparts.mkgeneric(pipeline, calibstatevector, "lal_nxydump")
statevector_monitor = pipeparts.mkappsink(pipeline, statevector_monitor, max_buffers = 1)
statevector_monitor.connect("new-sample", handler.appsink_statevector_new_buffer, ifo, monitor_bitmask_dict)
statevector_monitor.connect("new-sample", handler.appsink_statevector_new_buffer, instrument, monitor_bitmask_dict)
#
# Produce time-dependent correction factors to be recorded in the frames
......
......@@ -5,6 +5,7 @@
pkgpythondir = $(pkgpyexecdir)
pkgpython_PYTHON = \
calibration_parts.py
calibration_parts.py \
calibhandler.py
......@@ -33,7 +33,7 @@ GObject.threads_init()
Gst.init(None)
from gstlal import simplehandler
from kafka import KafkaProducer
from lal import LIGOTimeGPS
#
# =============================================================================
......@@ -71,6 +71,7 @@ class Handler(simplehandler.Handler):
self.verbose = verbose
self.kafka_server = kafka_server
if self.kafka_server is not None:
from kafka import KafkaProducer
self.producer = KafkaProducer(
bootstrap_servers = [kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
......@@ -87,12 +88,11 @@ class Handler(simplehandler.Handler):
s = StringIO.StringIO(mapinfo.data)
time, state = s.getvalue().split('\n')[0].split()
state = int(state)
buf.unmap(mapinfo)
monitor_dict = {}
for keys, bitmask in bitmaskdict.items():
monitor_dict[key] = state & bitmask
if self.kafka_server is not None:
self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict)
buf.unmap(mapinfo)
monitor_dict = {}
for key, bitmask in bitmaskdict.items():
monitor_dict[key] = state & bitmask
if self.kafka_server is not None:
self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict)
return Gst.FlowReturn.OK
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment