Skip to content
Snippets Groups Projects

Reenable combined sky maps now using multi-ordering

Compare and
10 files
+ 493
173
Compare changes
  • Side-by-side
  • Inline
Files
10
+ 39
16
@@ -11,8 +11,7 @@ from . import gracedb
log = get_logger(__name__)
def _create_base_alert_dict(classification, superevent, alert_type,
raven_coinc=False):
def _create_base_alert_dict(classification, superevent, alert_type):
'''Create the base of the alert dictionary, with all contents except the
skymap and the external coinc information.'''
# NOTE Everything that comes through this code path will be marked as
@@ -59,9 +58,14 @@ def _create_base_alert_dict(classification, superevent, alert_type,
@gracedb.task(shared=False)
def _add_external_coinc_to_alert(alert_dict, superevent):
def _add_external_coinc_to_alert(alert_dict, superevent,
combined_skymap_filename):
external_event = gracedb.get_event(superevent['em_type'])
if combined_skymap_filename:
combined_skymap = gracedb.download(combined_skymap_filename,
superevent['em_type'])
else:
combined_skymap = None
alert_dict['external_coinc'] = {
'gcn_notice_id':
external_event['extra_attributes']['GRB']['trigger_id'],
@@ -74,7 +78,7 @@ def _add_external_coinc_to_alert(alert_dict, superevent):
'time_sky_position_coincidence_far': superevent['space_coinc_far']
}
return alert_dict
return alert_dict, combined_skymap
@app.task(bind=True, shared=False, queue='kafka', ignore_result=True)
@@ -101,7 +105,7 @@ def _upload_notice(self, payload, brokerhost, superevent_id):
@app.task(bind=True, queue='kafka', shared=False)
def _send(self, alert_dict, skymap, brokerhost):
def _send(self, alert_dict, skymap, brokerhost, combined_skymap=None):
"""Write the alert to the Kafka topic"""
# Copy the alert dictionary so we dont modify the original
payload_dict = alert_dict.copy()
@@ -116,6 +120,10 @@ def _send(self, alert_dict, skymap, brokerhost):
encoder = config['skymap_encoder']
payload_dict['event']['skymap'] = encoder(skymap)
if combined_skymap:
payload_dict['external_coinc']['combined_skymap'] = \
encoder(combined_skymap)
# Write to kafka topic
serialization_model = \
self.app.conf['kafka_streams'][brokerhost].serialization_model
@@ -129,9 +137,16 @@ def _send(self, alert_dict, skymap, brokerhost):
return payload
@app.task(bind=True, queue='kafka', shared=False)
def _send_with_combined(self, alert_dict_combined_skymap, skymap, brokerhost):
alert_dict, combined_skymap = alert_dict_combined_skymap
return _send(alert_dict, skymap, brokerhost,
combined_skymap=combined_skymap)
@app.task(bind=True, ignore_result=True, queue='kafka', shared=False)
def send(self, skymap_and_classification, superevent, alert_type,
raven_coinc=False):
raven_coinc=False, combined_skymap_filename=None):
"""Send an public alert to all currently connected kafka brokers.
Parameters
@@ -151,7 +166,8 @@ def send(self, skymap_and_classification, superevent, alert_type,
The alert type.
raven_coinc: bool
Is there a coincident external event processed by RAVEN?
combined_skymap_filename : str
Combined skymap filename. Default None.
"""
if skymap_and_classification is not None:
@@ -163,17 +179,20 @@ def send(self, skymap_and_classification, superevent, alert_type,
alert_dict = _create_base_alert_dict(
classification,
superevent,
alert_type,
raven_coinc=raven_coinc
alert_type
)
if raven_coinc and alert_type != 'retraction':
canvas = (
_add_external_coinc_to_alert.s(alert_dict, superevent)
_add_external_coinc_to_alert.si(
alert_dict,
superevent,
combined_skymap_filename
)
|
group(
(
_send.s(skymap, brokerhost)
_send_with_combined.s(skymap, brokerhost)
|
_upload_notice.s(brokerhost, superevent['superevent_id'])
) for brokerhost in self.app.conf['kafka_streams'].keys()
@@ -200,7 +219,8 @@ def _create_skymap_classification_tuple(skymap, classification):
@app.task(shared=False, ignore_result=True)
def download_skymap_and_send_alert(classification, superevent, alert_type,
skymap_filename=None, raven_coinc=False):
skymap_filename=None, raven_coinc=False,
combined_skymap_filename=None):
"""Wrapper for send function when caller has not already downloaded the
skymap.
@@ -221,7 +241,8 @@ def download_skymap_and_send_alert(classification, superevent, alert_type,
The skymap filename.
raven_coinc: bool
Is there a coincident external event processed by RAVEN?
combined_skymap_filename : str
The combined skymap filename. Default None
"""
if skymap_filename is not None and alert_type != 'retraction':
@@ -233,14 +254,16 @@ def download_skymap_and_send_alert(classification, superevent, alert_type,
|
_create_skymap_classification_tuple.s(classification)
|
send.s(superevent, alert_type, raven_coinc=raven_coinc)
send.s(superevent, alert_type, raven_coinc=raven_coinc,
combined_skymap_filename=combined_skymap_filename)
)
else:
canvas = send.s(
(None, classification),
superevent,
alert_type,
raven_coinc=raven_coinc
raven_coinc=raven_coinc,
combined_skymap_filename=combined_skymap_filename
)
canvas.apply_async()
Loading