Skip to content
Snippets Groups Projects

Draft: Ingestion of SVOM ECLAIRS notices into LLAI workflow

Open Naresh Adhikari requested to merge naresh.adhikari/gwcelery:addsvom into main
Compare and Show latest version
1 file
+ 7
6
Compare changes
  • Side-by-side
  • Inline
@@ -85,11 +85,12 @@ def handle_snews_gcn(payload):
gcn.NoticeType.INTEGRAL_REFINED,
gcn.NoticeType.INTEGRAL_OFFLINE,
gcn.NoticeType.AGILE_MCAL_ALERT,
# gcn.NoticeType.SVOM_ECLAIRS_ALERT,
queue='exttrig',
shared=False)
def handle_grb_gcn(payload):
"""Handles the payload from Fermi, Swift, INTEGRAL, and AGILE MCAL
GCN notices.
"""Handles the payload from Fermi, Swift, INTEGRAL, AGILE MCAL,
and SVOM ECLAIRS GCN notices.
Filters out candidates likely to be noise. Creates external events
from the notice if new notice, otherwise updates existing event. Then
@@ -101,6 +102,7 @@ def handle_grb_gcn(payload):
Swift: https://gcn.gsfc.nasa.gov/swift.html
INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html
AGILE-MCAL: https://gcn.gsfc.nasa.gov/agile_mcal.html
# SVOM-ECLAIRS: https://gcn.gsfc.nasa.gov/svom.html
Parameters
----------
@@ -115,7 +117,8 @@ def handle_grb_gcn(payload):
stream_obsv_dict = {'/SWIFT': 'Swift',
'/Fermi': 'Fermi',
'/INTEGRAL': 'INTEGRAL',
'/AGILE': 'AGILE'}
'/AGILE': 'AGILE',
'/fsc': 'SVOM'}
event_observatory = stream_obsv_dict[stream_path]
ext_group = 'Test' if root.attrib['role'] == 'test' else 'External'
@@ -216,6 +219,7 @@ def handle_grb_gcn(payload):
'external_swift',
'external_integral',
'external_agile',
'external_svom',
shared=False)
def handle_grb_igwn_alert(alert):
"""Parse an IGWN alert message related to superevents/GRB external triggers
@@ -503,6 +507,46 @@ def handle_targeted_kafka_alert(alert):
).delay()
@alerts.handler('svom')
def handle_kafka_alert(alert):
"""
Parse an SVOM alert sent via Kafka from a MOU partner in our joint
search for SVOM ECLAIRS GRBs, using the probability of astrophysical origin
(pastro) to vet events.
Parameters
----------
alert : dict
Kafka alert packet containing SVOM event data.
"""
# Convert alert to VOEvent format
payload, pipeline, time, trig_id = _kafka_to_voevent(alert)
# Use 'pastro' value to veto events likely being noise
pastro = alert.get('p_astro', 0) # Handles if p_astro field is N/A.
pastro_threshold = 0.5 # Change later to conf. variable
# Veto events with 'pastro' value less than the threshold
label = 'NOT_GRB' if pastro < pastro_threshold else None
# Look whether a previous event with the same ID exists
query = (f'group: External pipeline: {pipeline} '
f'grbevent.trigger_id = "{trig_id}"')
# Chain tasks: query for events, then potentially create or replace
# an event and skymap
(
gracedb.get_events.si(query=query)
|
_create_replace_external_event_and_skymap.s(
payload, 'GRB', pipeline,
label=label, notice_date=time,
skymap=alert.get('healpix_file'),
use_radec=('ra' in alert and 'dec' in alert)
)
).delay()
def _skymaps_are_ready(event, label, task):
"""Determine whether labels are complete to launch a certain task.
@@ -717,7 +761,7 @@ def _create_replace_external_event_and_skymap(
def _kafka_to_voevent(alert):
"""Parse an alert sent via Kafka from a MOU partner in our joint
subthreshold targeted search and convert to an equivalent XML string
GCN VOEvent.
GCN VOEvent. Also, include handling for SVOM/ECLAIRs mission alert.
Parameters
----------
@@ -728,14 +772,15 @@ def _kafka_to_voevent(alert):
-------
payload : str
XML GCN notice alert packet in string format
payload, pipeline, time, trig_id
"""
# Define basic values
pipeline = alert['mission']
start_time = alert['trigger_time']
alert_time = alert['alert_datetime']
far = alert['far']
duration = alert['rate_duration']
far = alert.get('far')
snr = alert.get('image_snr') # change this to make optional
duration = alert['image_duration']
id = '_'.join(str(x) for x in alert['id'])
# Use central time since starting time is not well measured
central_time = \
@@ -747,7 +792,7 @@ def _kafka_to_voevent(alert):
# sky localization may not be available
ra = alert.get('ra')
dec = alert.get('dec')
# Try to get dec first then ra, None if both misssing
# Try to get dec first then ra, None if both missing
error = alert.get('dec_uncertainty')
if error is None:
error = alert.get('ra_uncertainty')
@@ -758,21 +803,35 @@ def _kafka_to_voevent(alert):
if ra is None or dec is None or error is None:
ra, dec, error = 0., 0., 0.
# Load template
fname = str(Path(__file__).parent /
'../tests/data/{}_subgrbtargeted_template.xml'.format(
pipeline.lower()))
root = etree.parse(fname)
# Load template based on the mission
# FIXME: Need to adjust for targeted Fermi-Swift and SVOM/ECLAIRS GRBs
if pipeline.lower() in ['fermi', 'swift']:
fname = f'{pipeline.lower()}_subgrbtargeted_template.xml'
ivorn_suffix = 'targeted_subthreshold'
else:
fname = 'svom_grb_template.xml' # change the saved filename
ivorn_suffix = 'grb'
# Construct the full path to the template
template_path = Path(__file__).parent / f'../tests/data/{fname}'
if not template_path.is_file():
raise FileNotFoundError(f'Template file {fname} not found '
f'in ../tests/data')
root = etree.parse(str(template_path))
# Update template values
# Change ivorn to indicate this is a subthreshold targeted event
root.xpath('.')[0].attrib['ivorn'] = \
'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format(
pipeline.lower(), trigger_time).encode()
# Change ivorn to indicate this is subthreshold targeted event and SVOM grb
ivorn = \
f'ivo://lvk.internal/{pipeline.lower()}#{ivorn_suffix}-{trigger_time}'
root.xpath('.')[0].attrib['ivorn'] = ivorn
# Update ID
root.find("./What/Param[@name='TrigID']").attrib['value'] = \
id.encode()
if pipeline.lower() in ['fermi', 'swift']:
root.find("./What/Param[@name='TrigID']").attrib['value'] = str(id)
else:
root.find("./What/Group[@name='Svom_Identifiers']"
"/Param[@name='Burst_Id']").attrib['value'] = str(id)
# Change times to chosen time
root.find("./Who/Date").text = str(alert_time).encode()
@@ -780,11 +839,21 @@ def _kafka_to_voevent(alert):
"ObservationLocation/AstroCoords/Time/TimeInstant/"
"ISOTime")).text = str(trigger_time).encode()
root.find("./What/Param[@name='FAR']").attrib['value'] = \
str(far).encode()
if pipeline.lower() in ['fermi', 'swift']:
root.find("./What/Param[@name='FAR']").attrib['value'] = \
(str(far).encode())
else:
root.find("./Group[@name='Detection_Info']"
"/Param[@name='SNR']").attrib['value'] = \
(str(snr).encode())
root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
str(duration).encode()
if pipeline.lower() in ['fermi', 'swift']:
root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \
str(duration).encode()
else:
root.find("./Group[@name='Detection_Info']"
"/Param[@name='Timescale']").attrib['value'] = \
(str(duration).encode())
# Sky position
root.find(("./WhereWhen/ObsDataLocation/"
Loading