From be5cb4e316be65f10279edc81b3b8488dd7791f2 Mon Sep 17 00:00:00 2001
From: Chad Hanna <crh184@psu.edu>
Date: Fri, 2 Sep 2016 08:51:26 -0400
Subject: [PATCH] gstlal_ll_inspiral_aggregator: refactor various bits

---
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 196 +++++++++++-------
 1 file changed, 124 insertions(+), 72 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index 5fe3f625b3..bc5d10c86c 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -55,11 +55,8 @@ def parse_command_line():
 
 	# directory to put everything in
 	parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
-
 	parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
-
 	parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
-
 	parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
 
 	args = parser.parse_args()
@@ -77,14 +74,24 @@ def parse_command_line():
 
 
 def median(l):
+	"""!
+	Return the median of a list on nearest value
+	"""
 	return sorted(l)[len(l)//2]
 
 
 def now():
+	"""!
+	A convenience function to return the current gps time
+	"""
 	return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
 
 
 def get_url(url,d):
+	"""!
+	A function to pull data from @param url where @param d specifies a
+	specific route.  FIXME it assumes that the routes end in .txt
+	"""
 	jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n")
 	jobtime = [float(x.split()[0]) for x in jobdata if x]
 	jobdata = [float(x.split()[1]) for x in jobdata if x]
@@ -92,11 +99,14 @@ def get_url(url,d):
 	return jobtime, jobdata
 
 
-def quantize(xarr, yarr, func, level = 0):
+def reduce(xarr, yarr, func, level = 0):
+	"""!
+	This function does a data reduction by powers of 10
+	"""
 	datadict = collections.OrderedDict()
 	assert len(yarr) == len(xarr)
 	for x,y in zip(xarr, yarr):
-		# quantize to this level
+		# reduce to this level
 		key = int(x) // (10**level)
 		# we want to sort on y not x
 		datadict.setdefault(key, []).append((y,x))
@@ -109,6 +119,9 @@ def quantize(xarr, yarr, func, level = 0):
 
 
 def makedir(path):
+	"""!
+	A convenience function to make new directories and trap errors
+	"""
 	try:
 		os.makedirs(path)
 	except IOError:
@@ -118,6 +131,11 @@ def makedir(path):
 
 
 def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
+	"""!
+	A function to create a new dataset with time @param timedata and data
+	@param data.  The data will be stored in an hdf5 file at path @param path with
+	base name @param base.  You can also make a temporary file.
+	"""
 	if tmp:
 		fname = os.path.join(path, "%s.hdf5.tmp" % base)
 	else:
@@ -142,6 +160,9 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
 
 
 def get_dataset(path, base):
+	"""!
+	open a dataset at @param path with name @param base and return the data
+	"""
 	fname = os.path.join(path, "%s.hdf5" % base)
 	f = h5py.File(fname, "r")
 	x,y = list(f["time"]), list(f["data"])
@@ -150,14 +171,24 @@ def get_dataset(path, base):
 
 
 def gps_to_minimum_time_quanta(gpstime):
+	"""!
+	given a gps time return the minimum time quanta, e.g., 123456789 ->
+	123456000.
+	"""
 	return int(gpstime) // MIN_TIME_QUANTA * MIN_TIME_QUANTA
 
 
 def gps_to_leaf_directory(gpstime, level = 0):
+	"""!
+	get the leaf directory for a given gps time
+	"""
 	return "/".join(str(gps_to_minimum_time_quanta(gpstime) // MIN_TIME_QUANTA // (10**level)))
 
 
 def gps_to_sub_directories(gpstime, level, basedir):
+	"""!
+	return the entire relevant directory structure for a given GPS time
+	"""
 	root = os.path.join(basedir, gps_to_leaf_directory(gpstime, level))
 	out = []
 	for i in range(10):
@@ -168,6 +199,10 @@ def gps_to_sub_directories(gpstime, level, basedir):
 
 
 def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
+	"""!
+	Given a gps time, the number of jobs and data types produce an
+	appropriate data structure for storing the hierarchical data.
+	"""
 	str_time = str(gpstime).split(".")[0]
 	digits = [int(x) for x in str_time]
 	directories = [numpy.array([digits[x]]) for x in range(7)]
@@ -192,6 +227,81 @@ def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
 						create_new_dataset(type_bin_dir, d)
 
 
+def gps_range(jobtime, dataspan):
+	gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
+	min_t, max_t = min(gpsblocks), max(gpsblocks)
+	for gpstime in gpsblocks:
+		dataspan.add(gpstime)
+	return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
+
+
+def update_lowest_level_data(path, d, s, e, typ, self, jobtime, func):
+	try:
+		fname, prev_times, prev_data = get_dataset(path, d)
+	except:
+		setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
+		fname, prev_times, prev_data = get_dataset(path, d)
+	# only get new data
+	if prev_times:
+		this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
+	else:
+		this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
+	this_time = [jobtime[i] for i in this_time_ix] + prev_times
+	this_data = [jobdata[i] for i in this_time_ix] + prev_data
+	reduced_time, reduced_data = reduce(this_time, this_data, func, level = 0)
+	logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time)))
+	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	# copy the tmp file over the original
+	shutil.move(tmpfname, fname)
+
+
+def job_expanse(dataspan):
+	if dataspan:
+		min_t, max_t = min(dataspan), max(dataspan)
+		return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
+	else:
+		return [], []
+
+def reduce_data_from_lower_level(s, level, self, this_level_dir, job, typ, d, func, s, e):
+
+	agg_data = []
+	agg_time = []
+	for subdir in gps_to_sub_directories(s, level, self.base_dir):
+		path = "/".join([this_level_dir, subdir, "by_job", job, typ])
+		try:
+			fname, x, y = get_dataset(path, d)
+			agg_time += x
+			agg_data += y
+		except IOError as ioerr:
+			logging.error(ioerr)
+			pass
+	reduced_time, reduced_data = reduce(agg_time, agg_data, func, level=level)
+	path = "/".join([this_level_dir, "by_job", job, typ])
+	loggging.info("processing reduced data %s for job %s  in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time)))
+	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	# FIXME don't assume we can get the non temp file name this way
+	shutil.move(tmpfname, tmpfname.replace(".tmp",""))
+
+
+def reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e):
+	# Process this level
+	agg_data = []
+	agg_time = []
+	for job  in sorted(self.urls):
+		path = "/".join([this_level_dir, "by_job", job, typ])
+		try:
+			fname, x, y = get_dataset(path, d)
+			agg_time += x
+			agg_data += y
+		except IOError as ioerr:
+			logging.error(ioerr)
+			pass
+	reduced_time, reduced_data = reduce(agg_time, agg_data, func, level=level)
+	loggin.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time)))
+	path = "/".join([this_level_dir, typ])
+	tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+	# FIXME don't assume we can get the non temp file name this way
+	shutil.move(tmpfname, tmpfname.replace(".tmp",""))
 #
 # =============================================================================
 #
@@ -213,7 +323,7 @@ class Collector(servicediscovery.Listener):
 		self.lock = threading.Lock()
 		# FIXME:  use glib's mainloop machinery instead, implement
 		# this as a timeout or whatever it's called
-		logging.info("starting wget loop thread thread")
+		logging.info("starting wget loop thread")
 		self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,))
 		self.wget_thread.start()
 
@@ -253,84 +363,26 @@ class Collector(servicediscovery.Listener):
 						url = url.replace(".local","")
 						for d in self.dataurls:
 							jobtime, jobdata = get_url(url,d)
-							gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
-							min_t, max_t = min(gpsblocks), max(gpsblocks)
-							for gpstime in gpsblocks:
-								dataspan.add(gpstime)
-							gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
+							gps1, gps2 = gps_range(jobtime, dataspan)
 							for s,e in zip(gps1, gps2):
 								for (typ, func) in self.datatypes:
 									path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
-									try:
-										fname, prev_times, prev_data = get_dataset(path, d)
-									except:
-										setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
-										fname, prev_times, prev_data = get_dataset(path, d)
-									# only get new data
-									if prev_times:
-										this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
-									else:
-										this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
-									this_time = [jobtime[i] for i in this_time_ix] + prev_times
-									this_data = [jobdata[i] for i in this_time_ix] + prev_data
-									reduced_time, reduced_data = quantize(this_time, this_data, func, level = 0)
-									print "processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time))
-									tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
-									# copy the tmp file over the original
-									shutil.move(tmpfname, fname)
+									update_lowest_level_data(path, d, s, e, typ, self, jobtime, func)
 
 					# Data reduction across jobs at the lowest level
-					if dataspan:
-						min_t, max_t = min(dataspan), max(dataspan)
-						gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
-					else:
-						gps1, gps2 = [], []
+					gps1, gps2 = job_expanse(dataspan)
 					for s,e in zip(gps1, gps2):
 						# FIXME don't hardcode this range
-						for level in range(1):
+						for level in range(7):
 							this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
 							for d in self.dataurls:
 								for (typ,func) in self.datatypes:
-									# descend down a level if level > 0
+									# Produce the data at this level by descending a level.
 									if level > 0:
 										for job in sorted(self.urls):
-											agg_data = []
-											agg_time = []
-											for subdir in gps_to_sub_directories(s, level, self.base_dir):
-												path = "/".join([this_level_dir, subdir, "by_job", job, typ])
-												try:
-													fname, x, y = get_dataset(path, d)
-													agg_time += x
-													agg_data += y
-												except IOError as ioerr:
-													print >> sys.stderr, ioerr
-													pass
-											reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
-											path = "/".join([this_level_dir, "by_job", job, typ])
-											print "processing reduced data %s for job %s  in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))
-											tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
-											# FIXME don't assume we can get the non temp file name this way
-											shutil.move(tmpfname, tmpfname.replace(".tmp",""))
-									# Process this level
-									agg_data = []
-									agg_time = []
-									for job  in sorted(self.urls):
-										path = "/".join([this_level_dir, "by_job", job, typ])
-										#print "processing %s %s data reduction: level %d type %s" % (job, d, level, typ)
-										try:
-											fname, x, y = get_dataset(path, d)
-											agg_time += x
-											agg_data += y
-										except IOError as e:
-											print >> sys.stderr, e
-											pass
-									reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
-									print "processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time))
-									path = "/".join([this_level_dir, typ])
-									tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
-									# FIXME don't assume we can get the non temp file name this way
-									shutil.move(tmpfname, tmpfname.replace(".tmp",""))
-
+											reduce_data_from_lower_level(s, level, self, this_level_dir, job, typ, d, func, s, e)
+									# reduce data across jobs at this level
+									reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e)
 		except:
 			mainloop.quit()
 			raise
-- 
GitLab