Commit 6811064d authored by Patrick Godwin's avatar Patrick Godwin
Browse files

aggregate.py: use kafka.Client, move timeseries/trigger parsing utils here

parent c14dfe1b
Pipeline #133516 passed with stages
in 2 minutes and 18 seconds
......@@ -6,8 +6,7 @@ __description__ = "utilities to aggregate and store incoming metrics"
#-------------------------------------------------
### imports
import collections
import json
from collections import defaultdict
import logging
import os
import sys
......@@ -17,21 +16,46 @@ import timeit
import numpy
import yaml
from kafka import KafkaConsumer
from . import utils
from .io import influx, kafka
#-------------------------------------------------
### logging config
### aggregator utilities
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
def parse_timeseries(messages, topics):
"""Parses timeseries-based Kafka messages into a format for storing into influx.
"""
data = {t: defaultdict(lambda: {'time': [], 'fields': {'data': []}}) for t in topics}
# retrieve timeseries for all routes and topics
for message in messages:
try:
job = message.key
route = message.topic
data[route][job]['time'].extend(message.value['time'])
data[route][job]['fields']['data'].extend(message.value['data'])
except KeyError: ### no route in message
pass
# convert series to numpy arrays
for route in topics:
for job in data[route].keys():
data[route][job]['time'] = numpy.array(data[route][job]['time'])
data[route][job]['fields']['data'] = numpy.array(data[route][job]['fields']['data'])
return data
def parse_triggers(messages):
"""Parses trigger-based Kafka messages into a format for storing into influx.
"""
triggers = []
for message in messages:
triggers.extend(message.value)
return triggers
#-------------------------------------------------
### aggregator utilities
def _add_parser_args(parser):
parser.add_argument('-c', '--config',
......@@ -40,18 +64,17 @@ def _add_parser_args(parser):
help="chooses data backend to use from config. default = 'default'.")
parser.add_argument('-d', '--data-type', default='timeseries',
help = "Sets the data type of metrics expected from [timeseries|triggers]. default = timeseries.")
parser.add_argument('-n', '--hostname', default='localhost',
help="specify Kafka hostname to read metrics from. default = localhost.")
parser.add_argument('-p', '--port', type=int, default=8086,
help="specify Kafka port to read metrics from. default = 8086")
parser.add_argument('-s', '--schema', action='append',
help="Specify schema to use for aggregation. Can be given multiple times.")
parser.add_argument('-t', '--tag', default='generic',
parser.add_argument('-u', '--uri', default='kafka://localhost:9092',
help="specify Kafka URI to read metrics from. default = kafka://localhost:9092.")
parser.add_argument('-t', '--topic', action='append',
help="Specify topic to aggregate from. Can be given multiple times.")
parser.add_argument('--tag', default='generic',
help = "Specify a tag for this aggregator job. default = 'generic'.")
parser.add_argument('--across-jobs', action = 'store_true',
help = "If set, aggregate data across jobs as well.")
parser.add_argument('--processing-cadence', default = 0.5,
help = "Rate at which the aggregator acquires and processes data. default = 0.5 seconds.")
parser.add_argument('-v', '--verbose', action = 'store_true', help = 'Be verbose.')
#-------------------------------------------------
......@@ -66,15 +89,21 @@ def main(args=None):
_parser_add_arguments(parser)
args = parser.parse_args()
schemas = args.schema
topics = args.topic
# set up logging
log_level = logging.INFO if args.verbose else logging.WARNING
logging.basicConfig(format='%(asctime)s | %(name)s : %(levelname)s : %(message)s')
logger = logging.getLogger('scald-aggregate')
logger.setLevel(log_level)
### sanity checking
# sanity checking
assert args.data_type in ('timeseries', 'triggers'), '--data-type must be one of [timeseries|triggers]'
if args.data_type == 'triggers':
assert len(schemas) == 1, 'only one schema allowed if --data-type = triggers'
assert len(topics) == 1, 'only one topic allowed if --data-type = triggers'
### load configuration
# load configuration
config = None
if args.config:
config_path = args.config
......@@ -86,19 +115,8 @@ def main(args=None):
config = yaml.safe_load(f)
# instantiate a consumer to subscribe to all of our topics, i.e., jobs
consumer = KafkaConsumer(
*schemas,
bootstrap_servers=[':'.join([args.hostname, str(args.port)])],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='aggregator_{}_{}'.format(args.tag, args.schema[0]),
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
client = kafka.Client(args.uri)
client.subscribe(topics)
# set up aggregator
aggregator_settings = config['backends'][args.backend]
......@@ -110,21 +128,22 @@ def main(args=None):
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("retrieving data from kafka")
logger.info("retrieving data from kafka")
start = timeit.default_timer()
msgs = [msg for msg in client.query(max_messages=2000)]
if args.data_type == 'timeseries':
data = kafka.retrieve_timeseries(consumer, schemas, max_records = 2000)
data = parse_timeseries(msgs, topics)
elif args.data_type == 'triggers':
data = kafka.retrieve_triggers(consumer, schemas, route_name = schemas[0], max_records = 2000)
data = parse_triggers(msgs)
retrieve_elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % retrieve_elapsed)
logger.info("time to retrieve data: %.1f s" % retrieve_elapsed)
# store and reduce data for each job
start = timeit.default_timer()
for schema in schemas:
logging.info("storing and reducing metrics for schema: %s" % schema)
for schema in topics:
logger.info("storing and reducing metrics for schema: %s" % schema)
if args.data_type == 'timeseries':
aggregator.store_columns(schema, data[schema], aggregate=config['schemas'][schema]['aggregate'])
elif args.data_type == 'triggers':
......@@ -133,14 +152,9 @@ def main(args=None):
aggregator.store_triggers(schema, [trg for trg in data if far_key in trg], far_key = far_key, time_key = time_key)
store_elapsed = timeit.default_timer() - start
logging.info("time to store/reduce %s: %.1f s" % (args.data_type, store_elapsed))
logger.info("time to store/reduce %s: %.1f s" % (args.data_type, store_elapsed))
time.sleep(max(args.processing_cadence - store_elapsed - retrieve_elapsed, args.processing_cadence))
# close connection to consumer if using kafka
if consumer:
consumer.close()
# always end on an error so that condor won't think we're done and will
# restart us
sys.exit(1)
# close client connection
client.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment