Commit a553440a authored by Leo Pound Singer's avatar Leo Pound Singer
Browse files

WIP: Add GCN sent/received tallies to Nagios checks

Flag an error if we have neither sent nor received all LVC notice
types in the past 2 hours.
parent 4185732d
Pipeline #48798 failed with stages
in 1 minute and 28 seconds
......@@ -15,9 +15,11 @@ from celery.utils.log import get_task_logger
from celery_eternal import EternalTask, EternalProcessTask
from gcn import get_notice_type, NoticeType
import gcn
import lxml.etree
from ...import app
from ..core import DispatchHandler
from . import tally
log = get_task_logger(__name__)
......@@ -74,6 +76,9 @@ def send(message):
# Look up the ephemeral port number saved in Redis.
port = broker.backend.client.get(broker.name + '.port')
# Update the tally of sent events.
tally.increment('sent', get_notice_type(lxml.etree.fromstring(message)))
# Send the VOEvent using comet-sendvo.
subprocess.run(['comet-sendvo', '--port', port],
check=True, input=message, stderr=subprocess.PIPE)
......@@ -90,6 +95,9 @@ class _VOEventDispatchHandler(DispatchHandler):
except ValueError:
pass
# Update the tally of received events.
tally.increment('received', notice_type)
return notice_type, (payload,), {}
......
"""Utilities to keep a tally of GCNs sent and received."""
from ... import app
def _get_key(category, notice_type):
return '{}.{}.{:d}'.format(__name__, category, notice_type)
# Lifetime for the counters.
lifetime = 7200
def increment(category, notice_type):
"""Increment a tally by one.
Parameters
----------
category : {'sent', 'received'}
A short descriptive name for the tally.
notice_type : gcn.NoticeType, int
The notice type.
Returns
-------
int
The new tally.
"""
key = _get_key(category, notice_type)
with app.backend.client.pipeline() as pipe:
count, _ = pipe.incr(key).expire(key, lifetime).execute()
return count
def get(category, notice_type):
"""Get the value of a tally.
Parameters
----------
category : {'sent', 'received'}
A short descriptive name for the tally.
notice_type : gcn.NoticeType, int
The notice type.
Returns
-------
int
The current value of the tally.
"""
key = _get_key(category, notice_type)
return int(app.backend.client.get(key) or 0)
......@@ -29,21 +29,26 @@ def fake_gcn(notice_type):
return lxml.etree.tostring(root), root
def test_unrecognized_notice_type(caplog):
@patch('gwcelery.tasks.gcn.tally.increment')
def test_unrecognized_notice_type(mock_increment, caplog):
"""Test handling an unrecognized (enum not defined) notice type."""
caplog.set_level(logging.WARNING)
gcn.handler.dispatch(*fake_gcn(10000))
record, = caplog.records
assert record.message == 'ignoring unrecognized key: 10000'
mock_increment.assert_called_once_with('received', 10000)
def test_unregistered_notice_type(caplog):
@patch('gwcelery.tasks.gcn.tally.increment')
def test_unregistered_notice_type(mock_increment, caplog):
"""Test handling an unregistered notice type."""
caplog.set_level(logging.WARNING)
gcn.handler.dispatch(*fake_gcn(gcn.NoticeType.SWIFT_UVOT_POS_NACK))
record, = caplog.records
assert record.message == ('ignoring unrecognized key: '
'<NoticeType.SWIFT_UVOT_POS_NACK: 89>')
mock_increment.assert_called_once_with(
'received', int(gcn.NoticeType.SWIFT_UVOT_POS_NACK))
@pytest.fixture
......@@ -54,7 +59,8 @@ def reset_handlers():
gcn.handler.update(old_handler)
def test_registered_notice_type(reset_handlers):
@patch('gwcelery.tasks.gcn.tally.increment')
def test_registered_notice_type(mock_increment, reset_handlers):
@gcn.handler(gcn.NoticeType.AGILE_POINTDIR, gcn.NoticeType.AGILE_TRANS)
def agile_handler(payload):
pass
......
from distutils.spawn import find_executable
from unittest.mock import Mock
import gcn
import pytest
from .. import app
from ..tasks.gcn import tally
from ..tools import nagios
......@@ -90,12 +92,29 @@ def test_nagios(capsys, monkeypatch, socket_enabled,
out, err = capsys.readouterr()
assert 'CRITICAL: Too many lvalert nodes are subscribed' in out
# success
# LVAlert nodes subscribed, no recent VOEvents
mock_lvalert_client.configure_mock(**{
'return_value.get_subscriptions.return_value':
nagios.get_expected_lvalert_nodes()})
with pytest.raises(SystemExit) as excinfo:
app.start(['gwcelery', 'nagios'])
assert excinfo.value.code == nagios.NagiosPluginStatus.WARNING
out, err = capsys.readouterr()
assert 'WARNING: GCN appears to be down' in out
# success
notice_types = (gcn.NoticeType.LVC_PRELIMINARY,
gcn.NoticeType.LVC_INITIAL,
gcn.NoticeType.LVC_UPDATE,
gcn.NoticeType.LVC_RETRACTION)
categories = ('sent', 'received')
for category in categories:
for notice_type in notice_types:
tally.increment(category, notice_type)
with pytest.raises(SystemExit) as excinfo:
app.start(['gwcelery', 'nagios'])
assert excinfo.value.code == nagios.NagiosPluginStatus.OK
......
"""A `Nagios plugin <https://nagios-plugins.org/doc/guidelines.html>`_
for monitoring GWCelery."""
import io
from enum import IntEnum
from sys import exit
from traceback import format_exc, format_exception
from celery.bin.base import Command
from celery_eternal import EternalTask
import gcn
import kombu.exceptions
import sleek_lvalert
# Make sure that all tasks are registered
from .. import tasks
from .. import tasks # Make sure that all tasks are registered
from ..tasks.gcn import tally # For checking GCN sent/received counts
class NagiosPluginStatus(IntEnum):
......@@ -23,6 +25,12 @@ class NagiosPluginStatus(IntEnum):
class NagiosCriticalError(Exception):
"""An exception that maps to a Nagios status of `CRITICAL`."""
status = NagiosPluginStatus.CRITICAL
class NagiosWarningError(Exception):
"""An exception that maps to a Nagios status of `WARNING`."""
status = NagiosPluginStatus.WARNING
def get_active_queues(inspector):
......@@ -99,6 +107,27 @@ def check_status(app):
raise NagiosCriticalError('Too many lvalert nodes are subscribed') \
from AssertionError('Extra nodes: ' + ', '.join(extra))
notice_types = (gcn.NoticeType.LVC_PRELIMINARY,
gcn.NoticeType.LVC_INITIAL,
gcn.NoticeType.LVC_UPDATE,
gcn.NoticeType.LVC_RETRACTION)
categories = ('sent', 'received')
tallies = {
nt: {c: tally.get(c, nt) for c in categories} for nt in notice_types}
if not(any(t > 0 for ts in tallies.values() for t in ts.values())):
f = io.StringIO()
print('We have sent or received suspiciously few VOEvents recently. '
'This may indicate a problem with GCN, or it may indicate that '
'GWCelery was recently restarted.', file=f)
print(file=f)
print('Tallies', '/'.join(categories), 'in the last', tally.lifetime,
'seconds by notice type:', file=f)
for notice_type, ts in tallies.items():
print(' ', '/'.join(str(_) for _ in ts.values()),
notice_type.name, file=f)
raise NagiosWarningError('GCN appears to be down') \
from NagiosWarningError(f.getvalue())
class NagiosCommand(Command):
"""Check Celery status for monitoring with Nagios."""
......@@ -106,8 +135,8 @@ class NagiosCommand(Command):
def run(self, **kwargs):
try:
check_status(self.app)
except NagiosCriticalError as e:
status = NagiosPluginStatus.CRITICAL
except (NagiosCriticalError, NagiosWarningError) as e:
status = e.status
output, = e.args
e = e.__cause__
detail = ''.join(format_exception(type(e), e, e.__traceback__))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment