From 96c4f95b1f2dcc4985337acb8eafb17bde99695a Mon Sep 17 00:00:00 2001 From: Chad Hanna <crh184@psu.edu> Date: Tue, 16 Aug 2016 13:07:46 -0400 Subject: [PATCH] gstlal_ll_inspiral_aggregator: add kipps service discovery magic --- gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 176 ++++++++++++++---- 1 file changed, 136 insertions(+), 40 deletions(-) diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index a9edc773bc..15fbd74788 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -1,7 +1,7 @@ #!/usr/bin/env python import h5py import numpy -import os +import sys, os import itertools import argparse import lal @@ -9,25 +9,25 @@ from lal import LIGOTimeGPS import time from gstlal import servicediscovery from gi.repository import GLib +import logging +import subprocess +import threading +from gi.repository import GLib +from gstlal import servicediscovery -def now(): - return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0) +# +# ============================================================================= +# +# Utility functions +# +# ============================================================================= +# -# Read command line options -def parse_command_line(): - parser = argparse.ArgumentParser(description="Online data aggregator") - - # directory to put everything in - parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path") - - # num-jobs - parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs") - - args = parser.parse_args() +def now(): + return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0) - return args def makedir(path): @@ -48,8 +48,6 @@ def create_new_dataset(path, data): f.close() def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True): - # The next 100,000 seconds of directory structure after the specified start - # time given by the first 4 digits str_time = str(gpstime).split(".")[0] digits = [int(x) for x in str_time] directories = [numpy.array([digits[x]]) for x in range(7)] @@ -73,36 +71,134 @@ def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True): else: for d in data: create_new_dataset(type_dir, d) + +# +# ============================================================================= +# +# Command Line +# +# ============================================================================= +# + + +# Read command line options +def parse_command_line(): + + parser = argparse.ArgumentParser(description="Online data aggregator") + + # directory to put everything in + parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path") + + parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)") + + parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs") -class MyListener(servicediscovery.Listener): - def print_msg(self, action, sname, stype, sdomain, host, port, properties): - print >>sys.stderr, "Service \"%s\" %s" % (sname, action) - print >>sys.stderr, "\tType is \"%s\"" % stype - print >>sys.stderr, "\tDomain is \"%s\"" % sdomain - print >>sys.stderr, "\tHost is \"%s\"" % host - print >>sys.stderr, "\tPort is %s" % port - print >>sys.stderr, "\tProperties are %s\n" % properties - def add_service(self, sname, stype, sdomain, host, port, properties): - self.print_msg("added", sname, stype, sdomain, host, port, properties) - def remove_service(self, sname, stype, sdomain): - self.print_msg("removed", sname, stype, sdomain, None, None, None) - def failure(self, *args): - print >>sys.stderr, "failure", args + parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") + + args = parser.parse_args() + + return args + + +# +# ============================================================================= +# +# Internal Library +# +# ============================================================================= +# -# parse command line -args = parse_command_line() + +class Collector(servicediscovery.Listener): + def __init__(self, mainloop, job_tag = None, dump_period = 180.): + self.job_tag = job_tag + self.dump_period = dump_period + self.urls = {} + self.lock = threading.Lock() + # FIXME: use glib's mainloop machinery instead, implement + # this as a timeout or whatever it's called + logging.info("starting wget loop thread thread") + self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,)) + self.wget_thread.start() + + def add_service(self, sname, stype, sdomain, host, port, properties): + if stype != "_http._tcp" or not sname.startswith("gstlal_inspiral "): + return + url = "http://%s:%s/" % (host, port) + logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag"))) + if self.job_tag is not None and properties.get("job_tag") != self.job_tag: + logging.info("server has wrong or missing job tab, discarding") + return + if not properties.get("GSTLAL_LL_JOB"): + logging.info("server has no GSTLAL_LL_JOB value, discarding") + return + # watch for security problems: don't let url or job ID + # terminate the wget shell command in mid-string + if ";" in url or ";" in properties["GSTLAL_LL_JOB"]: + logging.warn("invalid URL and/or job ID") + return + logging.info("recording server at %s for GSTLAL_LL_JOB='%s'" % (url, properties["GSTLAL_LL_JOB"])) + with self.lock: + self.urls[properties["GSTLAL_LL_JOB"]] = url + + def wget_loop(self, mainloop): + try: + while self.dump_period: + logging.info("sleeping") + time.sleep(self.dump_period) + setup_dirs(now(), types, bins, data, args.base_dir) + #with self.lock: + # for job, url in sorted(self.urls.items()): + # assert job + # cmd = "wget -nv -nH -P %s -r %s" % (job, url) + # logging.info(cmd) + # subprocess.call(cmd, shell = True) + except: + mainloop.quit() + raise + + def quit(self): + logging.info("waiting for wget loop to finish ...") + self.dump_period = 0 # signal exit + self.wget_thread.join() + + +# +# ============================================================================= +# +# Main +# +# ============================================================================= +# + + +options = parse_command_line() # FIXME don't hardcode some of these? types = ["min", "max", "mean", "all"] bins = ["%04d" % b for b in numpy.arange(0, args.num_jobs)] data = ["latency", "snr"] + +logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + + mainloop = GLib.MainLoop() -browser = servicediscovery.ServiceBrowser(MyListener()) -print "Browsing for services. Press CTRL-C to quit.\n" -mainloop.run() -while True: - - setup_dirs(now(), types, bins, data, args.base_dir) - time.sleep(1) +collector = Collector(mainloop, job_tag = options.job_tag, dump_period = options.dump_period) +browser = servicediscovery.ServiceBrowser(collector) + +try: + mainloop.run() +except: + collector.quit() + raise + + +# +# always end on an error so that condor won't think we're done and will +# restart us +# + + +sys.exit(1) -- GitLab