Commit 092953a2 authored by Leo Pound Singer's avatar Leo Pound Singer

Update sleek-lvalert to 2.0.0

This switches the XMPP client from the unmaintained SleekXMPP
project to the currently maintained fork Slixmpp.
parent 43ee463c
......@@ -20,6 +20,9 @@ Changelog
coincident event. Update both time and spatial FAR within superevent
when publishable.
- Update to sleek-lvalert 2.0.0, which switches the XMPP client from the
unmaintained SleekXMPP project to the currently maintained fork Slixmpp.
0.12.1 (2020-03-12)
-------------------
......
This diff is collapsed.
import asyncio
from threading import Thread
from celery import bootsteps
from celery.utils.log import get_logger
......@@ -41,16 +44,24 @@ class Receiver(LVAlertBootStep):
self._client = client.LVAlertClient(
server=consumer.app.conf['lvalert_host'],
nodes=consumer.app.conf['lvalert_nodes'])
self._client.listen(_send_lvalert_received)
self._thread = Thread(target=self._run)
def _run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._client.disconnected = loop.create_future()
self._client.loop = loop
self._client.start()
def start(self, consumer):
super().start(consumer)
self._client.connect()
self._client.process()
self._client.listen(_send_lvalert_received)
self._thread.start()
def stop(self, consumer):
super().stop(consumer)
self._client.disconnect()
self._client.stop()
self._thread.join()
def info(self, consumer):
return {'lvalert-nodes': self._client.get_subscriptions()}
return {'lvalert-nodes': list(self._client.subscriptions)}
import asyncio
from celery.utils.log import get_logger
import sleek_lvalert
......@@ -10,8 +12,23 @@ class LVAlertClient(sleek_lvalert.LVAlertClient):
super().__init__(*args, **kwargs)
self._needed_subscriptions = set(nodes or ())
self.add_event_handler('session_start', self._resubscribe)
self.add_event_handler('session_start', self._refresh)
self.add_event_handler('pubsub_subscription', self._refresh)
self._current_subscriptions = set()
def _resubscribe(self, event):
async def _resubscribe(self, *args):
log.info('Resubscribing to PubSub nodes')
current_subscriptions = set(self.get_subscriptions())
self.subscribe(*(self._needed_subscriptions - current_subscriptions))
await self._refresh()
to_subscribe = self._needed_subscriptions - self.subscriptions
to_unsubscribe = self.subscriptions - self._needed_subscriptions
await asyncio.gather(self.subscribe(*to_subscribe),
self.unsubscribe(to_unsubscribe))
async def _refresh(self, *args):
log.info('Subscriptions detected')
self._current_subscriptions = set(await self.get_subscriptions())
log.info('Current subscriptions: %r', self._current_subscriptions)
@property
def subscriptions(self):
return self._current_subscriptions
......@@ -3,7 +3,6 @@ bilby-pipe >= 0.3.8
celery[redis] >= 4.4.0
comet
corner
dnspython # silence "DNS: dnspython not found. Can not use SRV lookup." warning from SleekXMPP
flask
flask-caching
gracedb-sdk >= 0.1.5
......@@ -30,7 +29,7 @@ safe-netrc
seaborn
sentry-sdk[flask,tornado]
service_identity # We don't actually use this package, but it silences some annoying warnings from twistd.
sleek-lvalert < 2.0.0
sleek-lvalert >= 2.0.0
voeventlib >= 1.2
werkzeug >= 0.15.0 # for werkzeug.middleware.proxy_fix.ProxyFix
zstandard # for task compression
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment