From 660545551b70422105ef8b5e9c3baccfa7edf580 Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Fri, 18 Jan 2019 22:13:58 -0800 Subject: [PATCH] gstlal-ugly: port aggregation code to use datamon functions, tidy up --- gstlal-ugly/bin/gstlal_ll_dq | 58 +++++++------------ gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 33 +++++------ gstlal-ugly/bin/gstlal_ll_inspiral_state | 27 +++------ 3 files changed, 44 insertions(+), 74 deletions(-) diff --git a/gstlal-ugly/bin/gstlal_ll_dq b/gstlal-ugly/bin/gstlal_ll_dq index b0bfd1e4a2..f2098c8a04 100755 --- a/gstlal-ugly/bin/gstlal_ll_dq +++ b/gstlal-ugly/bin/gstlal_ll_dq @@ -6,7 +6,7 @@ from collections import deque from scipy import signal import sys import StringIO -from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd, aggregator +from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd from optparse import OptionParser import gi gi.require_version('Gst', '1.0') @@ -16,6 +16,9 @@ Gst.init(None) import h5py import logging +from datamon import aggregator +from datamon import io + def parse_command_line(): parser = OptionParser(description = __doc__) @@ -41,11 +44,14 @@ class PSDHandler(simplehandler.Handler): del kwargs["instrument"] simplehandler.Handler.__init__(self, *args, **kwargs) self.horizon_distance_func = reference_psd.HorizonDistance(20., 2048., 1./16., 1.4, 1.4) - self.range_history = deque(maxlen = 10000) - self.range_history_time = deque(maxlen = 10000) - self.noisedeq = deque(maxlen = 10000) + + self.routes = ("noise", "range_history") + self.datatypes = (("min", min), ("median", aggregator.median), ("max", max)) self.timedeq = deque(maxlen = 10000) + self.datadeq = {route: deque(maxlen = 10000) for route in self.routes} self.last_reduce_time = None + self.prevdataspan = set() + def do_on_message(self, bus, message): if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum": self.psd = pipeio.parse_spectrum_message(message) @@ -58,19 +64,16 @@ class PSDHandler(simplehandler.Handler): if self.last_reduce_time is None: self.last_reduce_time = int(round(buftime,-2)) (result, mapinfo) = buf.map(Gst.MapFlags.READ) - thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument) - for typ in ("min", "median", "max"): - aggregator.makedir(os.path.join(thisdir, typ)) if mapinfo.data: # First noise s = StringIO.StringIO(mapinfo.data) data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().split('\n') if x]) ix = numpy.argmax(data, axis=0)[1] self.timedeq.append(buftime) - self.noisedeq.append(data[ix,1]) + self.datadeq['noise'].append(data[ix,1]) # Then range - self.range_history.append(self.horizon_distance_func(self.psd, 8)[0] / 2.25) + self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25) # The PSD psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4 @@ -80,35 +83,19 @@ class PSDHandler(simplehandler.Handler): del buf return Gst.FlowReturn.OK - - # Save a "latest" - self.to_hdf5(os.path.join(os.path.join(self.out_path, self.instrument), "psd.hdf5"), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])}) - # write out all of the file types - for typ in ("min", "median", "max"): - self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "noise.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.noisedeq)}) - self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "range_history.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.range_history)}) - - # - # FIXME do data reduction by levels here. - # - # Only reduce every 100s if (buftime - self.last_reduce_time) >= 100: - logging.info("reducing data and writing PSD snaphot for %d @ %d" % (buftime, int(aggregator.now()))) self.last_reduce_time = int(round(buftime,-2)) - for typ, func in (("min", min), ("median", aggregator.median), ("max", max)): - for route in ("noise", "range_history"): - for level in range(0, aggregator.DIRS-1): - thisdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level)), self.instrument), typ) - nextdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level+1)), self.instrument), typ) - aggregator.makedir(nextdir) - this_fname, this_x, this_y = aggregator.get_dataset(thisdir, route) - next_fname, next_x, next_y = aggregator.get_dataset(nextdir, route) - xarr, yarr = aggregator.reduce_data(numpy.concatenate((this_x,next_x)), numpy.concatenate((this_y, next_y)), func, level = level + 1) - self.to_hdf5(next_fname, {"time": numpy.array(xarr), "data": numpy.array(yarr)}) - # The PSD is special, we just record it. No min/median/max - thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument) - psd_name = "%s-PSD-%d0-100.hdf5" % (self.instrument, int(round(buftime,-2))) + logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(aggregator.now()))) + + timedata = {(self.instrument, route): numpy.array(self.timedeq) for route in self.routes} + datadata = {(self.instrument, route): numpy.array(self.datadeq[route]) for route in self.routes} + self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, self.routes, self.instrument, 'instrument', self.datatypes, timedata, datadata, self.prevdataspan)) + + # Save a "latest" psd + # NOTE: The PSD is special, we just record it. No min/median/max + thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime), 'by_instrument', self.instrument) + psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2))) self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])}) buf.unmap(mapinfo) @@ -122,7 +109,6 @@ class PSDHandler(simplehandler.Handler): def to_hdf5(self, path, datadict): tmppath = "/dev/shm/%s" % path.replace("/","_") + ".tmp" - print tmppath f = h5py.File(tmppath, "w") for k, v in datadict.items(): f[k] = v diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 48405fed7d..038f9ae55e 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -32,9 +32,11 @@ import urllib2 import shutil import collections from multiprocessing import Pool -from gstlal import aggregator import json +from datamon import aggregator +from datamon import io + MIN_TIME_QUANTA = 10000 DIRS = 6 @@ -82,10 +84,9 @@ def parse_command_line(): # if __name__ == '__main__': - options = parse_command_line() - # FIXME don't hardcode some of these? + # FIXME don't hardcode some of these? datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type] jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)] routes = options.route @@ -94,12 +95,14 @@ if __name__ == '__main__': pool = Pool(options.num_threads) prevdataspan = set() + # We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs if options.kafka_server is not None: from kafka import KafkaConsumer consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') else: consumer = None + # start an infinite loop to keep updating and aggregating data while True: logging.info("sleeping") @@ -109,29 +112,23 @@ if __name__ == '__main__': if consumer is not None: # this is not threadsafe! logging.info("getting data from kafka") - timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer) + timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes) else: - timedata, datadata = None, None + timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_id, num_threads=options.num_threads) # First get the raw and reduced data for each job in parallel - mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs] - for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs): - #for ds in map(aggregator.get_data_from_job_and_reduce, mapargs): + mapargs = [(options.base_dir, routes, job, 'job', datatypes, timedata, datadata, prevdataspan) for job in jobs] + for ds in pool.map(io.hdf5.reduce_by_tag, mapargs): dataspan.update(ds) prevdataspan = dataspan.copy() + # Then reduce the data across jobs at each level mapargs = [] for start, end in zip(*aggregator.job_expanse(dataspan)): - # FIXME don't hardcode this range - for level in range(DIRS): - this_level_dir = "/".join([options.base_dir, aggregator.gps_to_leaf_directory(start, level = level)]) - mapargs = [] - for route in routes: - for (typ,func) in datatypes: - aggregator.setup_dir_across_job_by_level(start, typ, route, options.base_dir, verbose = True, level = level) - mapargs.append((jobs, this_level_dir, typ, route, func, level, start, end)) - pool.map(aggregator.reduce_across_jobs, mapargs) - #map(aggregator.reduce_across_jobs, mapargs) + mapargs = [] + for route in routes: + mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end)) + pool.map(io.hdf5.reduce_across_tags, mapargs) logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now()))) # diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_state b/gstlal-ugly/bin/gstlal_ll_inspiral_state index 830f1f4c9c..d7b10d47ff 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_state +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_state @@ -32,9 +32,10 @@ import urllib2 import shutil import collections from multiprocessing import Pool -from gstlal import aggregator import json +from datamon import aggregator +from datamon import io # # ============================================================================= @@ -73,18 +74,6 @@ def parse_command_line(): # ============================================================================= # -def get_data_from_route((job, job_tag, routes, basedir)): - with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f: - url = f.readline().strip() - for route in routes: - logging.info("processing job %s for route %s : %s" % (job, route, url)) - data = aggregator.get_url(url, route) - jobtime, jobdata = data[0], data[1] - path = "%s/by_job/%s" % (basedir, job) - tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = jobtime, data = jobdata, tmp = True) - shutil.move(tmpfname, fname) - - if __name__ == '__main__': options = parse_command_line() @@ -108,7 +97,6 @@ if __name__ == '__main__': from kafka import KafkaConsumer consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') else: - pool = Pool(options.num_threads) consumer = None while True: logging.info("sleeping") @@ -117,17 +105,16 @@ if __name__ == '__main__': if consumer: # this is not threadsafe! logging.info("getting data from kafka") - timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer, req_all = True) + timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, req_all = True) for (job,route) in timedata: if "L1" in route or "H1" in route: # FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo datadata[(job,route)] /= 16 - path = "%s/by_job/%s" % (options.base_dir, job) - tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = timedata[(job,route)], data = datadata[(job,route)], tmp = True) - shutil.move(tmpfname, fname) else: - mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs] - pool.map(get_data_from_route, mapargs) + timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads = options.num_threads) + for (job, route) in timedata: + path = "%s/by_job/%s" % (options.base_dir, job) + io.hdf5.store_timeseries(path, route.replace("/","_"), timedata[(job, route)], datadata[(job, route)]) sys.exit(1) -- GitLab