Skip to content
Snippets Groups Projects
Commit ee422a40 authored by chad.hanna's avatar chad.hanna
Browse files

gstlal-ugly/python/aggregator.py a few performance improvements and kafka support

parent d4b7501b
No related branches found
No related tags found
No related merge requests found
......@@ -65,13 +65,14 @@ def get_url(url,d):
A function to pull data from @param url where @param d specifies a
specific route. FIXME it assumes that the routes end in .txt
"""
f = "%s%s.txt" % (url, d)
try:
jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n")
jobdata = urllib2.urlopen(f).read().split("\n")
except urllib2.HTTPError as e:
logging.error("%s : %s" % (url, str(e)))
logging.error("%s : %s" % (f, str(e)))
return
except urllib2.URLError as e:
logging.error("%s : %s" % (url, str(e)))
logging.error("%s : %s" % (f, str(e)))
return
data = []
for line in jobdata:
......@@ -85,6 +86,36 @@ def get_url(url,d):
return out
def get_data_from_kafka(jobs, routes, kafka_consumer, req_all = False, timeout = 300):
"""!
A function to pull data from kafka for a set of jobs (topics) and
routes (keys in the incoming json messages)
"""
timedata = dict(((j,r), []) for j in jobs for r in routes)
datadata = dict(((j,r), []) for j in jobs for r in routes)
# pick a time to bail out of this loop
bailout_time = now()
# FIXME this will block forever if it runs out of events unless
# consumer_timeout_ms (int) is set. Should we let it block?
cnt = 0
for message in kafka_consumer:
if message.topic not in jobs:
continue
# FIXME assumes json has only one key value pair
route, packet = message.value.items()[0]
if route not in routes:
continue
timedata[(message.topic, route)].append(float(packet.split()[0]))
datadata[(message.topic, route)].append(float(packet.split()[1]))
cnt += 1
if timedata[(message.topic, route)][-1] > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout):
break
for k in timedata:
timedata[k] = numpy.array(timedata[k])
datadata[k] = numpy.array(datadata[k])
return timedata, datadata
def reduce_data(xarr, yarr, func, level = 0):
"""!
This function does a data reduction by powers of 10 where level
......@@ -154,12 +185,12 @@ def get_dataset(path, base):
fname = os.path.join(path, "%s.hdf5" % base)
try:
f = h5py.File(fname, "r")
x,y = list(f["time"]), list(f["data"])
x,y = numpy.array(f["time"]), numpy.array(f["data"])
f.close()
return fname, x,y
except IOError:
fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False)
return fname, [], []
return fname, numpy.array([]), numpy.array([])
def gps_to_minimum_time_quanta(gpstime):
......@@ -230,12 +261,12 @@ def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ,
setup_dir_by_job_and_level(start, typ, job, route, base_dir, verbose = True, level = 0)
fname, prev_times, prev_data = get_dataset(path, route)
# only get new data and assume that everything is time ordered
if prev_times:
if prev_times.size:
this_time_ix = numpy.logical_and(jobtime > max(start-1e-16, prev_times[-1]), jobtime < end)
else:
this_time_ix = numpy.logical_and(jobtime >= start, jobtime < end)
this_time = list(jobtime[this_time_ix]) + prev_times
this_data = list(jobdata[this_time_ix]) + prev_data
this_time = numpy.concatenate((jobtime[this_time_ix], prev_times))
this_data = numpy.concatenate((jobdata[this_time_ix], prev_data))
# shortcut if there are no updates
if len(this_time) == len(prev_times) and len(this_data) == len(prev_data):
return []
......@@ -258,16 +289,16 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ
this_level_dir = "/".join([base_dir, gps_to_leaf_directory(start, level = level)])
agg_data = []
agg_time = []
agg_data = numpy.array([])
agg_time = numpy.array([])
# FIXME iterate over levels instead.
for subdir in gps_to_sub_directories(start, level, base_dir):
path = "/".join([this_level_dir, subdir, "by_job", job, typ])
try:
fname, x, y = get_dataset(path, route)
agg_time += x
agg_data += y
agg_time = numpy.concatenate((agg_time, x))
agg_data = numpy.concatenate((agg_data, y))
except IOError as ioerr:
makedir(path)
# make an empty data set
......@@ -284,14 +315,14 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ
def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)):
# Process this level
agg_data = []
agg_time = []
agg_data = numpy.array([])
agg_time = numpy.array([])
for job in sorted(jobs):
path = "/".join([this_level_dir, "by_job", job, typ])
try:
fname, x, y = get_dataset(path, route)
agg_time += x
agg_data += y
agg_time = numpy.concatenate((agg_time, x))
agg_data = numpy.concatenate((agg_data, y))
except IOError as ioerr:
makedir(path)
create_new_dataset(path, route)
......@@ -304,7 +335,7 @@ def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, en
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs)):
def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs, timedata, datadata)):
# get the url
with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
url = f.readline().strip()
......@@ -314,11 +345,14 @@ def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan,
for route in routes:
#logging.info("processing job %s for route %s" % (job, route))
# FIXME assumes always two columns
full_data = get_url(url, route)
if full_data:
jobtime, jobdata = full_data[0], full_data[1]
if timedata is None:
full_data = get_url(url, route)
if full_data:
jobtime, jobdata = full_data[0], full_data[1]
else:
jobtime, jobdata = [], []
else:
jobtime, jobdata = [], []
jobtime, jobdata = timedata[(job,route)], datadata[(job,route)]
gps1, gps2 = gps_range(jobtime)
for start, end in zip(gps1, gps2):
# shortcut to not reprocess data that has already been
......@@ -326,6 +360,7 @@ def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan,
# previously determined to be needing to be updated
# anything before that is pointless
if prevdataspan and end < min(prevdataspan):
#logging.info("no new data for job %s and route %s" % (job, route))
continue
for (typ, func) in datatypes:
now = time.time()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment