Skip to content
Snippets Groups Projects
Commit dfcc7f88 authored by Cody Messick's avatar Cody Messick
Browse files

Restart Kafka consumers when non-fatal errors arise. Closes #675

parent f2063e5f
No related branches found
No related tags found
1 merge request!1278Restart Kafka consumers when non-fatal errors arise.
......@@ -9,6 +9,8 @@ Changelog
- Increase tolerance duration for superevent existence nagios check in
the last hour from 1 min to 10 mins.
- Add error handling to hop-client consumers.
2.1.4 "Mokele-Mbembe" (07-26-2023)
----------------------------------
......
......@@ -115,6 +115,7 @@ class KafkaListener(KafkaBase):
def __init__(self, name, config):
super().__init__(name, config, 'consumer')
self._open_hop_stream = None
self.running = False
# Don't kill worker if listener can't connect
try:
self._open_hop_stream = self._hop_stream.open(config['url'], 'r')
......@@ -126,13 +127,34 @@ class KafkaListener(KafkaBase):
log.exception('Connection to %s failed', self._config["url"])
def listen(self):
for message in self._open_hop_stream:
# Send signal
kafka_record_consumed.send(
None,
name=self.name,
record=self.get_payload_output(message)
)
self.running = True
# Restart the consumer when non-fatal errors come up, similar to
# gwcelery.igwn_alert.IGWNAlertClient
while self.running:
try:
for message in self._open_hop_stream:
# Send signal
kafka_record_consumed.send(
None,
name=self.name,
record=self.get_payload_output(message)
)
except KafkaException as exception:
err = exception.args[0]
if self.running is False:
# The close attempt in the KafkaListener stop method throws
# a KafkaException that's caught by this try except, so we
# just have to catch this case for the worker to shut down
# gracefully
pass
elif err.fatal():
# stop running and close before raising error
self.running = False
self._open_hop_stream.close()
raise
else:
log.warning(
"non-fatal error from kafka: {}".format(err.name))
class KafkaWriter(KafkaBase):
......@@ -232,6 +254,7 @@ class Consumer(KafkaBootStep):
self._listeners.values() if listener._open_hop_stream
is not None))
for s in self._listeners.values():
s.running = False
if s._open_hop_stream is not None:
s._open_hop_stream.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment