Commit 810c4403 authored by Patrick Godwin's avatar Patrick Godwin

Merge branch 'latestbyjobgroupby' into 'master'

added groupby for latest by jobs for both deployment and mock

See merge request gstlal-visualisation/ligo-scald!21
parents 5788515f a7ac41d8
Pipeline #67488 passed with stages
in 1 minute and 47 seconds
......@@ -844,7 +844,7 @@ class Consumer(object):
if not datetime:
time = utils.unix_to_gps(numpy.array(time)).tolist()
return time, tags, data
return time, list(tags), list(data)
def retrieve_timeseries(self, measurement, start, end, column, tags=None, aggregate=None, dt=None, datetime=False):
......
......@@ -45,7 +45,7 @@ def mock_query():
database = bottle.request.query['db']
query = bottle.request.query['q'].strip()
epoch = bottle.request.query['epoch']
start, end, measurement, fields, aggregate, tags, filters, latest = translate_query(query)
start, end, measurement, fields, aggregate, tags, filters, latest, groupby = translate_query(query)
except AssertionError as e:
return bottle.HTTPResponse(status=400, headers=JSON_HEADER, body=json.dumps({'error': str(e), 'query': repr(query)}))
......@@ -62,8 +62,7 @@ def mock_query():
series = generate_triggers(start, end, meas_name, fields, far, epoch=epoch, latest=latest)
else:
dt = int(rp.strip('"')[:-1])
series = generate_timeseries(start, end, meas_name, fields, aggregate, dt, tags=tags, filters=filters, epoch=epoch, latest=latest)
series = generate_timeseries(start, end, meas_name, fields, aggregate, dt, tags=tags, filters=filters, epoch=epoch, latest=latest, groupby=groupby)
### generate fake data
response = {'results': [{'statement_id':0, 'series':series}]}
......@@ -75,7 +74,7 @@ def mock_query():
### functions
def translate_query(query_str):
measurement, fields, conditions, limit = parse_query(query_str)
measurement, fields, conditions, limit, groupby = parse_query(query_str)
### FIXME: not handling multiple columns for now
tags = fields[1:]
......@@ -105,7 +104,7 @@ def translate_query(query_str):
else:
filters.append((column, val))
return start, end, measurement, fields, aggregate, tags, filters, limit
return start, end, measurement, fields, aggregate, tags, filters, limit, groupby
def parse_query(query_str):
......@@ -119,6 +118,13 @@ def parse_query(query_str):
else:
limit = 0
### parse group by if any
if 'GROUP BY' in query_str:
query_str, groupby = query_str.split('GROUP BY')
groupby = groupby.strip()
else:
groupby = None
### parse conditions if any
if 'WHERE' in query_str:
query_str, conditions = query_str.split('WHERE')
......@@ -132,10 +138,10 @@ def parse_query(query_str):
assert from_ == 'FROM', 'FROM not in correct location'
fields = [field.strip('"') for field in fields.split(',')]
return measurement, fields, conditions, limit
return measurement, fields, conditions, limit, groupby
def generate_timeseries(start, end, measurement, fields, aggregate, dt, tags=None, filters=None, epoch='ns', latest=0):
def generate_timeseries(start, end, measurement, fields, aggregate, dt, tags=None, filters=None, epoch='ns', latest=0, groupby=None):
### format filters
### FIXME: doesn't do anything for now
columns = []
......@@ -148,7 +154,15 @@ def generate_timeseries(start, end, measurement, fields, aggregate, dt, tags=Non
column_names.extend(fields)
### format times
if latest:
if groupby and latest:
if groupby == 'job':
size = latest * NUM_JOBS
else:
size = latest
times = numpy.random.uniform(start, high=end, size=size)
times = utils.floor_div(times, int(dt))
times = times[numpy.logical_and(times <= end, times >= start)]
elif latest:
times = numpy.random.uniform(start, high=end, size=latest)
times = utils.floor_div(times, int(dt))
times = times[numpy.logical_and(times <= end, times >= start)]
......@@ -161,22 +175,31 @@ def generate_timeseries(start, end, measurement, fields, aggregate, dt, tags=Non
### generate timeseries
series = []
if times.size:
if groupby:
times = _convert_gps_times(times, epoch)
if tags and tags[0].strip('"') == 'job': ### FIXME: assume 1 tag max for now
for job_id in range(NUM_JOBS):
data = numpy.random.exponential(size=times.size) + 1
series.extend([list(row) for row in zip(times, data, [job_id for i in range(times.size)])])
else:
data = random_trigger_value(fields[0], times.size)
series.extend([list(row) for row in zip(times, data)])
### format timeseries
return [{
'name': measurement,
'columns': column_names,
'values': series,
}]
if groupby and groupby.strip('"') == 'job': ### FIXME: assume 1 tag max for now
for job_id, time in zip(range(NUM_JOBS), times):
data = numpy.random.exponential(size=latest) + 1 ### FIXME: only works when latest = 1
row = {'columns': column_names, 'name': measurement, 'values': [[time, data.tolist()[0], str(job_id).zfill(4)]]}
series.append(row)
### format timeseries
return series
else:
if times.size:
times = _convert_gps_times(times, epoch)
if tags and tags[0].strip('"') == 'job': ### FIXME: assume 1 tag max for now
for job_id in range(NUM_JOBS):
data = numpy.random.exponential(size=times.size) + 1
series.extend([list(row) for row in zip(times, data, [job_id for i in range(times.size)])])
else:
data = random_trigger_value(fields[0], times.size)
series.extend([list(row) for row in zip(times, data)])
### format timeseries
return [{
'name': measurement,
'columns': column_names,
'values': series,
}]
def generate_triggers(start, end, measurement, fields, far_threshold, epoch='ns', latest=0, num_triggers=100):
### format filters
......@@ -223,7 +246,6 @@ def _convert_gps_times(gps_times, epoch):
elif epoch == 'datetime':
return [gpstime.gps_to_str(t, "%Y-%m-%dT%H:%M:%S.%fZ") for t in gps_times]
def _add_parser_args(parser):
parser.add_argument('-b', '--backend', default='wsgiref',
help="chooses server backend. options: [cgi|wsgiref]. wsgiref starts a local server for development, default = wsgiref.")
......
......@@ -238,7 +238,6 @@ def serve_latest(measurement, start, end):
column, tags, _, tag_filters, aggregate, dt, _, datetime, backend = parse_query(bottle.request.query)
tag = app.config['measurements'][measurement]['tag']
tag_ids = [str(tag_num).zfill(4) for tag_num in range(app.config['measurements'][measurement]['num_tags'])]
default_value = app.config['measurements'][measurement]['default']
transform = app.config['measurements'][measurement]['transform']
y = []
......@@ -246,9 +245,9 @@ def serve_latest(measurement, start, end):
### query for timeseries
consumer = config_to_consumer(app.config['backends'][backend])
current_gps = utils.gps_now()
for tag_id in tag_ids:
time, data = consumer.retrieve_timeseries_latest(measurement, column, tags=[(tag, tag_id)], aggregate=aggregate, dt=dt, datetime=datetime)
y.append(transform_data(time, data, transform, default_value, current_gps))
time, tag_ids, data = consumer.retrieve_latest_by_tag(measurement, column[0], tag_key=tag, aggregate=aggregate, dt=dt, datetime=datetime)
for i in range(len(tag_ids)):
y.append(transform_data(time[i], data[i], transform, default_value, current_gps))
### format request
response = [{'x':tag_ids, 'y':y}]
......@@ -440,19 +439,21 @@ def parse_query(query):
return columns, tags, tag_key, tag_filters, aggregate, dt, far, datetime, backend
def transform_data(time, data, transform, default_value, now):
def transform_data(time_value, data_value, transform, default_value, now):
if transform == 'none':
if data:
return data[-1]
if data_value:
return data_value
else:
return default_value
elif transform == 'latency':
if time:
return max(now - time[-1], 0)
if isinstance(time_value, list):
return max(now - time_value[-1], 0)
elif time_value:
return max(now - time_value, 0)
else:
return default_value
else:
raise NotImplementedError('transform option not known/implemented')
raise NotImplementedError('transform option not known/implemented, only "none" or "latency" are accepted right now')
......@@ -507,7 +508,7 @@ def main(args=None):
else:
config_path = os.getenv('SCALDRC_PATH')
if not config_path:
raise KeyError('no configuration file found, please set your SCALDRC_PATH correctly or add --config param')
raise KeyError('no configuration file found, please set your SCALDRC_PATH correctly using "export SCALDRC_PATH=PATH/TO/CONFIG" or add --config param (-c /path/to/config)')
with open(config_path, 'r') as f:
app.config.update(yaml.safe_load(f))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment