Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • alexander.pace/server
  • geoffrey.mo/gracedb-server
  • deep.chatterjee/gracedb-server
  • cody.messick/server
  • sushant.sharma-chaudhary/server
  • michael-coughlin/server
  • daniel.wysocki/gracedb-server
  • roberto.depietri/gracedb
  • philippe.grassia/gracedb
  • tri.nguyen/gracedb
  • jonah-kanner/gracedb
  • brandon.piotrzkowski/gracedb
  • joseph-areeda/gracedb
  • duncanmmacleod/gracedb
  • thomas.downes/gracedb
  • tanner.prestegard/gracedb
  • leo-singer/gracedb
  • computing/gracedb/server
18 results
Show changes
Showing
with 1651 additions and 0 deletions
# Submodule for alert issuer classes
# Utilities for issuing alerts
class AlertIssuer(object):
"""Base class for issuing XMPP alerts"""
serializer_class = None
alert_types = None
def __init__(self, obj, alert_type):
# Check alert type
if alert_type not in self.alert_types:
raise ValueError('alert_type should be in {0}'.format(
self.alert_types))
# Assign attributes
self.obj = obj
self.alert_type = alert_type
def serialize_obj(self):
return self.serializer_class(self.obj).data
def issue_alerts(self, **kwargs):
# Should be overridden in derived classes
return NotImplemented
class AlertIssuerWithParentObject(AlertIssuer):
"""
Base class for issuing XMPP alerts. Handles the case where
we want to include both a serialized object (like a log, label, etc.),
as well as a "parent" object to which the first object is attched, like
an event or superevent.
"""
parent_serializer_class = None
def serialize_parent(self):
return self.serializer_class(self.get_parent_obj())
def get_parent_obj(self):
if not hasattr(self, 'parent_obj'):
self.parent_obj = self._get_parent_obj()
return self.parent_obj
def _get_parent_obj(self):
return NotImplemented
from __future__ import absolute_import
import logging
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from guardian.models import GroupObjectPermission
from api.v1.events.serializers import EventSerializer
from core.urls import build_absolute_uri
from events.shortcuts import is_event
from events.view_utils import eventLogToDict, signoffToDict, \
emObservationToDict, embbEventLogToDict, groupeventpermissionToDict, \
labelToDict, voeventToDict
from ..main import issue_alerts
from .base import AlertIssuerWithParentObject
# Set up logger
logger = logging.getLogger(__name__)
# NOTE: we have to be careful in all of these serializers since we want to
# serialize the event subclass always, not the base event object.
class AlertIssuerWithParentEvent(AlertIssuerWithParentObject):
parent_serializer_class = EventSerializer
def serialize_obj(self):
return self.serializer_class(self.obj)
def serialize_parent(self):
return self.parent_serializer_class(self.get_parent_obj(),
context={'is_alert': True}).data
def _get_parent_obj(self):
# Assumes that the obj has a direct relation to an event
if not hasattr(self.obj, 'event'):
raise AttributeError(('object of class {0} does not have a direct '
'relationship to an event').format(
self.obj.__class__.__name__))
# Make sure we have the event "subclass"
return self.obj.event.get_subclass_or_self()
def issue_alerts(self, **kwargs):
issue_alerts(self.get_parent_obj(), self.alert_type,
self.serialize_obj(), self.serialize_parent(), **kwargs)
class EventAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = EventSerializer
alert_types = ['new', 'update', 'selected_as_preferred',
'removed_as_preferred', 'added_to_superevent',
'removed_from_superevent']
def serialize_obj(self):
return self.serializer_class(self.obj.get_subclass_or_self(),
context={'is_alert': True}).data
def _get_parent_obj(self):
return self.obj.get_subclass_or_self()
class EventLogAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(eventLogToDict)
alert_types = ['log']
class EventLabelAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(labelToDict)
alert_types = ['label_added', 'label_removed']
def issue_alerts(self, **kwargs):
issue_alerts(self.get_parent_obj(), self.alert_type,
self.serialize_obj(), self.serialize_parent(),
label=self.obj.label, **kwargs)
class EventVOEventAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(voeventToDict)
alert_types = ['voevent']
class EventEMObservationAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(emObservationToDict)
alert_types = ['emobservation']
class EventEMBBEventLogAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(embbEventLogToDict)
alert_types = ['embb_event_log']
class EventSignoffAlertIssuer(AlertIssuerWithParentEvent):
serializer_class = staticmethod(signoffToDict)
alert_types = ['signoff_created', 'signoff_updated', 'signoff_deleted']
class EventPermissionsAlertIssuer(EventAlertIssuer):
serializer_class = staticmethod(groupeventpermissionToDict)
alert_types = ['exposed', 'hidden']
def serialize_obj(self):
"""self.obj should be an event here"""
gops = GroupObjectPermission.objects.filter(
object_pk=self.obj.pk,
content_type=ContentType.objects.get_for_model(self.obj))
gop_list = [self.serializer_class(gop) for gop in gops]
return gop_list
from api.v1.superevents.serializers import SupereventSerializer, \
SupereventLogSerializer, SupereventLabelSerializer, \
SupereventVOEventSerializer, SupereventEMObservationSerializer, \
SupereventSignoffSerializer, SupereventGroupObjectPermissionSerializer
from ..main import issue_alerts
from .base import AlertIssuerWithParentObject
class AlertIssuerWithParentSuperevent(AlertIssuerWithParentObject):
parent_serializer_class = SupereventSerializer
def serialize_parent(self):
return self.parent_serializer_class(self.get_parent_obj(), is_alert=True).data
def _get_parent_obj(self):
# Assumes that the obj has a direct relation to a superevent
if not hasattr(self.obj, 'superevent'):
raise AttributeError(('object of class {0} does not have a direct '
'relationship to a superevent').format(
self.obj.__class__.__name__))
return self.obj.superevent
def issue_alerts(self, **kwargs):
issue_alerts(self.get_parent_obj(), self.alert_type,
self.serialize_obj(), self.serialize_parent(), **kwargs)
class SupereventAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventSerializer
alert_types = ['new', 'update', 'event_added', 'event_removed',
'confirmed_as_gw']
def _get_parent_obj(self):
return self.obj
class SupereventLogAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventLogSerializer
alert_types = ['log']
class SupereventLabelAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventLabelSerializer
alert_types = ['label_added', 'label_removed']
def issue_alerts(self, **kwargs):
issue_alerts(self.get_parent_obj(), self.alert_type,
self.serialize_obj(), self.serialize_parent(),
label=self.obj.label, **kwargs)
class SupereventVOEventAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventVOEventSerializer
alert_types = ['voevent']
class SupereventEMObservationAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventEMObservationSerializer
alert_types = ['emobservation']
class SupereventSignoffAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventSignoffSerializer
alert_types = ['signoff_created', 'signoff_updated', 'signoff_deleted']
class SupereventPermissionsAlertIssuer(AlertIssuerWithParentSuperevent):
serializer_class = SupereventGroupObjectPermissionSerializer
alert_types = ['exposed', 'hidden']
def serialize_obj(self):
"""
self.obj should be a superevent, but we want to return the list
of group object permissions here.
"""
# NOTE: it seems really weird to return a list of permissions here.
# But exposing/hiding a list of permissions does in fact change
# multiple permissions. Should give this some more thought.
gops = self.obj.supereventgroupobjectpermission_set.all()
return self.serializer_class(gops, many=True).data
def _get_parent_obj(self):
return self.obj
import logging
import os
import asyncio
import datetime
import json
import time
import functools
from django.conf import settings
from igwn_alert import client
from igwn_alert_overseer.overseer.overseer_client import overseer_client
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from asyncio.exceptions import InvalidStateError
# Set up logger
logger = logging.getLogger(__name__)
# man, just shorten the variable name
OVERSEER_TIMEOUT = settings.OVERSEER_TIMEOUT
def timeout_and_stop(io_loop):
logger.critical(f'Overseer IO Loop timed out after {OVERSEER_TIMEOUT} seconds.')
io_loop.stop()
def send_with_lvalert_overseer(node_name, message, port):
# Compile message dictionary
msg_dict = {
'node_name': node_name,
'message': message,
'action': 'push',
}
# Set up client:
client = overseer_client(host='localhost', port=port)
# Format message. FIXME maybe move this step into the overseer client?
msg_dict = json.dumps(msg_dict)
alert_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(alert_loop)
# Start the async request to push the message to the overseer, and
# await the success/failure response.
resp = client.send_to_overseer(msg_dict, logger)
# Start the async I/O loop within the current thread
io_loop = IOLoop.instance()
# Construct a callable that passes io_loop as an argument
overseer_timeout = functools.partial(timeout_and_stop, io_loop)
# Add a timeout for the scenario where the overseer server isn't
# running or responding. This shouldn't actually happen, but hey.
io_loop.add_timeout(time.time() + OVERSEER_TIMEOUT, overseer_timeout)
# Start the I/O loop
io_loop.start()
# Interpret the response
rdict = json.loads(resp.result())
# Two scenarios here: the overseer client code gives a StreamClosedError
# when the I/O loop was stopped after it timed out. I think the
# InvalidStateError came as a result of prior implementation of this logic,
# so i don't think it would occur again... but if it does it still represents
# an invalid response from the overseer, so the alert should be sent again.
except (StreamClosedError, InvalidStateError) as e:
# close the loop and free up the port:
alert_loop.close()
# return false and then attempt to send with the client code.
return False
finally:
# close the loop and free up the port:
alert_loop.close()
# Return a boolean indicating whether the message was sent
# successfully or not
return True if rdict.get('success', None) is not None else False
def send_with_kafka_client(node, message, server, username=None,
password=None, group=None, **kwargs):
# Set up for initializing LVAlertClient instance
client_settings = {
'server': server
}
# Username and password should be provided for container deployments.
# For VMs, they won't be, so it will look up the credentials in the
# hop auth.toml file
if username is not None:
client_settings['username'] = username
if password is not None:
client_settings['password'] = password
# if for some reason the group didn't get set correctly, make it the
# default
if group is not None:
client_settings['group'] = group
else:
client_settings['group'] = settings.DEFAULT_IGWN_ALERT_GROUP
# Instantiate client
igwn_alert_client = client(**client_settings)
# Send message
igwn_alert_client.publish(topic=node, msg=message)
from __future__ import absolute_import
import logging
from django.conf import settings
from events.shortcuts import is_event
from .email import issue_email_alerts
from .mattermost import issue_mattermost_alerts
from .phone import issue_phone_alerts
from .recipients import ALERT_TYPE_RECIPIENT_GETTERS
from .xmpp import issue_xmpp_alerts
# Set up logger
logger = logging.getLogger(__name__)
def issue_alerts(event_or_superevent, alert_type, serialized_object,
serialized_parent=None, **kwargs):
# Send XMPP alert
if settings.SEND_XMPP_ALERTS:
issue_xmpp_alerts(event_or_superevent, alert_type, serialized_object,
serialized_parent=serialized_parent)
# Process phone and email alerts
issue_phone_and_email_alerts(event_or_superevent, alert_type, **kwargs)
# Process Mattermost alerts
if settings.SEND_MATTERMOST_ALERTS:
issue_mattermost_alerts(event_or_superevent, alert_type)
def issue_phone_and_email_alerts(event_or_superevent, alert_type, **kwargs):
# A few checks on whether we should issue a phone and/or email alert ------
if not (settings.SEND_EMAIL_ALERTS or settings.SEND_PHONE_ALERTS):
return
# Phone and email alerts are only issued for certain alert types
# We check this by looking at the keys of ALERT_TYPE_RECIPIENT_GETTERS
if (alert_type not in ALERT_TYPE_RECIPIENT_GETTERS):
return
# Don't send phone or email alerts for MDC or Test cases, or offline
# edit: include detchar events
if is_event(event_or_superevent):
# Test/MDC events
event = event_or_superevent
if event.is_mdc() or event.is_test():
return
# Offline events
if event.offline:
return
# Detchar events:
if (event.group.name == 'Detchar' or
event.pipeline.name == 'HardwareInjection'):
return
else:
# Test/MDC superevents
s = event_or_superevent
if s.is_mdc() or s.is_test():
return
# Superevents with offline preferred events
if s.preferred_event.offline:
return
# Looks like we're going to issue phone and/or email alerts ---------------
# Get recipient getter class
rg_class = ALERT_TYPE_RECIPIENT_GETTERS[alert_type]
# Instantiate recipient getter
rg = rg_class(event_or_superevent, **kwargs)
# Get recipients
email_recipients, phone_recipients = rg.get_recipients()
# Try to get label explicitly from kwargs
label = kwargs.get('label', None)
# Issue phone alerts
if settings.SEND_PHONE_ALERTS and phone_recipients.exists():
issue_phone_alerts(event_or_superevent, alert_type, phone_recipients,
label=label)
# Issue email alerts
if settings.SEND_EMAIL_ALERTS and email_recipients.exists():
issue_email_alerts(event_or_superevent, alert_type, email_recipients,
label=label)
import datetime
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.management.base import BaseCommand, CommandError
from alerts.models import Contact, Notification
# Turn on x-ray tracing for improved performance in AWS
if getattr(settings, 'ENABLE_AWS_XRAY', None):
try:
from aws_xray_sdk.core import xray_recorder
xray_recorder.begin_segment("remove-alerts-segment")
except ModuleNotFoundError:
print("aws_xray_sdk not found, skipping.")
class Command(BaseCommand):
help="Delete Contacts and Notifications for inactive users"
def add_arguments(self, parser):
parser.add_argument('-q', '--quiet', action='store_true',
default=False, help='Suppress output')
def handle(self, *args, **options):
verbose = not options['quiet']
if verbose:
self.stdout.write(('Checking inactive users\' notifications and '
'contacts at {0}').format(datetime.datetime.utcnow()))
# Get contacts and notifications whose user is no longer in the LVC
lvc = Group.objects.get(name=settings.LVC_GROUP)
notifications = Notification.objects.exclude(user__groups=lvc)
contacts = Contact.objects.exclude(user__groups=lvc)
# Generate log message
if verbose:
if notifications.exists():
t_log_msg = "Deleting {0} notifications: ".format(
notifications.count()) + " | ".join([t.__str__()
for t in notifications])
self.stdout.write(t_log_msg)
if contacts.exists():
c_log_msg = "Deleting {0} contacts: ".format(contacts.count())\
+ " | ".join([c.__str__() for c in contacts])
self.stdout.write(c_log_msg)
# Delete
notifications.delete()
contacts.delete()
import logging
from django.conf import settings
from django.urls import reverse
from core.urls import build_absolute_uri
from events.shortcuts import is_event
from . import egad
# Set up logger
logger = logging.getLogger(__name__)
message_template = 'A superevent with GraceDB ID {sid} was created. [View]({url})'
def issue_mattermost_alerts_local(event_or_superevent, alert_type):
# Not implemented
pass
def issue_mattermost_alerts_egad(event_or_superevent, alert_type):
# For now we're hard-coded to only issue new superevent alerts
if is_event(event_or_superevent):
return
if alert_type != "new":
return
if (settings.TIER not in {"dev"}
and not event_or_superevent.is_production()):
return
superevent = event_or_superevent
# For now we're hard-coded to the default channel
channel_label = "default"
# Construct the message, currently hard-coded to new superevents
url = build_absolute_uri(reverse('superevents:view',
args=[superevent.superevent_id]))
message = message_template.format(sid=superevent.superevent_id, url=url)
payload = {
"channel_label": channel_label,
"message": message,
}
egad.send_alert("mattermost", payload)
if settings.ENABLE_EGAD_MATTERMOST:
issue_mattermost_alerts = issue_mattermost_alerts_egad
else:
issue_mattermost_alerts = issue_mattermost_alerts_local
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2019-03-05 20:50
from __future__ import unicode_literals
import alerts.fields
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('events', '0032_create_imbh_search'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Contact',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('description', models.CharField(max_length=30)),
('email', models.EmailField(blank=True, max_length=254, null=True)),
('phone', alerts.fields.PhoneNumberField(blank=True, max_length=255, null=True)),
('phone_method', models.CharField(blank=True, choices=[(b'C', b'Call'), (b'T', b'Text'), (b'B', b'Call and text')], default=None, max_length=1, null=True)),
('verified', models.BooleanField(default=False, editable=False)),
('verification_code', models.IntegerField(editable=False, null=True)),
('verification_expiration', models.DateTimeField(editable=False, null=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='Notification',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('description', models.CharField(max_length=40)),
('far_threshold', models.FloatField(blank=True, null=True)),
('label_query', models.CharField(blank=True, max_length=100, null=True)),
('category', models.CharField(choices=[(b'E', b'Event'), (b'S', b'Superevent')], default=b'S', max_length=1)),
('ns_candidate', models.BooleanField(default=False)),
('contacts', models.ManyToManyField(to='alerts.Contact')),
('groups', models.ManyToManyField(blank=True, to='events.Group')),
('labels', models.ManyToManyField(blank=True, to='events.Label')),
('pipelines', models.ManyToManyField(blank=True, to='events.Pipeline')),
('searches', models.ManyToManyField(blank=True, to='events.Search')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
]
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2019-05-03 17:31
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
def set_verified_time_default(apps, schema_editor):
Contact = apps.get_model('alerts', 'Contact')
# Data migration to set verified_time for all contacts
# which are already verified
for c in Contact.objects.filter(verified=True).iterator():
c.verified_time = c.updated
c.save(update_fields=['verified_time'])
class Migration(migrations.Migration):
dependencies = [
('alerts', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='contact',
name='created',
field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now),
preserve_default=False,
),
migrations.AddField(
model_name='contact',
name='updated',
field=models.DateTimeField(auto_now=True),
),
migrations.AddField(
model_name='contact',
name='verified_time',
field=models.DateTimeField(blank=True, editable=False, null=True),
),
migrations.RunPython(
set_verified_time_default,
migrations.RunPython.noop
),
]
# -*- coding: utf-8 -*-
# Generated by Django 1.11.18 on 2019-05-03 17:59
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('alerts', '0002_add_created_updated_verified_time_fields_to_contact'),
]
operations = [
migrations.AddField(
model_name='notification',
name='created',
field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now),
preserve_default=False,
),
migrations.AddField(
model_name='notification',
name='updated',
field=models.DateTimeField(auto_now=True),
),
]
# -*- coding: utf-8 -*-
# Generated by Django 1.11.23 on 2019-09-19 19:57
# This was auto-generated after moving to Python 3, with no changes to the
# actual models. See the commit message for more details.
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('alerts', '0003_add_created_updated_time_fields_to_notification'),
]
operations = [
migrations.AlterField(
model_name='contact',
name='phone_method',
field=models.CharField(blank=True, choices=[('C', 'Call'), ('T', 'Text'), ('B', 'Call and text')], default=None, max_length=1, null=True),
),
migrations.AlterField(
model_name='notification',
name='category',
field=models.CharField(choices=[('E', 'Event'), ('S', 'Superevent')], default='S', max_length=1),
),
]
from collections import defaultdict
import logging
import random
import textwrap
import six
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.mail import EmailMessage
from django.db import models
from django.utils import timezone
from django.utils.http import urlencode
from django_twilio.client import twilio_client
from core.models import CleanSaveModel
from .fields import PhoneNumberField
from .phone import get_twilio_from
# Set up logger
logger = logging.getLogger(__name__)
# Set up user model
UserModel = get_user_model()
###############################################################################
# Contacts ####################################################################
###############################################################################
class Contact(CleanSaveModel):
# Phone contact methods
CONTACT_PHONE_CALL = 'C'
CONTACT_PHONE_TEXT = 'T'
CONTACT_PHONE_BOTH = 'B'
CONTACT_PHONE_METHODS = (
(CONTACT_PHONE_CALL, 'Call'),
(CONTACT_PHONE_TEXT, 'Text'),
(CONTACT_PHONE_BOTH, 'Call and text'),
)
# Number of digits in verification codes
CODE_DIGITS = 6
# Fields
user = models.ForeignKey(UserModel, null=False, on_delete=models.CASCADE)
description = models.CharField(max_length=30, blank=False, null=False)
email = models.EmailField(blank=True, null=True)
phone = PhoneNumberField(blank=True, max_length=255, null=True)
phone_method = models.CharField(max_length=1, null=True, blank=True,
choices=CONTACT_PHONE_METHODS, default=None)
verified = models.BooleanField(default=False, editable=False)
verification_code = models.IntegerField(null=True, editable=False)
verification_expiration = models.DateTimeField(null=True, editable=False)
# Fields for tracking when certain things happen
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
verified_time = models.DateTimeField(null=True, blank=True, editable=False)
def __str__(self):
return "{0}: {1}".format(self.user.username, self.description)
def clean(self):
# Mostly used for preventing creation of bad Contact
# objects through the Django interface.
super(Contact, self).clean()
err_dict = defaultdict(list)
# If a phone number is given, require either call or text to be True.
if (self.phone is not None and self.phone_method is None):
err_msg = 'Choose a phone contact method.'
err_dict['phone_method'].append(err_msg)
if (self.phone is None and self.phone_method is not None):
err_msg = '"Call" and "text" should be False for non-phone alerts.'
err_dict['phone'].append(err_msg)
# Only one contact method is allowed
if (self.email is not None and self.phone is not None):
err_msg = \
'Only one contact method (email or phone) can be selected.'
err_dict[NON_FIELD_ERRORS].append(err_msg)
# If no e-mail or phone given, raise error.
# We have to skip this due to really annoying behavior with forms.. :(
#if not (self.email or self.phone):
# err_msg = \
# 'One contact method (email or phone) is required.'
# err_dict[NON_FIELD_ERRORS].append(err_msg)
if err_dict:
raise ValidationError(err_dict)
def generate_verification_code(self):
self.verification_code = random.randint(10**(self.CODE_DIGITS-1),
(10**self.CODE_DIGITS)-1)
self.verification_expiration = timezone.now() + \
settings.VERIFICATION_CODE_LIFETIME
self.save(update_fields=['verification_code',
'verification_expiration'])
def send_verification_code(self):
# Message for texts and emails
msg = ('Verification code for contact "{desc}" on {host}: {code}. '
'If you did not request a verification code or do not know what '
'this is, please disregard.').format(desc=self.description,
host=settings.LIGO_FQDN, code=self.verification_code)
if self.email:
subject = 'Verification code for contact "{desc}" on {host}' \
.format(desc=self.description, host=settings.LIGO_FQDN)
email = EmailMessage(subject, msg,
from_email=settings.ALERT_EMAIL_FROM, to=[self.email])
email.send()
elif self.phone:
from_ = get_twilio_from()
if (self.phone_method == self.CONTACT_PHONE_CALL):
# If phone method is only call, send a call
# Convert code to a string with spaces between the numbers
# so it's pronounced properly by text-to-voice
code = " ".join(str(self.verification_code))
urlparams = urlencode({'code': code})
twiml_url = '{base}{twiml_bin}?{params}'.format(
base=settings.TWIML_BASE_URL,
twiml_bin=settings.TWIML_BIN['verify'],
params=urlparams)
twilio_client.calls.create(to=self.phone, from_=from_,
url=twiml_url, method='GET')
else:
# If method is text or both, send a text
twilio_client.messages.create(to=self.phone, from_=from_,
body=msg)
def verify(self):
if not self.verified:
self.verified = True
self.verified_time = timezone.now()
self.save(update_fields=['verified', 'verified_time'])
def display(self):
if self.email:
return "Email {0}".format(self.email)
elif self.phone:
if self.phone_method == self.CONTACT_PHONE_BOTH:
return "Call and text {0}".format(self.phone)
elif self.phone_method == self.CONTACT_PHONE_CALL:
return "Call {0}".format(self.phone)
if self.phone_method == self.CONTACT_PHONE_TEXT:
return "Text {0}".format(self.phone)
def print_info(self):
"""Prints information about Contact object; useful for debugging."""
info_str = textwrap.dedent("""\
Contact "{description}" (user {username})
E-mail: {email}
Phone: {phone} (method={method})
Created: {created_time}
Last updated: {updated_time}
Verified: {verified} ({verified_time})
""").format(description=self.description, username=self.user.username,
email=self.email, phone=self.phone, method=self.phone_method,
created_time=self.created, updated_time=self.updated,
verified=self.verified, verified_time=self.verified_time)
print(info_str)
###############################################################################
# Notifications ###############################################################
###############################################################################
class Notification(models.Model):
# Notification categories
NOTIFICATION_CATEGORY_EVENT = 'E'
NOTIFICATION_CATEGORY_SUPEREVENT = 'S'
NOTIFICATION_CATEGORY_CHOICES = (
(NOTIFICATION_CATEGORY_EVENT, 'Event'),
(NOTIFICATION_CATEGORY_SUPEREVENT, 'Superevent'),
)
user = models.ForeignKey(UserModel, null=False, on_delete=models.CASCADE)
contacts = models.ManyToManyField(Contact)
description = models.CharField(max_length=40, blank=False, null=False)
far_threshold = models.FloatField(blank=True, null=True)
labels = models.ManyToManyField('events.label', blank=True)
label_query = models.CharField(max_length=100, null=True, blank=True)
category = models.CharField(max_length=1, null=False, blank=False,
choices=NOTIFICATION_CATEGORY_CHOICES,
default=NOTIFICATION_CATEGORY_SUPEREVENT)
# Fields for tracking when certain things happen
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
# Whether the event possibly has a neutron star in it.
# The logic for determining this is defined in a method below.
ns_candidate = models.BooleanField(default=False)
# Event-only fields
groups = models.ManyToManyField('events.group', blank=True)
pipelines = models.ManyToManyField('events.pipeline', blank=True)
searches = models.ManyToManyField('events.search', blank=True)
def __str__(self):
return six.text_type(
"{username}: {display}".format(
username=self.user.username,
display=self.display()
)
)
def display(self):
kwargs = {}
if self.category == self.NOTIFICATION_CATEGORY_EVENT:
output = 'Event'
elif self.category == self.NOTIFICATION_CATEGORY_SUPEREVENT:
output = 'Superevent'
# Add label stuff
if self.label_query or self.labels.exists():
action = 'labeled with {labels}'
if self.label_query:
labels = '({0})'.format(self.label_query)
else:
labels = " & ".join([l.name for l in self.labels.all()])
if self.labels.count() > 1:
labels = '({0})'.format(labels)
action = action.format(labels=labels)
else:
if self.far_threshold or self.ns_candidate:
action = 'created or updated'
else:
action = 'created'
output += ' {action}'.format(action=action)
# Add groups, pipelines, searches for event-type notifications
if self.category == self.NOTIFICATION_CATEGORY_EVENT:
output += ' & {groups} & {pipelines} & {searches}'
if self.groups.exists():
grps = " | ".join([g.name for g in self.groups.all()])
if self.groups.count() > 1:
grps = '({0})'.format(grps)
kwargs['groups'] = 'group={0}'.format(grps)
else:
kwargs['groups'] = 'any group'
if self.pipelines.exists():
pipelines = " | ".join([p.name for p in self.pipelines.all()])
if self.pipelines.count() > 1:
pipelines = '({0})'.format(pipelines)
kwargs['pipelines'] = 'pipeline={0}'.format(pipelines)
else:
kwargs['pipelines'] = 'any pipeline'
if self.searches.exists():
searches = " | ".join([s.name for s in self.searches.all()])
if self.searches.count() > 1:
searches = '({0})'.format(searches)
kwargs['searches'] = 'search={0}'.format(searches)
else:
kwargs['searches'] = 'any search'
# Optionally add FAR threshold
if self.far_threshold:
output += ' & FAR < {far_threshold}'
kwargs['far_threshold'] = self.far_threshold
# Optionally add NS candidate info
if self.ns_candidate:
output += ' & Is NS candidate'
# Add contacts
output += ' -> {contacts}'
kwargs['contacts'] = \
", ".join([c.display() for c in self.contacts.all()])
return output.format(**kwargs)
from __future__ import absolute_import
import logging
import time
from django.conf import settings
from django.urls import reverse
from django.utils.http import urlencode
from django_twilio.client import twilio_client
from core.urls import build_absolute_uri
from events.permission_utils import is_external
from events.shortcuts import is_event
from . import egad
from .utils import convert_superevent_id_to_speech
# Set up logger
logger = logging.getLogger(__name__)
# Dict for managing Twilio message contents.
TWILIO_MSG_CONTENT = {
'event': {
'new': ('A {pipeline} event with GraceDB ID {graceid} was created. '
'{url}'),
'update': ('A {pipeline} event with GraceDB ID {graceid} was updated. '
'{url}'),
'label_added': ('A {pipeline} event with GraceDB ID {graceid} was '
'labeled with {label}. {url}'),
'label_removed': ('The label {label} was removed from a {pipeline} '
'event with GraceDB ID {graceid}. {url}'),
},
'superevent': {
'new': 'A superevent with GraceDB ID {sid} was created. {url}',
'update': 'A superevent with GraceDB ID {sid} was updated. {url}',
'label_added': ('A superevent with GraceDB ID {sid} was labeled with '
'{label}. {url}'),
'label_removed': ('The label {label} was removed from a superevent '
'with GraceDB ID {sid}. {url}'),
},
}
def get_twilio_from():
"""Gets phone number which Twilio alerts come from."""
for from_ in twilio_client.incoming_phone_numbers.list():
return from_.phone_number
raise RuntimeError('Could not determine "from" Twilio phone number')
def get_message_content(event_or_superevent, alert_type, **kwargs):
"""Get content for text messages"""
# kwargs should include 'label' for label_added and label_removed alerts
# Get template
if is_event(event_or_superevent):
event_type = 'event'
else:
event_type = 'superevent'
msg_template = TWILIO_MSG_CONTENT[event_type][alert_type]
# Compile message content
if is_event(event_or_superevent):
event = event_or_superevent
# Get url
url = build_absolute_uri(reverse('view', args=[event.graceid]))
# Compile message
msg = msg_template.format(graceid=event.graceid,
pipeline=event.pipeline.name, url=url, **kwargs)
else:
superevent = event_or_superevent
# get url
url = build_absolute_uri(reverse('superevents:view',
args=[superevent.superevent_id]))
# Compile message
msg = msg_template.format(sid=superevent.superevent_id, url=url,
**kwargs)
return msg
def compile_twiml_url(event_or_superevent, alert_type, **kwargs):
# Try to get label from kwargs - should be a string corresponding
# to the label name here
label = kwargs.get('label', None)
# Compile urlparams
if is_event(event_or_superevent):
urlparams = {
'graceid': event_or_superevent.graceid,
'pipeline': event_or_superevent.pipeline.name,
}
twiml_bin_dict = settings.TWIML_BIN['event']
else:
urlparams = {'sid': convert_superevent_id_to_speech(
event_or_superevent.superevent_id)}
twiml_bin_dict = settings.TWIML_BIN['superevent']
if label is not None:
urlparams['label_lower'] = label.lower()
# Construct TwiML URL
twiml_url = '{base}{twiml_bin}?{params}'.format(
base=settings.TWIML_BASE_URL,
twiml_bin=twiml_bin_dict[alert_type],
params=urlencode(urlparams))
return twiml_url
def issue_phone_alerts_local(event_or_superevent, alert_type, contacts,
label=None):
"""
Note: contacts is a QuerySet of Contact objects.
"""
time_start = time.perf_counter()
# Get "from" phone number.
from_ = get_twilio_from()
# Get message content
msg_kwargs = {}
if alert_type in ['label_added', 'label_removed'] and label:
msg_kwargs['label'] = label.name
msg_body = get_message_content(event_or_superevent, alert_type,
**msg_kwargs)
# Compile Twilio voice URL
twiml_url = compile_twiml_url(event_or_superevent, alert_type,
**msg_kwargs)
# Loop over recipients and make calls and/or texts.
for contact in contacts:
if is_external(contact.user):
# Only make calls to LVC members (non-LVC members
# shouldn't even be able to sign up for phone alerts,
# but this is another safety measure.
logger.warning("External user {0} is somehow signed up for"
" phone alerts".format(contact.user.username))
continue
try:
if (contact.phone_method in [contact.__class__.CONTACT_PHONE_CALL,
contact.__class__.CONTACT_PHONE_BOTH]):
# POST to TwiML bin to make voice call.
logger.debug("Calling {0} at {1}".format(contact.user.username,
contact.phone))
twilio_client.calls.create(to=contact.phone, from_=from_,
url=twiml_url, method='GET')
except Exception as e:
logger.exception("Failed to call {0} at {1}.".format(
contact.user.username, contact.phone))
try:
# Create Twilio message.
if (contact.phone_method in [contact.__class__.CONTACT_PHONE_TEXT,
contact.__class__.CONTACT_PHONE_BOTH]):
logger.debug("Texting {0} at {1}".format(contact.user.username,
contact.phone))
twilio_client.messages.create(to=contact.phone, from_=from_,
body=msg_body)
except Exception as e:
logger.exception("Failed to text {0} at {1}.".format(
contact.user.username, contact.phone))
time_elapsed = time.perf_counter() - time_start
logger.debug(f"Sent {len(contacts)} phone alerts in {time_elapsed} sec")
def issue_phone_alerts_egad(event_or_superevent, alert_type, contacts,
label=None):
time_start = time.perf_counter()
# Get message content
msg_kwargs = {}
if alert_type in ['label_added', 'label_removed'] and label:
msg_kwargs['label'] = label.name
message = get_message_content(event_or_superevent, alert_type,
**msg_kwargs)
# Compile Twilio voice URL
twiml_url = compile_twiml_url(event_or_superevent, alert_type,
**msg_kwargs)
# Loop over recipients to get information needed by EGAD
contacts_info = []
for contact in contacts:
if is_external(contact.user):
# Only make calls to LVC members (non-LVC members
# shouldn't even be able to sign up for phone alerts,
# but this is another safety measure.
logger.warning("External user {0} is somehow signed up for"
" phone alerts".format(contact.user.username))
continue
contacts_info.append({
"phone_method": contact.phone_method,
"phone_number": contact.phone,
})
payload = {
"contacts": contacts_info,
"message": message,
"twiml_url": twiml_url,
}
egad.send_alert("phone", payload)
time_elapsed = time.perf_counter() - time_start
logger.debug(
f"Dispatched {len(contacts)} phone alerts in {time_elapsed} sec"
)
if settings.ENABLE_EGAD_PHONE:
issue_phone_alerts = issue_phone_alerts_egad
else:
issue_phone_alerts = issue_phone_alerts_local
try:
from functools import reduce
except ImportError: # python < 3
pass
from django.conf import settings
from django.db.models import Q
from events.models import Label
from events.shortcuts import is_event
from superevents.shortcuts import is_superevent
from .models import Contact, Notification
from .utils import evaluate_label_queries
class CreationRecipientGetter(object):
queryset = Notification.objects.all()
def __init__(self, es, **kwargs):
# NOTE: es = event_or_superevent
self.is_event_alert = is_event(es)
self.event = es if self.is_event_alert else es.preferred_event
self.process_kwargs(**kwargs)
# Explicitly get the values for a few things and store them on a
# class instance. This is because there is a possibility of race
# conditions if the event or superevent is updated while we are trying
# to figure out which notifications should trigger, which can take
# several seconds or more in production.
self.far = self.event.far
self.is_ns_candidate = self.event.is_ns_candidate()
# Force queryset evaluation with list()
self.label_names = list(es.labels.values_list('name', flat=True))
def process_kwargs(self, **kwargs):
pass
def get_category_filter(self):
if self.is_event_alert:
return Q(category=Notification.NOTIFICATION_CATEGORY_EVENT)
return Q(category=Notification.NOTIFICATION_CATEGORY_SUPEREVENT)
def get_far_filter(self):
query = Q(far_threshold__isnull=True)
if self.far:
query |= Q(far_threshold__gt=self.far)
return query
def get_nscand_filter(self):
if self.is_ns_candidate:
return Q()
return Q(ns_candidate=False)
def get_group_filter(self):
if self.is_event_alert:
return Q(groups__isnull=True) | Q(groups=self.event.group)
return Q()
def get_pipeline_filter(self):
if self.is_event_alert:
return Q(pipelines__isnull=True) | Q(pipelines=self.event.pipeline)
return Q()
def get_search_filter(self):
if self.is_event_alert:
return Q(searches__isnull=True) | Q(searches=self.event.search)
return Q()
def get_filter_query(self):
filter_list = [getattr(self, method)() for method in dir(self)
if method.startswith('get_') and method.endswith('_filter')]
if filter_list:
return reduce(Q.__and__, filter_list)
return Q()
def get_trigger_query(self):
return Q()
def filter_for_labels(self, notifications):
# Check notifications which do NOT have a label query. Check whether
# their labels are a subset of what is attached to the event or
# superevent. Notifications with no labels are automatically a
# subset.
# In this case, all the labels attached to the notification should
# be in the set attached to the event/superevent.
notification_pks = []
label_set = set(self.label_names)
for n in notifications.filter(label_query__isnull=True):
n_label_set = set(n.labels.values_list('name', flat=True))
if n_label_set.issubset(label_set):
notification_pks.append(n.pk)
# Check those with label queries
notification_qs = notifications.filter(label_query__isnull=False)
pks = evaluate_label_queries(self.label_names, notification_qs)
notification_pks.extend(pks)
return Notification.objects.filter(pk__in=notification_pks)
def get_contacts_for_notifications(self, notifications):
# Get contacts; make sure contacts are verified and user is in the
# LVC group (safeguards)
contacts = Contact.objects.filter(notification__in=notifications,
verified=True, user__groups__name=settings.LVC_GROUP) \
.select_related('user')
# Separate into email and phone contacts
email_recipients = contacts.filter(email__isnull=False)
phone_recipients = contacts.filter(phone__isnull=False)
return email_recipients, phone_recipients
def get_notifications(self):
# Get trigger query and apply to get baseline set of notifications
trigger_query = self.get_trigger_query()
base_notifications = self.queryset.filter(trigger_query)
# Get and apply filter query to trim it down
filter_query = self.get_filter_query()
notifications = base_notifications.filter(filter_query)
# Do label filtering - remove any notifications whose
# label requirements are not met by the event or superevent
final_notifications = self.filter_for_labels(notifications)
return final_notifications
def get_recipients(self):
# Get notifications matching criteria
notifications = self.get_notifications()
# Get email and phone recipients and return
email_contacts, phone_contacts = \
self.get_contacts_for_notifications(notifications)
# Filter to get only "distinct" contacts; i.e., don't send multiple
# texts to a user who has two notifications set up to point to the
# same contact
email_contacts = email_contacts.distinct()
phone_contacts = phone_contacts.distinct()
return email_contacts, phone_contacts
class UpdateRecipientGetter(CreationRecipientGetter):
def process_kwargs(self, **kwargs):
# We try to get old_far this way since old_far can be None, but we
# want the code to fail if it is not provided.
try:
self.old_far = kwargs['old_far']
except KeyError:
raise ValueError('old_far must be provided')
self.old_nscand = kwargs.get('old_nscand', None)
if self.old_nscand is None:
raise ValueError('old_nscand must be provided')
def get_trigger_query(self):
# Initial query should match no objects
query = Q(pk__in=[])
# Then we add other options that could possibly match
if self.far is not None:
if self.old_far is None:
query |= Q(far_threshold__gt=self.far)
else:
query |= (Q(far_threshold__lte=self.old_far) &
Q(far_threshold__gt=self.far))
if self.old_nscand is False and self.is_ns_candidate:
query |= Q(ns_candidate=True)
return query
class LabelAddedRecipientGetter(CreationRecipientGetter):
def process_kwargs(self, **kwargs):
self.label = kwargs.get('label', None)
if self.label is None:
raise ValueError('label must be provided')
def get_notifications(self):
# Any notification that might be triggered by a label_added action
# should have that label in the 'labels' field. This includes
# notifications with a label_query. Part of the Notification creation
# process picks out all labels in the label_query (even negated ones)
# and adds the to the 'labels' field.
base_notifications = self.label.notification_set.all()
# Get and apply filter query to trim it down
filter_query = self.get_filter_query()
notifications = base_notifications.filter(filter_query)
# Do label filtering
final_notifications = self.filter_for_labels(notifications)
# Get email and phone recipients and return
return final_notifications
class LabelRemovedRecipientGetter(LabelAddedRecipientGetter):
def filter_for_labels(self, notifications):
# Only notifications with a label query should be triggered
# by a label_removed alert, since notifications with a
# label set can only have non-negated labels.
notification_qs = notifications.filter(label_query__isnull=False)
pks = evaluate_label_queries(self.label_names, notification_qs)
return Notification.objects.filter(pk__in=pks)
# Dict which maps alert types to recipient getter classes
ALERT_TYPE_RECIPIENT_GETTERS = {
'new': CreationRecipientGetter,
'update': UpdateRecipientGetter,
'label_added': LabelAddedRecipientGetter,
'label_removed': LabelRemovedRecipientGetter,
}
import copy
import pytest
import re
from django.contrib.auth import get_user_model
from alerts.models import Contact, Notification
from events.models import Label, Group, Pipeline, Search, Event
from superevents.models import Superevent
from .constants import (
DEFAULT_FAR_T, DEFAULT_LABELS, DEFAULT_LABEL_QUERY, LABEL_QUERY2,
RANDOM_LABEL, LABEL_QUERY_PARSER, DEFAULT_GROUP, DEFAULT_PIPELINE,
DEFAULT_SEARCH
)
UserModel = get_user_model()
###############################################################################
# UTILITY FUNCTIONS ###########################################################
###############################################################################
def create_notification(
user,
notification_category,
contact_description='test',
phone=None,
email=None,
phone_method=Contact.CONTACT_PHONE_BOTH,
notification_description='test',
far_threshold=None,
ns_candidate=None,
label_names=None,
label_query=None,
groups=None,
pipelines=None,
searches=None,
):
# Create contact
contact_dict = {}
if phone is not None and email is not None:
raise ValueError("Specify only one of label_names or label_query")
elif phone:
contact_dict['phone'] = phone
contact_dict['phone_method'] = phone_method
elif email:
contact_dict['email'] = email
c = Contact.objects.create(
user=user,
description=contact_description,
verified=True,
**contact_dict
)
# Create notification
notification_dict = {}
if far_threshold:
notification_dict['far_threshold'] = far_threshold
if ns_candidate:
notification_dict['ns_candidate'] = ns_candidate
if label_query:
notification_dict['label_query'] = label_query
n = Notification.objects.create(
user=user,
description=notification_description,
category=notification_category,
**notification_dict
)
# Add m2m relations
n.contacts.add(c)
if label_names and label_query:
raise ValueError('')
elif label_query:
label_names = LABEL_QUERY_PARSER.findall(label_query)
if label_names:
for l in label_names:
label, _ = Label.objects.get_or_create(name=l)
n.labels.add(label)
if notification_category == Notification.NOTIFICATION_CATEGORY_EVENT:
if groups:
for g in groups:
group, _ = Group.objects.get_or_create(name=g)
n.groups.add(group)
if pipelines:
for p in pipelines:
pipeline, _ = Pipeline.objects.get_or_create(name=p)
n.pipelines.add(pipeline)
if searches:
for s in searches:
search, _ = Search.objects.get_or_create(name=s)
n.searches.add(search)
return n
###############################################################################
# FIXTURES ####################################################################
###############################################################################
@pytest.mark.django_db
@pytest.fixture
def event():
group, _ = Group.objects.get_or_create(name='event_group')
pipeline, _ = Pipeline.objects.get_or_create(name='event_pipeline')
search, _ = Search.objects.get_or_create(name='event_search')
user = UserModel.objects.create(username='event.creator')
event = Event.objects.create(group=group, pipeline=pipeline, search=search,
far=1, submitter=user)
return event
@pytest.mark.django_db
@pytest.fixture
def superevent(event):
user = UserModel.objects.create(username='superevent.creator')
superevent = Superevent.objects.create(submitter=user, t_start=0, t_0=1,
t_end=2, preferred_event=event)
return superevent
SUPEREVENT_NOTIFICATION_DATA = [
dict(desc='all'),
dict(desc='far_t_only', far_threshold=DEFAULT_FAR_T),
dict(desc='nscand_only', ns_candidate=True),
dict(desc='labels_only', label_names=DEFAULT_LABELS),
dict(desc='labelq_only', label_query=DEFAULT_LABEL_QUERY['query']),
dict(desc='far_t_and_nscand', far_threshold=DEFAULT_FAR_T,
ns_candidate=True),
dict(desc='far_t_and_labels', far_threshold=DEFAULT_FAR_T,
label_names=DEFAULT_LABELS),
dict(desc='far_t_and_labelq', far_threshold=DEFAULT_FAR_T,
label_query=DEFAULT_LABEL_QUERY['query']),
dict(desc='nscand_and_labels', ns_candidate=True,
label_names=DEFAULT_LABELS),
dict(desc='nscand_and_labelq', ns_candidate=True,
label_query=DEFAULT_LABEL_QUERY['query']),
dict(desc='far_t_and_nscand_and_labels', far_threshold=DEFAULT_FAR_T,
ns_candidate=True, label_names=DEFAULT_LABELS),
dict(desc='far_t_and_nscand_and_labelq', far_threshold=DEFAULT_FAR_T,
ns_candidate=True, label_query=DEFAULT_LABEL_QUERY['query']),
dict(desc='labelq2_only', label_query=LABEL_QUERY2),
dict(desc='far_t_and_labelq2', far_threshold=DEFAULT_FAR_T,
label_query=LABEL_QUERY2),
dict(desc='nscand_and_labelq2', ns_candidate=True,
label_query=LABEL_QUERY2),
dict(desc='far_t_and_nscand_and_labelq2', far_threshold=DEFAULT_FAR_T,
ns_candidate=True, label_query=LABEL_QUERY2),
]
@pytest.mark.django_db
@pytest.fixture
def superevent_notifications(request):
# Get user fixture
user = request.getfixturevalue('internal_user')
# Create notifications
notification_pks = []
notification_data = copy.deepcopy(SUPEREVENT_NOTIFICATION_DATA)
for notification_dict in notification_data:
desc = notification_dict.pop('desc')
n = create_notification(
user,
Notification.NOTIFICATION_CATEGORY_SUPEREVENT,
phone='12345678901',
notification_description=desc,
**notification_dict
)
notification_pks.append(n.pk)
return Notification.objects.filter(pk__in=notification_pks)
EVENT_NOTIFICATION_DATA = copy.deepcopy(SUPEREVENT_NOTIFICATION_DATA)
EVENT_NOTIFICATION_DATA += [
dict(desc='gps_only', groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_gps', far_threshold=DEFAULT_FAR_T,
groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='nscand_and_gps', ns_candidate=True,
groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='labels_and_gps', label_names=DEFAULT_LABELS,
groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='labelq_and_gps', label_query=DEFAULT_LABEL_QUERY['query'],
groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_nscand_and_gps', far_threshold=DEFAULT_FAR_T,
ns_candidate=True, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_labels_and_gps', far_threshold=DEFAULT_FAR_T,
label_names=DEFAULT_LABELS, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_labelq_and_gps', far_threshold=DEFAULT_FAR_T,
label_query=DEFAULT_LABEL_QUERY['query'], groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='nscand_and_labels_and_gps', ns_candidate=True,
label_names=DEFAULT_LABELS, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='nscand_and_labelq_and_gps', ns_candidate=True,
label_query=DEFAULT_LABEL_QUERY['query'], groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_nscand_and_labels_and_gps',
far_threshold=DEFAULT_FAR_T, ns_candidate=True,
label_names=DEFAULT_LABELS, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_nscand_and_labelq_and_gps',
far_threshold=DEFAULT_FAR_T, ns_candidate=True,
label_query=DEFAULT_LABEL_QUERY['query'], groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='labelq2_and_gps', label_query=LABEL_QUERY2,
groups=[DEFAULT_GROUP], pipelines=[DEFAULT_PIPELINE],
searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_labelq2_and_gps', far_threshold=DEFAULT_FAR_T,
label_query=LABEL_QUERY2, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='nscand_and_labelq2_and_gps', ns_candidate=True,
label_query=LABEL_QUERY2, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
dict(desc='far_t_and_nscand_and_labelq2_and_gps',
far_threshold=DEFAULT_FAR_T, ns_candidate=True,
label_query=LABEL_QUERY2, groups=[DEFAULT_GROUP],
pipelines=[DEFAULT_PIPELINE], searches=[DEFAULT_SEARCH]),
]
@pytest.mark.django_db
@pytest.fixture
def event_notifications(request):
# Get user fixture
user = request.getfixturevalue('internal_user')
# Create notifications
notification_pks = []
notification_data = copy.deepcopy(EVENT_NOTIFICATION_DATA)
for notification_dict in notification_data:
desc = notification_dict.pop('desc')
n = create_notification(
user,
Notification.NOTIFICATION_CATEGORY_EVENT,
phone='12345678901',
notification_description=desc,
**notification_dict
)
notification_pks.append(n.pk)
return Notification.objects.filter(pk__in=notification_pks)