Skip to content
Snippets Groups Projects

Add alerts task

Merged Cody Messick requested to merge cody.messick/gwcelery:alerts_task into main
Files
10
from confluent_kafka.error import KafkaException
from hop import stream
from hop.models import AvroBlob
@@ -21,6 +22,29 @@ class KafkaWriter:
# Set up flag for failed delivery of messages
self.kafka_delivery_failures = False
def kafka_topic_up(self):
'''Check for problems in broker and topic. Returns True is broker and
topic appear to be up, returns False otherwise.'''
kafka_url = self._config['url']
_, _, broker, topic = kafka_url.split('/')
try:
# FIXME Replace with hop clients list_topics function once
# https://github.com/scimma/hop-client/pull/185 makes it into a
# release (scheduled for v0.7.0)
topics = self._open_hop_stream._producer.list_topics(
topic,
timeout=5
).topics
if topics[topic].error is None:
log.info(f'{kafka_url} appears to be functioning properly')
return True
else:
log.error(f'{topic} at {broker} appears to be down')
return False
except KafkaException:
log.error(f'{broker} appears to be down')
return False
def _delivery_cb(self, kafka_error, message):
# FIXME Get rid of if-else logic once
# https://github.com/scimma/hop-client/pull/190 is merged
@@ -97,7 +121,11 @@ class Producer(KafkaBootStep):
s._open_hop_stream.close()
def info(self, consumer):
return {'kafka_delivery_failures': {
return {'kafka_topic_up': {
brokerhost: writer.kafka_topic_up() for brokerhost, writer
in self._writers.items()
},
'kafka_delivery_failures': {
brokerhost: writer.kafka_delivery_failures for
brokerhost, writer in self._writers.items()
}}
Loading