Commit dbe1d94d authored by Leo Pound Singer's avatar Leo Pound Singer

Switch XMPP client from SleekXMPP to Slixmpp

SleekXMPP is no longer maintained.
parent 26ec7f7e
Pipeline #110213 passed with stages
in 1 minute and 37 seconds
# Changelog
## 1.0.1 (unreleased)
## 2.0.0 (unreleased)
- No changes yet.
- Switch XMPP client from SleekXMPP to Slixmpp. SleekXMPP is no longer
maintained.
- Bump minimum Python version to Python 3.5, since this is the minimum
version supported by Slixmpp.
## 1.0.0 (2020-03-03)
......
# sleek-lvalert
sleek-lvalert is a client for the LIGO/Virgo LVAlert pubsub infrastructure that
is powered by [SleekXMPP](http://sleekxmpp.com) and is compatible with
Python 3.
is powered by [slixmpp](https://slixmpp.readthedocs.io) and requires Python 3.
......@@ -202,7 +202,7 @@ texinfo_documents = [
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
'python': ('https://docs.python.org/3', None),
'sleekxmpp': ('http://sleekxmpp.com/', None)
'slixmpp': ('https://slixmpp.readthedocs.io/', None)
}
......
......@@ -2,7 +2,7 @@ sleek-lvalert Documentation
============================
sleek-lvalert is a client for the LIGO/Virgo LVAlert pubsub infrastructure that
is powered by SleekXMPP_. It is compatible with Python 3.
is powered by :doc:`slixmpp <slixmpp:index>`. It requires Python 3.5 or later.
Quick Start
-----------
......@@ -37,5 +37,4 @@ Command Line Interface
:func: parser
.. _netrc: https://www.gnu.org/software/inetutils/manual/html_node/The-_002enetrc-file.html
.. _SleekXMPP: http://sleekxmpp.com
.. _pip: http://pip.pypa.io
\ No newline at end of file
[bdist_wheel]
universal=1
[flake8]
exclude =
sleek_lvalert/_version.py
......@@ -23,8 +20,6 @@ classifiers =
Intended Audience :: Science/Research
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Operating System :: POSIX
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
......@@ -36,12 +31,9 @@ classifiers =
[options]
packages = find:
python_requires = >=2.7
install_requires =
pyasn1>=0.1.8
pyasn1-modules>=0.0.5
safe-netrc
sleekxmpp
slixmpp
[options.entry_points]
console_scripts =
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import asyncio
import getpass
import logging
import netrc
......@@ -22,7 +23,7 @@ import uuid
import pkg_resources
from safe_netrc import netrc as _netrc
from safe_netrc import NetrcParseError
import sleekxmpp
import slixmpp
from ._version import get_versions
......@@ -67,7 +68,7 @@ def _get_login(username, password, netrc, interactive, server):
raise RuntimeError('Password not specified')
class LVAlertClient(sleekxmpp.ClientXMPP):
class LVAlertClient(slixmpp.ClientXMPP):
"""An XMPP client configured for LVAlert.
Parameters
......@@ -95,10 +96,15 @@ class LVAlertClient(sleekxmpp.ClientXMPP):
.. code-block:: python
client = LVAlertClient()
client.connect()
client.process(block=False)
client.subscribe('cbc_gstlal', 'cbc_pycbc')
client.abort()
async def callback(*args):
await client.subscribe('cbc_gstlal', 'cbc_pycbc')
client.stop()
# Run callback as soon as the client is connected.
client.add_event_handler('session_start', callback, disposable=True)
# Start processing (stops when callback reaches client.stop())
client.start()
Here is an example for running a listener.
......@@ -111,8 +117,7 @@ class LVAlertClient(sleekxmpp.ClientXMPP):
client = LVAlertClient()
client.listen(process_alert)
client.connect()
client.process(block=True)
client.start() # Runs until interrupted with Ctrl-C
"""
def __init__(self, username=None, password=None, resource=None, netrc=None,
......@@ -123,15 +128,16 @@ class LVAlertClient(sleekxmpp.ClientXMPP):
resource = uuid.uuid4().hex
jid = '{}@{}/{}'.format(username, server, resource)
super(LVAlertClient, self).__init__(jid, password)
super().__init__(jid, password)
self.register_plugin('xep_0060') # Activate PubSub plugin
self.add_event_handler("session_start", self._session_start)
self.ca_certs = pkg_resources.resource_filename(__name__, 'certs.pem')
self._stopped = None
def _session_start(self, event):
async def _session_start(self, event):
self.send_presence()
self.get_roster()
await self.get_roster()
def listen(self, callback):
"""Set a callback to be executed for each pubsub item received.
......@@ -144,6 +150,40 @@ class LVAlertClient(sleekxmpp.ClientXMPP):
self._callback = callback
self.add_event_handler('pubsub_publish', self._pubsub_publish)
def start(self):
"""Run the client until :meth:`stop` is called.
Establish a connection, process all events, and run all event handlers,
until :meth:`stop` is called or the current thread is interrupted
(e.g., by a :exc:`KeyboardInterrupt`).
If the connection is ever dropped, it is re-established automatically.
Once processing stops, the connection is closed cleanly before this
method returns.
"""
self._stopped = self.loop.create_future()
self.init_plugins()
self.connect()
try:
self.loop.run_until_complete(self._stopped)
finally:
self.disconnect()
self.loop.run_until_complete(self.disconnected)
def _stop(self):
if self._stopped is not None:
self._stopped.set_result(True)
self._stopped = None
def stop(self):
"""Stop the client.
If the client has been started by calling :meth:`start`, then
:meth:`start` will return and the connection will be closed.
"""
self.loop.call_soon_threadsafe(self._stop)
def _pubsub_publish(self, msg):
node = msg['pubsub_event']['items']['node']
text = msg['pubsub_event']['items']['item']['payload'].text
......@@ -156,33 +196,33 @@ class LVAlertClient(sleekxmpp.ClientXMPP):
def _pubsub_server(self):
return 'pubsub.{}'.format(self.boundjid.server)
def get_nodes(self):
async def get_nodes(self):
"""Get a list of all available pubsub nodes."""
result = self['xep_0060'].get_nodes(self._pubsub_server)
result = await self['xep_0060'].get_nodes(self._pubsub_server)
return [item for _, item, _ in result['disco_items']['items']]
def get_subscriptions(self):
async def get_subscriptions(self):
"""Get a list of your subscriptions."""
result = self['xep_0060'].get_subscriptions(self._pubsub_server)
result = await self['xep_0060'].get_subscriptions(self._pubsub_server)
return sorted({stanza['node'] for stanza in
result['pubsub']['subscriptions']['substanzas']})
def subscribe(self, *nodes):
"""Subscribe to one or more pubsub nodes."""
for node in nodes:
log.info('Subscribing to %s', node)
self['xep_0060'].subscribe(self._pubsub_server, node)
async def _subscribe(self, node):
await self['xep_0060'].subscribe(self._pubsub_server, node)
def unsubscribe(self, *nodes):
async def subscribe(self, *nodes):
"""Subscribe to one or more pubsub nodes."""
await asyncio.gather(*(self._subscribe(node) for node in nodes))
async def _unsubscribe(self, node):
subs = await self['xep_0060'].get_subscriptions(
self._pubsub_server, node)
subs = subs['pubsub']['subscriptions']['substanzas']
subids = [sub['subid'] for sub in subs]
await asyncio.gather(*(
self['xep_0060'].unsubscribe(self._pubsub_server, node, subid)
for subid in subids))
async def unsubscribe(self, *nodes):
"""Unsubscribe from one or more pubsub nodes."""
for node in nodes:
subscriptions = self['xep_0060'].get_subscriptions(
self._pubsub_server, node
)['pubsub']['subscriptions']['substanzas']
for subscription in subscriptions:
log.info('Unsubscribing from %s [%s]',
node, subscription['subid'])
self['xep_0060'].unsubscribe(
self._pubsub_server, node, subscription['subid'])
await asyncio.gather(*(self._unsubscribe(node) for node in nodes))
......@@ -14,12 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from __future__ import print_function
import argparse
import logging
import sys
import sleekxmpp
import slixmpp
from . import LVAlertClient, DEFAULT_SERVER
......@@ -81,25 +80,26 @@ def main(args=None):
netrc=opts.netrc,
interactive=True)
if not xmpp.connect(reattempt=False):
sys.exit(1)
try:
if opts.action == 'listen':
xmpp.listen(show)
xmpp.process(block=True)
else:
xmpp.auto_reconnect = False
xmpp.process(block=False)
if opts.action == 'nodes':
print(*xmpp.get_nodes(), sep='\n')
elif opts.action == 'subscriptions':
print(*xmpp.get_subscriptions(), sep='\n')
elif opts.action == 'subscribe':
xmpp.subscribe(*opts.node)
elif opts.action == 'unsubscribe':
xmpp.unsubscribe(*opts.node)
except sleekxmpp.exceptions.IqError as e:
print('XMPP error:', e.iq['error'], file=sys.stderr)
finally:
xmpp.disconnect()
xmpp.connect()
if opts.action == 'listen':
xmpp.listen(show)
else:
async def callback(*args):
try:
if opts.action == 'nodes':
print(*await xmpp.get_nodes(), sep='\n')
elif opts.action == 'subscriptions':
print(*await xmpp.get_subscriptions(), sep='\n')
elif opts.action == 'subscribe':
await xmpp.subscribe(*opts.node)
elif opts.action == 'unsubscribe':
await xmpp.unsubscribe(*opts.node)
except slixmpp.exceptions.IqError as e:
print('XMPP error:', e.iq['error'], file=sys.stderr)
finally:
xmpp.stop()
xmpp.add_event_handler('session_start', callback, disposable=True)
xmpp.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment