Skip to content
Snippets Groups Projects

Upload kafka alert notice after sending

Merged Cody Messick requested to merge cody.messick/gwcelery:upload_alert_dict into main
Files
2
+ 36
5
from base64 import b64encode
import json
from astropy import time
from celery import group
from celery.utils.log import get_logger
from hop.models import AvroBlob
from ..import app
from .core import identity, get_last
from . import gracedb
log = get_logger(__name__)
@@ -80,6 +83,22 @@ def _add_external_coinc_to_alert(alert_dict, superevent):
return alert_dict
@gracedb.task(shared=False, ignore_result=True)
def upload_alert_dict(alert_dict, skymap):
if skymap is not None:
alert_dict['event']['skymap'] = b64encode(skymap).decode('utf-8')
alert_dict_json = json.dumps(alert_dict)
filename = 'notice-dict-{}-{}.json'.format(
alert_dict['superevent_id'],
alert_dict['alert_type'].lower()
)
message = 'Dictionary used to generate Kafka alert notices (without ' + \
'skymap)'
gracedb.upload(alert_dict_json, filename, alert_dict['superevent_id'],
message)
@app.task(bind=True, ignore_result=True, queue='kafka', shared=False)
def _send(self, alert_dict, skymap, brokerhost):
"""Write the alert to the Kafka topic"""
@@ -140,14 +159,26 @@ def send(self, skymap_and_classification, superevent, alert_type,
_add_external_coinc_to_alert.s(alert_dict, superevent)
|
group(
_send.s(skymap, brokerhost) for brokerhost in
self.app.conf['kafka_streams'].keys()
*(
_send.s(skymap, brokerhost) for brokerhost in
self.app.conf['kafka_streams'].keys()
),
identity.s()
)
|
get_last.s()
|
upload_alert_dict.s(skymap)
)
else:
canvas = group(
_send.s(alert_dict, skymap, brokerhost) for brokerhost in
self.app.conf['kafka_streams'].keys()
canvas = (
group(
_send.s(alert_dict, skymap, brokerhost) for brokerhost in
self.app.conf['kafka_streams'].keys()
)
|
upload_alert_dict.si(alert_dict, skymap)
)
canvas.apply_async()
Loading