Skip to content
Snippets Groups Projects
Commit eb4efd0d authored by Deep Chatterjee's avatar Deep Chatterjee
Browse files

override stock listen with a running attribute

This is needed to signal the listener from the main thread to stop.
parent 17b1f3eb
No related branches found
No related tags found
No related merge requests found
Pipeline #443074 passed
import json
from threading import Thread
import warnings
from celery import bootsteps
from celery.utils.log import get_logger
from hop.models import JSONBlob
from hop.io import StartPosition
from igwn_alert import client
from .signals import igwn_alert_received
......@@ -11,6 +15,66 @@ __all__ = ('Receiver',)
log = get_logger(__name__)
# FIXME: Remove once igwn_alert listener has a sentinel variable to
# indicate a shutdown from the main thread
# Relevant MR in igwn-alert client: https://git.ligo.org/computing/igwn-alert/client/-/merge_requests/12 # noqa: E501
class IGWNAlertClient(client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = 1
self.until_eos = True
self.running = False
self.start_at = StartPosition.LATEST
# mostly implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
def listen(self, callback=None, topic=None):
"""
Set a callback to be executed for each pubsub item received.
Parameters
----------
callback : callable (optional)
A function of two arguments: the topic and the alert payload.
When set to :obj:`None`, print out alert payload.
topic : :obj:`str`, or :obj:`list` of :obj:`str` (optional)
Topic or list of topics to listen to. When set to :obj:`None`,
then listen to all topics connected to user's credential.
"""
self.running = True
if topic:
if isinstance(topic, str):
topic = [topic]
listen_topics = topic
else:
listen_topics = self.get_topics()
s = self.open(self._construct_topic_url(listen_topics), "r")
log.info("Opened connection to scimma")
while self.running:
try:
for payload, metadata in s.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (json.JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
except (KeyboardInterrupt, SystemExit):
self.running = False
s.close()
class IGWNAlertBootStep(bootsteps.ConsumerStep):
"""Generic boot step to limit us to appropriate kinds of workers.
......@@ -41,7 +105,8 @@ class Receiver(IGWNAlertBootStep):
def start(self, consumer):
super().start(consumer)
self._client = client(group=consumer.app.conf['igwn_alert_group'])
self._client = IGWNAlertClient(
group=consumer.app.conf['igwn_alert_group'])
self.thread = Thread(
target=self._client.listen,
args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']),
......@@ -50,8 +115,8 @@ class Receiver(IGWNAlertBootStep):
def stop(self, consumer):
super().stop(consumer)
self._client.running = False
self.thread.join()
self._client.disconnect()
def info(self, consumer):
return {'igwn-alert-topics': consumer.app.conf[
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment