Skip to content
Snippets Groups Projects

Use adc Consumer stop functionality to terminate IGWNReceiverThread properly

Merged Deep Chatterjee requested to merge deep.chatterjee/gwcelery:fix-listener-regression into main
Files
3
@@ -6,7 +6,6 @@ from adc.errors import KafkaException
from celery import bootsteps
from celery.utils.log import get_logger
from hop.models import JSONBlob
from hop.io import StartPosition
from igwn_alert import client
from .signals import igwn_alert_received
@@ -16,67 +15,55 @@ __all__ = ('Receiver',)
log = get_logger(__name__)
# FIXME: Remove once igwn_alert listener has a sentinel variable to
# indicate a shutdown from the main thread
# Relevant MR in igwn-alert client: https://git.ligo.org/computing/igwn-alert/client/-/merge_requests/12 # noqa: E501
# Implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
# with minor differences
class IGWNAlertClient(client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.batch_size = 1
self.until_eos = False # FIXME: Revert to True when fix for autocommit is found # noqa: E501
self.running = False
self.start_at = StartPosition.LATEST
self.running = False # FIXME: needed to handle timeouts; remove when issue #441 is fixed # noqa: E501
# mostly implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
def listen(self, callback=None, topic=None):
def listen(self, callback, topics):
"""
Set a callback to be executed for each pubsub item received.
Parameters
----------
callback : callable (optional)
callback : callable
A function of two arguments: the topic and the alert payload.
When set to :obj:`None`, print out alert payload.
topic : :obj:`str`, or :obj:`list` of :obj:`str` (optional)
Topic or list of topics to listen to. When set to :obj:`None`,
then listen to all topics connected to user's credential.
topics : :obj:`list` of :obj:`str`
Topic or list of topics to listen to.
"""
self.running = True
if topic:
if isinstance(topic, str):
topic = [topic]
listen_topics = topic
else:
listen_topics = self.get_topics()
s = self.open(self._construct_topic_url(listen_topics), "r")
log.info("Opened connection to scimma")
while self.running:
self.stream_obj = self.open(self._construct_topic_url(topics), "r") # noqa: E501
try:
for payload, metadata in s.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (json.JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
with self.stream_obj as s:
for payload, metadata in s.read(
metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
# Fix in case message is in new format:
if isinstance(payload, JSONBlob):
payload = payload.content
else:
try:
payload = json.loads(payload)
except (json.JSONDecodeError, TypeError) as e:
warnings.warn("Payload is not valid "
"json: {}".format(e))
if not callback:
print("New message from topic {topic}: {msg}"
.format(topic=metadata.topic, msg=payload))
else:
callback(topic=metadata.topic.split('.')[1],
payload=payload)
except KafkaException as err:
if '_TIMED_OUT' in err.name:
log.exception("Timeout from kafka")
else:
raise
s.close()
self.stream_obj.close()
class IGWNAlertBootStep(bootsteps.ConsumerStep):
@@ -120,6 +107,7 @@ class Receiver(IGWNAlertBootStep):
def stop(self, consumer):
super().stop(consumer)
self._client.running = False
self._client.stream_obj._consumer.stop()
self.thread.join()
def info(self, consumer):
Loading