From 0c880ace7c948b54029f2dbc04bf003afaa5e592 Mon Sep 17 00:00:00 2001
From: Brandon Piotrzkowski <piotrzk3@uwm.edu>
Date: Thu, 12 Dec 2019 23:21:18 -0600
Subject: [PATCH] Automate sky map comparison; Fixes #248

---
 gwcelery/tasks/external_skymaps.py            | 53 +++++++-----
 gwcelery/tasks/external_triggers.py           | 76 ++++++++++++++++--
 gwcelery/tasks/gracedb.py                     |  4 +-
 gwcelery/tasks/raven.py                       |  2 +
 gwcelery/tests/test_tasks_external_skymaps.py | 22 +++--
 .../tests/test_tasks_external_triggers.py     | 80 +++++++++++++++++++
 gwcelery/tests/test_tasks_raven.py            |  5 +-
 7 files changed, 205 insertions(+), 37 deletions(-)

diff --git a/gwcelery/tasks/external_skymaps.py b/gwcelery/tasks/external_skymaps.py
index b4953394b..ef0e4f9e6 100644
--- a/gwcelery/tasks/external_skymaps.py
+++ b/gwcelery/tasks/external_skymaps.py
@@ -2,8 +2,8 @@
 from astropy import units as u
 from astropy.coordinates import ICRS, SkyCoord
 from astropy_healpix import HEALPix, pixel_resolution_to_nside
-#  import astropy.utils.data
 from celery import group
+#  import astropy.utils.data
 import numpy as np
 from ligo.skymap.io import fits
 from ligo.skymap.tool import ligo_skymap_combine
@@ -20,29 +20,37 @@ from ..util.tempfile import NamedTemporaryFile
 from ..import _version
 
 
-def create_combined_skymap(graceid):
+def create_combined_skymap(se_id, ext_id):
     """Creates and uploads the combined LVC-Fermi skymap.
 
     This also uploads the external trigger skymap to the external trigger
     GraceDB page.
     """
-    preferred_skymap = get_preferred_skymap(graceid)
-    message = 'Combined LVC-Fermi sky map using {0}.'.format(preferred_skymap)
-    new_skymap = re.findall(r'(.*).fits', preferred_skymap)[0] + '-gbm.fits.gz'
-    external_trigger_id = external_trigger(graceid)
-    return (external_trigger_heasarc.s(external_trigger_id) |
-            get_external_skymap.s() |
-            group(
-                combine_skymaps.s(gracedb.download(preferred_skymap,
-                                                   graceid)) |
-                gracedb.upload.s(
-                    new_skymap, graceid, message, ['sky_loc', 'public']),
-
-                gracedb.upload.s('glg_healpix_all_bn_v00.fit',
-                                 external_trigger_id,
-                                 'Sky map from HEASARC.',
-                                 ['sky_loc', 'public']))
-            )
+    se_skymap_filename = get_preferred_skymap(se_id)
+    ext_skymap_filename = get_external_skymap_filename(ext_id)
+    new_skymap_filename = re.findall(r'(.*).fits.gz', se_skymap_filename)[0]
+
+    se_skymap = gracedb.download(se_skymap_filename, se_id)
+    ext_skymap = gracedb.download(ext_skymap_filename, ext_id)
+    message = 'Combined LVC-external sky map using {0}.'.format(
+        se_skymap_filename)
+    message_png = (
+        'Mollweide projection of <a href="/api/events/{graceid}/files/'
+        '{filename}">{filename}</a>').format(
+            graceid=se_id, filename=new_skymap_filename + '-ext.fits.gz')
+
+    (
+        combine_skymaps.si(se_skymap, ext_skymap)
+        |
+        group(
+            gracedb.upload.s(new_skymap_filename + '-ext.fits.gz', se_id,
+                             message, ['sky_loc', 'public']),
+            skymaps.plot_allsky.s()
+            |
+            gracedb.upload.s(new_skymap_filename + '-ext.png', se_id,
+                             message_png, ['sky_loc', 'public'])
+        )
+    ).delay()
 
 
 @app.task(autoretry_for=(ValueError,), retry_backoff=10,
@@ -57,8 +65,7 @@ def get_preferred_skymap(graceid):
     for message in reversed(gracedb_log):
         comment = message['comment']
         filename = message['filename']
-        if (filename.endswith('.fits.gz') or filename.endswith('.fits')) and \
-                'copied' in comment:
+        if filename.endswith('.fits.gz'):
             return filename
     raise ValueError('No skymap available for {0} yet.'.format(graceid))
 
@@ -172,6 +179,8 @@ def get_upload_external_skymap(graceid):
                 graceid,
                 'Sky map from HEASARC.',
                 ['sky_loc'])
+            |
+            gracedb.create_label.si('LUMIN_GO', graceid)
         ).delay()
 
     except ValueError:
@@ -300,4 +309,6 @@ def create_upload_external_skymap(event):
                          graceid,
                          message,
                          ['sky_loc'])
+        |
+        gracedb.create_label.si('LUMIN_GO', graceid)
     ).delay()
diff --git a/gwcelery/tasks/external_triggers.py b/gwcelery/tasks/external_triggers.py
index 13b5576b5..0b5f7e86d 100644
--- a/gwcelery/tasks/external_triggers.py
+++ b/gwcelery/tasks/external_triggers.py
@@ -12,6 +12,15 @@ from . import raven
 log = get_logger(__name__)
 
 
+REQUIRED_LABELS_BY_TASK = {
+    'compare': {'SKYMAP_READY', 'LUMIN_GO', 'EM_COINC'},
+    'combine': {'SKYMAP_READY', 'LUMIN_GO', 'RAVEN_ALERT'}
+}
+"""These labels should be present on an event to consider it to
+be ready for sky map comparison.
+"""
+
+
 @gcn.handler(gcn.NoticeType.SNEWS,
              queue='exttrig',
              shared=False)
@@ -145,15 +154,44 @@ def handle_grb_lvalert(alert):
     if alert['alert_type'] == 'new' and \
             alert['object'].get('group') == 'External':
         raven.coincidence_search(graceid, alert['object'], group='CBC')
-        raven.coincidence_search(graceid, alert['object'],
-                                 group='Burst')
-    elif 'S' in graceid:
+        raven.coincidence_search(graceid, alert['object'], group='Burst')
+    elif alert['alert_type'] == 'new' and 'S' in graceid:
         preferred_event_id = gracedb.get_superevent(graceid)['preferred_event']
         group = gracedb.get_event(preferred_event_id)['group']
-        if alert['alert_type'] == 'new':
-            raven.coincidence_search(graceid, alert['object'],
-                                     group=group,
-                                     pipelines=['Fermi', 'Swift'])
+        raven.coincidence_search(graceid, alert['object'],
+                                 group=group,
+                                 pipelines=['Fermi', 'Swift'])
+    elif alert['alert_type'] == 'label_added' and \
+            alert['object'].get('group') == 'External':
+        if _skymaps_are_ready(alert['object'], 'compare') and \
+                alert['data']['name'] in REQUIRED_LABELS_BY_TASK['compare']:
+            #  if both sky maps present and a coincidence, compare sky maps
+            se_id, ext_ids = _get_superevent_ext_ids(graceid, alert['object'],
+                                                     'compare')
+            superevent = gracedb.get_superevent(se_id)
+            preferred_event_id = superevent['preferred_event']
+            gw_group = gracedb.get_event(preferred_event_id)['group']
+            raven.raven_pipeline([alert['object']], se_id, superevent,
+                                 gw_group)
+        if _skymaps_are_ready(alert['object'], 'combine') and \
+                alert['data']['name'] in REQUIRED_LABELS_BY_TASK['combine']:
+            #  if both sky maps present and a raven alert, create combined
+            #  skymap
+            se_id, ext_id = _get_superevent_ext_ids(graceid, alert['object'],
+                                                    'combine')
+            external_skymaps.create_combined_skymap(se_id, ext_id)
+        elif 'EM_COINC' in alert['object']['labels']:
+            #  if not complete, check if GW sky map; apply label to external
+            #  event if GW sky map
+            se_labels = gracedb.get_labels(alert['object']['superevent'])
+            if 'SKYMAP_READY' in se_labels:
+                gracedb.create_label.si('SKYMAP_READY', graceid).delay()
+    elif alert['alert_type'] == 'label_added' and 'S' in graceid:
+        #  if sky map in superevent, apply label to all external events
+        #  at the time
+        if 'SKYMAP_READY' in alert['object']['labels']:
+            for ext_id in alert['object']['em_events']:
+                gracedb.create_label.si('SKYMAP_READY', ext_id).delay()
 
 
 @lvalert.handler('superevent',
@@ -189,3 +227,27 @@ def handle_snews_lvalert(alert):
         if alert['alert_type'] == 'new' and group == 'Burst':
             raven.coincidence_search(graceid, alert['object'],
                                      group=group, pipelines=['SNEWS'])
+
+
+def _skymaps_are_ready(event, task):
+    label_set = set(event['labels'])
+    required_labels = REQUIRED_LABELS_BY_TASK[task]
+    return required_labels.issubset(label_set)
+
+
+def _get_superevent_ext_ids(graceid, event, task):
+    if task == 'combine':
+        if 'S' in graceid:
+            se_id = event['superevent_id']
+            ext_id = event['em_type']
+        else:
+            se_id = event['superevent']
+            ext_id = event['graceid']
+    elif task == 'compare':
+        if 'S' in graceid:
+            se_id = event['superevent_id']
+            ext_id = event['em_events']
+        else:
+            se_id = event['superevent']
+            ext_id = [event['graceid']]
+    return se_id, ext_id
diff --git a/gwcelery/tasks/gracedb.py b/gwcelery/tasks/gracedb.py
index 453092372..17a297c38 100644
--- a/gwcelery/tasks/gracedb.py
+++ b/gwcelery/tasks/gracedb.py
@@ -329,6 +329,6 @@ def add_event_to_superevent(superevent_id, graceid):
         with client.addEventToSuperevent(superevent_id, graceid):
             pass  # Close without reading response; we only needed the status
     except rest.HTTPError as e:
-        error_msg = '"is already assigned to a Superevent"'
-        if not (e.status == 400 and e.message == error_msg):
+        error_msg = 'is already assigned to a Superevent'
+        if not (e.status == 400 and error_msg in e.message):
             raise
diff --git a/gwcelery/tasks/raven.py b/gwcelery/tasks/raven.py
index b666f94df..a61f78e8e 100644
--- a/gwcelery/tasks/raven.py
+++ b/gwcelery/tasks/raven.py
@@ -326,6 +326,8 @@ def trigger_raven_alert(coinc_far_json, superevent, gracedb_id,
             (
                 gracedb.create_label.si('RAVEN_ALERT', superevent_id)
                 |
+                gracedb.create_label.si('RAVEN_ALERT', ext_id)
+                |
                 gracedb.create_label.si('RAVEN_ALERT', preferred_gwevent_id)
             ).delay()
     if not pass_far_threshold:
diff --git a/gwcelery/tests/test_tasks_external_skymaps.py b/gwcelery/tests/test_tasks_external_skymaps.py
index a3d93a9d1..aca347b4d 100644
--- a/gwcelery/tests/test_tasks_external_skymaps.py
+++ b/gwcelery/tests/test_tasks_external_skymaps.py
@@ -60,14 +60,24 @@ def mock_get_file_contents(monkeypatch, toy_fits_filecontents):  # noqa: F811
         'astropy.utils.data.get_file_contents', get_file_contents)
 
 
-@patch('gwcelery.tasks.gracedb.get_superevent', mock_get_superevent)
-@patch('gwcelery.tasks.gracedb.get_event', mock_get_event)
-@patch('gwcelery.tasks.gracedb.get_log', mock_get_log)
-@patch('astropy.utils.data.get_file_contents', mock_get_file_contents)
-def test_create_combined_skymap():
+@patch('gwcelery.tasks.skymaps.plot_allsky.run')
+@patch('gwcelery.tasks.gracedb.upload.run')
+@patch('gwcelery.tasks.external_skymaps.combine_skymaps.run')
+@patch('gwcelery.tasks.gracedb.download')
+@patch('gwcelery.tasks.external_skymaps.get_external_skymap_filename',
+       return_value='fermi_skymap.fits.gz,0')
+@patch('gwcelery.tasks.external_skymaps.get_preferred_skymap',
+       return_value='bayestar.fits.gz,0')
+def test_create_combined_skymap(mock_get_preferred_skymap,
+                                mock_get_external_skymap_filename,
+                                mock_download,
+                                mock_combine_skymaps, mock_upload,
+                                mock_plot_allsky):
     """Test creating combined LVC and Fermi skymap"""
     # Run function under test
-    external_skymaps.create_combined_skymap('S12345')
+    external_skymaps.create_combined_skymap('S12345', 'E12345')
+    mock_combine_skymaps.assert_called_once()
+    mock_upload.assert_called()
 
 
 @patch('gwcelery.tasks.gracedb.get_log', mock_get_log)
diff --git a/gwcelery/tests/test_tasks_external_triggers.py b/gwcelery/tests/test_tasks_external_triggers.py
index a5e299139..4cf57ddb9 100644
--- a/gwcelery/tests/test_tasks_external_triggers.py
+++ b/gwcelery/tests/test_tasks_external_triggers.py
@@ -99,6 +99,86 @@ def test_handle_replace_grb_event(mock_get_event, mock_get_events,
     mock_replace_event.assert_called_once_with('E1', text)
 
 
+@patch('gwcelery.tasks.gracedb.create_label.run')
+@patch('gwcelery.tasks.gracedb.get_labels',
+       return_value=['SKYMAP_READY'])
+def test_handle_create_skymap_label_from_ext_event(mock_get_labels,
+                                                   mock_create_label):
+    alert = {"uid": "E1212",
+             "alert_type": "label_added",
+             "data": {"name": "EM_COINC"},
+             "object": {
+                 "group": "External",
+                 "labels": ["EM_COINC", "LUMIN_GO"],
+                 "superevent": "S1234"
+                       }
+             }
+    external_triggers.handle_grb_lvalert(alert)
+    mock_create_label.assert_called_once_with('SKYMAP_READY', 'E1212')
+
+
+@patch('gwcelery.tasks.gracedb.create_label.run')
+def test_handle_create_skymap_label_from_superevent(mock_create_label):
+    alert = {"uid": "S1234",
+             "alert_type": "label_added",
+             "data": {"name": "SKYMAP_READY"},
+             "object": {
+                 "group": "CBC",
+                 "labels": ["SKYMAP_READY"],
+                 "superevent_id": "S1234",
+                 "em_events": ['E1212']
+                       }
+             }
+    external_triggers.handle_grb_lvalert(alert)
+    mock_create_label.assert_called_once_with('SKYMAP_READY', 'E1212')
+
+
+@patch('gwcelery.tasks.raven.raven_pipeline')
+@patch('gwcelery.tasks.gracedb.get_superevent',
+       return_value={
+           'superevent_id': 'S1234',
+           'preferred_event': 'G1234'
+                    })
+@patch('gwcelery.tasks.gracedb.get_event',
+       return_value={
+           'graceid': 'G1234',
+           'group': 'CBC'
+                    })
+def test_handle_skymap_comparison(mock_get_event, mock_get_superevent,
+                                  mock_raven_pipeline):
+    alert = {"uid": "E1212",
+             "alert_type": "label_added",
+             "data": {"name": "SKYMAP_READY"},
+             "object": {
+                 "graceid": "E1212",
+                 "group": "External",
+                 "labels": ["EM_COINC", "LUMIN_GO", "SKYMAP_READY"],
+                 "superevent": "S1234"
+                       }
+             }
+    external_triggers.handle_grb_lvalert(alert)
+    mock_raven_pipeline.assert_called_once_with([alert['object']], 'S1234',
+                                                {'superevent_id': 'S1234',
+                                                 'preferred_event': 'G1234'},
+                                                'CBC')
+
+
+@patch('gwcelery.tasks.external_skymaps.create_combined_skymap')
+def test_handle_skymap_combine(mock_create_combined_skymap):
+    alert = {"uid": "E1212",
+             "alert_type": "label_added",
+             "data": {"name": "RAVEN_ALERT"},
+             "object": {
+                 "graceid": "E1212",
+                 "group": "External",
+                 "labels": ["EM_COINC", "LUMIN_GO", "SKYMAP_READY",
+                            "RAVEN_ALERT"],
+                 "superevent": "S1234"}
+             }
+    external_triggers.handle_grb_lvalert(alert)
+    mock_create_combined_skymap.assert_called_once_with('S1234', 'E1212')
+
+
 @patch('gwcelery.tasks.detchar.dqr_json', return_value='dqrjson')
 @patch('gwcelery.tasks.gracedb.upload.run')
 @patch('gwcelery.tasks.gracedb.get_event', return_value={
diff --git a/gwcelery/tests/test_tasks_raven.py b/gwcelery/tests/test_tasks_raven.py
index 81390cd42..0f4f9e23c 100644
--- a/gwcelery/tests/test_tasks_raven.py
+++ b/gwcelery/tests/test_tasks_raven.py
@@ -164,7 +164,9 @@ def test_raven_pipeline(mock_create_label,
     threshold, when a coincidence is found but does pass threshold, and when
     multiple events are found.
     """
-    alert_object = {'preferred_event': 'G1', 'pipeline': 'GRB'}
+    alert_object = {'preferred_event': 'G1', 'pipeline': 'GRB', 'labels': []}
+    for result in raven_search_results:
+        result['labels'] = []
     if 'E' in graceid:
         alert_object['group'] = 'External'
     raven.raven_pipeline(raven_search_results, graceid, alert_object, group)
@@ -309,6 +311,7 @@ def test_trigger_raven_alert(mock_create_label, mock_update_superevent,
 
     if expected_result:
         label_calls = [call('RAVEN_ALERT', superevent_id),
+                       call('RAVEN_ALERT', ext_id),
                        call('RAVEN_ALERT', preferred_id)]
         if ext_event['pipeline'] == 'SNEWS':
             coinc_far = None
-- 
GitLab