From a52e5ca75f92c9d9ebc8c5550eabec34425b325d Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Wed, 3 Jan 2018 13:12:15 -0800 Subject: [PATCH] gstlal_etg: added hdf file saving ala h5py, added utility functions and classes to idq_aggregator.py --- gstlal-ugly/bin/gstlal_etg | 49 ++++-- gstlal-ugly/python/idq_aggregator.py | 215 ++++++++++++--------------- 2 files changed, 132 insertions(+), 132 deletions(-) diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg index f1e29bd472..75b5bafe68 100755 --- a/gstlal-ugly/bin/gstlal_etg +++ b/gstlal-ugly/bin/gstlal_etg @@ -170,13 +170,20 @@ class MultiChannelHandler(simplehandler.Handler): self.last_save_time = None self.cadence = options.cadence self.tag = '%s-%s' % (self.instrument[:1], self.description) - # create header for trigger file - if options.latency: - self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency") + # hdf saving properties + if options.save_hdf: + columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq'] + self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence) + # ascii saving properties else: - self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel") - self.fdata = deque(maxlen = 25000) - self.fdata.append(self.header) + self.tag = '%s-%s' % (self.instrument[:1], self.description) + # create header for trigger file + if options.latency: + self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency") + else: + self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel") + self.fdata = deque(maxlen = 25000) + self.fdata.append(self.header) # set initialization time if options.data_source in ("framexmit", "lvshm"): @@ -260,9 +267,14 @@ class MultiChannelHandler(simplehandler.Handler): # Save triggers (in ascii format) once every cadence if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)): - self.to_trigger_file(buftime) - self.fdata.clear() - self.fdata.append(self.header) + if options.save_hdf: + fname = '%s-%d-%d' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence) + fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5]) + self.fdata.dump(fpath, fname) + else: + self.to_trigger_file(buftime) + self.fdata.clear() + self.fdata.append(self.header) self.last_save_time = buftime # read buffer contents @@ -303,15 +315,18 @@ class MultiChannelHandler(simplehandler.Handler): # NOTE # Setting stop time to trigger time for use with half sine gaussians stop_time = trigger_time - # save iDQ compatible data - if options.latency: - self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency)) - else: - self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag)) # append row to feature vector for bottle requests etg_row = {'timestamp': buftime, 'channel': channel, 'rate': rate, 'start_time': start_time, 'stop_time': stop_time, 'trigger_time': trigger_time, - 'frequency': freq, 'q': q, 'phase': row.phase, 'sigma_sq': row.sigmasq, 'chi_sq': row.chisq, 'snr': row.snr} + 'frequency': freq, 'q': q, 'phase': row.phase, 'sigmasq': row.sigmasq, 'chisq': row.chisq, 'snr': row.snr} self.etg_event.append(etg_row) + # save iDQ compatible data + if options.save_hdf: + self.fdata.append(etg_row, key = (channel, rate), buftime = buftime) + else: + if options.latency: + self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency)) + else: + self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag)) def to_trigger_file(self, buftime = None): @@ -481,6 +496,7 @@ def parse_command_line(): parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.") parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.") parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.") + parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence") parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.") parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.") parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.") @@ -669,7 +685,8 @@ if options.verbose: mainloop.run() # save remaining triggers -handler.to_trigger_file() +if not options.save_hdf: + handler.to_trigger_file() # # Shut down pipeline diff --git a/gstlal-ugly/python/idq_aggregator.py b/gstlal-ugly/python/idq_aggregator.py index 5c71894d82..e3819f5e17 100644 --- a/gstlal-ugly/python/idq_aggregator.py +++ b/gstlal-ugly/python/idq_aggregator.py @@ -29,8 +29,8 @@ import os import glob import sys +import h5py import numpy -import pandas from lal import gpstime from gstlal import aggregator @@ -42,141 +42,59 @@ from gstlal import aggregator # #################### -def reduce_data(data, gps_end, path, reduce_count, channel, rate, reduce_cadence = 100, aggregate = 'max', level = 0): - """! - This function does a data reduction recursively as needed - by powers of 10 where level specifies the power. - Default minimum is 1 e.g., reduction by 1 order of magnitude. - """ - if (reduce_count % (10 ** level) == 0 and level < aggregator.DIRS): - gps_start = gps_end - reduce_cadence * (10 ** level) - if level == 0: - agg_data = aggregate_data(data, gps_start, gps_end, aggregate = aggregate) - else: - agg_data = aggregate_data(get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = aggregate, level = level - 1), gps_start, gps_end, level = level) - path = update_agg_path(path, gps_start, cur_level = level - 1, new_level = level) - if agg_data is not None: - create_new_dataset(path, 'aggregates', agg_data, channel, rate, aggregate = aggregate) - reduce_data(data, gps_end, path, reduce_count, channel, rate, aggregate = aggregate, level = level + 1) - -def aggregate_data(data, gps_start, gps_end, column = 'snr', aggregate = 'max', level = 0): - """! - Reduces data of a given level for a given gps range, - column, and aggregate. Returns the aggregated data. - """ - gps_start_idx = floor_div(gps_start, 10 ** (level+1)) - gps_end_idx = floor_div(gps_end, 10 ** (level+1)) - if aggregate == 'max': - max_index = get_dataframe_subset(gps_start_idx, gps_end_idx, data, level = level).groupby(pandas.TimeGrouper('%ds' % (10 ** (level+1))))[column].idxmax().dropna().values - else: - raise NotImplementedError - if max_index.size > 0: - return data.loc[max_index] - else: - return None - -def get_dataset(path, base, channel, rate, aggregate = None): +def get_dataset(path, base): """! open a dataset at @param path with name @param base and return the data """ - fname = os.path.join(path, "%s.h5" % base) - with pandas.HDFStore(fname) as store: - if aggregate is None: - return store.select('%s/%s/data' % (channel, str(rate))) - else: - return store.select('%s/%s/%s' % (channel, str(rate), aggregate)) - -def create_new_dataset(path, base, data, channel, rate, aggregate = None, tmp = False): + fname = os.path.join(path, "%s.hdf5" % base) + try: + f = h5py.File(fname, "r") + fields = f.keys() + data = zip(*[f[field] for field in fields]) + f.close() + d_types = [(str(field), 'f8') for field in fields] + data = numpy.array(data, dtype=d_types) + return fname, data + except IOError: + return fname, [] + +def create_new_dataset(path, base, fields, data = None, tmp = False): """! A function to create a new dataset with data @param data. The data will be stored in an hdf5 file at path @param path with base name @param base. You can also make a temporary file. """ - # create path if one doesn't already exist - if not os.path.exists(path): - aggregator.makedir(path) if tmp: - fname = os.path.join(path, "%s.h5.tmp" % base) + fname = os.path.join(path, "%s.hdf5.tmp" % base) else: - fname = os.path.join(path, "%s.h5" % base) - with pandas.HDFStore(fname) as store: - if aggregate is None: - store.put('%s/%s/data' % (channel, str(rate)), data) + # A non temp dataset should not be overwritten + fname = os.path.join(path, "%s.hdf5" % base) + if os.path.exists(fname): + return fname + # create dir if it doesn't exist + if not os.path.exists(path): + aggregator.makedir(path) + # save data to hdf5 + f = h5py.File(fname, "w") + for field in fields: + if data is None: + f.create_dataset(field, (0,), dtype="f8") else: - store.append('%s/%s/%s' % (channel, str(rate), aggregate), data) - return fname + f.create_dataset(field, (len(data[field]),), dtype="f8") + f[field][...] = data[field] -def get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = None, level = 0): - """! - Returns a dataset for a given aggregation level and gps range. - """ - global_start_index = floor_div(gps_start, 10 ** (level+1)) - global_end_index = floor_div(gps_end, 10 ** (level+1)) - gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index, aggregator.MIN_TIME_QUANTA * (10 ** level))) - # generate all relevant files and combine to one dataset - # FIXME: more efficient to filter relevant datasets first - # rather than overpopulating dataset and then filtering - paths = (update_agg_path(path, gps_epoch, cur_level = level, new_level = level) for gps_epoch in gps_epochs) - if aggregate is None: - pattern = '[0-9]' * 10 + '.h5' - filelist = (glob.glob(os.path.join(path, pattern)) for path in paths) - # flatten list - files = (file_ for sublist in filelist for file_ in sublist) - else: - files = (os.path.join(path, 'aggregates.h5') for path in paths) - datasets = (pandas.read_hdf(file_, '%s/%s/%s' % (channel, rate, aggregate)) for file_ in files) - return get_dataframe_subset(global_start_index, global_end_index, pandas.concat(datasets), level = level) - -def get_dataframe_subset(gps_start, gps_end, dataframe, level = 0): - start_index = gps_to_index(gps_start) - end_index = gps_to_index(gps_end - (10 ** level)) - return dataframe.loc[start_index:end_index] - -def create_dataframe_index(gps_time): - """ - Returns a datetime index based on gps times - per minimum aggregator quanta. - """ - start_time = gps_to_index(floor_div(gps_time, aggregator.MIN_TIME_QUANTA)) - return pandas.date_range(start_time, periods = aggregator.MIN_TIME_QUANTA, freq = 's') - -def gps_to_index(gps_time): - # FIXME: round microseconds is a workaround for possible bug in the - # gpstime module in lal that adds a microsecond - return pandas.Timestamp(gpstime.gps_to_utc(gps_time).replace(microsecond=0)) - -def gps_from_index(timestamp): - return int(gpstime.utc_to_gps(timestamp)) - -def to_agg_path(base_path, tag, gps_time, level = 0): - """ - Returns a hierarchical gps time path based on - channel rate and level in hierarchy. - e.g. level 0: base/tag/1/2/3/4/5/6/ - e.g. level 2: base/tag/1/2/3/4/ - """ - path = os.path.join(base_path, tag) - path = os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level)) - return path - -def update_agg_path(path, gps_time, cur_level = 0, new_level = 0): - """ - Returns an updated aggregator path based on - an existing path and a gps time. - """ - for agg_level in range(max(0, cur_level), aggregator.DIRS): - path, _ = os.path.split(path) - return os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = new_level)) + f.close() + return fname def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch): - """ + """! Returns whether new and old gps times are in different epochs. """ return (new_gps_time - floor_div(prev_gps_time, gps_epoch)) >= gps_epoch def floor_div(x, n): - """ + """! Floor an integer by removing its remainder from integer division by another integer n. e.g. floor_div(163, 10) = 160 @@ -184,3 +102,68 @@ def floor_div(x, n): """ assert n > 0 return (x / n) * n + + + +#################### +# +# classes +# +#################### + +class FeatureData(object): + """! + Base class for saving feature data. + Extend for a specific file-based implementation. + """ + def __init__(self, columns, keys = None, **kwargs): + self.keys = keys + self.columns = columns + self.etg_data = {} + + def dump(self, path): + raise NotImplementedError + + def load(self, path): + raise NotImplementedError + + def pop(self): + raise NotImplementedError + + def append(self, key, value): + raise NotImplementedError + + def clear(self): + raise NotImplementedError + +class HDF5FeatureData(FeatureData): + """! + Saves feature data to hdf5. + """ + def __init__(self, columns, keys, **kwargs): + super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs) + self.cadence = kwargs.pop("cadence") + self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys} + for key in keys: + self.etg_data[key][:] = numpy.nan + + def dump(self, path, base): + for key in self.keys: + key_path = os.path.join(path, str(key[0]), str(key[1]).zfill(4)) + create_new_dataset(key_path, base, self.columns, data = self.etg_data[key]) + self.clear() + + def load(self, path): + raise NotImplementedError + + def pop(self): + raise NotImplementedError + + def append(self, value, key = None, buftime = None): + if buftime and key: + idx = buftime % self.cadence + self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns]) + + def clear(self): + for key in self.keys: + self.etg_data[key][:] = numpy.nan -- GitLab