diff --git a/CHANGES.md b/CHANGES.md index 64b84b8c4d1139cb29f7c0eb3e355980d349cd39..382bbd1f8885313647ac85e0fa2e34bbe265de57 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,7 +2,7 @@ ## 0.0.19 (unreleased) -- No changes yet. +- Rewrite the GCN broker so that it does not require a dedicated worker. ## 0.0.18 (2018-07-06) diff --git a/INSTALL.md b/INSTALL.md index 13cd8a19720c30c99253f5eea678c28b19c6bdd9..503f6816d56c717635bb431c6cbb3bd5b3fd2371 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -38,10 +38,9 @@ redis and start a server: $ make -j $ src/redis-server -GWCelery itself consists of five workers: +GWCelery itself consists of four workers: $ gwcelery worker -l info -n gwcelery-worker -Q celery -B $ gwcelery worker -l info -n gwcelery-openmp-worker -Q openmp -c 1 - $ gwcelery worker -l info -n gwcelery-voevent-worker -Q voevent -c 1 $ gwcelery worker -l info -n gwcelery-superevent-worker -Q superevent -c 1 $ gwcelery worker -l info -n gwcelery-exttrig-worker -Q exttrig -c 1 diff --git a/doc/design.rst b/doc/design.rst index e3aa7c193e809b49b38cda9f58755033cc0041bb..fd21f5a526cc19fa7db200cfe2309fad7c4f3c6e 100644 --- a/doc/design.rst +++ b/doc/design.rst @@ -36,14 +36,7 @@ of several processes: * :meth:`gwcelery.tasks.bayestar.localize` * :meth:`gwcelery.tasks.skymaps.plot_volume` -5. **VOEvent Worker** - - A Celery worker that is dedicated to sending VOEvents (has to be dedicated - for technical reasons). There is only task that runs in the VOEvent queue: - - * :meth:`gwcelery.tasks.gcn.send` - -6. **Superevent Worker** +5. **Superevent Worker** A Celery worker that is dedicated to serially process triggers from low latency pipelines and create/modify superevents in *GraceDb*. There is only @@ -51,7 +44,7 @@ of several processes: * :meth:`gwcelery.tasks.superevents.handle` -7. **External Trigger Worker** +6. **External Trigger Worker** A Celery worker that is dedicated to serially process external triggers from GRB alerts received from Fermi, Swift and neutrino alerts received from SNEWS @@ -59,7 +52,7 @@ of several processes: * :meth:`gwcelery.tasks.gcn.external_triggers.handle` -8. **General-Purpose Worker** +7. **General-Purpose Worker** A Celery worker that accepts all other tasks. @@ -71,6 +64,7 @@ to keep open a persistent connection with some external service. These tasks are subclasses of :class:`celery_eternal.EternalTask` or :class:`celery_eternal.EternalProcessTask`. +* :meth:`gwcelery.tasks.gcn.broker` * :meth:`gwcelery.tasks.gcn.listen` * :meth:`gwcelery.tasks.lvalert.listen` diff --git a/gwcelery/data/gwcelery.sub b/gwcelery/data/gwcelery.sub index 360c634b890a20cb7c01809ba8d6c11dd7b3fb51..06522a41242017eadbc276d522daa1282a7208f8 100755 --- a/gwcelery/data/gwcelery.sub +++ b/gwcelery/data/gwcelery.sub @@ -50,13 +50,6 @@ log = gwcelery-superevent-worker.log output = gwcelery-superevent-worker.out queue -arguments = "gwcelery worker -l info -n gwcelery-voevent-worker -Q voevent -c 1" -description = gwcelery-voevent-worker -error = gwcelery-voevent-worker.err -log = gwcelery-voevent-worker.log -output = gwcelery-voevent-worker.out -queue - arguments = "gwcelery flower --url-prefix=~$ENV(USER)/gwcelery" description = gwcelery-flower error = gwcelery-flower.err diff --git a/gwcelery/tasks/gcn/__init__.py b/gwcelery/tasks/gcn/__init__.py index f2c16b0246cf9563a4c56766286cef30b966846c..ff1f4d9788ef4c70caf8878a1d35b809b66a6fdc 100644 --- a/gwcelery/tasks/gcn/__init__.py +++ b/gwcelery/tasks/gcn/__init__.py @@ -6,12 +6,13 @@ References .. [GCN] https://gcn.gsfc.nasa.gov """ +import contextlib import socket import struct +import time -from celery import Task from celery.utils.log import get_task_logger -from celery_eternal import EternalProcessTask +from celery_eternal import EternalTask, EternalProcessTask from gcn import get_notice_type, NoticeType import gcn @@ -20,57 +21,61 @@ from ..core import DispatchHandler log = get_task_logger(__name__) -_size_struct = struct.Struct("!I") +@app.task(base=EternalTask, bind=True) +def broker(self): + """Single-client VOEvent broker for sending notices to GCN. + + This is a single-client VOEvent broker (e.g. server). It listens for a + connection from address :obj:`~gwcelery.celery.Base.gcn_bind_address` and + port :obj:`~gwcelery.celery.Base.gcn_bind_port`. + """ + with contextlib.closing(socket.socket(socket.AF_INET)) as sock: + sock.settimeout(1.0) + sock.bind((app.conf['gcn_bind_address'], app.conf['gcn_bind_port'])) + sock.listen(0) + while not self.is_aborted(): + try: + conn, (addr, _) = sock.accept() + except socket.timeout: + continue + if addr == app.conf['gcn_remote_address']: + log.info('accepted connection from remote host %s', addr) + break + else: + log.error('denied connection from remote host %s', addr) + conn.close() + else: # self.is_aborted() + return + + with contextlib.closing(conn): + conn.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', 1, 0)) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) -class _SendTask(Task): + while not self.is_aborted(): + # Get next payload from queue in first-in, first-out fashion + payload = self.backend.lindex(_queue_name, 0) + if payload is None: + time.sleep(1) + continue + nbytes = len(payload) - def __init__(self): - self.conn = None + log.info('sending payload of %d bytes', nbytes) + conn.sendall(struct.pack('!I', nbytes) + payload) + self.backend.lpop(_queue_name) -@app.task(queue='voevent', base=_SendTask, ignore_result=True, bind=True, - autoretry_for=(socket.error,), default_retry_delay=0.001, - retry_backoff=True, retry_kwargs=dict(max_retries=None), - shared=False) -def send(self, payload): - """Send a VOEvent to GCN.""" - payload = payload.encode('utf-8') - nbytes = len(payload) +_queue_name = broker.name + '.voevent-queue' - conn = self.conn - self.conn = None - if conn is None: - log.info('creating new socket') - sock = socket.socket(socket.AF_INET) - try: - sock.bind((app.conf['gcn_bind_address'], - app.conf['gcn_bind_port'])) - sock.listen(0) - while True: - conn, (addr, _) = sock.accept() - if addr == app.conf['gcn_remote_address']: - break - else: - log.error('connection denied to remote host %s', addr) - finally: - sock.close() - conn.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, - struct.pack('ii', 1, 0)) - conn.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) +@app.task(bind=True, ignore_result=True, shared=False) +def send(self, payload): + """Send a VOEvent to GCN. - log.info('sending payload of %d bytes', nbytes) - try: - conn.sendall(_size_struct.pack(nbytes) + payload) - except: # noqa - try: - conn.shutdown(socket.SHUT_RDWR) - except: # noqa - log.exception('failed to shut down socket') - conn.close() - raise - self.conn = conn + Under the hood, this task just pushes the payload onto a Redis queue, + and :func:`~gwcelery.tasks.gcn.broker` sends it.""" + self.backend.rpush(_queue_name, payload.encode('utf-8')) class _VOEventDispatchHandler(DispatchHandler): diff --git a/gwcelery/tests/test_tasks_gcn.py b/gwcelery/tests/test_tasks_gcn.py index e62ea09e3a7ea1dcaf170d6a942830a567d1aedc..17d4ce5712df7525defff7d6aa37ca99bd774671 100644 --- a/gwcelery/tests/test_tasks_gcn.py +++ b/gwcelery/tests/test_tasks_gcn.py @@ -1,19 +1,22 @@ +import contextlib import json import logging import socket +import struct from threading import Thread from time import sleep from unittest.mock import MagicMock, patch -import gcn from gcn.voeventclient import _recv_packet import lxml.etree import pkg_resources import pytest -from ..tasks.gcn import handler, listen, send +from ..tasks import gcn from .. import app +logging.basicConfig(level=logging.INFO) + # Test data with pkg_resources.resource_stream( __name__, 'data/lvalert_voevent.json') as f: @@ -22,65 +25,94 @@ voevent = lvalert['object']['text'] @pytest.fixture -def send_thread(): - thread = Thread(target=send, args=(voevent,)) - thread.daemon = True +def broker_thread(monkeypatch): + queue = [b'foo', b'bar', b'bat'] + + def lindex(key, index): + assert key == gcn._queue_name + try: + return queue[index] + except IndexError: + return None + + def lpop(key): + assert key == gcn._queue_name + try: + return queue.pop(0) + except IndexError: + return None + + monkeypatch.setattr('gwcelery.tasks.gcn.broker.backend.lindex', + lindex, raising=False) + monkeypatch.setattr('gwcelery.tasks.gcn.broker.backend.lpop', + lpop, raising=False) + + monkeypatch.setattr('gwcelery.tasks.gcn.broker.is_aborted', lambda: False) + thread = Thread(target=gcn.broker) thread.start() - sleep(1) - yield thread + sleep(0.1) + yield + monkeypatch.setattr('gwcelery.tasks.gcn.broker.is_aborted', lambda: True) thread.join() - if send.conn is not None: - try: - send.conn.close() - except socket.error: - pass - send.conn = None + + +@pytest.fixture +def wrong_remote_address(): + old = app.conf['gcn_remote_address'] + app.conf['gcn_remote_address'] = '192.0.2.0' + yield + app.conf['gcn_remote_address'] = old + + +@pytest.fixture +def connection_to_broker(broker_thread): + with contextlib.closing(socket.socket(socket.AF_INET)) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', 1, 0)) + sock.connect(('127.0.0.1', 53410)) + yield sock @pytest.mark.enable_socket -def test_send_connection_closed(send_thread): - """Test sending a VOEvent over loopback to a connection that - is immediately closed.""" - sock = socket.socket(socket.AF_INET) - sock.connect(('127.0.0.1', 53410)) - sock.shutdown(socket.SHUT_RDWR) - sock.close() +def test_broker_aborted_before_accept(broker_thread, monkeypatch): + """Test aborting the broker while it is still waiting for connections.""" + monkeypatch.setattr('gwcelery.tasks.gcn.broker.is_aborted', lambda: True) @pytest.mark.enable_socket -def test_send(send_thread): - """Test sending a VOEvent over loopback.""" - # First, simulate connecting from a disallowed IP address. - # The connection should be refused. - app.conf['gcn_remote_address'] = '192.0.2.0' - sock = socket.socket(socket.AF_INET) - try: - sock.settimeout(0.1) - with pytest.raises(socket.error): - sock.connect(('127.0.0.1', 53410)) - packet = _recv_packet(sock) - finally: - try: - sock.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - sock.close() - - # Now, simulate connecting from the allowed IP address. - # The VOEvent should be received. - app.conf['gcn_remote_address'] = '127.0.0.1' - sock = socket.socket(socket.AF_INET) - try: - sock.settimeout(0.1) - sock.connect(('127.0.0.1', 53410)) - packet = _recv_packet(sock) - finally: - try: - sock.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - sock.close() - assert packet == voevent.encode('utf-8') +def test_broker_aborted_after_accept(connection_to_broker, monkeypatch): + """Test aborting the broker after it has accepted a connection.""" + monkeypatch.setattr('gwcelery.tasks.gcn.broker.is_aborted', lambda: True) + + +@pytest.mark.enable_socket +def test_broker_disconnected(connection_to_broker): + """Test connection from a broker to a client that immediately closes the + socket.""" + pass + + +@pytest.mark.enable_socket +def test_broker_wrong_address(capsys, wrong_remote_address, + connection_to_broker): + """Test that the broker refuses connections from the wrong IP address.""" + assert connection_to_broker.recv(1) == b'' + + +@pytest.mark.enable_socket +def test_broker(connection_to_broker, broker_thread): + """Test receiving packets from the broker.""" + connection_to_broker.settimeout(1.0) + packets = [_recv_packet(connection_to_broker) for _ in range(3)] + assert packets == [b'foo', b'bar', b'bat'] + + +def test_send(monkeypatch): + mock_rpush = MagicMock() + monkeypatch.setattr('gwcelery.tasks.gcn.broker.backend.rpush', + mock_rpush, raising=False) + gcn.send('foo') + mock_rpush.assert_called_once_with(gcn._queue_name, b'foo') @pytest.mark.enable_socket @@ -88,7 +120,7 @@ def test_listen(monkeypatch): """Test that the listen task would correctly launch gcn.listen().""" mock_gcn_listen = MagicMock() monkeypatch.setattr('gcn.listen', mock_gcn_listen) - listen.run() + gcn.listen.run() mock_gcn_listen.assert_called_once() @@ -105,7 +137,7 @@ def fake_gcn(notice_type): def test_unrecognized_notice_type(caplog): """Test handling an unrecognized (enum not defined) notice type.""" caplog.set_level(logging.WARNING) - handler.dispatch(*fake_gcn(10000)) + gcn.handler.dispatch(*fake_gcn(10000)) record, = caplog.records assert record.message == 'ignoring unrecognized key: 10000' @@ -113,7 +145,7 @@ def test_unrecognized_notice_type(caplog): def test_unregistered_notice_type(caplog): """Test handling an unregistered notice type.""" caplog.set_level(logging.WARNING) - handler.dispatch(*fake_gcn(gcn.NoticeType.SWIFT_UVOT_POS_NACK)) + gcn.handler.dispatch(*fake_gcn(gcn.NoticeType.SWIFT_UVOT_POS_NACK)) record, = caplog.records assert record.message == ('ignoring unrecognized key: ' '<NoticeType.SWIFT_UVOT_POS_NACK: 89>') @@ -121,19 +153,19 @@ def test_unregistered_notice_type(caplog): @pytest.fixture def reset_handlers(): - old_handler = dict(handler) - handler.clear() + old_handler = dict(gcn.handler) + gcn.handler.clear() yield - handler.update(old_handler) + gcn.handler.update(old_handler) def test_registered_notice_type(reset_handlers): - @handler(gcn.NoticeType.AGILE_POINTDIR, gcn.NoticeType.AGILE_TRANS) + @gcn.handler(gcn.NoticeType.AGILE_POINTDIR, gcn.NoticeType.AGILE_TRANS) def agile_handler(payload): pass with patch.object(agile_handler, 'run') as mock_run: - handler.dispatch(*fake_gcn(gcn.NoticeType.SWIFT_UVOT_POS_NACK)) + gcn.handler.dispatch(*fake_gcn(gcn.NoticeType.SWIFT_UVOT_POS_NACK)) mock_run.assert_not_called() - handler.dispatch(*fake_gcn(gcn.NoticeType.AGILE_POINTDIR)) + gcn.handler.dispatch(*fake_gcn(gcn.NoticeType.AGILE_POINTDIR)) mock_run.assert_called_once()