diff --git a/gwcelery/tasks/avro.py b/gwcelery/tasks/avro.py index 745db6765cfcf0c3c8ec165e900346d8b89abc13..d068bdf9c2bdc84d7061595c211b239bac2b8a71 100644 --- a/gwcelery/tasks/avro.py +++ b/gwcelery/tasks/avro.py @@ -5,24 +5,15 @@ import fastavro import io import voeventparse -from requests.exceptions import HTTPError - -from kombu import Connection, Exchange, Producer - # FIXME Figure out where this will live -parsed_schema = fastavro.schema.load_schema('/home/cmessick/userguide/_static/igwn.alerts.v1_0.Alert.avsc') +parsed_schema = fastavro.schema.load_schema('/home/emfollow-test/.avro_avsc_files_test_deleteme/igwn.alerts.v1_0.Alert.avsc') @gracedb.task(shared=False) def _create_alert_payload(superevent_id): - try: - skymap = io.BytesIO() - with open(gracedb.download.s('bayestar.multiorder.fits', superevent_id), 'rb') as payload: - skymap.write(payload.read()) - return skymap.getvalue() - - # FIXME WHat is the correct thing to do here - except Exception as e: - return None + skymap_bytes = gracedb.download.s('bayestar.multiorder.fits', superevent_id)() + # FIXME Add check to ensure skymap downloaded correctly + return skymap_bytes + def create_alert_dict(message): @@ -30,42 +21,50 @@ def create_alert_dict(message): alert_dict = {} - with open(message, 'rb') as f: - voevent = voeventparse.load(f) + voevent = voeventparse.voevent.loads(message) params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')} - alert_dict['author'] = voevent.Who.Author.contactName + # the voevent object returns lxml.objectify.StringElement objects for + # fields that contain strings, need to convert this to a string for the + # avro writer + alert_dict['author'] = str(voevent.Who.Author.contactName) alert_dict['alert_type'] = params['AlertType'].upper() # FIXME Should the time_created field be the same as the voevent? - alert_dict['time_created'] = voevent.Who.Date + alert_dict['time_created'] = str(voevent.Who.Date) alert_dict['superevent_id'] = params['GraceID'] alert_dict['is_public'] = bool(params['OpenAlert']) # FIXME Do we need to check for software injections? alert_dict['is_injection'] = bool(params['HardwareInj']) alert_dict['urls'] = {'gracedb': params['EventPage']} # FIXME Track this ourselves when we move the voevent generation out of gracedb - alert_dict['event_revision'] = params['Pkt_Ser_Num'] + alert_dict['event_revision'] = int(params['Pkt_Ser_Num']) if alert_dict['alert_type'] == 'RETRACTION': alert_dict['event'] = None else: + # FIXME Is there a better way to check for a skymap without querying + # gracedb? + if 'skymap_fits' in params: + skymap = _create_alert_payload(alert_dict['superevent_id']) + else: + skymap = None alert_dict['event'] = { - 'time': voevent.find('.//TimeInstant').getchildren()[0], - 'far': params['FAR'], + 'time': str(voevent.find('.//TimeInstant').getchildren()[0]), + 'far': float(params['FAR']), 'instruments': sorted(params['Instruments'].split(',')), 'group': params['Group'], 'pipeline': params['Pipeline'], 'search': params['Search'], 'classification': { - 'BNS': params['BNS'], - 'NSBH': params['NSBH'], - 'BBH': params['BBH'] + 'BNS': float(params['BNS']), + 'NSBH': float(params['NSBH']), + 'BBH': float(params['BBH']) } if params['Group'] == 'CBC' else {}, 'properties': { - 'HasNS': params['HasNS'], - 'HasRemnant': params['HasRemnant'] + 'HasNS': float(params['HasNS']), + 'HasRemnant': float(params['HasRemnant']) }, - 'skymap': _create_alert_payload(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None + 'skymap': skymap } @@ -87,7 +86,7 @@ def create_alert_dict(message): return alert_dict @app.task(bind=True, default_retry_delay=20.0, - ignore_result=True, retry_backoff=True, + ignore_result=True, queue='kafka', retry_backoff=True, retry_kwargs=dict(max_retries=10), shared=False) def send(self, message): """Send an avro alert to a kafka broker @@ -98,8 +97,8 @@ def send(self, message): # Write avro packet to memory avro_stream = io.BytesIO() - fastavro.writer(avro_stream, parsed_schema, alert_dict) + fastavro.writer(avro_stream, parsed_schema, [alert_dict]) - self.conf['hop_stream'].write({'message': avro_stream.getvalue()}) + self.app.conf['hop_stream'].write(avro_stream.getvalue()) avro_stream.close() diff --git a/gwcelery/tasks/orchestrator.py b/gwcelery/tasks/orchestrator.py index f749c83536bde6dc8144751daafa742a1c912d45..36ce321be11da7fc3360fa6ad6c172f7b874ea3e 100644 --- a/gwcelery/tasks/orchestrator.py +++ b/gwcelery/tasks/orchestrator.py @@ -845,7 +845,10 @@ def retraction_alert(superevent_id): group( gracedb.download.s(superevent_id) | - gcn.send.s(), + group( + gcn.send.s(), + avro.send.s() + ), circulars.create_retraction_circular.si(superevent_id) |