diff --git a/gwcelery/tasks/avro_alerts.py b/gwcelery/tasks/avro_alerts.py index 60a2a9f1e7a86297c0890d97023b49b62eb9f141..c29a27878ba6e1ff3411d63d580fd1f35278ebcd 100644 --- a/gwcelery/tasks/avro_alerts.py +++ b/gwcelery/tasks/avro_alerts.py @@ -12,86 +12,86 @@ parsed_schema = fastavro.schema.load_schema('~/userguide/_static/igwn.alerts.v1_ @gracedb.task(shared=False) def _attempt_skymap_download(superevent_id) - try: - skymap = io.BytesIO() - with open(gracedb.download.s('bayestar.multiorder.fits',alert_dict['superevent_id']), 'rb') as f: - skymap.write(f.read()) - return skymap.getvalue() - - # FIXME WHat is the correct thing to do here - except Exception as e: - return None + try: + skymap = io.BytesIO() + with open(gracedb.download.s('bayestar.multiorder.fits',alert_dict['superevent_id']), 'rb') as f: + skymap.write(f.read()) + return skymap.getvalue() + + # FIXME WHat is the correct thing to do here + except Exception as e: + return None def create_alert_dict(message): - # FIXME Update to extract information from igwn alert instead of voevent - - alert_dict = {} - - with open(message, 'rb') as f: - voevent = voeventparse(f) - - params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')} - alert_dict['author'] = voevent.Who.Author.contactName - alert_dict['alert_type'] = params['AlertType'].upper() - # FIXME Should the time_created field be the same as the voevent? - alert_dict['time_created'] = voevent.Who.Date - alert_dict['superevent_id'] = params['GraceID'] - alert_dict['is_public'] = True if params['OpenAlert'] else False - # FIXME Do we need to check for software injections? - alert_dict['is_injection'] = True if params['HardwareInj'] else False - alert_dict['urls'] = {'gracedb': params['EventPage']} - # FIXME Track this ourselves when we move the voevent generation out of gracedb - alert_dict['event_revision'] = params['Pkt_Ser_Num'] - - if alert_dict['alert_type'] == 'RETRACTION': - alert_dict['event'] = None - else: - alert_dict['event'] = { - 'time': voevent.find('.//TimeInstant').getchildren()[0], - 'far': params['FAR'], - 'instruments': sorted(params['Instruments'].split(',')), - 'group': params['Group'], - 'pipeline': params['Pipeline'], - 'search': params['Search'], - 'classification': { - 'BNS': params['BNS'], - 'NSBH': params['NSBH'], - 'BBH': params['BBH'] - } if params['Group'] == 'CBC' else {}, - 'properties': { - 'HasNS': params['HasNS'], - 'HasRemnant': params['HasRemnant'] - } - 'skymap': _attempt_skymap_download(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None - } - - - try: - alert_dict['external_coinc'] = { - 'gcn_notice_id': params['External_GCN_Notice_Id'], - 'ivorn': params['External_Ivorn'], - 'observatory': params['External_Observatory'], - 'search': params['External_Search'], - 'time_difference': params['Time_Difference'], - 'time_coincidence_far': params['Time_Coincidence_FAR'] if 'Time_Coincidence_FAR' in params else None, - 'time_sky_position_coincidence_far': params['Time_Sky_Position_Coincidence_FAR'] if 'Time_Sky_Position_Coincidence_FAR' in params else None - } - except KeyError: - alert_dict['external_coinc'] = None - - return alert_dict + # FIXME Update to extract information from igwn alert instead of voevent + + alert_dict = {} + + with open(message, 'rb') as f: + voevent = voeventparse(f) + + params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')} + alert_dict['author'] = voevent.Who.Author.contactName + alert_dict['alert_type'] = params['AlertType'].upper() + # FIXME Should the time_created field be the same as the voevent? + alert_dict['time_created'] = voevent.Who.Date + alert_dict['superevent_id'] = params['GraceID'] + alert_dict['is_public'] = True if params['OpenAlert'] else False + # FIXME Do we need to check for software injections? + alert_dict['is_injection'] = True if params['HardwareInj'] else False + alert_dict['urls'] = {'gracedb': params['EventPage']} + # FIXME Track this ourselves when we move the voevent generation out of gracedb + alert_dict['event_revision'] = params['Pkt_Ser_Num'] + + if alert_dict['alert_type'] == 'RETRACTION': + alert_dict['event'] = None + else: + alert_dict['event'] = { + 'time': voevent.find('.//TimeInstant').getchildren()[0], + 'far': params['FAR'], + 'instruments': sorted(params['Instruments'].split(',')), + 'group': params['Group'], + 'pipeline': params['Pipeline'], + 'search': params['Search'], + 'classification': { + 'BNS': params['BNS'], + 'NSBH': params['NSBH'], + 'BBH': params['BBH'] + } if params['Group'] == 'CBC' else {}, + 'properties': { + 'HasNS': params['HasNS'], + 'HasRemnant': params['HasRemnant'] + } + 'skymap': _attempt_skymap_download(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None + } + + + try: + alert_dict['external_coinc'] = { + 'gcn_notice_id': params['External_GCN_Notice_Id'], + 'ivorn': params['External_Ivorn'], + 'observatory': params['External_Observatory'], + 'search': params['External_Search'], + 'time_difference': params['Time_Difference'], + 'time_coincidence_far': params['Time_Coincidence_FAR'] if 'Time_Coincidence_FAR' in params else None, + 'time_sky_position_coincidence_far': params['Time_Sky_Position_Coincidence_FAR'] if 'Time_Sky_Position_Coincidence_FAR' in params else None + } + except KeyError: + alert_dict['external_coinc'] = None + + return alert_dict @app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0, ignore_result=True, queue='voevent', retry_backoff=True, retry_kwargs=dict(max_retries=10), shared=False) def send(self, message): - """Send an avro alert to a kafka broker - """ + """Send an avro alert to a kafka broker + """ - alert_dict = create_alert_dict(message) + alert_dict = create_alert_dict(message) - message = io.BytesIO() - fastavro.writer(message, parsed_schema, alert_dict) + message = io.BytesIO() + fastavro.writer(message, parsed_schema, alert_dict) - # TODO Actually send to a kafka broker + # TODO Actually send to a kafka broker