diff --git a/gwcelery/tasks/avro.py b/gwcelery/tasks/avro.py new file mode 100644 index 0000000000000000000000000000000000000000..745db6765cfcf0c3c8ec165e900346d8b89abc13 --- /dev/null +++ b/gwcelery/tasks/avro.py @@ -0,0 +1,105 @@ +from ..import app +from . import gracedb + +import fastavro +import io +import voeventparse + +from requests.exceptions import HTTPError + +from kombu import Connection, Exchange, Producer + +# FIXME Figure out where this will live +parsed_schema = fastavro.schema.load_schema('/home/cmessick/userguide/_static/igwn.alerts.v1_0.Alert.avsc') + +@gracedb.task(shared=False) +def _create_alert_payload(superevent_id): + try: + skymap = io.BytesIO() + with open(gracedb.download.s('bayestar.multiorder.fits', superevent_id), 'rb') as payload: + skymap.write(payload.read()) + return skymap.getvalue() + + # FIXME WHat is the correct thing to do here + except Exception as e: + return None + + +def create_alert_dict(message): + # FIXME Update to extract information from igwn alert instead of voevent + + alert_dict = {} + + with open(message, 'rb') as f: + voevent = voeventparse.load(f) + + params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')} + alert_dict['author'] = voevent.Who.Author.contactName + alert_dict['alert_type'] = params['AlertType'].upper() + # FIXME Should the time_created field be the same as the voevent? + alert_dict['time_created'] = voevent.Who.Date + alert_dict['superevent_id'] = params['GraceID'] + alert_dict['is_public'] = bool(params['OpenAlert']) + # FIXME Do we need to check for software injections? + alert_dict['is_injection'] = bool(params['HardwareInj']) + alert_dict['urls'] = {'gracedb': params['EventPage']} + # FIXME Track this ourselves when we move the voevent generation out of gracedb + alert_dict['event_revision'] = params['Pkt_Ser_Num'] + + if alert_dict['alert_type'] == 'RETRACTION': + alert_dict['event'] = None + else: + alert_dict['event'] = { + 'time': voevent.find('.//TimeInstant').getchildren()[0], + 'far': params['FAR'], + 'instruments': sorted(params['Instruments'].split(',')), + 'group': params['Group'], + 'pipeline': params['Pipeline'], + 'search': params['Search'], + 'classification': { + 'BNS': params['BNS'], + 'NSBH': params['NSBH'], + 'BBH': params['BBH'] + } if params['Group'] == 'CBC' else {}, + 'properties': { + 'HasNS': params['HasNS'], + 'HasRemnant': params['HasRemnant'] + }, + 'skymap': _create_alert_payload(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None + } + + + try: + alert_dict['external_coinc'] = { + 'gcn_notice_id': params['External_GCN_Notice_Id'], + 'ivorn': params['External_Ivorn'], + 'observatory': params['External_Observatory'], + 'search': params['External_Search'], + 'time_difference': params['Time_Difference'], + 'time_coincidence_far': params.get(['Time_Coincidence_FAR']), + 'time_sky_position_coincidence_far': params.get(['Time_Sky_Position_Coincidence_FAR']) + } + except KeyError: + # FIXME Currently this logic will cause the task to silently ignore + # external coincs that are missing keys but otherwise are valid + alert_dict['external_coinc'] = None + + return alert_dict + +@app.task(bind=True, default_retry_delay=20.0, + ignore_result=True, retry_backoff=True, + retry_kwargs=dict(max_retries=10), shared=False) +def send(self, message): + """Send an avro alert to a kafka broker + """ + + # Create dictionary following avro schema + alert_dict = create_alert_dict(message) + + # Write avro packet to memory + avro_stream = io.BytesIO() + fastavro.writer(avro_stream, parsed_schema, alert_dict) + + self.conf['hop_stream'].write({'message': avro_stream.getvalue()}) + + avro_stream.close() diff --git a/gwcelery/tasks/orchestrator.py b/gwcelery/tasks/orchestrator.py index 94b4e25f8d2da5124cb3ab2785499fd57f431d92..f749c83536bde6dc8144751daafa742a1c912d45 100644 --- a/gwcelery/tasks/orchestrator.py +++ b/gwcelery/tasks/orchestrator.py @@ -10,6 +10,7 @@ import re from celery import group from ..import app +from . import avro from . import bayestar from . import circulars from .core import identity, get_first @@ -753,12 +754,16 @@ def preliminary_initial_update_alert(filenames, superevent_id, alert_type, group( gracedb.download.s(superevent_id) | - gcn.send.s() - | - ( - gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id) - if alert_type in {'earlywarning', 'preliminary'} - else identity.si() + group( + gcn.send.s() + | + ( + gracedb.create_label.si('GCN_PRELIM_SENT', superevent_id) + if alert_type in {'earlywarning', 'preliminary'} + else identity.si() + ), + # FIXME Create label like GCN_PRELIM_SENT on gracedb that can be set here + avro.send.s() ), circular_canvas,