From c7dc83f9a8bf330041a0eb3bea9cf757bd2431bb Mon Sep 17 00:00:00 2001
From: Chad Hanna <crh184@psu.edu>
Date: Sat, 1 Oct 2016 13:33:58 -0700
Subject: [PATCH] gstlal_ll_inspiral_aggregator: major refactoring

---
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 329 ++++++++----------
 1 file changed, 148 insertions(+), 181 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index faec11fa00..484f5fb245 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -25,18 +25,16 @@ import argparse
 import lal
 from lal import LIGOTimeGPS
 import time
-from gstlal import servicediscovery
 from gi.repository import GLib
 import logging
 import subprocess
-import threading
-from gi.repository import GLib
-from gstlal import servicediscovery
 import urllib2
 import shutil
 import collections
+from multiprocessing import Pool
 
-MIN_TIME_QUANTA = 1000
+MIN_TIME_QUANTA = 10000
+DIRS = 6
 
 
 #
@@ -58,6 +56,7 @@ def parse_command_line():
 	parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
 	parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
 	parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
+	parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently")
 
 	args = parser.parse_args()
 
@@ -93,15 +92,16 @@ def get_url(url,d):
 	specific route.  FIXME it assumes that the routes end in .txt
 	"""
 	jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n")
-	jobtime = [float(x.split()[0]) for x in jobdata if x]
-	jobdata = [float(x.split()[1]) for x in jobdata if x]
+	jobtime = numpy.array([float(x.split()[0]) for x in jobdata if x])
+	jobdata = numpy.array([float(x.split()[1]) for x in jobdata if x])
 	assert len(jobdata) == len(jobtime)
 	return jobtime, jobdata
 
 
 def reduce_data(xarr, yarr, func, level = 0):
 	"""!
-	This function does a data reduction by powers of 10
+	This function does a data reduction by powers of 10 where level
+	specifies the power.  Default is 0 e.g., data reduction over 1 second
 	"""
 	datadict = collections.OrderedDict()
 	assert len(yarr) == len(xarr)
@@ -165,10 +165,14 @@ def get_dataset(path, base):
 	open a dataset at @param path with name @param base and return the data
 	"""
 	fname = os.path.join(path, "%s.hdf5" % base)
-	f = h5py.File(fname, "r")
-	x,y = list(f["time"]), list(f["data"])
-	f.close()
-	return fname, x,y
+	try:
+		f = h5py.File(fname, "r")
+		x,y = list(f["time"]), list(f["data"])
+		f.close()
+		return fname, x,y
+	except IOError:
+		fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False)
+		return fname, [], []
 
 
 def gps_to_minimum_time_quanta(gpstime):
@@ -199,61 +203,60 @@ def gps_to_sub_directories(gpstime, level, basedir):
 	return out
 
 
-def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
+def setup_dir_by_job_and_level(gpstime, typ, job, route, base_dir, verbose = True, level = 0):
 	"""!
 	Given a gps time, the number of jobs and data types produce an
 	appropriate data structure for storing the hierarchical data.
 	"""
 	str_time = str(gpstime).split(".")[0]
-	digits = [int(x) for x in str_time]
-	directories = [numpy.array([digits[x]]) for x in range(7)]
-
-	# Setup the directory structure and put in empty files
-	for dirs in [directories[:i+1] for i in range(len(directories))]:
-		for path in itertools.product(*dirs):
-			cur_dir = os.path.join(base_dir, "/".join(str(x) for x in path))
-			makedir(cur_dir)
-			for (typ,_) in types:
-				type_dir = os.path.join(cur_dir, typ)
-				makedir(type_dir)
-				for d in data:
-					create_new_dataset(type_dir, d)
-			type_dir = os.path.join(cur_dir, "by_job")
-			for b in jobs:
-				bin_dir = os.path.join(type_dir, b)
-				for (typ,_) in types:
-					type_bin_dir = os.path.join(bin_dir, typ)
-					makedir(type_bin_dir)
-					for d in data:
-						create_new_dataset(type_bin_dir, d)
-
-
-def gps_range(jobtime, dataspan):
+	str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)]
+	directory = "%s/%s/by_job/%s/%s" % (base_dir, "/".join(str_time), job, typ) 
+	makedir(directory)
+	fname = create_new_dataset(directory, route)
+
+
+def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True, level = 0):
+	"""!
+	Given a gps time, the number of jobs and data types produce an
+	appropriate data structure for storing the hierarchical data.
+	"""
+	str_time = str(gpstime).split(".")[0]
+	str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)]
+	directory = "%s/%s/%s" % (base_dir, "/".join(str_time), typ) 
+	makedir(directory)
+	fname = create_new_dataset(directory, route)
+
+
+def gps_range(jobtime):
 	gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
 	min_t, max_t = min(gpsblocks), max(gpsblocks)
-	for gpstime in gpsblocks:
-		dataspan.add(gpstime)
 	return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
 
 
-def update_lowest_level_data(job, path, d, s, e, typ, self, jobtime, jobdata, func):
+def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, base_dir, jobtime, jobdata, func):
+	path = "/".join([base_dir, gps_to_leaf_directory(start), "by_job", job, typ])
 	try:
-		fname, prev_times, prev_data = get_dataset(path, d)
+		fname, prev_times, prev_data = get_dataset(path, route)
 	except:
-		setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
-		fname, prev_times, prev_data = get_dataset(path, d)
-	# only get new data
+		setup_dir_by_job_and_level(start, typ, job, route, base_dir, verbose = True, level = 0)
+		fname, prev_times, prev_data = get_dataset(path, route)
+	# only get new data and assume that everything is time ordered
 	if prev_times:
-		this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
+		this_time_ix = numpy.logical_and(jobtime > max(start-1e-16, prev_times[-1]), jobtime < end)
 	else:
-		this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
-	this_time = [jobtime[i] for i in this_time_ix] + prev_times
-	this_data = [jobdata[i] for i in this_time_ix] + prev_data
+		this_time_ix = numpy.logical_and(jobtime >= start, jobtime < end)
+	this_time = list(jobtime[this_time_ix]) + prev_times
+	this_data = list(jobdata[this_time_ix]) + prev_data
+	# shortcut if there are no updates
+	if len(this_time) == len(prev_times) and len(this_data) == len(prev_data):
+		print "shortcutting"
+		return []
 	reduced_time, reduced_data = reduce_data(this_time, this_data, func, level = 0)
-	logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time)))
-	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	#logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, route, start, end, typ, len(reduced_time)))
+	tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
 	# copy the tmp file over the original
 	shutil.move(tmpfname, fname)
+	return [start, end]
 
 
 def job_expanse(dataspan):
@@ -263,136 +266,87 @@ def job_expanse(dataspan):
 	else:
 		return [], []
 
-def reduce_data_from_lower_level(level, self, this_level_dir, job, typ, d, func, s, e):
+def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ, route, func, start, end):
+
+	this_level_dir = "/".join([base_dir, gps_to_leaf_directory(start, level = level)])
 
 	agg_data = []
 	agg_time = []
-	for subdir in gps_to_sub_directories(s, level, self.base_dir):
+
+	# FIXME iterate over levels instead.
+	for subdir in gps_to_sub_directories(start, level, base_dir):
 		path = "/".join([this_level_dir, subdir, "by_job", job, typ])
 		try:
-			fname, x, y = get_dataset(path, d)
+			fname, x, y = get_dataset(path, route)
 			agg_time += x
 			agg_data += y
 		except IOError as ioerr:
-			logging.error(ioerr)
+			makedir(path)
+			# make an empty data set
+			create_new_dataset(path, route)
 			pass
 	reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level)
 	path = "/".join([this_level_dir, "by_job", job, typ])
-	logging.info("processing reduced data %s for job %s  in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time)))
-	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	makedir(path)
+	#logging.info("processing reduced data %s for job %s  in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time)))
+	tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
 	# FIXME don't assume we can get the non temp file name this way
 	shutil.move(tmpfname, tmpfname.replace(".tmp",""))
 
 
-def reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e):
+def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)):
 	# Process this level
 	agg_data = []
 	agg_time = []
