Skip to content
Snippets Groups Projects
Commit ed583dc2 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_ll_inspiral_event_uploader to gstlal-inspiral

parent c636986e
No related branches found
No related tags found
No related merge requests found
......@@ -56,16 +56,17 @@ dist_bin_SCRIPTS = \
gstlal_inspiral_combine_injection_sets \
gstlal_inspiral_svd_bank_pipe \
gstlal_ll_inspiral_aggregator \
gstlal_ll_inspiral_trigger_aggregator \
gstlal_ll_inspiral_calculate_range \
gstlal_ll_inspiral_daily_page \
gstlal_ll_inspiral_daily_page_online \
gstlal_ll_inspiral_event_uploader \
gstlal_ll_inspiral_get_urls \
gstlal_ll_inspiral_gracedb_min_instruments \
gstlal_ll_inspiral_gracedb_threshold \
gstlal_ll_inspiral_pipe \
gstlal_ll_inspiral_plot_sensitivity \
gstlal_ll_inspiral_save_state \
gstlal_ll_inspiral_trigger_aggregator \
gstlal_inspiral_summary_page \
gstlal_inspiral_summary_page_lite \
gstlalcbcnode \
......
#!/usr/bin/env python
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_ll_inspiral_event_uploader [--options]"
__description__ = "an executable to aggregate and upload GraceDB events from gstlal_inspiral jobs"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import deque, OrderedDict
import json
import logging
from optparse import OptionParser
import signal
import sys
import time
import timeit
from confluent_kafka import Producer, Consumer, KafkaError
import numpy
from ligo.segments import segment
from ligo.gracedb.rest import GraceDb
from ligo.gracedb.rest import DEFAULT_SERVICE_URL as DEFAULT_GRACEDB_URL
from ligo.scald import utils
from lal import LIGOTimeGPS
from gstlal import inspiral
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v", "--verbose", default=False, action="store_true", help = "Be verbose.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--num-jobs", type = int, default = 10, help="number of jobs to listen to")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic", metavar = "string", help = "Sets the input kafka topic. Required.")
parser.add_option("--gracedb-group", metavar = "name", default = "Test", help = "Gracedb group to which to upload events (default is Test).")
parser.add_option("--gracedb-pipeline", metavar = "name", default = "gstlal", help = "Name of pipeline to provide in GracedB uploads (default is gstlal).")
parser.add_option("--gracedb-search", metavar = "name", default = "LowMass", help = "Name of search to provide in GracedB uploads (default is LowMass).")
parser.add_option("--gracedb-service-url", metavar = "url", default = DEFAULT_GRACEDB_URL, help = "Override default GracedB service url (optional, default is %s)." % DEFAULT_GRACEDB_URL)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class EventUploader(object):
"""
manages handling of incoming events, selecting the best and uploading to GraceDB.
"""
def __init__(self, options):
logging.info('setting up...')
### initialize timing options
self.processing_cadence = options.processing_cadence
self.request_timeout = options.request_timeout
self.retries = 5
self.retry_delay = 1
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server}
### initialize consumer
consumer_kafka_settings = self.kafka_settings
consumer_kafka_settings['group.id'] = '-'.join(['synchronizer', options.tag])
self.consumer = Consumer(consumer_kafka_settings)
self.consumer.subscribe([options.input_topic])
self.num_jobs = options.num_jobs
### initialize gracedb client
if options.gracedb_service_url.startswith("file"):
self.client = inspiral.FakeGracedbClient(options.gracedb_service_url)
else:
self.client = gracedb.rest.GraceDb(options.gracedb_service_url)
### initialize event store
self.events = OrderedDict()
def fetch_data(self):
"""
requests for a new event from a topic,
and add to candidate list
"""
messages = self.consumer.consume(num_messages=self.num_jobs, timeout=self.request_timeout)
for message in messages:
### only add to queue if no errors in receiving data
if message and not message.error():
### process candidate event
candidate = json.loads(message.value())
candidate['time'] = LIGOTimeGPS(candidate['time'], candidate.pop('time_ns'))
self.process_candidate(candidate)
def process_candidate(self, candidate):
"""
handles the processing of a candidate, creating
a new event if necessary
"""
event_seg = self.event_window(candidate['time'])
if event_seg in self.events:
self.events[event_seg]['candidates'].append(candidate)
else:
for seg, event in self.events.keys():
if segment(candidate['time'], candidate['time']) in seg:
event['candidates'].append(candidate)
### event not found, create a new event
self.events[event_key] = self.new_event()
self.events[event_key]['candidates'].append(candidate)
def event_window(self, t):
"""
returns the event window representing the event
"""
dt = 0.2
return segment(utils.floor_div(t - dt, 0.5), utils.floor_div(t + dt, 0.5) + 0.5)
def new_event(self, candidate):
"""
returns the structure that defines an event
"""
return {
'num_sent': 0,
'time_sent': None,
'preferred': None,
'candidates': deque(maxlen = self.max_jobs)
}
def process_events(self):
"""
process events stored, selecting the best candidate.
upload if a new preferred event is found
"""
for event in sorted(self.events.values(), reverse=True):
if event['num_sent'] == 0:
updated, event = self.process_candidates(event)
assert updated
self.send_event(event)
elif utils.gps_now() >= self.next_event_upload(event):
updated, event = self.process_candidates(event)
if updated:
self.send_event(event)
# clean out old events
current_time = utils.gps_now()
for seg in list(self.events.keys()):
if current_time - seg[0] >= self.max_event_time:
self.events.pop(seg)
def process_candidates(self, event):
"""
process candidates and update the preferred event
if needed
returns event and whether the preferred event was updated
"""
preferred = self.select_best(event['candidates'])
event['candidates'].clear()
### either no preferred yet or preferred is undoubtedly better
if not event['preferred'] or (preferred['far'] < event['preferred']['far']
and preferred['snr'] > event['preferred']['snr']):
event['preferred'] = preferred
return True, event
### preferred far is better:
### update far in coinc, far in event, keep rest
### FIXME: turned off for now
#elif preferred['far'] < event['preferred']['far']:
# event['preferred']['far'] = preferred['far']
# self.update_coinc_far(event['preferred']['coinc'], preferred['far'])
# return True, event
### preferred snr is better:
### update coinc except far
### FIXME: turned off for now
#elif preferred['snr'] > event['preferred']['snr']:
# far = event['preferred']['far']
# event['preferred']['coinc'] = preferred['coinc']
# self.update_coinc_far(event['preferred']['coinc'], far)
# return True, event
### previous preferred is better
else:
return False, event
def select_best(self, candidates):
"""
select the best event out of the candidates
"""
candidates = list(candidates)
far_idx = numpy.argmin([c['far'] for c in candidates])
best = candidates[far_idx]
### FIXME: selecting best snr independently turned off for now
#min_far = min(c['far'] for c in candidates)
#snr_idx = numpy.argmax([c['snr'] for c in candidates])
#best = candidates[snr_idx]
#best['far'] = min_far
return best
def upload_event(self, event):
"""
upload a new event + auxiliary files
"""
event['num_sent'] += 1
event['time_sent'] = utils.gps_now()
for attempt in range(1, self.retries + 1):
try:
resp = self.client.createEvent(
self.group,
self.pipeline,
filename,
filecontents = event['coinc'],
search = self.search
)
except gracedb.rest.HTTPError as resp:
pass
else:
resp_json = resp.json()
if resp.status == httplib.CREATED:
graceid = resp_json['graceid']
if self.verbose:
logging.info("event assigned grace ID %s" % graceid)
self.upload_file(
"GstLAL internally computed p-astro",
"p_astro.json",
"p_astro",
event['p_astro'],
graceid
)
try:
resp = self.gracedb_client.writeLabel(gracedb_ids[-1], 'PASTRO_READY')
except gracedb.rest.HTTPError as resp:
logging.warning(resp)
break
logging.info("gracedb upload of %s failed on attempt %d/%d" % (filename, attempt, self.retries))
time.sleep(random.lognormal(math.log(self.retry_delay), .5))
else:
logging.warning("gracedb upload of %s failed" % filename)
self.upload_file("strain spectral densities", "psd.xml.gz", "psd", psd, graceid)
def upload_file(self, message, filename, tag, contents, graceid):
"""
upload a file to gracedb
"""
logging.info("posting '%s' to gracedb ID %s ..." % (filename, graceid))
for attempt in range(1, self.retries + 1):
try:
resp = self.client.writeLog(
graceid,
message,
filename = filename,
filecontents = contents,
tagname = tag
)
except gracedb.rest.HTTPError as resp:
pass
else:
if resp.status == httplib.CREATED:
break
logging.info("gracedb upload of %s for ID %s failed on attempt %d/%d" % (filename, graceid, attempt, self.retries))
time.sleep(random.lognormal(math.log(self.retry_delay), .5))
else:
logging.warning("gracedb upload of %s for ID %s failed" % (filename, graceid))
def next_event_upload(self, event):
"""
check whether enough time has elapsed to send an updated event
"""
return event['time_sent'] + numpy.pow(4, event['num_sent'])
def update_coinc_far(self, coinc, far):
"""
update the far in the coinc.xml file
"""
### FIXME: actually update the coinc
return coinc
def process(self):
"""
fetch events and process them at the specified cadence
"""
while self.is_running:
self.fetch_data()
self.process_events()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def start(self):
"""
start the event loop
"""
logging.info('starting event uploader for %d inspiral jobs...' % self.num_jobs)
self.is_running = True
self.process()
def stop(self):
"""
stop the event loop
"""
logging.info('shutting down...')
### FIXME: should also handle pushing rest of data in buffer
self.is_running = False
class SignalHandler(object):
"""
helper class to shut down the event uploader gracefully before exiting
"""
def __init__(self, event_uploader, signals = [signal.SIGINT, signal.SIGTERM]):
self.event_uploader = event_uploader
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
self.event_uploader.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
# set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format = '%(asctime)s | event_uploader : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# create event uploader instance
event_uploader = EventUploader(options)
# install signal handler
SignalHandler(event_uploader)
# start up
event_uploader.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment