Skip to content
Snippets Groups Projects
Commit 96c4f95b authored by Chad Hanna's avatar Chad Hanna
Browse files

gstlal_ll_inspiral_aggregator: add kipps service discovery magic

parent d4686337
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import h5py
import numpy
import os
import sys, os
import itertools
import argparse
import lal
......@@ -9,25 +9,25 @@ from lal import LIGOTimeGPS
import time
from gstlal import servicediscovery
from gi.repository import GLib
import logging
import subprocess
import threading
from gi.repository import GLib
from gstlal import servicediscovery
def now():
return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
#
# =============================================================================
#
# Utility functions
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
# num-jobs
parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
args = parser.parse_args()
def now():
return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
return args
def makedir(path):
......@@ -48,8 +48,6 @@ def create_new_dataset(path, data):
f.close()
def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
# The next 100,000 seconds of directory structure after the specified start
# time given by the first 4 digits
str_time = str(gpstime).split(".")[0]
digits = [int(x) for x in str_time]
directories = [numpy.array([digits[x]]) for x in range(7)]
......@@ -73,36 +71,134 @@ def setup_dirs(gpstime, types, bins, data, base_dir, verbose = True):
else:
for d in data:
create_new_dataset(type_dir, d)
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=112, help="number of running jobs")
class MyListener(servicediscovery.Listener):
def print_msg(self, action, sname, stype, sdomain, host, port, properties):
print >>sys.stderr, "Service \"%s\" %s" % (sname, action)
print >>sys.stderr, "\tType is \"%s\"" % stype
print >>sys.stderr, "\tDomain is \"%s\"" % sdomain
print >>sys.stderr, "\tHost is \"%s\"" % host
print >>sys.stderr, "\tPort is %s" % port
print >>sys.stderr, "\tProperties are %s\n" % properties
def add_service(self, sname, stype, sdomain, host, port, properties):
self.print_msg("added", sname, stype, sdomain, host, port, properties)
def remove_service(self, sname, stype, sdomain):
self.print_msg("removed", sname, stype, sdomain, None, None, None)
def failure(self, *args):
print >>sys.stderr, "failure", args
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
args = parser.parse_args()
return args
#
# =============================================================================
#
# Internal Library
#
# =============================================================================
#
# parse command line
args = parse_command_line()
class Collector(servicediscovery.Listener):
def __init__(self, mainloop, job_tag = None, dump_period = 180.):
self.job_tag = job_tag
self.dump_period = dump_period
self.urls = {}
self.lock = threading.Lock()
# FIXME: use glib's mainloop machinery instead, implement
# this as a timeout or whatever it's called
logging.info("starting wget loop thread thread")
self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,))
self.wget_thread.start()
def add_service(self, sname, stype, sdomain, host, port, properties):
if stype != "_http._tcp" or not sname.startswith("gstlal_inspiral "):
return
url = "http://%s:%s/" % (host, port)
logging.info("found '%s' server at %s for job tag '%s'" % (sname, url, properties.get("job_tag")))
if self.job_tag is not None and properties.get("job_tag") != self.job_tag:
logging.info("server has wrong or missing job tab, discarding")
return
if not properties.get("GSTLAL_LL_JOB"):
logging.info("server has no GSTLAL_LL_JOB value, discarding")
return
# watch for security problems: don't let url or job ID
# terminate the wget shell command in mid-string
if ";" in url or ";" in properties["GSTLAL_LL_JOB"]:
logging.warn("invalid URL and/or job ID")
return
logging.info("recording server at %s for GSTLAL_LL_JOB='%s'" % (url, properties["GSTLAL_LL_JOB"]))
with self.lock:
self.urls[properties["GSTLAL_LL_JOB"]] = url
def wget_loop(self, mainloop):
try:
while self.dump_period:
logging.info("sleeping")
time.sleep(self.dump_period)
setup_dirs(now(), types, bins, data, args.base_dir)
#with self.lock:
# for job, url in sorted(self.urls.items()):
# assert job
# cmd = "wget -nv -nH -P %s -r %s" % (job, url)
# logging.info(cmd)
# subprocess.call(cmd, shell = True)
except:
mainloop.quit()
raise
def quit(self):
logging.info("waiting for wget loop to finish ...")
self.dump_period = 0 # signal exit
self.wget_thread.join()
#
# =============================================================================
#
# Main
#
# =============================================================================
#
options = parse_command_line()
# FIXME don't hardcode some of these?
types = ["min", "max", "mean", "all"]
bins = ["%04d" % b for b in numpy.arange(0, args.num_jobs)]
data = ["latency", "snr"]
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
mainloop = GLib.MainLoop()
browser = servicediscovery.ServiceBrowser(MyListener())
print "Browsing for services. Press CTRL-C to quit.\n"
mainloop.run()
while True:
setup_dirs(now(), types, bins, data, args.base_dir)
time.sleep(1)
collector = Collector(mainloop, job_tag = options.job_tag, dump_period = options.dump_period)
browser = servicediscovery.ServiceBrowser(collector)
try:
mainloop.run()
except:
collector.quit()
raise
#
# always end on an error so that condor won't think we're done and will
# restart us
#
sys.exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment