From b781a22654fbb828759f90cb620550cbb4957796 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Sun, 10 Feb 2019 16:05:26 -0800
Subject: [PATCH] gstlal_ll_inspiral_aggregator: use new aggregation classes
 from datamon to aggregate data, tidy up imports

---
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 77 +++++++------------
 1 file changed, 29 insertions(+), 48 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index f28367f8d7..afd54d5895 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 #
 # Copyright (C) 2016  Kipp Cannon, Chad Hanna
+# Copyright (C) 2019  Patrick Godwin
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the GNU General Public License as published by the
@@ -17,29 +18,19 @@
 # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
 
-import h5py
-import numpy
-import sys, os
-import itertools
 import argparse
-import lal
-from lal import LIGOTimeGPS
-import time
-from gi.repository import GLib
+import json
 import logging
-import subprocess
-import urllib2
-import shutil
-import collections
 from multiprocessing import Pool
-import json
+import sys, os
+import time
+import timeit
+
+import numpy
 
 from datamon import aggregator
 from datamon import io
 
-MIN_TIME_QUANTA = 10000
-DIRS = 6
-
 
 #
 # =============================================================================
@@ -93,59 +84,49 @@ if __name__ == '__main__':
 	options = parse_command_line()
 
 	# FIXME don't hardcode some of these?
-	datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
 	jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
 	routes = options.route
 
 	logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
 
 	pool = Pool(options.num_threads)
-	prevdataspan = set()
 
-	# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
-	if options.kafka_server is not None:
+	# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
+	if options.kafka_server:
 		from kafka import KafkaConsumer
-		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
+		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_aggregator' % routes[0], max_poll_interval_ms = 60000)
 	else:
 		consumer = None
 
-	# Instantiate influxDB connection if data backend is influx
+        # set up aggregator sink
 	if options.data_backend == 'influx':
-		influx_client = io.influx.create_client(options.influx_hostname, options.influx_port)
-	else:
-		influx_client = None
+		agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
+	else: ### hdf5 data backend
+		agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
 
 	# start an infinite loop to keep updating and aggregating data
 	while True:
-		logging.info("sleeping")
+		logging.info("sleeping for %.1f s" % options.dump_period)
 		time.sleep(options.dump_period)
-		dataspan = set()
 
-		if consumer is not None:
+		if consumer:
 			# this is not threadsafe!
-			logging.info("getting data from kafka")
-			timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes)
+			logging.info("retrieving data from kafka")
+			start = timeit.default_timer()
+			datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, max_records = 2 * len(jobs) * len(routes))
+			elapsed = timeit.default_timer() - start
+			logging.info("time to retrieve data: %.1f s" % elapsed)
 		else:
-			timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
+			logging.info("retrieving data from bottle routes")
+			datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
 
+		# store and reduce data for each job
+		start = timeit.default_timer()
 		for route in routes:
-			# First get the raw and reduced data for each job in parallel
-			if influx_client:
-				io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job')
-			else:
-				mapargs = [(options.base_dir, route, job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs]
-				for ds in pool.map(io.hdf5.reduce_by_tag, mapargs):
-					dataspan.update(ds)
-				prevdataspan = dataspan.copy()
-
-				# Then reduce the data across jobs at each level
-				mapargs = []
-				for start, end in zip(*aggregator.job_expanse(dataspan)):
-					mapargs = []
-					for route in routes:
-						mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
-					pool.map(io.hdf5.reduce_across_tags, mapargs)
-					logging.info("processed reduced data in [%d %d) at %d for route %s" % (int(start), int(end), int(aggregator.now()), route))
+			logging.info("storing and reducing timeseries for measurement: %s" % route)
+			agg_sink.store_and_reduce(route, datadata[route], 'data', tags='job', aggregates=options.data_type)
+		elapsed = timeit.default_timer() - start
+		logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
 
 	#
 	# always end on an error so that condor won't think we're done and will
-- 
GitLab