Skip to content
Snippets Groups Projects

Reenable combined sky maps now using multi-ordering

Compare and
8 files
+ 279
107
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 28
9
@@ -6,6 +6,7 @@ from celery.utils.log import get_logger
from hop.models import AvroBlob
from ..import app
from .core import identity
from . import gracedb
log = get_logger(__name__)
@@ -101,7 +102,7 @@ def _upload_notice(self, payload, brokerhost, superevent_id):
@app.task(bind=True, queue='kafka', shared=False)
def _send(self, alert_dict, skymap, brokerhost):
def _send(self, alert_dict, skymap, combined_skymap, brokerhost):
"""Write the alert to the Kafka topic"""
# Copy the alert dictionary so we dont modify the original
payload_dict = alert_dict.copy()
@@ -116,6 +117,10 @@ def _send(self, alert_dict, skymap, brokerhost):
encoder = config['skymap_encoder']
payload_dict['event']['skymap'] = encoder(skymap)
if combined_skymap:
payload_dict['external_coinc']['combined_skymap_filename'] = \
encoder(combined_skymap)
# Write to kafka topic
serialization_model = \
self.app.conf['kafka_streams'][brokerhost].serialization_model
@@ -155,7 +160,7 @@ def send(self, skymap_and_classification, superevent, alert_type,
"""
if skymap_and_classification is not None:
skymap, *classification = skymap_and_classification
skymap, combined_skymap, *classification = skymap_and_classification
else:
skymap = None
classification = None
@@ -173,7 +178,7 @@ def send(self, skymap_and_classification, superevent, alert_type,
|
group(
(
_send.s(skymap, brokerhost)
_send.s(skymap, combined_skymap, brokerhost)
|
_upload_notice.s(brokerhost, superevent['superevent_id'])
) for brokerhost in self.app.conf['kafka_streams'].keys()
@@ -194,13 +199,15 @@ def send(self, skymap_and_classification, superevent, alert_type,
@app.task(shared=False)
def _create_skymap_classification_tuple(skymap, classification):
return (skymap, *classification)
def _create_skymap_classification_tuple(skymap, combined_skymap,
classification):
return (skymap, combined_skymap, *classification)
@app.task(shared=False, ignore_result=True)
def download_skymap_and_send_alert(classification, superevent, alert_type,
skymap_filename=None, raven_coinc=False):
skymap_filename=None, raven_coinc=False,
combined_skymap_filename=None):
"""Wrapper for send function when caller has not already downloaded the
skymap.
@@ -224,11 +231,23 @@ def download_skymap_and_send_alert(classification, superevent, alert_type,
"""
if combined_skymap_filename:
download_combined_canvas = (
gracedb.download.si(
combined_skymap_filename,
superevent['em_type']
)
)
else:
download_combined_canvas = identity.si()
if skymap_filename is not None and alert_type != 'retraction':
canvas = (
gracedb.download.si(
skymap_filename,
superevent['superevent_id']
group(
gracedb.download.si(
skymap_filename,
superevent['superevent_id']),
download_combined_canvas
)
|
_create_skymap_classification_tuple.s(classification)
Loading