From fc59426725893315c38cfe138ebbe59aca735d70 Mon Sep 17 00:00:00 2001
From: Chad Hanna <crh184@psu.edu>
Date: Thu, 18 Aug 2016 07:20:52 -0400
Subject: [PATCH] gstlal_ll_inspiral_aggregator: a possibly working version

---
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 258 ++++++++++++++----
 1 file changed, 203 insertions(+), 55 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index 9866986105..bd48ed8e87 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -1,4 +1,22 @@
 #!/usr/bin/env python
+#
+# Copyright (C) 2016  Kipp Cannon, Chad Hanna
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+
 import h5py
 import numpy
 import sys, os
@@ -14,6 +32,39 @@ import subprocess
 import threading
 from gi.repository import GLib
 from gstlal import servicediscovery
+import urllib2
+import shutil
+
+
+MIN_TIME_QUANTA = 1000
+
+
+#
+# =============================================================================
+#
+#                                 Command Line
+#
+# =============================================================================
+#
+
+
+# Read command line options
+def parse_command_line():
+
+	parser = argparse.ArgumentParser(description="Online data aggregator")
+
+	# directory to put everything in
+	parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
+
+	parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
+
+	parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
+	
+	parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
+
+	args = parser.parse_args()
+
+	return args
 
 
 #
@@ -25,10 +76,34 @@ from gstlal import servicediscovery
 #
 
 
+def median(l):
+	return sorted(l)[len(l)//2]
+
+
 def now():
 	return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
 
 
+def get_url(url,d):
+	jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("/n")
+	jobtime = [float(x.split()[0]) for x in jobdata]
+	jobdata = [float(x.split()[1]) for x in jobdata]
+	return jobtime, jobdata
+
+
+def quantize(xarr, yarr, func, level = 0):
+	datadict = {}
+	for x,y in zip(xarr, yarr):
+		# quantize to this level
+		key = int(x) // (10**(level+1))
+		# we want to sort on y not x
+		datadict.setdefault(key, []).append((y,x))
+	reduced_data = [func(value) for value in datadict.values()]
+	reduced_time = [x[1] for x in reduced_data]
+	reduced_data = [x[0] for x in reduced_data]
+
+	return reduced_time, reduced_data
+
 
 def makedir(path):
 	try:
@@ -38,16 +113,53 @@ def makedir(path):
 	except OSError:
 		pass
 
-def create_new_dataset(path, data):
-	fname = os.path.join(path, "%s.hdf5" % data)
+
+def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
+	if tmp:
+		fname = os.path.join(path, "%s.hdf5.tmp" % base)
+	else:
+		fname = os.path.join(path, "%s.hdf5" % base)
 	if os.path.exists(fname):
 		return
 	f = h5py.File(fname, "w")
-	f.create_dataset("time", (0,), dtype="f64")
-	f.create_dataset("data", (0,), dtype="f64")
+	if timedata is None and data is None:
+		f.create_dataset("time", (0,), dtype="f64")
+		f.create_dataset("data", (0,), dtype="f64")
+	else:
+		assert len(timedata) == len(data)
+		f.create_dataset("time", (len(timedata),), dtype="f64")
+		f.create_dataset("data", (len(data),), dtype="f64")
+		f["time"][...] = timedata
+		f["data"][...] = data
+		
+	f.close()
+	return fname
+
+
+def get_dataset(path, base):
+	fname = os.path.join(path, "%s.hdf5" % base)
+	f = h5py.File(fname, "r")
+	x,y = list(f["time"]), list(f["data"])	
 	f.close()
+	return fname, x,y
+
+
+def gps_to_minimum_time_quanta(gpstime):
+	return int(gpstime) // MIN_TIME_QUANTA * MIN_TIME_QUANTA
+
+
+def gps_to_leaf_directory(gpstime, level = 0):
+	return "/".join(str(gps_to_minimum_time_quanta(gpstime) // MIN_TIME_QUANTA // (10**level)))
+
+
+def gps_to_sub_directories(gpstime, level):
+	root = gps_to_leaf_directory(gpstime, level)
+	for i in range(10):
+		if os.path.exists(os.path.join(root,str(i))):
+			yield str(i)
 
-def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
+
+def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
 	str_time = str(gpstime).split(".")[0]
 	digits = [int(x) for x in str_time]
 	directories = [numpy.array([digits[x]]) for x in range(7)]
@@ -59,45 +171,19 @@ def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
 			if verbose:
 				print cur_dir
 			makedir(cur_dir)
-			for typ in types:
+			for (typ,_) in types:
 				type_dir = os.path.join(cur_dir, typ)
 				makedir(type_dir)
-				if typ == "all":
-					for b in bins:
-						bin_dir = os.path.join(type_dir, b)
-						makedir(bin_dir)
-						for d in data:
-							create_new_dataset(bin_dir, d)
-				else:
+				for d in data:
+					create_new_dataset(type_dir, d)
+			type_dir = os.path.join(cur_dir, "by_job")
+			for b in jobs:
+				bin_dir = os.path.join(type_dir, b)
+				for (typ,_) in types:
+					type_bin_dir = os.path.join(bin_dir, typ)
+					makedir(type_bin_dir)
 					for d in data:
-						create_new_dataset(type_dir, d)
-
-#
-# =============================================================================
-#
-#                                 Command Line
-#
-# =============================================================================
-#
-
-
-# Read command line options
-def parse_command_line():
-
-	parser = argparse.ArgumentParser(description="Online data aggregator")
-
-	# directory to put everything in
-	parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
-
-	parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
-
-	parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
-	
-	parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
-
-	args = parser.parse_args()
-
-	return args
+						create_new_dataset(type_bin_dir, d)
 
 
 #
@@ -110,9 +196,9 @@ def parse_command_line():
 
 
 class Collector(servicediscovery.Listener):
-	def __init__(self, mainloop, datatypes, bins, dataurls, base_dir, job_tag = None, dump_period = 180.):
+	def __init__(self, mainloop, datatypes, jobs, dataurls, base_dir, job_tag = None, dump_period = 180.):
 		self.datatypes = datatypes
-		self.bins = bins
+		self.jobs = set(jobs)
 		self.dataurls = dataurls
 		self.base_dir = base_dir
 		self.job_tag = job_tag
@@ -131,11 +217,14 @@ class Collector(servicediscovery.Listener):
 		url = "http://%s:%s/" % (host, port)
 		logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag")))
 		if self.job_tag is not None and properties.get("job_tag") != self.job_tag:
-			logging.info("server has wrong or missing job tab, discarding")
+			logging.info("server has wrong or missing job tag, discarding")
 			return
 		if not properties.get("GSTLAL_LL_JOB"):
 			logging.info("server has no GSTLAL_LL_JOB value, discarding")
 			return
+		if properties.get("GSTLAL_LL_JOB") not in self.jobs:
+			logging.info("server has a GSTLAL_LL_JOB value outside of requested range, discarding")
+			return
 		# watch for security problems:  don't let url or job ID
 		# terminate the wget shell command in mid-string
 		if ";" in url or ";" in properties["GSTLAL_LL_JOB"]:
@@ -149,14 +238,73 @@ class Collector(servicediscovery.Listener):
 		try:
 			while self.dump_period:
 				logging.info("sleeping")
-				setup_dirs(now(), self.datatypes, self.bins, self.dataurls, self.base_dir)
 				time.sleep(self.dump_period)
-				#with self.lock:
-				#	for job, url in sorted(self.urls.items()):
-				#		assert job
-				#		cmd = "wget -nv -nH -P %s -r %s" % (job, url)
-				#		logging.info(cmd)
-				#		subprocess.call(cmd, shell = True)
+				with self.lock:
+					dataspan = set()
+					for job, url in sorted(self.urls.items()):
+						assert job
+						# FIXME Hack
+						url = url.replace(".local","")
+						for d in self.dataurls:
+							print "processing ", job, d
+							jobtime, jobdata = get_url(url,d)
+							gpsblocks = sorted(list(set((gps_to_minimum_time_quanta(t) for t in jobtime))))
+							for gpstime in gpsblocks:
+								dataspan.add(gpstime)
+							for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
+								for (typ, func) in self.datatypes:
+									path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
+									try:
+										fname, x, y = get_dataset(path, d)
+									except:
+										setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
+										fname, prev_times, prev_data = get_dataset(path, d)
+									this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
+									this_time = [jobtime[i] for i in this_time_ix] + prev_times
+									this_data = [jobdata[i] for i in this_time_ix] + prev_data
+									quantize(this_time, this_data, func, level = 0)
+									tmpfname = create_new_dataset(path, d, this_time, this_data, tmp = True)
+									# copy the tmp file over the original
+									shutil.move(tmpfname, fname)
+
+					# Data reduction across jobs at the lowest level
+					gpsblocks = sorted(list(dataspan))
+					for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
+						# FIXME don't hardcode this range
+						for level in range(7):
+							this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
+							for d in self.dataurls:
+								for (typ,func) in self.datatypes:
+									# FIXME descend down a level if level > 0
+									if level > 0:
+										for job in sorted(self.urls):
+											agg_data = []
+											agg_time = []
+											for subdir in gps_to_sub_directories(s, level):
+												path = "/".join([this_level_dir, subdir, "by_job", job, typ])
+												fname, x, y = get_dataset(path, d)
+												agg_time += x
+												agg_data += y
+											reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
+											path = "/".join([this_level_dir, "by_job", job, typ])
+											tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+											# FIXME don't assume we can get the non temp file name this way
+											shutil.move(tmpfname, tmpfname.replace(".tmp",""))
+											
+									# Process this level
+									agg_data = []
+									agg_time = []
+									for job  in sorted(self.urls):
+										path = "/".join([this_level_dir, "by_job", job, typ])
+										fname, x, y = get_dataset(path, d)
+										agg_time += x
+										agg_data += y
+									reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
+									path = "/".join([this_level_dir, typ])
+									tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
+									# FIXME don't assume we can get the non temp file name this way
+									shutil.move(tmpfname, tmpfname.replace(".tmp",""))
+
 		except:
 			mainloop.quit()
 			raise
@@ -179,9 +327,9 @@ class Collector(servicediscovery.Listener):
 options = parse_command_line()
 
 # FIXME don't hardcode some of these?
-datatypes = ["min", "max", "mean", "all"]
-bins = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
-dataurls = ["latency", "snr"]
+datatypes = [("min", min), ("max", max), ("median", median)]
+jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
+dataurls = ["latency_history", "snr_history"]
 
 
 logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
@@ -189,7 +337,7 @@ logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(
 
 mainloop = GLib.MainLoop()
 
-collector = Collector(mainloop, datatypes, bins, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period)
+collector = Collector(mainloop, datatypes, jobs, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period)
 browser = servicediscovery.ServiceBrowser(collector)
 
 try:
-- 
GitLab