Commit 42df9fd4 authored by cal's avatar cal

calibhandler.py: Updates to monitoring of calibration, including ability to...

calibhandler.py: Updates to monitoring of calibration, including ability to keep trying to reach kafka server if unavailable at start-up.
parent b921e4a6
Pipeline #74922 passed with stages
in 32 minutes and 30 seconds
...@@ -76,12 +76,13 @@ class Handler(simplehandler.Handler): ...@@ -76,12 +76,13 @@ class Handler(simplehandler.Handler):
from kafka import KafkaProducer from kafka import KafkaProducer
try: try:
self.producer = KafkaProducer( self.producer = KafkaProducer(
bootstrap_servers = [kafka_server], bootstrap_servers = [self.kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'), key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'), value_serializer = lambda m: json.dumps(m).encode('utf-8'),
) )
except errors.NoBrokersAvaialble: except errors.NoBrokersAvailable:
self.producer = None self.producer = None
print("No brokers available for kafka. Defaulting to not pushing to kafka.")
def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict): def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict):
with self.lock: with self.lock:
...@@ -97,7 +98,24 @@ class Handler(simplehandler.Handler): ...@@ -97,7 +98,24 @@ class Handler(simplehandler.Handler):
monitor_dict = {} monitor_dict = {}
monitor_dict['time'] = float(buf_timestamp) monitor_dict['time'] = float(buf_timestamp)
for key, bitmask in bitmaskdict.items(): for key, bitmask in bitmaskdict.items():
monitor_dict[key] = state & bitmask all_bits_on = bitmask & bitmask
monitor = state & bitmask
if monitor == all_bits_on:
monitor_dict[key] = 1
else:
monitor_dict[key] = 0
# Check if kafka server is now available if it's supposed to be used
if self.kafka_server is not None and self.producer is None:
from kafka import KafkaProducer
try:
self.producer = KafkaProducer(
bootstrap_servers = [self.kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'),
)
except errors.NoBrokersAvailable:
self.producer = None
print("No brokers available for kafka. Defaulting to not pushing to kafka.")
if self.kafka_server is not None and self.producer is not None: if self.kafka_server is not None and self.producer is not None:
self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict) self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict)
return Gst.FlowReturn.OK return Gst.FlowReturn.OK
...@@ -107,6 +125,18 @@ class Handler(simplehandler.Handler): ...@@ -107,6 +125,18 @@ class Handler(simplehandler.Handler):
latency = elem.get_property("current-latency") latency = elem.get_property("current-latency")
name = elem.get_property("name") name = elem.get_property("name")
time = elem.get_property("timestamp") time = elem.get_property("timestamp")
# Check if kafka server is now available if it's supposed to be used
if self.kafka_server is not None and self.producer is None:
from kafka import KafkaProducer
try:
self.producer = KafkaProducer(
bootstrap_servers = [self.kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'),
)
except errors.NoBrokersAvailable:
self.producer = None
print("No brokers available for kafka. Defaulting to not pushing to kafka.")
if self.kafka_server is not None and self.producer is not None: if self.kafka_server is not None and self.producer is not None:
self.producer.send("%s_latency" % (name.split("_")[0]), value = {"time": time, name: latency}) self.producer.send("%s_latency" % (name.split("_")[0]), value = {"time": time, name: latency})
return Gst.FlowReturn.OK return Gst.FlowReturn.OK
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment