Skip to content
Snippets Groups Projects
Commit fc594267 authored by Chad Hanna's avatar Chad Hanna
Browse files

gstlal_ll_inspiral_aggregator: a possibly working version

parent 052c72b3
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import h5py
import numpy
import sys, os
......@@ -14,6 +32,39 @@ import subprocess
import threading
from gi.repository import GLib
from gstlal import servicediscovery
import urllib2
import shutil
MIN_TIME_QUANTA = 1000
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
args = parser.parse_args()
return args
#
......@@ -25,10 +76,34 @@ from gstlal import servicediscovery
#
def median(l):
return sorted(l)[len(l)//2]
def now():
return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
def get_url(url,d):
jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("/n")
jobtime = [float(x.split()[0]) for x in jobdata]
jobdata = [float(x.split()[1]) for x in jobdata]
return jobtime, jobdata
def quantize(xarr, yarr, func, level = 0):
datadict = {}
for x,y in zip(xarr, yarr):
# quantize to this level
key = int(x) // (10**(level+1))
# we want to sort on y not x
datadict.setdefault(key, []).append((y,x))
reduced_data = [func(value) for value in datadict.values()]
reduced_time = [x[1] for x in reduced_data]
reduced_data = [x[0] for x in reduced_data]
return reduced_time, reduced_data
def makedir(path):
try:
......@@ -38,16 +113,53 @@ def makedir(path):
except OSError:
pass
def create_new_dataset(path, data):
fname = os.path.join(path, "%s.hdf5" % data)
def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
if tmp:
fname = os.path.join(path, "%s.hdf5.tmp" % base)
else:
fname = os.path.join(path, "%s.hdf5" % base)
if os.path.exists(fname):
return
f = h5py.File(fname, "w")
f.create_dataset("time", (0,), dtype="f64")
f.create_dataset("data", (0,), dtype="f64")
if timedata is None and data is None:
f.create_dataset("time", (0,), dtype="f64")
f.create_dataset("data", (0,), dtype="f64")
else:
assert len(timedata) == len(data)
f.create_dataset("time", (len(timedata),), dtype="f64")
f.create_dataset("data", (len(data),), dtype="f64")
f["time"][...] = timedata
f["data"][...] = data
f.close()
return fname
def get_dataset(path, base):
fname = os.path.join(path, "%s.hdf5" % base)
f = h5py.File(fname, "r")
x,y = list(f["time"]), list(f["data"])
f.close()
return fname, x,y
def gps_to_minimum_time_quanta(gpstime):
return int(gpstime) // MIN_TIME_QUANTA * MIN_TIME_QUANTA
def gps_to_leaf_directory(gpstime, level = 0):
return "/".join(str(gps_to_minimum_time_quanta(gpstime) // MIN_TIME_QUANTA // (10**level)))
def gps_to_sub_directories(gpstime, level):
root = gps_to_leaf_directory(gpstime, level)
for i in range(10):
if os.path.exists(os.path.join(root,str(i))):
yield str(i)
def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
str_time = str(gpstime).split(".")[0]
digits = [int(x) for x in str_time]
directories = [numpy.array([digits[x]]) for x in range(7)]
......@@ -59,45 +171,19 @@ def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
if verbose:
print cur_dir
makedir(cur_dir)
for typ in types:
for (typ,_) in types:
type_dir = os.path.join(cur_dir, typ)
makedir(type_dir)
if typ == "all":
for b in bins:
bin_dir = os.path.join(type_dir, b)
makedir(bin_dir)
for d in data:
create_new_dataset(bin_dir, d)
else:
for d in data:
create_new_dataset(type_dir, d)
type_dir = os.path.join(cur_dir, "by_job")
for b in jobs:
bin_dir = os.path.join(type_dir, b)
for (typ,_) in types:
type_bin_dir = os.path.join(bin_dir, typ)
makedir(type_bin_dir)
for d in data:
create_new_dataset(type_dir, d)
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
args = parser.parse_args()
return args
create_new_dataset(type_bin_dir, d)
#
......@@ -110,9 +196,9 @@ def parse_command_line():
class Collector(servicediscovery.Listener):
def __init__(self, mainloop, datatypes, bins, dataurls, base_dir, job_tag = None, dump_period = 180.):
def __init__(self, mainloop, datatypes, jobs, dataurls, base_dir, job_tag = None, dump_period = 180.):
self.datatypes = datatypes
self.bins = bins
self.jobs = set(jobs)
self.dataurls = dataurls
self.base_dir = base_dir
self.job_tag = job_tag
......@@ -131,11 +217,14 @@ class Collector(servicediscovery.Listener):
url = "http://%s:%s/" % (host, port)
logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag")))
if self.job_tag is not None and properties.get("job_tag") != self.job_tag:
logging.info("server has wrong or missing job tab, discarding")
logging.info("server has wrong or missing job tag, discarding")
return
if not properties.get("GSTLAL_LL_JOB"):
logging.info("server has no GSTLAL_LL_JOB value, discarding")
return
if properties.get("GSTLAL_LL_JOB") not in self.jobs:
logging.info("server has a GSTLAL_LL_JOB value outside of requested range, discarding")
return
# watch for security problems: don't let url or job ID
# terminate the wget shell command in mid-string
if ";" in url or ";" in properties["GSTLAL_LL_JOB"]:
......@@ -149,14 +238,73 @@ class Collector(servicediscovery.Listener):
try:
while self.dump_period:
logging.info("sleeping")
setup_dirs(now(), self.datatypes, self.bins, self.dataurls, self.base_dir)
time.sleep(self.dump_period)
#with self.lock:
# for job, url in sorted(self.urls.items()):
# assert job
# cmd = "wget -nv -nH -P %s -r %s" % (job, url)
# logging.info(cmd)
# subprocess.call(cmd, shell = True)
with self.lock:
dataspan = set()
for job, url in sorted(self.urls.items()):
assert job
# FIXME Hack
url = url.replace(".local","")
for d in self.dataurls:
print "processing ", job, d
jobtime, jobdata = get_url(url,d)
gpsblocks = sorted(list(set((gps_to_minimum_time_quanta(t) for t in jobtime))))
for gpstime in gpsblocks:
dataspan.add(gpstime)
for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
for (typ, func) in self.datatypes:
path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
try:
fname, x, y = get_dataset(path, d)
except:
setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
fname, prev_times, prev_data = get_dataset(path, d)
this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
this_time = [jobtime[i] for i in this_time_ix] + prev_times
this_data = [jobdata[i] for i in this_time_ix] + prev_data
quantize(this_time, this_data, func, level = 0)
tmpfname = create_new_dataset(path, d, this_time, this_data, tmp = True)
# copy the tmp file over the original
shutil.move(tmpfname, fname)
# Data reduction across jobs at the lowest level
gpsblocks = sorted(list(dataspan))
for s,e in zip(gpsblocks, gpsblocks[1:]+[gpsblocks[-1]+MIN_TIME_QUANTA]):
# FIXME don't hardcode this range
for level in range(7):
this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
for d in self.dataurls:
for (typ,func) in self.datatypes:
# FIXME descend down a level if level > 0
if level > 0:
for job in sorted(self.urls):
agg_data = []
agg_time = []
for subdir in gps_to_sub_directories(s, level):
path = "/".join([this_level_dir, subdir, "by_job", job, typ])
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
path = "/".join([this_level_dir, "by_job", job, typ])
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
# Process this level
agg_data = []
agg_time = []
for job in sorted(self.urls):
path = "/".join([this_level_dir, "by_job", job, typ])
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
path = "/".join([this_level_dir, typ])
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
except:
mainloop.quit()
raise
......@@ -179,9 +327,9 @@ class Collector(servicediscovery.Listener):
options = parse_command_line()
# FIXME don't hardcode some of these?
datatypes = ["min", "max", "mean", "all"]
bins = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
dataurls = ["latency", "snr"]
datatypes = [("min", min), ("max", max), ("median", median)]
jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
dataurls = ["latency_history", "snr_history"]
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
......@@ -189,7 +337,7 @@ logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(
mainloop = GLib.MainLoop()
collector = Collector(mainloop, datatypes, bins, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period)
collector = Collector(mainloop, datatypes, jobs, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period)
browser = servicediscovery.ServiceBrowser(collector)
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment