diff --git a/gwcelery/tasks/avro.py b/gwcelery/tasks/avro.py index c29a27878ba6e1ff3411d63d580fd1f35278ebcd..b290ba28b99acd7460214da098d283c8c7cec85b 100644 --- a/gwcelery/tasks/avro.py +++ b/gwcelery/tasks/avro.py @@ -1,4 +1,5 @@ from ..import app +from ..import kafka from . import gracedb import fastavro @@ -11,11 +12,11 @@ from requests.exceptions import HTTPError parsed_schema = fastavro.schema.load_schema('~/userguide/_static/igwn.alerts.v1_0.Alert.avsc') @gracedb.task(shared=False) -def _attempt_skymap_download(superevent_id) +def _create_alert_payload(superevent_id) try: skymap = io.BytesIO() - with open(gracedb.download.s('bayestar.multiorder.fits',alert_dict['superevent_id']), 'rb') as f: - skymap.write(f.read()) + with open(gracedb.download.s('bayestar.multiorder.fits', superevent_id), 'rb') as payload: + skymap.write(payload.read()) return skymap.getvalue() # FIXME WHat is the correct thing to do here @@ -29,7 +30,7 @@ def create_alert_dict(message): alert_dict = {} with open(message, 'rb') as f: - voevent = voeventparse(f) + voevent = voeventparse.load(f) params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')} alert_dict['author'] = voevent.Who.Author.contactName @@ -37,9 +38,9 @@ def create_alert_dict(message): # FIXME Should the time_created field be the same as the voevent? alert_dict['time_created'] = voevent.Who.Date alert_dict['superevent_id'] = params['GraceID'] - alert_dict['is_public'] = True if params['OpenAlert'] else False + alert_dict['is_public'] = bool(params['OpenAlert']) # FIXME Do we need to check for software injections? - alert_dict['is_injection'] = True if params['HardwareInj'] else False + alert_dict['is_injection'] = bool(params['HardwareInj']) alert_dict['urls'] = {'gracedb': params['EventPage']} # FIXME Track this ourselves when we move the voevent generation out of gracedb alert_dict['event_revision'] = params['Pkt_Ser_Num'] @@ -63,7 +64,7 @@ def create_alert_dict(message): 'HasNS': params['HasNS'], 'HasRemnant': params['HasRemnant'] } - 'skymap': _attempt_skymap_download(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None + 'skymap': _create_alert_payload(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None } @@ -74,10 +75,12 @@ def create_alert_dict(message): 'observatory': params['External_Observatory'], 'search': params['External_Search'], 'time_difference': params['Time_Difference'], - 'time_coincidence_far': params['Time_Coincidence_FAR'] if 'Time_Coincidence_FAR' in params else None, - 'time_sky_position_coincidence_far': params['Time_Sky_Position_Coincidence_FAR'] if 'Time_Sky_Position_Coincidence_FAR' in params else None + 'time_coincidence_far': params.get(['Time_Coincidence_FAR']), + 'time_sky_position_coincidence_far': params.get(['Time_Sky_Position_Coincidence_FAR']) } except KeyError: + # FIXME Currently this logic will cause the task to silently ignore + # external coincs that are missing keys but otherwise are valid alert_dict['external_coinc'] = None return alert_dict @@ -91,7 +94,7 @@ def send(self, message): alert_dict = create_alert_dict(message) - message = io.BytesIO() - fastavro.writer(message, parsed_schema, alert_dict) + stream = io.BytesIO() + fastavro.writer(stream, parsed_schema, alert_dict) # TODO Actually send to a kafka broker