Skip to content
Snippets Groups Projects

Draft: Ingestion of SVOM ECLAIRS notices into LLAI workflow

Open Naresh Adhikari requested to merge naresh.adhikari/gwcelery:addsvom into main
Compare and Show latest version
1 file
+ 25
15
Compare changes
  • Side-by-side
  • Inline
@@ -85,11 +85,12 @@ def handle_snews_gcn(payload):
@@ -85,11 +85,12 @@ def handle_snews_gcn(payload):
gcn.NoticeType.INTEGRAL_REFINED,
gcn.NoticeType.INTEGRAL_REFINED,
gcn.NoticeType.INTEGRAL_OFFLINE,
gcn.NoticeType.INTEGRAL_OFFLINE,
gcn.NoticeType.AGILE_MCAL_ALERT,
gcn.NoticeType.AGILE_MCAL_ALERT,
 
# gcn.NoticeType.SVOM_ECLAIRS_ALERT,
queue='exttrig',
queue='exttrig',
shared=False)
shared=False)
def handle_grb_gcn(payload):
def handle_grb_gcn(payload):
"""Handles the payload from Fermi, Swift, INTEGRAL, and AGILE MCAL
"""Handles the payload from Fermi, Swift, INTEGRAL, AGILE MCAL,
GCN notices.
and SVOM ECLAIRS GCN notices.
Filters out candidates likely to be noise. Creates external events
Filters out candidates likely to be noise. Creates external events
from the notice if new notice, otherwise updates existing event. Then
from the notice if new notice, otherwise updates existing event. Then
@@ -101,6 +102,7 @@ def handle_grb_gcn(payload):
@@ -101,6 +102,7 @@ def handle_grb_gcn(payload):
Swift: https://gcn.gsfc.nasa.gov/swift.html
Swift: https://gcn.gsfc.nasa.gov/swift.html
INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html
INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html
AGILE-MCAL: https://gcn.gsfc.nasa.gov/agile_mcal.html
AGILE-MCAL: https://gcn.gsfc.nasa.gov/agile_mcal.html
 
# SVOM-ECLAIRS: https://gcn.gsfc.nasa.gov/svom.html
Parameters
Parameters
----------
----------
@@ -115,7 +117,8 @@ def handle_grb_gcn(payload):
@@ -115,7 +117,8 @@ def handle_grb_gcn(payload):
stream_obsv_dict = {'/SWIFT': 'Swift',
stream_obsv_dict = {'/SWIFT': 'Swift',
'/Fermi': 'Fermi',
'/Fermi': 'Fermi',
'/INTEGRAL': 'INTEGRAL',
'/INTEGRAL': 'INTEGRAL',
'/AGILE': 'AGILE'}
'/AGILE': 'AGILE',
 
'/fsc': 'SVOM'}
event_observatory = stream_obsv_dict[stream_path]
event_observatory = stream_obsv_dict[stream_path]
ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
@@ -216,6 +219,7 @@ def handle_grb_gcn(payload):
@@ -216,6 +219,7 @@ def handle_grb_gcn(payload):
'external_swift',
'external_swift',
'external_integral',
'external_integral',
'external_agile',
'external_agile',
 
'external_svom',
shared=False)
shared=False)
def handle_grb_igwn_alert(alert):
def handle_grb_igwn_alert(alert):
"""Parse an IGWN alert message related to superevents/GRB external triggers
"""Parse an IGWN alert message related to superevents/GRB external triggers
@@ -503,6 +507,46 @@ def handle_targeted_kafka_alert(alert):
@@ -503,6 +507,46 @@ def handle_targeted_kafka_alert(alert):
).delay()
).delay()
 
@alerts.handler('svom')
 
def handle_kafka_alert(alert):
 
"""
 
Parse an SVOM alert sent via Kafka from a MOU partner in our joint
 
search for SVOM ECLAIRS GRBs, using the probability of astrophysical origin
 
(pastro) to vet events.
 
 
Parameters
 
----------
 
alert : dict
 
Kafka alert packet containing SVOM event data.
 
"""
 
# Convert alert to VOEvent format
 
payload, pipeline, time, trig_id = _kafka_to_voevent(alert)
 
 
# Use 'pastro' value to veto events likely being noise
 
pastro = alert.get('p_astro', 0) # Handles if p_astro field is N/A.
 
pastro_threshold = 0.5 # Change later to conf. variable
 
 
# Veto events with 'pastro' value less than the threshold
 
label = 'NOT_GRB' if pastro < pastro_threshold else None
 
 
# Look whether a previous event with the same ID exists
 
query = (f'group: External pipeline: {pipeline} '
 
f'grbevent.trigger_id = "{trig_id}"')
 
 
# Chain tasks: query for events, then potentially create or replace
 
# an event and skymap
 
(
 
gracedb.get_events.si(query=query)
 
|
 
_create_replace_external_event_and_skymap.s(
 
payload, 'GRB', pipeline,
 
label=label, notice_date=time,
 
skymap=alert.get('healpix_file'),
 
use_radec=('ra' in alert and 'dec' in alert)
 
)
 
).delay()
 
 
def _skymaps_are_ready(event, label, task):
def _skymaps_are_ready(event, label, task):
"""Determine whether labels are complete to launch a certain task.
"""Determine whether labels are complete to launch a certain task.
@@ -717,7 +761,7 @@ def _create_replace_external_event_and_skymap(
@@ -717,7 +761,7 @@ def _create_replace_external_event_and_skymap(
def _kafka_to_voevent(alert):
def _kafka_to_voevent(alert):
"""Parse an alert sent via Kafka from a MOU partner in our joint
"""Parse an alert sent via Kafka from a MOU partner in our joint
subthreshold targeted search and convert to an equivalent XML string
subthreshold targeted search and convert to an equivalent XML string
GCN VOEvent.
GCN VOEvent. Also, include handling for SVOM/ECLAIRs mission alert.
Parameters
Parameters
----------
----------
@@ -728,14 +772,15 @@ def _kafka_to_voevent(alert):
@@ -728,14 +772,15 @@ def _kafka_to_voevent(alert):
-------
-------
payload : str
payload : str
XML GCN notice alert packet in string format
XML GCN notice alert packet in string format
payload, pipeline, time, trig_id
"""
"""
# Define basic values
# Define basic values
pipeline = alert['mission']
pipeline = alert['mission']
start_time = alert['trigger_time']
start_time = alert['trigger_time']
alert_time = alert['alert_datetime']
alert_time = alert['alert_datetime']
far = alert['far']
far = alert.get('far')
duration = alert['rate_duration']
snr = alert.get('image_snr') # change this to make optional
 
duration = alert['image_duration']
id = '_'.join(str(x) for x in alert['id'])
id = '_'.join(str(x) for x in alert['id'])
# Use central time since starting time is not well measured
# Use central time since starting time is not well measured
central_time = \
central_time = \
@@ -747,7 +792,7 @@ def _kafka_to_voevent(alert):
@@ -747,7 +792,7 @@ def _kafka_to_voevent(alert):
# sky localization may not be available
# sky localization may not be available
ra = alert.get('ra')
ra = alert.get('ra')
dec = alert.get('dec')
dec = alert.get('dec')
# Try to get dec first then ra, None if both misssing
# Try to get dec first then ra, None if both missing
error = alert.get('dec_uncertainty')
error = alert.get('dec_uncertainty')
if error is None:
if error is None:
error = alert.get('ra_uncertainty')
error = alert.get('ra_uncertainty')
@@ -758,21 +803,35 @@ def _kafka_to_voevent(alert):
@@ -758,21 +803,35 @@ def _kafka_to_voevent(alert):
if ra is None or dec is None or error is None:
if ra is None or dec is None or error is None:
ra, dec, error = 0., 0., 0.
ra, dec, error = 0., 0., 0.
# Load template
# Load template based on the mission
fname = str(Path(__file__).parent /
# FIXME: Need to adjust for targeted Fermi-Swift and SVOM/ECLAIRS GRBs
'../tests/data/{}_subgrbtargeted_template.xml'.format(
if pipeline.lower() in ['fermi', 'swift']:
pipeline.lower()))
fname = f'{pipeline.lower()}_subgrbtargeted_template.xml'
root = etree.parse(fname)
ivorn_suffix = 'targeted_subthreshold'
 
else:
 
fname = 'svom_grb_template.xml' # change the saved filename
 
ivorn_suffix = 'grb'
 
 
# Construct the full path to the template
 
template_path = Path(__file__).parent / f'../tests/data/{fname}'
 
if not template_path.is_file():
 
raise FileNotFoundError(f'Template file {fname} not found '
 
f'in ../tests/data')
 
root = etree.parse(str(template_path))
# Update template values
# Update template values
# Change ivorn to indicate this is a subthreshold targeted event
# Change ivorn to indicate this is subthreshold targeted event and SVOM grb
root.xpath('.')[0].attrib['ivorn'] = \
ivorn = \
'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format(
f'ivo://lvk.internal/{pipeline.lower()}#{ivorn_suffix}-{trigger_time}'
pipeline.lower(), trigger_time).encode()
 
root.xpath('.')[0].attrib['ivorn'] = ivorn
# Update ID
# Update ID
root.find("./What/Param[@name='TrigID']").attrib['value'] = \
if pipeline.lower() in ['fermi', 'swift']:
id.encode()
root.find("./What/Param[@name='TrigID']").attrib['value'] = str(id)
 
else:
 
root.find("./What/Group[@name='Svom_Identifiers']"
 
"/Param[@name='Burst_Id']").attrib['value'] = str(id)
# Change times to chosen time
# Change times to chosen time
root.find("./Who/Date").text = str(alert_time).encode()
root.find("./Who/Date").text = str(alert_time).encode()
@@ -780,11 +839,21 @@ def _kafka_to_voevent(alert):
@@ -780,11 +839,21 @@ def _kafka_to_voevent(alert):
"ObservationLocation/AstroCoords/Time/TimeInstant/"
"ObservationLocation/AstroCoords/Time/TimeInstant/"
"ISOTime")).text = str(trigger_time).encode()
"ISOTime")).text = str(trigger_time).encode()
root.find("./What/Param[@name='FAR']").attrib['value'] = \
if pipeline.lower() in ['fermi', 'swift']:
str(far).encode()
root.find("./What/Param[@name='FAR']").attrib['value'] = \
 
(str(far).encode())
 
else:
 
root.find("./Group[@name='Detection_Info']"
 
"/Param[@name='SNR']").attrib['value'] = \
 
(str(snr).encode())
root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
if pipeline.lower() in ['fermi', 'swift']:
str(duration).encode()
root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
 
str(duration).encode()
 
else:
 
root.find("./Group[@name='Detection_Info']"
 
"/Param[@name='Timescale']").attrib['value'] = \
 
(str(duration).encode())
# Sky position
# Sky position
root.find(("./WhereWhen/ObsDataLocation/"
root.find(("./WhereWhen/ObsDataLocation/"
Loading