diff --git a/CHANGES.md b/CHANGES.md index 28ef656683bdd9045fb5a38883b99cbeb6704617..d7756d944a213b5844b3eac80a06a3bc6d4a363d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,13 @@ event does not produce an LVAlert message at all, so there is no way to intercept it.) +- Added a query kwarg to superevents method to reduce latency in + fetching the superevents from gracedb. + +- Refactored getting event information for update type events so + that gracedb is polled only once to get the information needed + for superevent manager. + ## 0.0.10 (2018-06-13) - Model the time extent of events and superevents using the diff --git a/gwcelery/celery.py b/gwcelery/celery.py index 5ebc3ac01fbe9c4d25697ed063c167f9ec61ead3..8fd2d0abf492fed1e90bb34052e4d46d0d8c7587 100644 --- a/gwcelery/celery.py +++ b/gwcelery/celery.py @@ -33,6 +33,12 @@ class Base: lib=10.0) """Lower extent of superevent segments.""" + superevent_query_d_t_start = 100. + """Lower extent of superevents query""" + + superevent_query_d_t_end = 100. + """Upper extent of superevents query""" + superevent_d_t_end = dict(gstlal=10.0, pycbc=10.0, mbtaonline=10.0, diff --git a/gwcelery/tasks/gracedb.py b/gwcelery/tasks/gracedb.py index 15031e23558af5beb41ca74bc8a723dcf1cad688..d492b0d4ed021e5b921d277b611c8f100b9a046d 100644 --- a/gwcelery/tasks/gracedb.py +++ b/gwcelery/tasks/gracedb.py @@ -89,8 +89,8 @@ def upload(filecontents, filename, graceid, message, tags=None): client.writeLog(graceid, message, filename, filecontents, tags) -@task(shared=False) -def get_superevent(gid): +@app.task(shared=False) +def get_superevent(gid, query=None): """Iterate through superevents in gracedb and return sid if gid exists in the association. @@ -99,6 +99,9 @@ def get_superevent(gid): gid : str uid of the trigger to be checked + query : str + optional query to be passed to :meth:`superevents` + Returns ------- superevent_id : str @@ -108,7 +111,7 @@ def get_superevent(gid): superevents : list The list of the superevents. """ - superevents = list(client.superevents(orderby='t_0')) + superevents = list(client.superevents(query=query, orderby='t_0')) for superevent in superevents: preferred_flag = False # check preferred_event first diff --git a/gwcelery/tasks/superevents.py b/gwcelery/tasks/superevents.py index e50cc850fffca1f47d3cf773866202e85452e62a..c6ebc2620a28afae6dcef43bd79e9dac41c3a393 100644 --- a/gwcelery/tasks/superevents.py +++ b/gwcelery/tasks/superevents.py @@ -33,6 +33,7 @@ def handle(payload): """ gid = payload['uid'] alert_type = payload['alert_type'] + event_info = _event_info(payload) try: far = payload['object']['far'] @@ -45,19 +46,25 @@ def handle(payload): log.info("Skipping processing of %s because of low far", gid) return - sid, preferred_flag, superevents = gracedb.get_superevent(gid) + query_times = (event_info['gpstime'] - + app.conf['superevent_query_d_t_start'], + event_info['gpstime'] + + app.conf['superevent_query_d_t_end']) + + sid, preferred_flag, superevents = gracedb.get_superevent( + gid, query='''{} .. {}'''.format(*query_times)) superevents = _superevent_segment_list(superevents) - d_t_start, d_t_end = _get_dts(payload) + d_t_start, d_t_end = _get_dts(event_info) # Condition 1/2 if sid is None and alert_type == 'new': log.debug('Entered 1st if') - event_segment = _Event(payload['object'].get('gpstime'), - payload['uid'], - payload['object'].get('group'), - payload['object'].get('pipeline'), - payload['object'].get('search'), + event_segment = _Event(event_info['gpstime'], + event_info['uid'], + event_info['group'], + event_info['pipeline'], + event_info['search'], event_dict=payload) # Check which superevent window trigger gpstime falls in @@ -93,23 +100,42 @@ def handle(payload): log.critical('Unhandled by parse_trigger, passing...') -# FIXME: Unify _get_dts and _far_check, call get_event only once -def _get_dts(payload): - """ - Returns the dt_start and dt_end values based on CBC/Burst - for new and update type alerts +def _event_info(payload): + """Helper function to fetch required event info from GraceDb + at once and reduce polling """ alert_type = payload['alert_type'] if alert_type == 'new': - group = payload['object']['group'].lower() + event_info = dict(uid=payload['uid'], + gpstime=payload['object']['gpstime'], + far=payload['object']['far'], + group=payload['object']['group'], + pipeline=payload['object']['pipeline'], + search=payload['object'].get('search'), + alert_type=payload['alert_type']) else: - group = gracedb.get_event(payload['uid'])['group'].lower() - + event_dict = gracedb.get_event(payload['uid']) + event_info = dict(uid=event_dict['graceid'], + gpstime=event_dict['gpstime'], + far=event_dict['far'], + group=event_dict['group'], + pipeline=event_dict['pipeline'], + search=event_dict.get('search'), + alert_type=payload['alert_type']) + return event_info + + +def _get_dts(event_info): + """ + Returns the dt_start and dt_end values based on CBC/Burst + for new and update type alerts + """ + group = event_info['group'] dt_start = app.conf['superevent_d_t_start'].get( - group, app.conf['superevent_default_d_t_start']) + group.lower(), app.conf['superevent_default_d_t_start']) dt_end = app.conf['superevent_d_t_end'].get( - group, app.conf['superevent_default_d_t_end']) + group.lower(), app.conf['superevent_default_d_t_end']) return dt_start, dt_end