From cdb3eba2b522cad8307b45b6087b72c73a001c6e Mon Sep 17 00:00:00 2001 From: Tanner Prestegard <tanner.prestegard@ligo.org> Date: Thu, 7 Jun 2018 15:26:11 -0500 Subject: [PATCH] First commit of modularized alerts app Splitting up much of the alerts functionality and improving/generalizing it. --- gracedb/alerts/email.py | 272 +++++++++++++++++++++++++++++ gracedb/alerts/event_utils.py | 53 ++++++ gracedb/alerts/lvalert.py | 50 ++++++ gracedb/alerts/main.py | 148 ++++++++++++++++ gracedb/alerts/phone.py | 115 ++++++++++++ gracedb/alerts/superevent_utils.py | 122 +++++++++++++ gracedb/alerts/xmpp.py | 145 +++++++++++++++ 7 files changed, 905 insertions(+) create mode 100644 gracedb/alerts/email.py create mode 100644 gracedb/alerts/event_utils.py create mode 100644 gracedb/alerts/lvalert.py create mode 100644 gracedb/alerts/main.py create mode 100644 gracedb/alerts/phone.py create mode 100644 gracedb/alerts/superevent_utils.py create mode 100644 gracedb/alerts/xmpp.py diff --git a/gracedb/alerts/email.py b/gracedb/alerts/email.py new file mode 100644 index 000000000..7e530e8be --- /dev/null +++ b/gracedb/alerts/email.py @@ -0,0 +1,272 @@ + +import json +import os +import socket +from subprocess import Popen, PIPE, STDOUT +import sys + +from django.core.mail import EmailMessage +from django.conf import settings + +from core.time_utils import gpsToUtc + +from events.models import Event +from events.permission_utils import is_external +from events.query import filter_for_labels + +import logging +log = logging.getLogger(__name__) + +def indent(nindent, text): + return "\n".join([(nindent*' ')+line for line in text.split('\n')]) + +def prepareSummary(event): + gpstime = event.gpstime + utctime = gpsToUtc(gpstime).strftime("%Y-%m-%d %H:%M:%S") + instruments = getattr(event, 'instruments', "") + far = getattr(event, 'far', 1.0) + summary_template = """ + Event Time (GPS): %s + Event Time (UTC): %s + Instruments: %s + FAR: %.3E """ + summary = summary_template % (gpstime, utctime, instruments, far) + si_set = event.singleinspiral_set.all() + if si_set.count(): + si = si_set[0] + summary += """ + Component masses: %.2f, %.2f """ % (si.mass1, si.mass2) + return summary + +# The serialized object passed in here will normally be an EventLog or EMBB log entry +def issueAlertForUpdate(event, description, doxmpp, filename="", serialized_object=None): + if doxmpp: + issueXMPPAlert(event, filename, "update", description, serialized_object) + # XXX No emails or phone calls for this. Argh. + +# The only kind of serialized object relevant for a Label is an event. +def issueAlertForLabel(event, label, doxmpp, serialized_event=None, event_url=None): + if doxmpp: + issueXMPPAlert(event, "", "label", label, serialized_event) + # Email + profileRecips = [] + phoneRecips = [] + pipeline = event.pipeline + # Triggers on given label matching pipeline OR with no pipeline (wildcard type) + triggers = label.trigger_set.filter(pipelines=pipeline) + triggers = triggers | label.trigger_set.filter(pipelines=None) + for trigger in triggers: + if len(trigger.label_query) > 0: + # construct a queryset containing only this event + qs = Event.objects.filter(id=event.id) + qs = filter_for_labels(qs, trigger.label_query) + # If the label query cleans out our query set, we'll continue + # without adding the recipient. + if qs.count() == 0: + continue + + for recip in trigger.contacts.all(): + if recip.email: + profileRecips.append(recip.email) + if recip.phone: + phoneRecips.append(recip) + + if event.search: + subject = "[gracedb] %s / %s / %s / %s" % (label.name, event.pipeline.name, event.search.name, event.graceid()) + else: + subject = "[gracedb] %s / %s / %s" % (label.name, event.pipeline.name, event.graceid()) + + message = "A %s event with graceid %s was labeled with %s" % \ + (event.pipeline.name, event.graceid(), label.name) + if event_url: + message += '\n\n%s' % event_url + + if event.group.name == "Test": + fromaddress = settings.ALERT_TEST_EMAIL_FROM + toaddresses = settings.ALERT_TEST_EMAIL_TO + bccaddresses = [] + message += "\n\nWould have sent email to: %s" % str(profileRecips) + message += "\n\nWould have called/texted: {0}" \ + .format(str([c.phone for c in phoneRecips])) + phoneRecips = [] + else: + fromaddress = settings.ALERT_EMAIL_FROM + toaddresses = [] + bccaddresses = profileRecips + + if settings.SEND_EMAIL_ALERTS and (toaddresses or bccaddresses): + if not toaddresses: + toaddresses = ["(undisclosed recipients)"] + email = EmailMessage(subject, message, fromaddress, toaddresses, bccaddresses) + email.send() + + # Make phone calls. + if settings.SEND_PHONE_ALERTS and phoneRecips: + make_twilio_calls(event, phoneRecips, "label", label=label) + +def issueEmailAlert(event, event_url): + + # Check settings switch for turning off email alerts + if not settings.SEND_EMAIL_ALERTS: + return + + # The right way of doing this is to make the email alerts filter-able + # by search. But this is a low priority dev task. For now, we simply + # short-circuit in case this is an MDC event. + if event.search and event.search.name == 'MDC': + return + + # Gather Recipients + if event.group.name == 'Test': + fromaddress = settings.ALERT_TEST_EMAIL_FROM + toaddresses = settings.ALERT_TEST_EMAIL_TO + bccaddresses = [] + else: + fromaddress = settings.ALERT_EMAIL_FROM + toaddresses = settings.ALERT_EMAIL_TO + # XXX Bizarrely, this settings.ALERT_EMAIL_BCC seems to be overwritten in a + # persistent way between calls, so that you can get alerts going out to the + # wrong contacts. I find that it works if you just start with an empty list + # See: https://bugs.ligo.org/redmine/issues/2185 + #bccaddresses = settings.ALERT_EMAIL_BCC + bccaddresses = [] + pipeline = event.pipeline + triggers = pipeline.trigger_set.filter(labels=None) + for trigger in triggers: + for recip in trigger.contacts.all(): + if ((event.far and event.far < trigger.farThresh) + or not trigger.farThresh): + if recip.email: + bccaddresses.append(recip.email) + + subject = "[gracedb] %s event. ID: %s" % (event.pipeline.name, event.graceid()) + message = """ +New Event +%s / %s +GRACEID: %s +Info: %s +Data: %s +Submitter: %s +Event Summary: +%s +""" + message %= (event.group.name, + event.pipeline.name, + event.graceid(), + event_url, + event.weburl(), + "%s %s" % (event.submitter.first_name, event.submitter.last_name), + indent(3, prepareSummary(event)) + ) + + email = EmailMessage(subject, message, fromaddress, toaddresses, bccaddresses) + email.send() + +def issuePhoneAlert(event): + + # Check settings switch for turning off phone alerts + if not settings.SEND_PHONE_ALERTS: + return + + # The right way of doing this is to make the email alerts filter-able + # by search. But this is a low priority dev task. For now, we simply + # short-circuit in case this is an MDC event. + if event.search and event.search.name == 'MDC': + return + + # Gather recipients + phoneRecips = [] + if event.group.name != 'Test': + pipeline = event.pipeline + triggers = pipeline.trigger_set.filter(labels=None) + for trigger in triggers: + for recip in trigger.contacts.all(): + if ((event.far and event.far < trigger.farThresh) + or not trigger.farThresh): + if recip.phone: + phoneRecips.append(recip) + + # Make phone calls. + if phoneRecips: + make_twilio_calls(event, phoneRecips, "create") + +def issueXMPPAlert(event, location, alert_type="new", description="", serialized_object=None): + + # Check settings switch for turning off XMPP alerts + if not settings.SEND_XMPP_ALERTS: + return + + nodename = "%s_%s" % (event.group.name, event.pipeline.name) + nodename = nodename.lower() + nodenames = [ nodename, ] + if event.search: + nodename = nodename + "_%s" % event.search.name.lower() + nodenames.append(nodename) + + log.debug('issueXMPPAlert: %s' % event.graceid()) + + # Create the output dictionary and serialize as JSON. + lva_data = { + 'file': location, + 'uid': event.graceid(), + 'alert_type': alert_type, + # The following string cast is necessary because sometimes + # description is a label object! + 'description': str(description), + 'labels': [label.name for label in event.labels.all()] + } + if serialized_object: + lva_data['object'] = serialized_object + msg = json.dumps(lva_data) + log.debug("issueXMPPAlert: writing message %s" % msg) + + if settings.USE_LVALERT_OVERSEER: + manager = Manager() + + for server in settings.ALERT_XMPP_SERVERS: + port = settings.LVALERT_OVERSEER_PORTS[server] + for nodename in nodenames: + + if settings.USE_LVALERT_OVERSEER: + # Calculate unique message_id and log + message_id = sha1(nodename + msg).hexdigest() + log.info("issueXMPPAlert: sending %s,%s,%s to node %s" % (event.graceid(), alert_type, message_id, nodename)) + + rdict = manager.dict() + msg_dict = {'node_name': nodename, 'message': msg, 'action': 'push'} + p = Process(target=send_to_overseer, args=(msg_dict, rdict, log, True, port)) + p.start() + p.join() + + if rdict.get('success', None): + continue + + # If not success, we need to do this the old way. + log.info("issueXMPPAlert: failover to lvalert_send") + else: + # Not using LVAlert overseer, so just log the node and server + log.info("issueXMPPAlert: sending to node %s on %s" % (nodename, server)) + + # Set up environment for running lvalert_send script + env = os.environ.copy() + + # Construct lvalert_send command + p = Popen( + ["lvalert_send", + "--server=%s" % server, + "--file=-", + "--node=%s" % nodename, + ], + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + env=env) + + # Send lvalert message to subprocess + out, err = p.communicate(msg) + + log.debug("issueXMPPAlert: lvalert_send: return code %s" % p.returncode) + if p.returncode > 0: + # XXX This should probably raise an exception. + log.error("issueXMPPAlert: ERROR: %s" % err) + diff --git a/gracedb/alerts/event_utils.py b/gracedb/alerts/event_utils.py new file mode 100644 index 000000000..effd9432f --- /dev/null +++ b/gracedb/alerts/event_utils.py @@ -0,0 +1,53 @@ +from django.urls import reverse + +from .main import issue_alerts +from core.urls import build_absolute_uri +from events.shortcuts import is_event +from events.view_utils import eventToDict, eventLogToDict + +import logging +logger = logging.getLogger(__name__) + + +def event_alert_helper(obj, serializer=eventToDict, request=None): + """ + If obj is not an event, assume it is an object with a relation to + an Event object. + """ + + # If superevent_id is None, assume obj is a Superevent + if is_event(obj): + graceid = event.graceid() + else: + try: + graceid = obj.event.graceid() + except: + # TODO: raise appropriate error + pass + + # Construct URL for web view + url = build_absolute_uri(reverse('view', args=[graceid]), request) + + # Serialize the object into a dictionary + obj_dict = serializer(obj) + + return url, obj_dict + + +def issue_alert_for_event_log(log, request=None): + + # Get URL for event webview and serialized log + url, serialized_object = event_alert_helper(log, eventLogToDict, request) + + # Description + if log.filename: + description = "UPLOAD: '{filename}'".format(filename=log.filename) + else: + description = "LOG:" + description += " {message}".format(message=log.comment) + + # Send alerts + issue_alerts(log.event, alert_type="update", url=url, + description=description, serialized_object=serialized_object, + file_name=log.filename) + diff --git a/gracedb/alerts/lvalert.py b/gracedb/alerts/lvalert.py new file mode 100644 index 000000000..5f46c8afb --- /dev/null +++ b/gracedb/alerts/lvalert.py @@ -0,0 +1,50 @@ +import os +from multiprocessing import Process +from subprocess import Popen, PIPE +from ligo.overseer.overseer_client import send_to_overseer + +import logging +logger = logging.getLogger(__name__) + +def send_with_lvalert_overseer(node_name, message, manager, port): + + # Get rdict from manager (?) + rdict = manager.dict() + + # Compile message dictionary + msg_dict = { + 'node_name': node_name, + 'message': message, + 'action': 'push', + } + + # Send to overseer (?) + p = Process(target=send_to_overseer, args=(msg_dict, rdict, + logger, True, port)) + p.start() + p.join() + + # Return a boolean indicating whether the message was sent + # successfully or not + return True if rdict.get('success', None) is not None else False + + +def send_with_lvalert_send(node, message, server): + + # Set up environment for running lvalert_send executable + env = os.environ.copy() + + # Construct LVAlert command + cmd = [ + "lvalert_send", + "--server={server}".format(server=server), + "--file=-", + "--node={node}".format(node=node) + ] + + # Execute command + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) + out, err = p.communicate(message) + + success = True if p.returncode == 0 else False + return success, err diff --git a/gracedb/alerts/main.py b/gracedb/alerts/main.py new file mode 100644 index 000000000..921a854aa --- /dev/null +++ b/gracedb/alerts/main.py @@ -0,0 +1,148 @@ + +import json +import os +import socket +from subprocess import Popen, PIPE, STDOUT +import sys + +from django.contrib.auth.models import Group +from django.core.mail import EmailMessage +from django.conf import settings +from django.db.models import QuerySet, Q + +from core.time_utils import gpsToUtc +from events.models import Event +from events.permission_utils import is_external +from events.query import filter_for_labels +from events.shortcuts import is_event +from superevents.shortcuts import is_superevent + +from .xmpp import issue_xmpp_alert + +import logging +log = logging.getLogger(__name__) + +LVC_GROUP = Group.objects.get(name=settings.LVC_GROUP) + + +def check_recips(recips_qs): + """ + Make sure only internal users are included. Assumes that the queryset's + model has a foreign key to the user object. + """ + return recips_qs.filter(user__groups=LVC_GROUP) + + +def get_alert_recips(event_or_superevent): + + if is_superevent(event_or_superevent): + # TODO: update for superevents + pass + elif is_event(event_or_superevent): + event = event_or_superevent + triggers = event.pipeline.trigger_set.filter(labels=None) \ + .prefetch_related('contacts') + email_recips = [c for t in triggers for c in + t.contacts.all().select_related('user') + if ((not t.farThresh or (event.far and event.far < t.farThresh)) + and r.email)] + phone_recips = [c for t in triggers for c in + t.contacts.all().select_related('user') + if ((not t.farThresh or (event.far and event.far < t.farThresh)) + and r.phone)] + + return check_recips(email_recips), check_recips(phone_recips) + + +def get_alert_recips_for_label(event_or_superevent, label): + # Blank QuerySets for recipients + email_recips = QuerySet() + phone_recips = QuerySet() + + # Construct a queryset containing only this object; needed for + # call to filter_for_labels + qs = event_or_superevent.model.objects.filter(id=event_or_superevent.id) + + # Triggers on given label matching pipeline OR with no pipeline; + # no pipeline indicates that pipeline is irrelevant + if is_superevent(event_or_superevent): + # TODO: fix this + query = Q() + elif is_event(event_or_superevent): + event = event_or_superevent + query = Q(pipelines=event.pipeline) | Q(pipelines=None) + + # Iterate over triggers found from the label query + # TODO: this doesn't work quite correctly since negated labels aren't + # properly handled in the view function which creates triggers. + # Example: a trigger with '~INJ' as the label_query has INJ in its labels + # Idea: have filter_for_labels return a Q object generated from the + # label query + triggers = label.trigger_set.filter(query).prefetch_related('contacts') + for trigger in triggers.related(): + + if len(trigger.label_query) > 0: + qs_out = filter_for_labels(qs, trigger.label_query) + + # If the label query cleans out our query set, we'll continue + # without adding the recipient. + if not qs_out.exists(): + continue + + # Compile a list of recipients from the trigger's contacts + email_recips |= trigger.contacts.exclude(email="") \ + .select_related('user') + phone_recips |= trigger.contacts.exclude(phone="") \ + .select_related('user') + #email_recips.extend([c for c in + # trigger.contacts.all().select_related('user') if c.email]) + #phone_recips.extend([c for c in + # trigger.contacts.all().select_related('user') if c.phone]) + + return check_recips(email_recips), check_recips(phone_recips) + + +def issue_alerts(event_or_superevent, alert_type, url=None, file_name="", + description="", label=None, serialized_object=None): + + # Check alert_type + if alert_type not in ["new", "label", "update"]: + raise ValueError(("alert_type is {0}, should be 'new', 'label', " + "or 'update'").format(alert_type)) + + # Send XMPP alert + if settings.SEND_XMPP_ALERTS: + issue_xmpp_alert(event_or_superevent, alert_type, file_name, + serialized_object=serialized_object) + + # Below here, we only do processing for email and phone alerts ------------ + + # TODO: make phone and e-mail alerts work for superevents + if is_superevent(event_or_superevent): + return + + # We currently don't send phone or email alerts for updates + if alert_type == "update": + return + + # Don't send phone or email alerts for MDC events or Test events + if is_event(event_or_superevent): + event = event_or_superevent + if ((event.search and event.search.name == 'MDC') \ + or event.group.name == 'Test'): + return + + # Compile phone and email recipients for alert + if alert_type == "new": + email_recips, phone_recips = get_alert_recips(event_or_superevent) + # Force label = None for new alerts + label = None + elif alert_type == "label": + email_recips, phone_recips = \ + get_alert_recips_for_label(event_or_superevent, label) + + if settings.SEND_EMAIL_ALERTS: + issueEmailAlert(event_or_superevent, url) + + if settings.SEND_PHONE_ALERTS and phone_recips: + issue_phone_alerts(event_or_superevent, phone_recips, label=label) diff --git a/gracedb/alerts/phone.py b/gracedb/alerts/phone.py new file mode 100644 index 000000000..37f93586b --- /dev/null +++ b/gracedb/alerts/phone.py @@ -0,0 +1,115 @@ +import socket + +from django.conf import settings +from django_twilio.client import twilio_client +from events.permission_utils import is_external + +import logging +log = logging.getLogger(__name__) + + +# TODO: generalize to superevents +# Dict for managing TwiML bin arguments. +# Should match structure of TWIML_BINS dict in +# config/settings/secret.py. +TWIML_ARG_STR = { + 'new': 'pipeline={pipeline}&graceid={graceid}&server={server}', + 'label': ('pipeline={pipeline}&graceid={graceid}&label_lower={label}' + '&server={server}'), +} + +# TODO: fix these by using reverse +# Dict for managing Twilio message contents. +TWILIO_MSG_CONTENT = { + 'new': ('A {pipeline} event with GraceDB ID {graceid} was created.' + ' https://{server}.ligo.org/events/view/{graceid}'), + 'label': ('A {pipeline} event with GraceDB ID {graceid} was labeled with ' + '{label}. https://{server}.ligo.org/events/view/{graceid}') +} + + +def get_twilio_from(): + """Gets phone number which Twilio alerts come from.""" + for from_ in twilio_client.phone_numbers.iter(): + return from_.phone_number + raise RuntimeError('Could not determine "from" Twilio phone number') + + +def issue_phone_alerts(event, contacts, label=None): + """ + USAGE: + ------ + New event created: + issue_phone_alerts(event, contacts) + New label applied to event (Label is a GraceDB model): + issue_phone_alerts(event, contacts, label=Label) + + Note: contacts is a QuerySet of Contact objects. + """ + + # Determine alert_type + if label is not None: + alert_type = "label" + else: + alert_type = "new" + + # Get server name. + hostname = socket.gethostname() + + # Get "from" phone number. + from_ = get_twilio_from() + + # Compile Twilio voice URL and message body + msg_params = { + 'pipeline': event.pipeline.name, + 'graceid': event.graceid(), + 'server': hostname, + } + if alert_type == "label"; + msg_params['label'] = label.name + twiml_url = settings.TWIML_BASE_URL + settings.TWIML_BIN[alert_type] + \ + "?" + TWIML_ARG_STR[alert_type] + twiml_url = twiml_url.format(**msg_params) + msg_body = TWILIO_MSG_CONTENT[alert_type].format(**msg_params) + + # Loop over recipients and make calls and/or texts. + for contact in twilio_recips: + if is_external(recip.user): + # Only make calls to LVC members (non-LVC members + # shouldn't even be able to sign up for phone alerts, + # but this is another safety measure. + log.warning("External user {0} is somehow signed up for" + " phone alerts".format(recip.user.username)) + continue + + try: + # POST to TwiML bin to make voice call. + if recip.call_phone: + log.debug("Calling {0} at {1}".format(recip.user.username, + recip.phone)) + twilio_client.calls.create(to=recip.phone, from_=from_, + url=twiml_url, method='GET') + + # Create Twilio message. + if recip.text_phone: + log.debug("Texting {0} at {1}".format(recip.user.username, + recip.phone)) + twilio_client.messages.create(to=recip.phone, from_=from_, + body=msg_body) + except Exception as e: + log.exception("Failed to contact {0} at {1}.".format( + recip.user.username, recip.phone)) + + +# TODO: update for superevents +def get_phone_recips(event): + triggers = event.pipeline.trigger_set.filter(labels=None) \ + .prefetch_related('contacts') + phone_recips = [c for t in triggers for c in + t.contacts.all().select_related('user') + if ((not t.farThresh or (event.far and event.far < t.farThresh)) and + r.phone)] + + return phone_recips + + diff --git a/gracedb/alerts/superevent_utils.py b/gracedb/alerts/superevent_utils.py new file mode 100644 index 000000000..c18c0911f --- /dev/null +++ b/gracedb/alerts/superevent_utils.py @@ -0,0 +1,122 @@ +from django.urls import reverse +from rest_framework.renderers import JSONRenderer + +from .main import issue_alerts +from core.urls import build_absolute_uri +from superevents.api.serializers import SupereventSerializer, \ + SupereventLogSerializer, SupereventLabelSerializer +from superevents.shortcuts import is_superevent + +import logging +logger = logging.getLogger(__name__) + + +def superevent_alert_helper(obj, serializer=SupereventSerializer, + request=None): + """ + Assume non-superevent objects passed to this function have a + foreign key link to a superevent + """ + + # If superevent_id is None, assume obj is a Superevent + if is_superevent(obj): + superevent_id = obj.superevent_id + else: + try: + superevent_id = obj.superevent.superevent_id + except: + # TODO: raise appropriate error + pass + + # Construct URL for web view + url = build_absolute_uri(reverse('superevents:view', args=[superevent_id]), + request) + + # Serialize the object into a dictionary + obj_dict = serializer(obj).data + + return url, obj_dict + + +def issue_alert_for_superevent_creation(superevent, request=None): + + # Get URL and serialized superevent + url, serialized_object = superevent_alert_helper(superevent, + request=request) + + # Description + description = "NEW: superevent {0}".format(superevent.superevent_id) + + # Send alerts + issue_alerts(superevent, alert_type="new", url=url, + description=description, serialized_object=serialized_object) + + +#def issue_alert_for_superevent_update(superevent, request=None): +# # Get URL and serialized superevent +# url, serialized_object = superevent_alert_helper(superevent, +# request=request) +# +# # Description +# # TODO: fix +# description = "UPDATE: superevent {0}".format(superevent.superevent_id) +# +# # Send alerts +# issue_alerts(superevent, alert_type="update", url=url, +# description=description, serialized_object=serialized_object) + + +def issue_alert_for_superevent_log(log, request=None): + + # Get URL for superevent webview and serialized log + url, serialized_object = superevent_alert_helper(log, + SupereventLogSerializer, request=request) + + # Description + if log.filename: + description = "UPLOAD: '{filename}'".format(filename=log.filename) + else: + description = "LOG:" + description += " {message}".format(message=log.comment) + + # Send alerts + issue_alerts(log.superevent, alert_type="update", url=url, + description=description, serialized_object=serialized_object, + file_name=log.filename) + + +def issue_alert_for_superevent_label_creation(labelling, request=None): + + # Get URL for superevent webview and serialized label + url, serialized_object = superevent_alert_helper(labelling, + SupereventLabelSerializer, request=request) + + # Description + description = "LABEL: {label} added".format(label=labelling.label.name) + + # Send alerts + # NOTE: current alerts don't include an object (change this?) + issue_alerts(labelling.superevent, alert_type="label", url=url, + description=description, serialized_object=None) + + +def issue_alert_for_superevent_label_removal(labelling, request=None): + # Get URL for superevent webview and serialized label + url, serialized_object = superevent_alert_helper(labelling, + SupereventLabelSerializer, request=request) + + # Description + description = "UPDATE: {label} removed".format(label=labelling.label.name) + + # Send alerts + # TODO: should this be a 'label' alert or an 'update' alert + issue_alerts(labelling.superevent, alert_type="update", url=url, + description=description, serialized_object=None) + + +def issue_alert_for_superevent_voevent(voevent, request=None): + pass + + +def issue_alert_for_superevent_emobservation(emobservation, request=None): + pass diff --git a/gracedb/alerts/xmpp.py b/gracedb/alerts/xmpp.py new file mode 100644 index 000000000..60d1a0ae4 --- /dev/null +++ b/gracedb/alerts/xmpp.py @@ -0,0 +1,145 @@ + +import simplejson +import os +import socket +import sys + + +from django.core.mail import EmailMessage +from django.conf import settings + +from .lvalert import send_with_lvalert_overseer, send_with_lvalert_send +from core.time_utils import gpsToUtc +from events.permission_utils import is_external +from events.query import filter_for_labels +from events.shortcuts import is_event +from superevents.shortcuts import is_superevent + +import logging +logger = logging.getLogger(__name__) + +if settings.USE_LVALERT_OVERSEER: + from hashlib import sha1 + from multiprocessing import Manager + + +def get_xmpp_node_names(event_or_superevent): + """ + Utility function for determining the names of nodes to which XMPP + notifications should be sent. Accepts an event or superevent object as the + sole argument. + """ + + # Compile a list of node names + node_names = [] + if is_superevent(event_or_superevent): + # TODO: test superevents + is_test = False + if is_test: + superevent_node = 'test_superevent' + else: + superevent_node = 'superevent' + node_names.append(superevent_node) + elif is_event(event_or_superevent): + # Node name format is group_pipeline or group_pipeline_search + # If search is provided, we send alerts to both of the relevant nodes + # NOTE: for test events, group=Test + event = event_or_superevent + gp_node = "{group}_{pipeline}".format(group=event.group.name, + pipeline=event.pipeline.name).lower() + node_names.append(gp_node) + if event.search: + gps_node = gp_node + "_{search}".format( + search=event.search.name.lower()) + node_names.append(gps_node) + else: + error_msg = ('Object is of {0} type; should be an event ' + 'or superevent').format(type(event_or_superevent)) + logger.error(error_msg) + # TODO: way to catch this? + raise TypeError(error_msg) + + return node_names + + +def issue_xmpp_alert(event_or_superevent, alert_type="new", file_name="", + description="", serialized_object=None): + """ + serialized_object should be a dict + """ + + # Check settings switch for turning off XMPP alerts + if not settings.SEND_XMPP_ALERTS: + return + + # Determine LVAlert node names + node_names = get_xmpp_node_names(event_or_superevent) + + # Get object id + if is_superevent(event_or_superevent): + object_id = event_or_superevent.superevent_id + elif is_event(event_or_superevent): + object_id = event_or_superevent.graceid() + else: + error_msg = ('Object is of {0} type; should be an event ' + 'or superevent').format(type(event_or_superevent)) + logger.error(error_msg) + raise TypeError(error_msg) + + # Create the output dictionary and serialize as JSON. + lva_data = { + 'file': file_name, + 'uid': object_id, + 'alert_type': alert_type, + 'description': description, + 'labels': [label.name for label in event_or_superevent.labels.all()] + } + if serialized_object is not None: + lva_data['object'] = serialized_object + # simplejson.dumps is needed to properly handle Decimal fields + msg = simplejson.dumps(lva_data) + + # Log message for debugging + logger.debug("issue_xmpp_alert: sending message {msg} for {object_id}" \ + .format(msg=msg, object_id=object_id)) + + # Get manager ready for LVAlert Overseer (?) + if settings.USE_LVALERT_OVERSEER: + manager = Manager() + + # Loop over LVAlert servers and nodes, issuing the alert to each + for server in settings.ALERT_XMPP_SERVERS: + port = settings.LVALERT_OVERSEER_PORTS[server] + for node_name in node_names: + + # Calculate unique message_id and log + message_id = sha1(node_name + msg).hexdigest() + + # Log message + logger.info(("issue_xmpp_alert: sending alert type {alert_type} " + "with message {msg_id} for {obj_id} to {node}").format( + alert_type=alert_type, msg_id=message_id, obj_id=object_id, + node=node_name)) + + # Try to send with LVAlert Overseer (if enabled) + success = False + if settings.USE_LVALERT_OVERSEER: + + # Send with LVAlert Overseer + success = send_with_lvalert_overseer(node_name, msg, manager, + port) + + # If not success, we need to do this the old way. + if not success: + logger.critical(("issue_xmpp_alert: sending message with " + "LVAlert Overseer failed, trying lvalert_send")) + + # If not using LVAlert Overseer or if sending with overseer failed, + # use basic lvalert_send executable (gross) + if (not settings.USE_LVALERT_OVERSEER) or (not success): + success, err = send_with_lvalert_send(node_name, msg, server) + + if not success: + logger.critical(("issue_xmpp_alert: error sending message " + "with lvalert_send: {e}").format(e=err)) + -- GitLab