Skip to content
Snippets Groups Projects
Commit fd8ca5a8 authored by Geoffrey Mo's avatar Geoffrey Mo
Browse files

Add retry for check_vectors if data is not yet available in the caches

parent df689013
No related branches found
No related tags found
No related merge requests found
......@@ -44,6 +44,9 @@ Changelog
- Fix the triple circular uploads by rearranging the alerts canvas and
moving tasks into the second group.
- Add an automatic retry for the check_vectors task if requested data is not
yet available in the caches due to data transfer latency
2.0.6 "Spaghetti Tree" (2023-05-10)
-----------------------------------
......
......@@ -374,8 +374,8 @@ def check_vector(cache, channel, start, end, bits, logic_type='all'):
None for key in bits if key is not None}
@app.task(shared=False)
def check_vectors(event, graceid, start, end):
@app.task(shared=False, bind=True, default_retry_delay=5)
def check_vectors(self, event, graceid, start, end):
"""Perform data quality checks for an event and labels/logs results to
GraceDB.
......@@ -452,8 +452,14 @@ def check_vectors(event, graceid, start, end):
analysis_channels = {k: v for k, v in analysis_channels
if k[3:] != 'DMT-DQ_VECTOR'}.items()
for channel, bits in analysis_channels:
states.update(check_vector(caches[channel.split(':')[0]], channel,
start, end, bit_defs[bits]))
try:
states.update(check_vector(
caches[channel.split(':')[0]], channel,
start, end, bit_defs[bits]))
except ValueError as exc:
# check_vector likely failed to find the requested data
# in the cache because it has yet to arrive
raise self.retry(exc=exc, max_retries=3)
# Pick out DQ and injection states, then filter for active detectors
dq_states = {key: value for key, value in states.items()
if key.split('_')[-1] != 'INJ'}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment