Skip to content
Snippets Groups Projects
Commit be5cb4e3 authored by Chad Hanna's avatar Chad Hanna
Browse files

gstlal_ll_inspiral_aggregator: refactor various bits

parent 11cc7cee
No related branches found
No related tags found
No related merge requests found
......@@ -55,11 +55,8 @@ def parse_command_line():
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
args = parser.parse_args()
......@@ -77,14 +74,24 @@ def parse_command_line():
def median(l):
"""!
Return the median of a list on nearest value
"""
return sorted(l)[len(l)//2]
def now():
"""!
A convenience function to return the current gps time
"""
return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
def get_url(url,d):
"""!
A function to pull data from @param url where @param d specifies a
specific route. FIXME it assumes that the routes end in .txt
"""
jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n")
jobtime = [float(x.split()[0]) for x in jobdata if x]
jobdata = [float(x.split()[1]) for x in jobdata if x]
......@@ -92,11 +99,14 @@ def get_url(url,d):
return jobtime, jobdata
def quantize(xarr, yarr, func, level = 0):
def reduce(xarr, yarr, func, level = 0):
"""!
This function does a data reduction by powers of 10
"""
datadict = collections.OrderedDict()
assert len(yarr) == len(xarr)
for x,y in zip(xarr, yarr):
# quantize to this level
# reduce to this level
key = int(x) // (10**level)
# we want to sort on y not x
datadict.setdefault(key, []).append((y,x))
......@@ -109,6 +119,9 @@ def quantize(xarr, yarr, func, level = 0):
def makedir(path):
"""!
A convenience function to make new directories and trap errors
"""
try:
os.makedirs(path)
except IOError:
......@@ -118,6 +131,11 @@ def makedir(path):
def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
"""!
A function to create a new dataset with time @param timedata and data
@param data. The data will be stored in an hdf5 file at path @param path with
base name @param base. You can also make a temporary file.
"""
if tmp:
fname = os.path.join(path, "%s.hdf5.tmp" % base)
else:
......@@ -142,6 +160,9 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
def get_dataset(path, base):
"""!
open a dataset at @param path with name @param base and return the data
"""
fname = os.path.join(path, "%s.hdf5" % base)
f = h5py.File(fname, "r")
x,y = list(f["time"]), list(f["data"])
......@@ -150,14 +171,24 @@ def get_dataset(path, base):
def gps_to_minimum_time_quanta(gpstime):
"""!
given a gps time return the minimum time quanta, e.g., 123456789 ->
123456000.
"""
return int(gpstime) // MIN_TIME_QUANTA * MIN_TIME_QUANTA
def gps_to_leaf_directory(gpstime, level = 0):
"""!
get the leaf directory for a given gps time
"""
return "/".join(str(gps_to_minimum_time_quanta(gpstime) // MIN_TIME_QUANTA // (10**level)))
def gps_to_sub_directories(gpstime, level, basedir):
"""!
return the entire relevant directory structure for a given GPS time
"""
root = os.path.join(basedir, gps_to_leaf_directory(gpstime, level))
out = []
for i in range(10):
......@@ -168,6 +199,10 @@ def gps_to_sub_directories(gpstime, level, basedir):
def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
"""!
Given a gps time, the number of jobs and data types produce an
appropriate data structure for storing the hierarchical data.
"""
str_time = str(gpstime).split(".")[0]
digits = [int(x) for x in str_time]
directories = [numpy.array([digits[x]]) for x in range(7)]
......@@ -192,6 +227,81 @@ def setup_dirs(gpstime, types, jobs, data, base_dir, verbose = True):
create_new_dataset(type_bin_dir, d)
def gps_range(jobtime, dataspan):
gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
min_t, max_t = min(gpsblocks), max(gpsblocks)
for gpstime in gpsblocks:
dataspan.add(gpstime)
return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
def update_lowest_level_data(path, d, s, e, typ, self, jobtime, func):
try:
fname, prev_times, prev_data = get_dataset(path, d)
except:
setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
fname, prev_times, prev_data = get_dataset(path, d)
# only get new data
if prev_times:
this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
else:
this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
this_time = [jobtime[i] for i in this_time_ix] + prev_times
this_data = [jobdata[i] for i in this_time_ix] + prev_data
reduced_time, reduced_data = reduce(this_time, this_data, func, level = 0)
logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time)))
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# copy the tmp file over the original
shutil.move(tmpfname, fname)
def job_expanse(dataspan):
if dataspan:
min_t, max_t = min(dataspan), max(dataspan)
return range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
else:
return [], []
def reduce_data_from_lower_level(s, level, self, this_level_dir, job, typ, d, func, s, e):
agg_data = []
agg_time = []
for subdir in gps_to_sub_directories(s, level, self.base_dir):
path = "/".join([this_level_dir, subdir, "by_job", job, typ])
try:
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
except IOError as ioerr:
logging.error(ioerr)
pass
reduced_time, reduced_data = reduce(agg_time, agg_data, func, level=level)
path = "/".join([this_level_dir, "by_job", job, typ])
loggging.info("processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time)))
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
def reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e):
# Process this level
agg_data = []
agg_time = []
for job in sorted(self.urls):
path = "/".join([this_level_dir, "by_job", job, typ])
try:
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
except IOError as ioerr:
logging.error(ioerr)
pass
reduced_time, reduced_data = reduce(agg_time, agg_data, func, level=level)
loggin.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time)))
path = "/".join([this_level_dir, typ])
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
#
# =============================================================================
#
......@@ -213,7 +323,7 @@ class Collector(servicediscovery.Listener):
self.lock = threading.Lock()
# FIXME: use glib's mainloop machinery instead, implement
# this as a timeout or whatever it's called
logging.info("starting wget loop thread thread")
logging.info("starting wget loop thread")
self.wget_thread = threading.Thread(target = self.wget_loop, args = (mainloop,))
self.wget_thread.start()
......@@ -253,84 +363,26 @@ class Collector(servicediscovery.Listener):
url = url.replace(".local","")
for d in self.dataurls:
jobtime, jobdata = get_url(url,d)
gpsblocks = set((gps_to_minimum_time_quanta(t) for t in jobtime))
min_t, max_t = min(gpsblocks), max(gpsblocks)
for gpstime in gpsblocks:
dataspan.add(gpstime)
gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
gps1, gps2 = gps_range(jobtime, dataspan)
for s,e in zip(gps1, gps2):
for (typ, func) in self.datatypes:
path = "/".join([self.base_dir, gps_to_leaf_directory(s), "by_job", job, typ])
try:
fname, prev_times, prev_data = get_dataset(path, d)
except:
setup_dirs(s, self.datatypes, self.jobs, self.dataurls, self.base_dir)
fname, prev_times, prev_data = get_dataset(path, d)
# only get new data
if prev_times:
this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e and t > prev_times[-1]]
else:
this_time_ix = [i for i,t in enumerate(jobtime) if s <= t < e]
this_time = [jobtime[i] for i in this_time_ix] + prev_times
this_data = [jobdata[i] for i in this_time_ix] + prev_data
reduced_time, reduced_data = quantize(this_time, this_data, func, level = 0)
print "processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, d, s, e, typ, len(reduced_time))
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# copy the tmp file over the original
shutil.move(tmpfname, fname)
update_lowest_level_data(path, d, s, e, typ, self, jobtime, func)
# Data reduction across jobs at the lowest level
if dataspan:
min_t, max_t = min(dataspan), max(dataspan)
gps1,gps2 = range(min_t, max_t+MIN_TIME_QUANTA, MIN_TIME_QUANTA), range(min_t+MIN_TIME_QUANTA, max_t+2*MIN_TIME_QUANTA, MIN_TIME_QUANTA)
else:
gps1, gps2 = [], []
gps1, gps2 = job_expanse(dataspan)
for s,e in zip(gps1, gps2):
# FIXME don't hardcode this range
for level in range(1):
for level in range(7):
this_level_dir = "/".join([self.base_dir, gps_to_leaf_directory(s, level = level)])
for d in self.dataurls:
for (typ,func) in self.datatypes:
# descend down a level if level > 0
# Produce the data at this level by descending a level.
if level > 0:
for job in sorted(self.urls):
agg_data = []
agg_time = []
for subdir in gps_to_sub_directories(s, level, self.base_dir):
path = "/".join([this_level_dir, subdir, "by_job", job, typ])
try:
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
except IOError as ioerr:
print >> sys.stderr, ioerr
pass
reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
path = "/".join([this_level_dir, "by_job", job, typ])
print "processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
# Process this level
agg_data = []
agg_time = []
for job in sorted(self.urls):
path = "/".join([this_level_dir, "by_job", job, typ])
#print "processing %s %s data reduction: level %d type %s" % (job, d, level, typ)
try:
fname, x, y = get_dataset(path, d)
agg_time += x
agg_data += y
except IOError as e:
print >> sys.stderr, e
pass
reduced_time, reduced_data = quantize(agg_time, agg_data, func, level=level)
print "processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, s, e, typ, level, len(reduced_time), len(agg_time))
path = "/".join([this_level_dir, typ])
tmpfname = create_new_dataset(path, d, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
reduce_data_from_lower_level(s, level, self, this_level_dir, job, typ, d, func, s, e)
# reduce data across jobs at this level
reduce_across_jobs(self, this_level_dir, typ, d, func, level, s, e)
except:
mainloop.quit()
raise
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment