Commit ac079d34 authored by Patrick Godwin's avatar Patrick Godwin

aggregator.py: add option to process triggers, add check to ensure only one...

aggregator.py: add option to process triggers, add check to ensure only one schema is passed if triggers are aggregated
parent 36aa9bff
Pipeline #77087 passed with stages
in 2 minutes and 58 seconds
......@@ -68,6 +68,12 @@ def main(args=None):
schemas = args.schema
### sanity checking
assert args.data_type in ('timeseries', 'triggers'), '--data-type must be one of [timeseries|triggers]'
if args.data_type == 'triggers':
assert len(schemas) == 1, 'only one schema allowed if --data-type = triggers'
### load configuration
config = None
if args.config:
......@@ -115,6 +121,8 @@ def main(args=None):
if args.data_type == 'timeseries':
data = kafka.retrieve_timeseries(consumer, schemas, max_records = 2000)
elif args.data_type == 'triggers':
data = kafka.retrieve_triggers(consumer, schemas, route_name = schemas[0], max_records = 2000)
retrieve_elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % retrieve_elapsed)
......@@ -123,10 +131,15 @@ def main(args=None):
start = timeit.default_timer()
for schema in schemas:
logging.info("storing and reducing metrics for schema: %s" % schema)
aggregator.store_columns(schema, data[schema], aggregate=config['schemas'][schema]['aggregate'])
if args.data_type == 'timeseries':
aggregator.store_columns(schema, data[schema], aggregate=config['schemas'][schema]['aggregate'])
elif args.data_type == 'triggers':
far_key = config['schemas'][schema]['far_key']
time_key = config['schemas'][schema]['time_key']
aggregator.store_triggers(schema, [trg for trg in triggers if far_key in trg], far_key = far_key, time_key = time_key)
store_elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % store_elapsed)
logging.info("time to store/reduce %s: %.1f s" % (args.data_type, store_elapsed))
time.sleep(max(args.processing_cadence - store_elapsed - retrieve_elapsed, args.processing_cadence))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment