From 74d16052829a952866d429b3cf0e7227e56444b4 Mon Sep 17 00:00:00 2001
From: Chad Hanna <crh184@psu.edu>
Date: Sun, 21 Aug 2016 14:28:21 -0400
Subject: [PATCH] gstlal_ll_inspiral_aggregator: fix whitespace

---
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 110 +++++++++++-------
 1 file changed, 68 insertions(+), 42 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index bd48ed8e87..5fe3f625b3 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -34,7 +34,7 @@ from gi.repository import GLib
 from gstlal import servicediscovery
 import urllib2
 import shutil
-
+import collections
 
 MIN_TIME_QUANTA = 1000
 
@@ -58,8 +58,8 @@ def parse_command_line():
 
 	parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
 
-	parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
-	
+	parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
+
 	parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
 
 	args = parser.parse_args()
@@ -85,22 +85,25 @@ def now():
 
 
 def get_url(url,d):
-	jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("/n")
-	jobtime = [float(x.split()[0]) for x in jobdata]
-	jobdata = [float(x.split()[1]) for x in jobdata]
+	jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n")
+	jobtime = [float(x.split()[0]) for x in jobdata if x]
+	jobdata = [float(x.split()[1]) for x in jobdata if x]
+	assert len(jobdata) == len(jobtime)
 	return jobtime, jobdata
 
 
 def quantize(xarr, yarr, func, level = 0):
-	datadict = {}
+	datadict = collections.OrderedDict()
+	assert len(yarr) == len(xarr)
 	for x,y in zip(xarr, yarr):
 		# quantize to this level
-		key = int(x) // (10**(level+1))
+		key = int(x) // (10**level)
 		# we want to sort on y not x
 		datadict.setdefault(key, []).append((y,x))
 	reduced_data = [func(value) for value in datadict.values()]
 	reduced_time = [x[1] for x in reduced_data]
 	reduced_data = [x[0] for x in reduced_data]
+	assert len(reduced_data) == len(reduced_time)
 
 	return reduced_time, reduced_data
 
@@ -118,20 +121,22 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
 	if tmp:
 		fname = os.path.join(path, "%s.hdf5.tmp" % base)
 	else:
+		# A non tmp dataset should not be overwritten
 		fname = os.path.join(path, "%s.hdf5" % base)
-	if os.path.exists(fname):
-		return
+		if os.path.exists(fname):
+			return fname
 	f = h5py.File(fname, "w")
 	if timedata is None and data is None:
-		f.create_dataset("time", (0,), dtype="f64")
-		f.create_dataset("data", (0,), dtype="f64")
+		f.create_dataset("time", (0,), dtype="f8")
+		f.create_dataset("data", (0,), dtype="f8")
 	else:
-		assert len(timedata) == len(data)
-		f.create_dataset("time", (len(timedata),), dtype="f64")
-		f.create_dataset("data", (len(data),), dtype="f64")
+		if len(timedata) != len(data):
+			raise ValueError("time data %d data %d" % (len(timedata), len(data)))
+		f.create_dataset("time", (len(timedata),), dtype="f8")
+		f.create_dataset("data", (len(data),), dtype="f8")
 		f["time"][...] = timedata
 		f["data"][...] = data
-		
+
 	f.close()
 	return fname
 
@@ -139,7 +144,7 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
 def get_dataset(path, base):
 	fname = os.path.join(path, "%s.hdf5" % base)
 	f = h5py.File(fname, "r")
-	x,y = list(f["time"]), list(f["data"])	
+	x,y = list(f["time"]), list(f["data"])
 	f.close()
 	return fname, x,y
 
@@ -152,11 +157,14 @@ def gps_to_leaf_directory(gpstime, level = 0):
 	return "/".join(str(gps_to_minimum_time_quanta(gpstime) // MIN_TIME_QUANTA // (10**level)))
 
 
-def gps_to_sub_directories(gpstime, level):
-	root = gps_to_leaf_directory(gpstime, level)
+def gps_to_sub_directories(gpstime, level, basedir):
+	root = os.path.join(basedir, gps_to_leaf_directory(gpstime, level))
+	out = []
 	for i in range(10):
-		if os.path.exists(os.path.join(root,str(i))):
-			yield str(i)
+		path = os.path.join(root,str(i))
+		if os.path.exists(path):
+			out.append(str(i))
+	return out
 
 
 def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
@@ -168,8 +176,6 @@ def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
 	for dirs in [directories[:i+1] for i in range(len(directories))]:
 		for path in itertools.product(*dirs):
 			cur_dir = os.path.join(base_dir, "/".join(str(x) for x in path))
-			if verbose:
-				print cur_dir
 			makedir(cur_dir)
 			for (typ,_) in types:
 				type_dir = os.path.join(cur_dir, typ)
@@ -246,60 +252,80 @@ class Collector(servicediscovery.Listener):
 						# FIXME Hack
 						url = url.replace(".local","")
 						for d in self.dataurls:
-							print "processing ", job, d
 							jobtime, jobdata = get_url(url,d)
-							gpsblocks = sorted(list(set((gps_to_minimum_time_quanta(t) for t in jobtime))))
+							gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
+							min_t, max_t = min(gpsblocks), max(gpsblocks)
 							for gpstime in gpsblocks:
 								dataspan.add(gpstime)
-							for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
+							gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
+							for s,e in zip(gps1, gps2):
 								for (typ, func) in self.datatypes:
 									path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
 									try:
-										fname, x, y = get_dataset(path, d)
+										fname, prev_times, prev_data = get_dataset(path, d)
 									except:
 										setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
 										fname, prev_times, prev_data = get_dataset(path, d)
-									this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
+									# only get new data
+									if prev_times:
+										this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
+									else:
+										this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
 									this_time = [jobtime[i] for i in this_time_ix] + prev_times
 									this_data = [jobdata[i] for i in this_time_ix] + prev_data
-									quantize(this_time, this_data, func, level = 0)
-									tmpfname = create_new_dataset(path, d, this_time, this_data, tmp = True)
+									reduced_time, reduced_data = quantize(this_time, this_data, func, level = 0)
+									print "processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time))
+									tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
 									# copy the tmp file over the original
 									shutil.move(tmpfname, fname)
 
 					# Data reduction across jobs at the lowest level
-					gpsblocks = sorted(list(dataspan))
-					for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
+					if dataspan:
+						min_t, max_t = min(dataspan), max(dataspan)
+						gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
+					else:
+						gps1, gps2 = [], []
+					for s,e in zip(gps1, gps2):
 						# FIXME don't hardcode this range
-						for level in range(7):
+						for level in range(1):
 							this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
 							for d in self.dataurls:
 								for (typ,func) in self.datatypes:
-									# FIXME descend down a level if level > 0
+									# descend down a level if level > 0
 									if level > 0:
 										for job in sorted(self.urls):
 											agg_data = []
 											agg_time = []
-											for subdir in gps_to_sub_directories(s, level):
+											for subdir in gps_to_sub_directories(s, level, self.base_dir):
 												path = "/".join([this_level_dir, subdir, "by_job", job, typ])
-												fname, x, y = get_dataset(path, d)
-												agg_time += x
-												agg_data += y
+												try:
+													fname, x, y = get_dataset(path, d)
+													agg_time += x
+													agg_data += y
+												except IOError as ioerr:
+													print >> sys.stderr, ioerr
+													pass
 											reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
 											path = "/".join([this_level_dir, "by_job", job, typ])
+											print "processing reduced data %s for job %s  in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))
 											tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
 											# FIXME don't assume we can get the non temp file name this way
 											shutil.move(tmpfname, tmpfname.replace(".tmp",""))
-											
 									# Process this level
 									agg_data = []
 									agg_time = []
 									for job  in sorted(self.urls):
 										path = "/".join([this_level_dir, "by_job", job, typ])
-										fname, x, y = get_dataset(path, d)
-										agg_time += x
-										agg_data += y
+										#print "processing %s %s data reduction: level %d type %s" % (job, d, level, typ)
+										try:
+											fname, x, y = get_dataset(path, d)
+											agg_time += x
+											agg_data += y
+										except IOError as e:
+											print >> sys.stderr, e
+											pass
 									reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
+									print "processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time))
 									path = "/".join([this_level_dir, typ])
 									tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
 									# FIXME don't assume we can get the non temp file name this way
-- 
GitLab