diff --git a/gstlal-ugly/python/aggregator.py b/gstlal-ugly/python/aggregator.py index a71236f45c60a0a6e9e729501aa21a3c5a5f6e00..657ad89b2bccd6ad959fb4bf0b68f81e07bb0bf0 100644 --- a/gstlal-ugly/python/aggregator.py +++ b/gstlal-ugly/python/aggregator.py @@ -65,13 +65,14 @@ def get_url(url,d): A function to pull data from @param url where @param d specifies a specific route. FIXME it assumes that the routes end in .txt """ + f = "%s%s.txt" % (url, d) try: - jobdata = urllib2.urlopen("%s%s.txt" % (url, d)).read().split("\n") + jobdata = urllib2.urlopen(f).read().split("\n") except urllib2.HTTPError as e: - logging.error("%s : %s" % (url, str(e))) + logging.error("%s : %s" % (f, str(e))) return except urllib2.URLError as e: - logging.error("%s : %s" % (url, str(e))) + logging.error("%s : %s" % (f, str(e))) return data = [] for line in jobdata: @@ -85,6 +86,36 @@ def get_url(url,d): return out +def get_data_from_kafka(jobs, routes, kafka_consumer, req_all = False, timeout = 300): + """! + A function to pull data from kafka for a set of jobs (topics) and + routes (keys in the incoming json messages) + """ + timedata = dict(((j,r), []) for j in jobs for r in routes) + datadata = dict(((j,r), []) for j in jobs for r in routes) + # pick a time to bail out of this loop + bailout_time = now() + # FIXME this will block forever if it runs out of events unless + # consumer_timeout_ms (int) is set. Should we let it block? + cnt = 0 + for message in kafka_consumer: + if message.topic not in jobs: + continue + # FIXME assumes json has only one key value pair + route, packet = message.value.items()[0] + if route not in routes: + continue + timedata[(message.topic, route)].append(float(packet.split()[0])) + datadata[(message.topic, route)].append(float(packet.split()[1])) + cnt += 1 + if timedata[(message.topic, route)][-1] > bailout_time and (not req_all or all(timedata.values()) or now() > bailout_time + timeout): + break + for k in timedata: + timedata[k] = numpy.array(timedata[k]) + datadata[k] = numpy.array(datadata[k]) + return timedata, datadata + + def reduce_data(xarr, yarr, func, level = 0): """! This function does a data reduction by powers of 10 where level @@ -154,12 +185,12 @@ def get_dataset(path, base): fname = os.path.join(path, "%s.hdf5" % base) try: f = h5py.File(fname, "r") - x,y = list(f["time"]), list(f["data"]) + x,y = numpy.array(f["time"]), numpy.array(f["data"]) f.close() return fname, x,y except IOError: fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False) - return fname, [], [] + return fname, numpy.array([]), numpy.array([]) def gps_to_minimum_time_quanta(gpstime): @@ -230,12 +261,12 @@ def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, setup_dir_by_job_and_level(start, typ, job, route, base_dir, verbose = True, level = 0) fname, prev_times, prev_data = get_dataset(path, route) # only get new data and assume that everything is time ordered - if prev_times: + if prev_times.size: this_time_ix = numpy.logical_and(jobtime > max(start-1e-16, prev_times[-1]), jobtime < end) else: this_time_ix = numpy.logical_and(jobtime >= start, jobtime < end) - this_time = list(jobtime[this_time_ix]) + prev_times - this_data = list(jobdata[this_time_ix]) + prev_data + this_time = numpy.concatenate((jobtime[this_time_ix], prev_times)) + this_data = numpy.concatenate((jobdata[this_time_ix], prev_data)) # shortcut if there are no updates if len(this_time) == len(prev_times) and len(this_data) == len(prev_data): return [] @@ -258,16 +289,16 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ this_level_dir = "/".join([base_dir, gps_to_leaf_directory(start, level = level)]) - agg_data = [] - agg_time = [] + agg_data = numpy.array([]) + agg_time = numpy.array([]) # FIXME iterate over levels instead. for subdir in gps_to_sub_directories(start, level, base_dir): path = "/".join([this_level_dir, subdir, "by_job", job, typ]) try: fname, x, y = get_dataset(path, route) - agg_time += x - agg_data += y + agg_time = numpy.concatenate((agg_time, x)) + agg_data = numpy.concatenate((agg_data, y)) except IOError as ioerr: makedir(path) # make an empty data set @@ -284,14 +315,14 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)): # Process this level - agg_data = [] - agg_time = [] + agg_data = numpy.array([]) + agg_time = numpy.array([]) for job in sorted(jobs): path = "/".join([this_level_dir, "by_job", job, typ]) try: fname, x, y = get_dataset(path, route) - agg_time += x - agg_data += y + agg_time = numpy.concatenate((agg_time, x)) + agg_data = numpy.concatenate((agg_data, y)) except IOError as ioerr: makedir(path) create_new_dataset(path, route) @@ -304,7 +335,7 @@ def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, en shutil.move(tmpfname, tmpfname.replace(".tmp","")) -def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs)): +def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs, timedata, datadata)): # get the url with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f: url = f.readline().strip() @@ -314,11 +345,14 @@ def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, for route in routes: #logging.info("processing job %s for route %s" % (job, route)) # FIXME assumes always two columns - full_data = get_url(url, route) - if full_data: - jobtime, jobdata = full_data[0], full_data[1] + if timedata is None: + full_data = get_url(url, route) + if full_data: + jobtime, jobdata = full_data[0], full_data[1] + else: + jobtime, jobdata = [], [] else: - jobtime, jobdata = [], [] + jobtime, jobdata = timedata[(job,route)], datadata[(job,route)] gps1, gps2 = gps_range(jobtime) for start, end in zip(gps1, gps2): # shortcut to not reprocess data that has already been @@ -326,6 +360,7 @@ def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, # previously determined to be needing to be updated # anything before that is pointless if prevdataspan and end < min(prevdataspan): + #logging.info("no new data for job %s and route %s" % (job, route)) continue for (typ, func) in datatypes: now = time.time()