diff --git a/gstlal-calibration/python/calibhandler.py b/gstlal-calibration/python/calibhandler.py index 3a2ca296584c510c38675868942a7cf81b67bebe..4daaa187cf8311950b1bbe26e98fe661573e4f0b 100644 --- a/gstlal-calibration/python/calibhandler.py +++ b/gstlal-calibration/python/calibhandler.py @@ -76,12 +76,13 @@ class Handler(simplehandler.Handler): from kafka import KafkaProducer try: self.producer = KafkaProducer( - bootstrap_servers = [kafka_server], + bootstrap_servers = [self.kafka_server], key_serializer = lambda m: json.dumps(m).encode('utf-8'), value_serializer = lambda m: json.dumps(m).encode('utf-8'), ) - except errors.NoBrokersAvaialble: + except errors.NoBrokersAvailable: self.producer = None + print("No brokers available for kafka. Defaulting to not pushing to kafka.") def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict): with self.lock: @@ -97,7 +98,24 @@ class Handler(simplehandler.Handler): monitor_dict = {} monitor_dict['time'] = float(buf_timestamp) for key, bitmask in bitmaskdict.items(): - monitor_dict[key] = state & bitmask + all_bits_on = bitmask & bitmask + monitor = state & bitmask + if monitor == all_bits_on: + monitor_dict[key] = 1 + else: + monitor_dict[key] = 0 + # Check if kafka server is now available if it's supposed to be used + if self.kafka_server is not None and self.producer is None: + from kafka import KafkaProducer + try: + self.producer = KafkaProducer( + bootstrap_servers = [self.kafka_server], + key_serializer = lambda m: json.dumps(m).encode('utf-8'), + value_serializer = lambda m: json.dumps(m).encode('utf-8'), + ) + except errors.NoBrokersAvailable: + self.producer = None + print("No brokers available for kafka. Defaulting to not pushing to kafka.") if self.kafka_server is not None and self.producer is not None: self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict) return Gst.FlowReturn.OK @@ -107,6 +125,18 @@ class Handler(simplehandler.Handler): latency = elem.get_property("current-latency") name = elem.get_property("name") time = elem.get_property("timestamp") + # Check if kafka server is now available if it's supposed to be used + if self.kafka_server is not None and self.producer is None: + from kafka import KafkaProducer + try: + self.producer = KafkaProducer( + bootstrap_servers = [self.kafka_server], + key_serializer = lambda m: json.dumps(m).encode('utf-8'), + value_serializer = lambda m: json.dumps(m).encode('utf-8'), + ) + except errors.NoBrokersAvailable: + self.producer = None + print("No brokers available for kafka. Defaulting to not pushing to kafka.") if self.kafka_server is not None and self.producer is not None: self.producer.send("%s_latency" % (name.split("_")[0]), value = {"time": time, name: latency}) return Gst.FlowReturn.OK