diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index 98a45177b64d16ad4274e0550a0ad3b00ebe7570..610a5aa8e8488b66008c093b969ac5ffca390c0b 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -652,9 +652,6 @@ for output_file_number, (svd_bank_url_dict, output_url, likelihood_url_namedtupl continue yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule) yield "</p>\n</body></html>" - # FIXME: get service-discovery working, then don't do this - if "GSTLAL_LL_JOB" in os.environ: - open("%s_registry.txt" % os.environ["GSTLAL_LL_JOB"], "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port)) # diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_get_urls b/gstlal-inspiral/bin/gstlal_ll_inspiral_get_urls index d600f5860cb2154016ad5d40bd0a40109f4fc706..533b9c060f4c3dc6bdd8ac69bfe03b8dae9b29fb 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_get_urls +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_get_urls @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/usr/bin/env python # # Copyright (C) 2014 Chad Hanna, Kipp Cannon # @@ -31,15 +31,145 @@ ## @package gstlal_ll_inspiral_get_urls -while true ; do - for f in $@; do - wget -nH -P ${f} -r $(cat ${f}_registry.txt); - done - sleep 180 -done +# +# ============================================================================= +# +# Preamble +# +# ============================================================================= +# + + +import logging +from optparse import OptionParser +import subprocess +import sys +import threading +import time + + +from gi.repository import GLib + + +from gstlal import servicediscovery + + +# +# ============================================================================= +# +# Command Line +# +# ============================================================================= +# + + +def parse_command_line(): + parser = OptionParser(description = __doc__) + parser.add_option("--dump-period", metavar = "seconds", type = "float", default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)") + parser.add_option("--job-tag", metavar = "TAG", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") + + options, ignored = parser.parse_args() + + return options + + +# +# ============================================================================= +# +# Internal Library +# +# ============================================================================= +# + + +class Collector(servicediscovery.Listener): + @staticmethod + def is_service_instance(sname, stype): + return stype == "_http._tcp" and sname.startswith("gstlal_inspiral ") + + def __init__(self, mainloop, job_tag = None, dump_period = 180.): + self.job_tag = job_tag + self.dump_period = dump_period + self.urls = {} + self.lock = threading.Lock() + # FIXME: use glib's mainloop machinery instead, implement + # this as a timeout or whatever it's called + logging.info("starting wget loop thread thread") + self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,)) + self.wget_thread.start() + + def add_service(self, sname, stype, sdomain, host, port, properties): + if not self.is_service_instance(sname, stype): + return + url = "http://%s:%s/" % (host, port) + logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag"))) + if self.job_tag is not None and properties.get("job_tag") != self.job_tag: + logging.info("server has wrong or missing job tab, discarding") + return + if not properties.get("GSTLAL_LL_JOB"): + logging.info("server has no GSTLAL_LL_JOB value, discarding") + return + # watch for security problems: don't let url or job ID + # terminate the wget shell command in mid-string + if ";" in url or ";" in properties["GSTLAL_LL_JOB"]: + logging.warn("invalid URL and/or job ID") + return + logging.info("recording server at %s for GSTLAL_LL_JOB='%s'" % (url, properties["GSTLAL_LL_JOB"])) + with self.lock: + self.urls[properties["GSTLAL_LL_JOB"]] = url + + def wget_loop(self, mainloop): + try: + while self.dump_period: + logging.info("sleeping") + time.sleep(self.dump_period) + with self.lock: + for job, url in sorted(self.urls.items()): + assert job + cmd = "wget -nv -nH -P %s -r %s" % (job, url) + logging.info(cmd) + subprocess.call(cmd, shell = True) + except: + mainloop.quit() + raise + + def quit(self): + logging.info("waiting for wget loop to finish ...") + self.dump_period = 0 # signal exit + self.wget_thread.join() + + +# +# ============================================================================= +# +# Main +# +# ============================================================================= +# + + +options = parse_command_line() + + +logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + + +mainloop = GLib.MainLoop() + +collector = Collector(mainloop, job_tag = options.job_tag, dump_period = options.dump_period) +browser = servicediscovery.ServiceBrowser(collector) + +try: + mainloop.run() +except: + collector.quit() + raise + # -# Always end on an error so that condor will restart it +# always end on an error so that condor won't think we're done and will +# restart us # -exit 1 + +sys.exit(1)