-	for job  in sorted(self.urls):
+	for job  in sorted(jobs):
 		path = "/".join([this_level_dir, "by_job", job, typ])
 		try:
-			fname, x, y = get_dataset(path, d)
+			fname, x, y = get_dataset(path, route)
 			agg_time += x
 			agg_data += y
 		except IOError as ioerr:
-			logging.error(ioerr)
+			makedir(path)
+			create_new_dataset(path, route)
 			pass
 	reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level)
-	logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time)))
+	#logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (route, start, end, typ, level, len(reduced_time), len(agg_time)))
 	path = "/".join([this_level_dir, typ])
-	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
 	# FIXME don't assume we can get the non temp file name this way
 	shutil.move(tmpfname, tmpfname.replace(".tmp",""))
-#
-# =============================================================================
-#
-#                               Internal Library
-#
-# =============================================================================
-#
 
 
-class Collector(servicediscovery.Listener):
-	def __init__(self, mainloop, datatypes, jobs, dataurls, base_dir, job_tag = None, dump_period = 180.):
-		self.datatypes = datatypes
-		self.jobs = set(jobs)
-		self.dataurls = dataurls
-		self.base_dir = base_dir
-		self.job_tag = job_tag
-		self.dump_period = dump_period
-		self.urls = {}
-		self.lock = threading.Lock()
-		# FIXME:  use glib's mainloop machinery instead, implement
-		# this as a timeout or whatever it's called
-		logging.info("starting wget loop thread")
-		self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,))
-		self.wget_thread.start()
-
-	def add_service(self, sname, stype, sdomain, host, port, properties):
-		if stype != "_http._tcp" or not sname.startswith("gstlal_inspiral "):
-			return
-		url = "http://%s:%s/" % (host, port)
-		logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag")))
-		if self.job_tag is not None and properties.get("job_tag") != self.job_tag:
-			logging.info("server has wrong or missing job tag, discarding")
-			return
-		if not properties.get("GSTLAL_LL_JOB"):
-			logging.info("server has no GSTLAL_LL_JOB value, discarding")
-			return
-		if properties.get("GSTLAL_LL_JOB") not in self.jobs:
-			logging.info("server has a GSTLAL_LL_JOB value outside of requested range, discarding")
-			return
-		# watch for security problems:  don't let url or job ID
-		# terminate the wget shell command in mid-string
-		if ";" in url or ";" in properties["GSTLAL_LL_JOB"]:
-			logging.warn("invalid URL and/or job ID")
-			return
-		logging.info("recording server at %s for GSTLAL_LL_JOB='%s'" % (url, properties["GSTLAL_LL_JOB"]))
-		with self.lock:
-			self.urls[properties["GSTLAL_LL_JOB"]] = url
-
-	def wget_loop(self, mainloop):
-		try:
-			while self.dump_period:
-				logging.info("sleeping")
-				time.sleep(self.dump_period)
-				with self.lock:
-					dataspan = set()
-					for job, url in sorted(self.urls.items()):
-						assert job
-						# FIXME Hack
-						url = url.replace(".local","")
-						for d in self.dataurls:
-							jobtime, jobdata = get_url(url,d)
-							gps1, gps2 = gps_range(jobtime, dataspan)
-							for s,e in zip(gps1, gps2):
-								for (typ, func) in self.datatypes:
-									path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
-									update_lowest_level_data(job, path, d, s, e, typ, self, jobtime, jobdata, func)
-
-					# Data reduction across jobs at the lowest level
-					gps1, gps2 = job_expanse(dataspan)
-					for s,e in zip(gps1, gps2):
-						# FIXME don't hardcode this range
-						for level in range(7):
-							this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
-							for d in self.dataurls:
-								for (typ,func) in self.datatypes:
-									# Produce the data at this level by descending a level.
-									if level > 0:
-										for job in sorted(self.urls):
-											reduce_data_from_lower_level(level, self, this_level_dir, job, typ, d, func, s, e)
-									# reduce data across jobs at this level
-									reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e)
-		except:
-			mainloop.quit()
-			raise
-
-	def quit(self):
-		logging.info("waiting for wget loop to finish ...")
-		self.dump_period = 0	# signal exit
-		self.wget_thread.join()
-
+def get_data_from_job_and_reduce((job, routes, datatypes, prevdataspan, base_dir, jobs)):
+	# get the url
+	with open(os.path.join(options.job_tag, "%s_registry.txt" % job)) as f:
+		url = f.readline().strip()
+	update_time = []
+	reduce_time = []
+	dataspan = set()
+	for route in routes:
+		logging.info("processing job %s for route %s" % (job, route))
+		jobtime, jobdata = get_url(url, route)
+		gps1, gps2 = gps_range(jobtime)
+		for start, end in zip(gps1, gps2):
+			# shortcut to not reprocess data that has already been
+			# processed.  Dataspan was the only thing that was
+			# previously determined to be needing to be updated
+			# anything before that is pointless
+			if prevdataspan: print end, min(prevdataspan)
+			if prevdataspan and end < min(prevdataspan):
+				continue
+			for (typ, func) in datatypes:
+				now = time.time()
+				for processed_time in update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, base_dir, jobtime, jobdata, func):
+					dataspan.add(processed_time)
+				update_time.append(time.time()-now)
+				# descend down through the levels
+				now = time.time()
+				for level in range(1,DIRS):
+					reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ, route, func, start, end)
+				reduce_time.append(time.time()-now)
+	print "Updated %d with average time %f; Reduced %d with average time %f" % (len(update_time), numpy.mean(update_time), len(reduce_time), numpy.mean(reduce_time))
+	return dataspan
 
 #
 # =============================================================================
