diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index faec11fa002002978a1ab119f314b60da4182836..484f5fb245d672e6fc6aa639c8153d9991bbaf3b 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -25,18 +25,16 @@ import argparse import lal from lal import LIGOTimeGPS import time -from gstlal import servicediscovery from gi.repository import GLib import logging import subprocess -import threading -from gi.repository import GLib -from gstlal import servicediscovery import urllib2 import shutil import collections +from multiprocessing import Pool -MIN_TIME_QUANTA = 1000 +MIN_TIME_QUANTA = 10000 +DIRS = 6 # @@ -58,6 +56,7 @@ def parse_command_line(): parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)") parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs") parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") + parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently") args = parser.parse_args() @@ -93,15 +92,16 @@ def get_url(url,d): specific route. FIXME it assumes that the routes end in .txt """ jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n") - jobtime = [float(x.split()[0]) for x in jobdata if x] - jobdata = [float(x.split()[1]) for x in jobdata if x] + jobtime = numpy.array([float(x.split()[0]) for x in jobdata if x]) + jobdata = numpy.array([float(x.split()[1]) for x in jobdata if x]) assert len(jobdata) == len(jobtime) return jobtime, jobdata def reduce_data(xarr, yarr, func, level = 0): """! - This function does a data reduction by powers of 10 + This function does a data reduction by powers of 10 where level + specifies the power. Default is 0 e.g., data reduction over 1 second """ datadict = collections.OrderedDict() assert len(yarr) == len(xarr) @@ -165,10 +165,14 @@ def get_dataset(path, base): open a dataset at @param path with name @param base and return the data """ fname = os.path.join(path, "%s.hdf5" % base) - f = h5py.File(fname, "r") - x,y = list(f["time"]), list(f["data"]) - f.close() - return fname, x,y + try: + f = h5py.File(fname, "r") + x,y = list(f["time"]), list(f["data"]) + f.close() + return fname, x,y + except IOError: + fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False) + return fname, [], [] def gps_to_minimum_time_quanta(gpstime): @@ -199,61 +203,60 @@ def gps_to_sub_directories(gpstime, level, basedir): return out -def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True): +def setup_dir_by_job_and_level(gpstime, typ, job, route, base_dir, verbose = True, level = 0): """! Given a gps time, the number of jobs and data types produce an appropriate data structure for storing the hierarchical data. """ str_time = str(gpstime).split(".")[0] - digits = [int(x) for x in str_time] - directories = [numpy.array([digits[x]]) for x in range(7)] - - # Setup the directory structure and put in empty files - for dirs in [directories[:i+1] for i in range(len(directories))]: - for path in itertools.product(*dirs): - cur_dir = os.path.join(base_dir, "/".join(str(x) for x in path)) - makedir(cur_dir) - for (typ,_) in types: - type_dir = os.path.join(cur_dir, typ) - makedir(type_dir) - for d in data: - create_new_dataset(type_dir, d) - type_dir = os.path.join(cur_dir, "by_job") - for b in jobs: - bin_dir = os.path.join(type_dir, b) - for (typ,_) in types: - type_bin_dir = os.path.join(bin_dir, typ) - makedir(type_bin_dir) - for d in data: - create_new_dataset(type_bin_dir, d) - - -def gps_range(jobtime, dataspan): + str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)] + directory = "%s/%s/by_job/%s/%s" % (base_dir, "/".join(str_time), job, typ) + makedir(directory) + fname = create_new_dataset(directory, route) + + +def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True, level = 0): + """! + Given a gps time, the number of jobs and data types produce an + appropriate data structure for storing the hierarchical data. + """ + str_time = str(gpstime).split(".")[0] + str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)] + directory = "%s/%s/%s" % (base_dir, "/".join(str_time), typ) + makedir(directory) + fname = create_new_dataset(directory, route) + + +def gps_range(jobtime): gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime)) min_t, max_t = min(gpsblocks), max(gpsblocks) - for gpstime in gpsblocks: - dataspan.add(gpstime) return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA) -def update_lowest_level_data(job, path, d, s, e, typ, self, jobtime, jobdata, func): +def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, base_dir, jobtime, jobdata, func): + path = "/".join([base_dir, gps_to_leaf_directory(start), "by_job", job, typ]) try: - fname, prev_times, prev_data = get_dataset(path, d) + fname, prev_times, prev_data = get_dataset(path, route) except: - setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir) - fname, prev_times, prev_data = get_dataset(path, d) - # only get new data + setup_dir_by_job_and_level(start, typ, job, route, base_dir, verbose = True, level = 0) + fname, prev_times, prev_data = get_dataset(path, route) + # only get new data and assume that everything is time ordered if prev_times: - this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]] + this_time_ix = numpy.logical_and(jobtime > max(start-1e-16, prev_times[-1]), jobtime < end) else: - this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e] - this_time = [jobtime[i] for i in this_time_ix] + prev_times - this_data = [jobdata[i] for i in this_time_ix] + prev_data + this_time_ix = numpy.logical_and(jobtime >= start, jobtime < end) + this_time = list(jobtime[this_time_ix]) + prev_times + this_data = list(jobdata[this_time_ix]) + prev_data + # shortcut if there are no updates + if len(this_time) == len(prev_times) and len(this_data) == len(prev_data): + print "shortcutting" + return [] reduced_time, reduced_data = reduce_data(this_time, this_data, func, level = 0) - logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time))) - tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True) + #logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, route, start, end, typ, len(reduced_time))) + tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # copy the tmp file over the original shutil.move(tmpfname, fname) + return [start, end] def job_expanse(dataspan): @@ -263,136 +266,87 @@ def job_expanse(dataspan): else: return [], [] -def reduce_data_from_lower_level(level, self, this_level_dir, job, typ, d, func, s, e): +def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ, route, func, start, end): + + this_level_dir = "/".join([base_dir, gps_to_leaf_directory(start, level = level)]) agg_data = [] agg_time = [] - for subdir in gps_to_sub_directories(s, level, self.base_dir): + + # FIXME iterate over levels instead. + for subdir in gps_to_sub_directories(start, level, base_dir): path = "/".join([this_level_dir, subdir, "by_job", job, typ]) try: - fname, x, y = get_dataset(path, d) + fname, x, y = get_dataset(path, route) agg_time += x agg_data += y except IOError as ioerr: - logging.error(ioerr) + makedir(path) + # make an empty data set + create_new_dataset(path, route) pass reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level) path = "/".join([this_level_dir, "by_job", job, typ]) - logging.info("processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))) - tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True) + makedir(path) + #logging.info("processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))) + tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # FIXME don't assume we can get the non temp file name this way shutil.move(tmpfname, tmpfname.replace(".tmp","")) -def reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e): +def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)): # Process this level agg_data = [] agg_time = [] - for job in sorted(self.urls): + for job in sorted(jobs): path = "/".join([this_level_dir, "by_job", job, typ]) try: - fname, x, y = get_dataset(path, d) + fname, x, y = get_dataset(path, route) agg_time += x agg_data += y except IOError as ioerr: - logging.error(ioerr) + makedir(path) + create_new_dataset(path, route) pass reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level) - logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time))) + #logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (route, start, end, typ, level, len(reduced_time), len(agg_time))) path = "/".join([this_level_dir, typ]) - tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True) + tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # FIXME don't assume we can get the non temp file name this way shutil.move(tmpfname, tmpfname.replace(".tmp","")) -# -# ============================================================================= -# -# Internal Library -# -# ============================================================================= -# -class Collector(servicediscovery.Listener): - def __init__(self, mainloop, datatypes, jobs, dataurls, base_dir, job_tag = None, dump_period = 180.): - self.datatypes = datatypes - self.jobs = set(jobs) - self.dataurls = dataurls - self.base_dir = base_dir - self.job_tag = job_tag - self.dump_period = dump_period - self.urls = {} - self.lock = threading.Lock() - # FIXME: use glib's mainloop machinery instead, implement - # this as a timeout or whatever it's called - logging.info("starting wget loop thread") - self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,)) - self.wget_thread.start() - - def add_service(self, sname, stype, sdomain, host, port, properties): - if stype != "_http._tcp" or not sname.startswith("gstlal_inspiral "): - return - url = "http://%s:%s/" % (host, port) - logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag"))) - if self.job_tag is not None and properties.get("job_tag") != self.job_tag: - logging.info("server has wrong or missing job tag, discarding") - return - if not properties.get("GSTLAL_LL_JOB"): - logging.info("server has no GSTLAL_LL_JOB value, discarding") - return - if properties.get("GSTLAL_LL_JOB") not in self.jobs: - logging.info("server has a GSTLAL_LL_JOB value outside of requested range, discarding") - return - # watch for security problems: don't let url or job ID - # terminate the wget shell command in mid-string - if ";" in url or ";" in properties["GSTLAL_LL_JOB"]: - logging.warn("invalid URL and/or job ID") - return - logging.info("recording server at %s for GSTLAL_LL_JOB='%s'" % (url, properties["GSTLAL_LL_JOB"])) - with self.lock: - self.urls[properties["GSTLAL_LL_JOB"]] = url - - def wget_loop(self, mainloop): - try: - while self.dump_period: - logging.info("sleeping") - time.sleep(self.dump_period) - with self.lock: - dataspan = set() - for job, url in sorted(self.urls.items()): - assert job - # FIXME Hack - url = url.replace(".local","") - for d in self.dataurls: - jobtime, jobdata = get_url(url,d) - gps1, gps2 = gps_range(jobtime, dataspan) - for s,e in zip(gps1, gps2): - for (typ, func) in self.datatypes: - path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ]) - update_lowest_level_data(job, path, d, s, e, typ, self, jobtime, jobdata, func) - - # Data reduction across jobs at the lowest level - gps1, gps2 = job_expanse(dataspan) - for s,e in zip(gps1, gps2): - # FIXME don't hardcode this range - for level in range(7): - this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)]) - for d in self.dataurls: - for (typ,func) in self.datatypes: - # Produce the data at this level by descending a level. - if level > 0: - for job in sorted(self.urls): - reduce_data_from_lower_level(level, self, this_level_dir, job, typ, d, func, s, e) - # reduce data across jobs at this level - reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e) - except: - mainloop.quit() - raise - - def quit(self): - logging.info("waiting for wget loop to finish ...") - self.dump_period = 0 # signal exit - self.wget_thread.join() - +def get_data_from_job_and_reduce((job, routes, datatypes, prevdataspan, base_dir, jobs)): + # get the url + with open(os.path.join(options.job_tag, "%s_registry.txt" % job)) as f: + url = f.readline().strip() + update_time = [] + reduce_time = [] + dataspan = set() + for route in routes: + logging.info("processing job %s for route %s" % (job, route)) + jobtime, jobdata = get_url(url, route) + gps1, gps2 = gps_range(jobtime) + for start, end in zip(gps1, gps2): + # shortcut to not reprocess data that has already been + # processed. Dataspan was the only thing that was + # previously determined to be needing to be updated + # anything before that is pointless + if prevdataspan: print end, min(prevdataspan) + if prevdataspan and end < min(prevdataspan): + continue + for (typ, func) in datatypes: + now = time.time() + for processed_time in update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, base_dir, jobtime, jobdata, func): + dataspan.add(processed_time) + update_time.append(time.time()-now) + # descend down through the levels + now = time.time() + for level in range(1,DIRS): + reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ, route, func, start, end) + reduce_time.append(time.time()-now) + print "Updated %d with average time %f; Reduced %d with average time %f" % (len(update_time), numpy.mean(update_time), len(reduce_time), numpy.mean(reduce_time)) + return dataspan # # ============================================================================= @@ -402,34 +356,47 @@ class Collector(servicediscovery.Listener): # ============================================================================= # - -options = parse_command_line() - -# FIXME don't hardcode some of these? -datatypes = [("min", min), ("max", max), ("median", median)] -jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)] -dataurls = ["latency_history", "snr_history"] - - -logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") - - -mainloop = GLib.MainLoop() - -collector = Collector(mainloop, datatypes, jobs, dataurls, options.base_dir, job_tag = options.job_tag, dump_period = options.dump_period) -browser = servicediscovery.ServiceBrowser(collector) - -try: - mainloop.run() -except: - collector.quit() - raise - - -# -# always end on an error so that condor won't think we're done and will -# restart us -# - - -sys.exit(1) +if __name__ == '__main__': + + options = parse_command_line() + # FIXME don't hardcode some of these? + datatypes = [("min", min), ("max", max), ("median", median)] + jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)] + routes = ["latency_history", "snr_history"] + + logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + + pool = Pool(options.num_threads) + prevdataspan = set() + while True: + logging.info("sleeping") + time.sleep(options.dump_period) + dataspan = set() + + # First get the raw and reduced data for each job in parallel + mapargs = [(job, routes, datatypes, prevdataspan, options.base_dir, jobs) for job in jobs] + for ds in pool.map(get_data_from_job_and_reduce, mapargs): + #for ds in map(get_data_from_job_and_reduce, mapargs): + dataspan.update(ds) + prevdataspan = dataspan.copy() + # Then reduce the data across jobs at each level + mapargs = [] + for start, end in zip(*job_expanse(dataspan)): + # FIXME don't hardcode this range + for level in range(DIRS): + this_level_dir = "/".join([options.base_dir, gps_to_leaf_directory(start, level = level)]) + logging.info("processing reduced data in span [%d,%d] at level %d" % (start, end, level)) + mapargs = [] + for route in routes: + for (typ,func) in datatypes: + setup_dir_across_job_by_level(start, typ, route, options.base_dir, verbose = True, level = level) + mapargs.append((jobs, this_level_dir, typ, route, func, level, start, end)) + pool.map(reduce_across_jobs, mapargs) + #map(reduce_across_jobs, mapargs) + + # + # always end on an error so that condor won't think we're done and will + # restart us + # + + sys.exit(1)