Skip to content
Snippets Groups Projects

Shutdown igwn-alert listener properly

Merged Deep Chatterjee requested to merge deep.chatterjee/gwcelery:stop-listener-properly into main
All threads resolved!
1 file
+ 49
2
Compare changes
  • Side-by-side
  • Inline
import json
from json import JSONDecodeError
from threading import Thread
import warnings
from celery import bootsteps
from celery.utils.log import get_logger
from hop.models import JSONBlob
from igwn_alert import client
from .signals import igwn_alert_received
@@ -11,6 +15,48 @@ __all__ = ('Receiver',)
log = get_logger(__name__)
class IGWNAlertClient(client):
def __init__(self, *args, **kwargs):
self.shutdown = False
super().__init__(*args, **kwargs)
# mostly implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
def listen(self, callback=None, topic=None):
if topic:
if isinstance(topic, str):
topic = [topic]
listen_topics = topic
else:
listen_topics = self.get_topics()
try:
with self.open(self._construct_topic_url(listen_topics), "r") as s:
for payload, metadata in s.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
if self.shutdown:
s.close()
break
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
except (KeyboardInterrupt, SystemExit):
pass
class IGWNAlertBootStep(bootsteps.ConsumerStep):
"""Generic boot step to limit us to appropriate kinds of workers.
@@ -41,7 +87,8 @@ class Receiver(IGWNAlertBootStep):
def start(self, consumer):
super().start(consumer)
self._client = client(group=consumer.app.conf['igwn_alert_group'])
self._client = IGWNAlertClient(
group=consumer.app.conf['igwn_alert_group'])
self.thread = Thread(
target=self._client.listen,
args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']),
@@ -50,8 +97,8 @@ class Receiver(IGWNAlertBootStep):
def stop(self, consumer):
super().stop(consumer)
self._client.shutdown = True
self.thread.join()
self._client.disconnect()
def info(self, consumer):
return {'igwn-alert-topics': consumer.app.conf[
Loading