Skip to content
Snippets Groups Projects

Modify igwn alert listener error handling to allow graceful shutdown

Closed Cody Messick requested to merge cody.messick/gwcelery:igwn_alert_graceful_shutdown into main
2 files
+ 33
26
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -35,33 +35,38 @@ class IGWNAlertClient(client):
Topic or list of topics to listen to.
"""
self.running = True
self.stream_obj = self.open(self._construct_topic_url(topics), "r") # noqa: E501
while self.running:
self.stream_obj = self.open(self._construct_topic_url(topics), "r") # noqa: E501
try:
with self.stream_obj as s:
for payload, metadata in s.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (json.JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
# FIXME: revisit when https://git.ligo.org/computing/igwn-alert/client/-/issues/19 # noqa: E501
# is addressed
try:
for payload, metadata in self.stream_obj.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (json.JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
except KafkaException as err:
if err.fatal:
if self.running is False:
# The close attempt in the Receiver stop method throws a
# KafkaException that's caught by this try except, so we
# just have to catch this case for the worker to shut down
# gracefully
pass
elif err.fatal:
# stop running and close before raising error
self.running = False
self.stream_obj.close()
@@ -113,9 +118,8 @@ class Receiver(IGWNAlertBootStep):
def stop(self, consumer):
super().stop(consumer)
if self._client.running:
self._client.running = False
self._client.stream_obj._consumer.stop()
self._client.running = False
self._client.stream_obj.close()
self.thread.join()
def info(self, consumer):
Loading