@@ -402,34 +356,47 @@ class Collector(servicediscovery.Listener):
 # =============================================================================
 #
 
-
-options = parse_command_line()
-
-# FIXME don't hardcode some of these?
-datatypes = [("min", min), ("max", max), ("median", median)]
-jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
-dataurls = ["latency_history", "snr_history"]
-
-
-logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
-
-
-mainloop = GLib.MainLoop()
-
-collector = Collector(mainloop, datatypes, jobs, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period)
-browser = servicediscovery.ServiceBrowser(collector)
-
-try:
-	mainloop.run()
-except:
-	collector.quit()
-	raise
-
-
-#
-# always end on an error so that condor won't think we're done and will
-# restart us
-#
-
-
-sys.exit(1)
+if __name__ == '__main__':
+
+	options = parse_command_line()
+	# FIXME don't hardcode some of these?
+	datatypes = [("min", min), ("max", max), ("median", median)]
+	jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
+	routes = ["latency_history", "snr_history"]
+
+	logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
+
+	pool = Pool(options.num_threads)
+	prevdataspan = set()
+	while True:
+		logging.info("sleeping")
+		time.sleep(options.dump_period)
+		dataspan = set()
+
+		# First get the raw and reduced data for each job in parallel
+		mapargs = [(job, routes, datatypes, prevdataspan, options.base_dir, jobs) for job in jobs]
+		for ds in pool.map(get_data_from_job_and_reduce, mapargs):
+		#for ds in map(get_data_from_job_and_reduce, mapargs):
+			dataspan.update(ds)
+		prevdataspan = dataspan.copy()
+		# Then reduce the data across jobs at each level
+		mapargs = []
+		for start, end in zip(*job_expanse(dataspan)):
+			# FIXME don't hardcode this range
+			for level in range(DIRS):
+				this_level_dir = "/".join([options.base_dir, gps_to_leaf_directory(start, level = level)])
+				logging.info("processing reduced data in span [%d,%d] at level %d" % (start, end, level))
+				mapargs = []
+				for route in routes:
+					for (typ,func) in datatypes:
+						setup_dir_across_job_by_level(start, typ, route, options.base_dir, verbose = True, level = level)
+						mapargs.append((jobs, this_level_dir, typ, route, func, level, start, end))
+				pool.map(reduce_across_jobs, mapargs)
+				#map(reduce_across_jobs, mapargs)
+
+	#
+	# always end on an error so that condor won't think we're done and will
+	# restart us
+	#
+
+	sys.exit(1)
-- 
GitLab