Skip to content
Snippets Groups Projects
Commit 627424b8 authored by Cody Messick's avatar Cody Messick
Browse files

Commit avro task, add placeholder function call to orchestrator

parent da45c147
No related branches found
No related tags found
No related merge requests found
from ..import app
from . import gracedb
import fastavro
import io
import voeventparse
from requests.exceptions import HTTPError
from kombu import Connection, Exchange, Producer
# FIXME Figure out where this will live
parsed_schema = fastavro.schema.load_schema('/home/cmessick/userguide/_static/igwn.alerts.v1_0.Alert.avsc')
@gracedb.task(shared=False)
def _create_alert_payload(superevent_id):
try:
skymap = io.BytesIO()
with open(gracedb.download.s('bayestar.multiorder.fits', superevent_id), 'rb') as payload:
skymap.write(payload.read())
return skymap.getvalue()
# FIXME WHat is the correct thing to do here
except Exception as e:
return None
def create_alert_dict(message):
# FIXME Update to extract information from igwn alert instead of voevent
alert_dict = {}
with open(message, 'rb') as f:
voevent = voeventparse.load(f)
params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')}
alert_dict['author'] = voevent.Who.Author.contactName
alert_dict['alert_type'] = params['AlertType'].upper()
# FIXME Should the time_created field be the same as the voevent?
alert_dict['time_created'] = voevent.Who.Date
alert_dict['superevent_id'] = params['GraceID']
alert_dict['is_public'] = bool(params['OpenAlert'])
# FIXME Do we need to check for software injections?
alert_dict['is_injection'] = bool(params['HardwareInj'])
alert_dict['urls'] = {'gracedb': params['EventPage']}
# FIXME Track this ourselves when we move the voevent generation out of gracedb
alert_dict['event_revision'] = params['Pkt_Ser_Num']
if alert_dict['alert_type'] == 'RETRACTION':
alert_dict['event'] = None
else:
alert_dict['event'] = {
'time': voevent.find('.//TimeInstant').getchildren()[0],
'far': params['FAR'],
'instruments': sorted(params['Instruments'].split(',')),
'group': params['Group'],
'pipeline': params['Pipeline'],
'search': params['Search'],
'classification': {
'BNS': params['BNS'],
'NSBH': params['NSBH'],
'BBH': params['BBH']
} if params['Group'] == 'CBC' else {},
'properties': {
'HasNS': params['HasNS'],
'HasRemnant': params['HasRemnant']
},
'skymap': _create_alert_payload(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None
}
try:
alert_dict['external_coinc'] = {
'gcn_notice_id': params['External_GCN_Notice_Id'],
'ivorn': params['External_Ivorn'],
'observatory': params['External_Observatory'],
'search': params['External_Search'],
'time_difference': params['Time_Difference'],
'time_coincidence_far': params.get(['Time_Coincidence_FAR']),
'time_sky_position_coincidence_far': params.get(['Time_Sky_Position_Coincidence_FAR'])
}
except KeyError:
# FIXME Currently this logic will cause the task to silently ignore
# external coincs that are missing keys but otherwise are valid
alert_dict['external_coinc'] = None
return alert_dict
@app.task(bind=True, default_retry_delay=20.0,
ignore_result=True, retry_backoff=True,
retry_kwargs=dict(max_retries=10), shared=False)
def send(self, message):
"""Send an avro alert to a kafka broker
"""
# Create dictionary following avro schema
alert_dict = create_alert_dict(message)
# Write avro packet to memory
avro_stream = io.BytesIO()
fastavro.writer(avro_stream, parsed_schema, alert_dict)
self.conf['hop_stream'].write({'message': avro_stream.getvalue()})
avro_stream.close()
......@@ -10,6 +10,7 @@ import re
from celery import group
from ..import app
from . import avro
from . import bayestar
from . import circulars
from .core import identity, get_first
......@@ -753,12 +754,16 @@ def preliminary_initial_update_alert(filenames, superevent_id, alert_type,
group(
gracedb.download.s(superevent_id)
|
gcn.send.s()
|
(
gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id)
if alert_type in {'earlywarning', 'preliminary'}
else identity.si()
group(
gcn.send.s()
|
(
gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id)
if alert_type in {'earlywarning', 'preliminary'}
else identity.si()
),
# FIXME Create label like GCN_PRELIM_SENT on gracedb that can be set here
avro.send.s()
),
circular_canvas,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment