Commit 08290805 authored by Patrick Godwin's avatar Patrick Godwin

add 'scald aggregate' as general purpose aggregator, move aggregator funcs to...

add 'scald aggregate' as general purpose aggregator, move aggregator funcs to io.core, use utils.gps_now() for getting current gps time
parent 0985bd1e
Pipeline #75127 failed with stages
in 3 minutes and 50 seconds
......@@ -9,6 +9,7 @@ import argparse
import logging
import signal
from . import aggregator
from . import deploy
from . import mock
from . import report
......@@ -28,6 +29,9 @@ subparser = parser.add_subparsers(
)
subparser.required = True
p = utils.append_subparser(subparser, 'aggregate', aggregator.main)
aggregator._add_parser_args(p)
p = utils.append_subparser(subparser, 'deploy', deploy.main)
deploy._add_parser_args(p)
......
#!/usr/bin/env python
#
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "utilities to aggregate and store incoming metrics"
#-------------------------------------------------
### imports
import collections
import json
import logging
import os
import sys
import time
import timeit
import numpy
import yaml
from kafka import KafkaConsumer
from . import utils
from .io import influx, kafka
#-------------------------------------------------
### constants
MIN_TIME_QUANTA = 10000
DIRS = 6
### logging config
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
#-------------------------------------------------
### aggregator utilities
def now():
"""!
A convenience function to return the current gps time
"""
return utils.gps_now()
def median(l):
"""!
Return the median of a list on nearest value
"""
return sorted(l)[len(l)//2]
def _add_parser_args(parser):
parser.add_argument('-c', '--config',
help="sets dashboard/plot options based on yaml configuration. if not set, uses SCALDRC_PATH.")
parser.add_argument('-b', '--backend', default='default',
help="chooses data backend to use from config. default = 'default'.")
parser.add_argument('-d', '--data-type', default='timeseries',
help = "Sets the data type of metrics expected from [timeseries|triggers]. default = timeseries.")
parser.add_argument('-n', '--hostname', default='localhost',
help="specify Kafka hostname to read metrics from. default = localhost.")
parser.add_argument('-p', '--port', type=int, default=8086,
help="specify Kafka port to read metrics from. default = 8086")
parser.add_argument('-s', '--schema', action='append',
help="Specify schema to use for aggregation. Can be given multiple times.")
parser.add_argument('-t', '--tag', default='generic',
help = "Specify a tag for this aggregator job. default = 'generic'.")
parser.add_argument('--across-jobs', action = 'store_true',
help = "If set, aggregate data across jobs as well.")
parser.add_argument('--processing-cadence', default = 0.5,
help = "Rate at which the aggregator acquires and processes data. default = 0.5 seconds.")
def aggregate_to_func(aggregate):
"""!
Given an aggregate string, returns back a function that does that
aggregation.
"""
if aggregate == 'median':
return median
elif aggregate == 'min':
return min
elif aggregate == 'max':
return max
else:
raise NotImplementedError
#-------------------------------------------------
### main
def main(args=None):
"""Aggregates and stores metrics to a data backend
def reduce_data(xarr, yarr, func, dt = 1):
"""!
This function does a data reduction by powers of 10 where dt
specifies the spacing. Default is 1 e.g., data reduction over 1 second
"""
datadict = collections.OrderedDict()
assert len(yarr) == len(xarr), 'x and y arrays are not equal'
for idx, (x, y) in enumerate(zip(xarr, yarr)):
# reduce to this level
key = int(x) // dt
# we want to sort on y not x
datadict.setdefault(key, []).append((y,x,idx))
reduced = [func(value) for value in datadict.values()]
reduced_data, reduced_time, reduced_idx = zip(*reduced)
assert len(reduced_data) == len(reduced_time)
sort_idx = numpy.argsort(reduced_time)
return reduced_idx, list(numpy.array(reduced_time)[sort_idx]), list(numpy.array(reduced_data)[sort_idx])
def makedir(path):
"""!
A convenience function to make new directories and trap errors
"""
try:
os.makedirs(path)
except IOError:
pass
except OSError:
pass
def gps_to_minimum_time_quanta(gpstime):
"""!
given a gps time return the minimum time quanta, e.g., 123456789 ->
123456000.
"""
return int(gpstime) // MIN_TIME_QUANTA * MIN_TIME_QUANTA
def gps_range(jobtime):
gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
if not gpsblocks:
return [], []
min_t, max_t = min(gpsblocks), max(gpsblocks)
return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
if not args:
parser = argparse.ArgumentParser()
_parser_add_arguments(parser)
args = parser.parse_args()
schemas = args.schema
def job_expanse(dataspan):
if dataspan:
min_t, max_t = min(dataspan), max(dataspan)
return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
### load configuration
config = None
if args.config:
config_path = args.config
else:
return [], []
config_path = os.getenv('SCALDRC_PATH')
if not config_path:
raise KeyError('no configuration file found, please set your SCALDRC_PATH correctly or add --config param')
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
### set up logging
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s"
)
# instantiate a consumer to subscribe to all of our topics, i.e., jobs
consumer = KafkaConsumer(
*schemas,
bootstrap_servers=[':'.join([args.hostname, str(args.port)])],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='aggregator_{}_{}'.format(args.tag, args.schema[0]),
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
# set up aggregator
aggregator_settings = config['backends'][args.backend]
aggregator_settings['reduce_across_tags'] = args.across_jobs
aggregator = influx.Aggregator(**aggregator_settings)
# register measurement schemas for aggregators
aggregator.load(path=config_path)
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("retrieving data from kafka")
start = timeit.default_timer()
if args.data_type == 'timeseries':
data = kafka.retrieve_timeseries(consumer, schemas, max_records = 2000)
retrieve_elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % retrieve_elapsed)
# store and reduce data for each job
start = timeit.default_timer()
for schema in schemas:
logging.info("storing and reducing metrics for schema: %s" % schema)
aggregator.store_columns(schema, data[schema], aggregate=config['schemas'][schema]['aggregate'])
store_elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % store_elapsed)
time.sleep(max(args.processing_cadence - store_elapsed - retrieve_elapsed, args.processing_cadence))
# close connection to consumer if using kafka
if consumer:
consumer.close()
# always end on an error so that condor won't think we're done and will
# restart us
sys.exit(1)
from . import common
from . import http
from . import influx
from . import kafka
from . import hdf5
from . import sqlite
__all__ = ['common', 'http', 'influx', 'kafka', 'hdf5', 'sqlite']
#!/usr/bin/env python
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "a module for shared I/O utilities"
#-------------------------------------------------
### imports
import json
import os
from .. import aggregator
#-------------------------------------------------
### dir utilities
def gps_to_leaf_directory(gpstime, level = 0):
"""Get the leaf directory for a given gps time.
"""
return "/".join(str(aggregator.gps_to_minimum_time_quanta(gpstime) // aggregator.MIN_TIME_QUANTA // (10**level)))
#-------------------------------------------------
### json utilities
def store_snapshot(webdir, measurement, data, dims, time, **attrs):
"""Stores a JSON-formatted snapshot to disk.
Parameters
----------
webdir : `str`
the directory where snapshots are stored, should
be web accessible (e.g. public_html)
measurement : `str`
the measurement name
data : `dict`
a mapping from a column to 1-dim data
dims : `dict`
a mapping from a dimension (one of x, y, z) to a column,
either 2-dim (x, y) or 3-dim (x, y, z).
time : `int`
the time the snapshot was taken
"""
### set up JSON structure
snapshot = {'time': time, 'measurement': measurement}
snapshot.update(data)
snapshot.update(dims)
snapshot.update({'metadata': attrs})
### create directories
leafdir = gps_to_leaf_directory(time)
snapshot_dir = os.path.join(webdir, 'snapshots', leafdir)
aggregator.makedir(snapshot_dir)
### save snapshot to disk
filename = '{}_{}.json'.format(measurement, time)
filepath = os.path.join(snapshot_dir, filename)
with open(filepath, 'w') as f:
f.write(json.dumps(snapshot))
### symlink latest snapshot
sympath = os.path.join(webdir, 'snapshots', 'latest', '{}.json'.format(measurement))
try:
os.symlink(filepath, sympath)
except OSError:
os.remove(sympath)
os.symlink(filepath, sympath)
......@@ -16,8 +16,7 @@ import shutil
import h5py
import numpy
from . import common
from .. import aggregator
from . import core
from .. import utils
......@@ -39,11 +38,11 @@ class Aggregator(object):
self.pool = multiprocessing.Pool(num_processes)
### track reduced data to process
self.last_reduce = aggregator.now()
self.last_reduce = utils.gps_now()
self.prev_dataspan = set()
### set up 'latest' directory for snapshots
aggregator.makedir(os.path.join(self.webdir, 'snapshots', 'latest'))
core.makedir(os.path.join(self.webdir, 'snapshots', 'latest'))
def store_and_reduce(self, measurement, data, columns, tags=None, aggregate='max'):
......@@ -80,12 +79,12 @@ class Aggregator(object):
### reduce across tags
if self.reduce_across_tags:
for start, end in zip(*aggregator.job_expanse(dataspan)):
for start, end in zip(*core.job_expanse(dataspan)):
reduce_across_tags((self.rootdir, measurement, data.keys(), tags, aggregate, start, end, 1))
### only reduce higher dt every last_reduce seconds
if utils.in_new_epoch(aggregator.now(), self.last_reduce, self.reduce_dt):
self.last_reduce = aggregator.now()
if utils.in_new_epoch(utils.gps_now(), self.last_reduce, self.reduce_dt):
self.last_reduce = utils.gps_now()
### reduce by tag
mapargs = []
......@@ -99,7 +98,7 @@ class Aggregator(object):
if self.reduce_across_tags:
mapargs = []
for dt in (10, 100, 1000, 10000, 100000):
for start, end in zip(*aggregator.job_expanse(dataspan)):
for start, end in zip(*core.job_expanse(dataspan)):
mapargs.append((self.rootdir, measurement, data.keys(), tags, aggregate, start, end, dt))
self.pool.map(reduce_across_tags, mapargs)
......@@ -122,7 +121,7 @@ class Aggregator(object):
the time the snapshot was taken
"""
common.store_snapshot(self.webdir, measurement, data, dims, time, **attrs)
core.store_snapshot(self.webdir, measurement, data, dims, time, **attrs)
class Consumer(object):
......@@ -351,7 +350,7 @@ class Consumer(object):
### reduction utilities
def store_timeseries(path, route, time, data):
aggregator.makedir(path)
core.makedir(path)
tmpfname, fname = create_new_dataset(path, route, time, data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, fname)
......@@ -361,10 +360,10 @@ def store_timeseries(path, route, time, data):
def reduce_by_tag(rootdir, route, tag, tag_type, aggregate, time, data, prevdataspan, dt):
if tag_type == 'ifo':
route = '{}_{}'.format(tag, route)
func = aggregator.aggregate_to_func(aggregate)
func = core.aggregate_to_func(aggregate)
dataspan = set()
gps1, gps2 = aggregator.gps_range(time)
gps1, gps2 = core.gps_range(time)
for start, end in zip(gps1, gps2):
# shortcut to not reprocess data that has already been
......@@ -388,7 +387,7 @@ def reduce_by_tag(rootdir, route, tag, tag_type, aggregate, time, data, prevdata
@utils.unpack
def reduce_across_tags(rootdir, route, tags, tag_type, aggregate, start, end, dt):
level = numpy.log10(dt).astype(int)
func = aggregator.aggregate_to_func(aggregate)
func = core.aggregate_to_func(aggregate)
setup_dir_across_tag_by_level(start, aggregate, route, rootdir, verbose = True, level=level)
agg_time = numpy.array([])
......@@ -397,7 +396,7 @@ def reduce_across_tags(rootdir, route, tags, tag_type, aggregate, start, end, dt
path = path_by_tag(rootdir, tag, tag_type, start, end, aggregate, level=level)
agg_time, agg_data = aggregate_data(path, route, agg_time, agg_data)
_, reduced_time, reduced_data = aggregator.reduce_data(agg_time, agg_data, func, dt=dt)
_, reduced_time, reduced_data = core.reduce_data(agg_time, agg_data, func, dt=dt)
path = path_across_tags(rootdir, start, end, aggregate, level=level)
store_timeseries(path, route, reduced_time, reduced_data)
......@@ -424,7 +423,7 @@ def update_lowest_level(rootdir, route, tag, tag_type, start, end, typ, func, jo
if len(this_time) == len(prev_times) and len(this_data) == len(prev_data):
return []
else:
_, reduced_time, reduced_data = aggregator.reduce_data(this_time, this_data, func, dt=1)
_, reduced_time, reduced_data = core.reduce_data(this_time, this_data, func, dt=1)
store_timeseries(path, route, reduced_time, reduced_data)
return [start, end]
......@@ -434,7 +433,7 @@ def reduce_by_one_level(rootdir, route, tag, tag_type, start, end, typ, func, le
agg_time = numpy.array([])
# FIXME iterate over levels instead.
this_level_dir = "/".join([rootdir, common.gps_to_leaf_directory(start, level = level)])
this_level_dir = "/".join([rootdir, core.gps_to_leaf_directory(start, level = level)])
for subdir in gps_to_sub_directories(start, level, rootdir):
if tag_type == 'ifo':
path = "/".join([this_level_dir, subdir, typ])
......@@ -442,7 +441,7 @@ def reduce_by_one_level(rootdir, route, tag, tag_type, start, end, typ, func, le
path = "/".join([this_level_dir, subdir, "by_{}".format(tag_type), tag, typ])
agg_time, agg_data = aggregate_data(path, route, agg_time, agg_data)
_, reduced_time, reduced_data = aggregator.reduce_data(agg_time, agg_data, func, dt=10**level)
_, reduced_time, reduced_data = core.reduce_data(agg_time, agg_data, func, dt=10**level)
path = path_by_tag(rootdir, tag, tag_type, start, end, typ, level=level)
store_timeseries(path, route, reduced_time, reduced_data)
......@@ -453,7 +452,7 @@ def aggregate_data(path, route, agg_time, agg_data):
agg_time = numpy.concatenate((agg_time, time))
agg_data = numpy.concatenate((agg_data, data))
except IOError as ioerr:
aggregator.makedir(path)
core.makedir(path)
create_new_dataset(path, route)
pass
......@@ -473,20 +472,20 @@ def file_in_range(filename, start, end):
def path_by_tag(rootdir, tag, tag_type, start, end, typ, level=0):
if tag_type == 'ifo':
return "/".join([rootdir, common.gps_to_leaf_directory(start, level=level), typ])
return "/".join([rootdir, core.gps_to_leaf_directory(start, level=level), typ])
else:
return "/".join([rootdir, common.gps_to_leaf_directory(start, level=level), "by_{}".format(tag_type), tag, typ])
return "/".join([rootdir, core.gps_to_leaf_directory(start, level=level), "by_{}".format(tag_type), tag, typ])
def path_across_tags(rootdir, start, end, typ, level=0):
return "/".join([rootdir, common.gps_to_leaf_directory(start, level=level), typ])
return "/".join([rootdir, core.gps_to_leaf_directory(start, level=level), typ])
def get_partial_paths_to_aggregated_data(start, end, level=0):
for n in count():
this_time = start + n * (10**level * aggregator.MIN_TIME_QUANTA)
this_time = start + n * (10**level * core.MIN_TIME_QUANTA)
if this_time <= end:
yield common.gps_to_leaf_directory(this_time, level)
yield core.gps_to_leaf_directory(this_time, level)
else:
break
......@@ -495,7 +494,7 @@ def gps_to_sub_directories(gpstime, level, basedir):
"""!
return the entire relevant directory structure for a given GPS time
"""
root = os.path.join(basedir, common.gps_to_leaf_directory(gpstime, level))
root = os.path.join(basedir, core.gps_to_leaf_directory(gpstime, level))
out = []
for i in range(10):
path = os.path.join(root,str(i))
......@@ -510,12 +509,12 @@ def setup_dir_by_tag_and_level(gpstime, typ, tag, tag_type, route, rootdir, verb
appropriate data structure for storing the hierarchical data.
"""
str_time = str(gpstime).split(".")[0]
str_time = str_time[:(len(str_time)-int(numpy.log10(aggregator.MIN_TIME_QUANTA))-level)]
str_time = str_time[:(len(str_time)-int(numpy.log10(core.MIN_TIME_QUANTA))-level)]
if tag_type == 'ifo':
directory = "%s/%s/%s" % (rootdir, "/".join(str_time), typ)
else:
directory = "%s/%s/by_%s/%s/%s" % (rootdir, "/".join(str_time), tag_type, tag, typ)
aggregator.makedir(directory)
core.makedir(directory)
tmpfname, fname = create_new_dataset(directory, route)
......@@ -525,9 +524,9 @@ def setup_dir_across_tag_by_level(gpstime, typ, route, rootdir, verbose = True,
appropriate data structure for storing the hierarchical data.
"""
str_time = str(gpstime).split(".")[0]
str_time = str_time[:(len(str_time)-int(numpy.log10(aggregator.MIN_TIME_QUANTA))-level)]
str_time = str_time[:(len(str_time)-int(numpy.log10(core.MIN_TIME_QUANTA))-level)]
directory = "%s/%s/%s" % (rootdir, "/".join(str_time), typ)
aggregator.makedir(directory)
core.makedir(directory)
tmpfname, fname = create_new_dataset(directory, route)
......
......@@ -13,7 +13,6 @@ import shutil
from six.moves import urllib
from .. import aggregator
from .. import utils
......
......@@ -19,8 +19,7 @@ import numpy
import urllib3
import yaml
from .. import aggregator
from . import common
from . import core
from . import line_protocol
from .. import utils
......@@ -89,13 +88,47 @@ class Aggregator(object):
### track reduced data to process
self.span_processed = defaultdict(list)
self.last_reduce = aggregator.now()
self.last_reduce = utils.gps_now()
### set up structure to store schemas
self.schema = {}
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None, aggregate=None):
def load(self, path=None):
"""Loads schemas contained within a configuration file.
Parameters
----------
path : `str`
the path to the configuration file
"""
if not path:
if 'SCALDRC_PATH' in os.environ:
path = os.getenv('SCALDRC_PATH')
else:
raise KeyError('no configuration file found, please set your SCALDRC_PATH correctly using "export SCALDRC_PATH=path/to/config" or pass in path kwarg')
### load config
config = None
with open(path, 'r') as f:
config = yaml.safe_load(f)
### register schemas
for schema in config['schemas'].values():
if 'schema1' in schema:
if 'measurement' in schema:
measurement = schema['measurement']
subschemas = {key: subschema for key, subschema in schema.items() if 'schema' in key}
for subschema in subschemas.values():
if not 'measurement' in subschema:
subschema['measurement'] = measurement
self._load_schema(subschema)
else:
self._load_schema(schema)
def register_schema(self, measurement, columns, column_key, tags=None, tag_key=None, aggregate=None, **kwargs):
"""Defines a schema for a measurement.
Parameters
......@@ -187,7 +220,7 @@ class Aggregator(object):
"""
if aggregate:
aggfunc = aggregator.aggregate_to_func(aggregate)
aggfunc = core.aggregate_to_func(aggregate)
else:
aggfunc = None
......@@ -207,7 +240,7 @@ class Aggregator(object):
else: ### reduce to 1s by default before storing timeseries
column_to_reduce = [row['fields'][self.schema[measurement]['column_key']] for row in rows]