From 42df9fd46815b7b5c07234225fc07ca345ea7d9d Mon Sep 17 00:00:00 2001
From: cal <cal@ldas-pcdev6.ligo-wa.caltech.edu>
Date: Wed, 14 Aug 2019 07:21:56 -0700
Subject: [PATCH] calibhandler.py: Updates to monitoring of calibration,
 including ability to keep trying to reach kafka server if unavailable at
 start-up.

---
 gstlal-calibration/python/calibhandler.py | 36 +++++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)

diff --git a/gstlal-calibration/python/calibhandler.py b/gstlal-calibration/python/calibhandler.py
index 3a2ca29658..4daaa187cf 100644
--- a/gstlal-calibration/python/calibhandler.py
+++ b/gstlal-calibration/python/calibhandler.py
@@ -76,12 +76,13 @@ class Handler(simplehandler.Handler):
 			from kafka import KafkaProducer
 			try:
 				self.producer = KafkaProducer(
-						bootstrap_servers = [kafka_server],
+						bootstrap_servers = [self.kafka_server],
 						key_serializer = lambda m: json.dumps(m).encode('utf-8'),
 						value_serializer = lambda m: json.dumps(m).encode('utf-8'),
 					)
-			except errors.NoBrokersAvaialble:
+			except errors.NoBrokersAvailable:
 				self.producer = None
+				print("No brokers available for kafka. Defaulting to not pushing to kafka.")
 
 	def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict):
 		with self.lock:
@@ -97,7 +98,24 @@ class Handler(simplehandler.Handler):
 				monitor_dict = {}
 				monitor_dict['time'] = float(buf_timestamp)
 				for key, bitmask in bitmaskdict.items():
-					monitor_dict[key] = state & bitmask
+					all_bits_on = bitmask & bitmask
+					monitor = state & bitmask
+					if monitor == all_bits_on:
+						monitor_dict[key] = 1
+					else:
+						monitor_dict[key] = 0
+				# Check if kafka server is now available if it's supposed to be used
+				if self.kafka_server is not None and self.producer is None:
+					from kafka import KafkaProducer
+					try:
+						self.producer = KafkaProducer(
+								bootstrap_servers = [self.kafka_server],
+								key_serializer = lambda m: json.dumps(m).encode('utf-8'),
+								value_serializer = lambda m: json.dumps(m).encode('utf-8'),
+							)
+					except errors.NoBrokersAvailable:
+						self.producer = None
+						print("No brokers available for kafka. Defaulting to not pushing to kafka.")
 				if self.kafka_server is not None and self.producer is not None:
 					self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict) 
 			return Gst.FlowReturn.OK	
@@ -107,6 +125,18 @@ class Handler(simplehandler.Handler):
 			latency = elem.get_property("current-latency")
 			name = elem.get_property("name")
 			time = elem.get_property("timestamp")
+			# Check if kafka server is now available if it's supposed to be used
+			if self.kafka_server is not None and self.producer is None:
+				from kafka import KafkaProducer
+				try:
+					self.producer = KafkaProducer(
+							bootstrap_servers = [self.kafka_server],
+							key_serializer = lambda m: json.dumps(m).encode('utf-8'),
+							value_serializer = lambda m: json.dumps(m).encode('utf-8'),
+						)
+				except errors.NoBrokersAvailable:
+					self.producer = None
+					print("No brokers available for kafka. Defaulting to not pushing to kafka.")
 			if self.kafka_server is not None and self.producer is not None:
 				self.producer.send("%s_latency" % (name.split("_")[0]), value = {"time": time, name: latency})
 			return Gst.FlowReturn.OK
-- 
GitLab