Skip to content
Snippets Groups Projects
Commit 03f0702a authored by Cody Messick's avatar Cody Messick
Browse files

avro_alerts.py: Replace tabs with 4 spaces

parent 5253231e
No related branches found
No related tags found
No related merge requests found
......@@ -12,86 +12,86 @@ parsed_schema = fastavro.schema.load_schema('~/userguide/_static/igwn.alerts.v1_
@gracedb.task(shared=False)
def _attempt_skymap_download(superevent_id)
try:
skymap = io.BytesIO()
with open(gracedb.download.s('bayestar.multiorder.fits',alert_dict['superevent_id']), 'rb') as f:
skymap.write(f.read())
return skymap.getvalue()
# FIXME WHat is the correct thing to do here
except Exception as e:
return None
try:
skymap = io.BytesIO()
with open(gracedb.download.s('bayestar.multiorder.fits',alert_dict['superevent_id']), 'rb') as f:
skymap.write(f.read())
return skymap.getvalue()
# FIXME WHat is the correct thing to do here
except Exception as e:
return None
def create_alert_dict(message):
# FIXME Update to extract information from igwn alert instead of voevent
alert_dict = {}
with open(message, 'rb') as f:
voevent = voeventparse(f)
params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')}
alert_dict['author'] = voevent.Who.Author.contactName
alert_dict['alert_type'] = params['AlertType'].upper()
# FIXME Should the time_created field be the same as the voevent?
alert_dict['time_created'] = voevent.Who.Date
alert_dict['superevent_id'] = params['GraceID']
alert_dict['is_public'] = True if params['OpenAlert'] else False
# FIXME Do we need to check for software injections?
alert_dict['is_injection'] = True if params['HardwareInj'] else False
alert_dict['urls'] = {'gracedb': params['EventPage']}
# FIXME Track this ourselves when we move the voevent generation out of gracedb
alert_dict['event_revision'] = params['Pkt_Ser_Num']
if alert_dict['alert_type'] == 'RETRACTION':
alert_dict['event'] = None
else:
alert_dict['event'] = {
'time': voevent.find('.//TimeInstant').getchildren()[0],
'far': params['FAR'],
'instruments': sorted(params['Instruments'].split(',')),
'group': params['Group'],
'pipeline': params['Pipeline'],
'search': params['Search'],
'classification': {
'BNS': params['BNS'],
'NSBH': params['NSBH'],
'BBH': params['BBH']
} if params['Group'] == 'CBC' else {},
'properties': {
'HasNS': params['HasNS'],
'HasRemnant': params['HasRemnant']
}
'skymap': _attempt_skymap_download(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None
}
try:
alert_dict['external_coinc'] = {
'gcn_notice_id': params['External_GCN_Notice_Id'],
'ivorn': params['External_Ivorn'],
'observatory': params['External_Observatory'],
'search': params['External_Search'],
'time_difference': params['Time_Difference'],
'time_coincidence_far': params['Time_Coincidence_FAR'] if 'Time_Coincidence_FAR' in params else None,
'time_sky_position_coincidence_far': params['Time_Sky_Position_Coincidence_FAR'] if 'Time_Sky_Position_Coincidence_FAR' in params else None
}
except KeyError:
alert_dict['external_coinc'] = None
return alert_dict
# FIXME Update to extract information from igwn alert instead of voevent
alert_dict = {}
with open(message, 'rb') as f:
voevent = voeventparse(f)
params = {elem.attrib['name']: elem.attrib['value'] for elem in voevent.iterfind('.//Param')}
alert_dict['author'] = voevent.Who.Author.contactName
alert_dict['alert_type'] = params['AlertType'].upper()
# FIXME Should the time_created field be the same as the voevent?
alert_dict['time_created'] = voevent.Who.Date
alert_dict['superevent_id'] = params['GraceID']
alert_dict['is_public'] = True if params['OpenAlert'] else False
# FIXME Do we need to check for software injections?
alert_dict['is_injection'] = True if params['HardwareInj'] else False
alert_dict['urls'] = {'gracedb': params['EventPage']}
# FIXME Track this ourselves when we move the voevent generation out of gracedb
alert_dict['event_revision'] = params['Pkt_Ser_Num']
if alert_dict['alert_type'] == 'RETRACTION':
alert_dict['event'] = None
else:
alert_dict['event'] = {
'time': voevent.find('.//TimeInstant').getchildren()[0],
'far': params['FAR'],
'instruments': sorted(params['Instruments'].split(',')),
'group': params['Group'],
'pipeline': params['Pipeline'],
'search': params['Search'],
'classification': {
'BNS': params['BNS'],
'NSBH': params['NSBH'],
'BBH': params['BBH']
} if params['Group'] == 'CBC' else {},
'properties': {
'HasNS': params['HasNS'],
'HasRemnant': params['HasRemnant']
}
'skymap': _attempt_skymap_download(alert_dict['superevent_id']) if alert_dict['AlertType'] != 'PRELIMINARY' and alert_dict['AlertType'] != 'RETRACTION' else None
}
try:
alert_dict['external_coinc'] = {
'gcn_notice_id': params['External_GCN_Notice_Id'],
'ivorn': params['External_Ivorn'],
'observatory': params['External_Observatory'],
'search': params['External_Search'],
'time_difference': params['Time_Difference'],
'time_coincidence_far': params['Time_Coincidence_FAR'] if 'Time_Coincidence_FAR' in params else None,
'time_sky_position_coincidence_far': params['Time_Sky_Position_Coincidence_FAR'] if 'Time_Sky_Position_Coincidence_FAR' in params else None
}
except KeyError:
alert_dict['external_coinc'] = None
return alert_dict
@app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0,
ignore_result=True, queue='voevent', retry_backoff=True,
retry_kwargs=dict(max_retries=10), shared=False)
def send(self, message):
"""Send an avro alert to a kafka broker
"""
"""Send an avro alert to a kafka broker
"""
alert_dict = create_alert_dict(message)
alert_dict = create_alert_dict(message)
message = io.BytesIO()
fastavro.writer(message, parsed_schema, alert_dict)
message = io.BytesIO()
fastavro.writer(message, parsed_schema, alert_dict)
# TODO Actually send to a kafka broker
# TODO Actually send to a kafka broker
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment