Commit fa1d472c authored by Patrick Godwin's avatar Patrick Godwin

influx.py: modify Consumer.query() to point to schemas for more intuitive...

influx.py: modify Consumer.query() to point to schemas for more intuitive usage, add Consumer.load() to load schemas from config. serve.py: fix typo causing issues with historical queries
parent ea40ce87
Pipeline #74763 passed with stages
in 3 minutes and 45 seconds
......@@ -17,6 +17,7 @@ from six.moves import urllib
import numpy
import urllib3
import yaml
from .. import aggregator
from . import common
......@@ -94,7 +95,7 @@ class Aggregator(object):
self.schema = {}
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None):
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None, aggregate=None):
"""Defines a schema for a measurement.
Parameters
......@@ -109,6 +110,8 @@ class Aggregator(object):
the tags stored in a given measurement
tag_key : 'str'
the tag in which to group by for aggregations
aggregate : 'str'
the aggregate to use
Defines the layout for data of a given measurement, as well as
how data will be aggregated when doing reductions.
......@@ -131,6 +134,7 @@ class Aggregator(object):
'column_key': column_key,
'tags': tags,
'tag_key': tag_key,
'aggregate': aggregate,
}
......@@ -506,7 +510,41 @@ class Consumer(object):
self.schema = {}
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None):
def load(self, path=None):
"""Loads schemas contained within a configuration file.
Parameters
----------
path : `str`
the path to the configuration file
"""
if not path:
if 'SCALDRC_PATH' in os.environ:
path = os.getenv('SCALDRC_PATH')
else:
raise KeyError('no configuration file found, please set your SCALDRC_PATH correctly using "export SCALDRC_PATH=path/to/config" or pass in path kwarg')
### load config
config = None
with open(path, 'r') as f:
config = yaml.safe_load(f)
### register schemas
for schema in config['schemas'].values():
if 'schema1' in schema:
if 'measurement' in schema:
measurement = schema['measurement']
subschemas = {key: subschema for key, subschema in schema.items() if 'schema' in key}
for subschema in subschemas.values():
if not 'measurement' in subschema:
subschema['measurement'] = measurement
self._load_schema(subschema)
else:
self._load_schema(schema)
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None, aggregate=None, **kwargs):
"""Defines a schema for a measurement.
Parameters
......@@ -521,6 +559,8 @@ class Consumer(object):
the tags stored in a given measurement
tag_key : 'str'
the tag in which to group by for aggregations
aggregate : 'str'
the aggregate to use
Defines the layout for data of a given measurement, as well as
how data will be aggregated when doing reductions.
......@@ -543,36 +583,40 @@ class Consumer(object):
'column_key': column_key,
'tags': tags,
'tag_key': tag_key,
'aggregate': aggregate,
}
def query(self, data_type, *args, **kwargs):
"""A convenience function to call the other retrieve methods by name.
def query(self, schema, data_type, *args, **kwargs):
"""Query for data using an available schema.
Parameters
----------
schema : `str`
the schema name
data_type : `str`
the type of data, e.g. timeseries
*args : args to pass along
**kwargs : kwargs to pass along
"""
if data_type == 'rows_by_tag':
return self.retrieve_rows_by_tag(*args, **kwargs)
elif data_type == 'binned_timeseries_by_tag':
return self.retrieve_binnedtimeseries_by_tag(*args, **kwargs)
elif data_type == 'timeseries_by_tag':
return self.retrieve_timeseries_by_tag(*args, **kwargs)
elif data_type == 'timeseries_latest':
return self.retrieve_timeseries_latest(*args, **kwargs)
elif data_type == 'latest_by_tag':
return self.retrieve_latest_by_tag(*args, **kwargs)
s = self.schema[schema]
s['measurement'] = schema ### FIXME: fix this issue of tying schema to measurement name
if data_type == 'rows':
start, end = args
return self.retrieve_rows_by_tag(s['measurement'], start, end, s['tag_key'], aggregate=s['aggregate'], **kwargs)
elif data_type == 'heatmap':
start, end = args
return self.retrieve_binnedtimeseries_by_tag(s['measurement'], start, end, s['columns'][0], s['tag_key'], aggregate=s['aggregate'], **kwargs)
elif data_type == 'latest':
return self.retrieve_latest_by_tag(s['measurement'], s['columns'][0], tag_key=s['tag_key'], aggregate=s['aggregate'], **kwargs)
elif data_type == 'timeseries':
return self.retrieve_timeseries(*args, **kwargs)
start, end = args
return self.retrieve_timeseries(s['measurement'], start, end, s['columns'][0], aggregate=s['aggregate'], **kwargs)
elif data_type == 'triggers':
return self.retrieve_triggers(*args, **kwargs)
elif data_type == 'snapshot':
return self.retrieve_snapshot(*args, **kwargs)
start, end = args
return self.retrieve_triggers(s['measurement'], start, end, s['columns'], **kwargs)
else:
raise NotImplementedError
......@@ -984,6 +1028,21 @@ class Consumer(object):
return time, json.loads(data), json.loads(dims)
def _load_schema(self, schema):
#-------------------------------------------------------
### internal utility to format schemas from config file
measurement = schema.pop('measurement')
columns = schema.pop('column')
if isinstance(columns, str):
columns = (columns,)
column_key = schema.pop('column_key', columns[0])
if 'tag' in schema:
tag = schema.pop('tag')
schema['tags'] = tag
self.register_schema(measurement, columns, column_key, **schema)
#-------------------------------------------------
### database utilities
......
......@@ -102,7 +102,7 @@ def dashboard(page='index'):
refresh = 2000
else:
start = config['pages'][page]['start']
stop = config['dashboard'][page]['end']
stop = config['pages'][page]['end']
refresh = -1
### fill in plot section for page with plot/schema info
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment