Skip to content
Snippets Groups Projects
Commit b3cddfe8 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

idq_aggregator.py: changed indexing scheme from MultiIndex to DatetimeIndex,...

idq_aggregator.py: changed indexing scheme from MultiIndex to DatetimeIndex, multilevel aggregation fixed (except for possibly some edge cases)
parent 1cdeae5c
No related branches found
No related tags found
No related merge requests found
......@@ -213,6 +213,7 @@ class MultiChannelHandler(simplehandler.Handler):
# dataframe/hdf saving properties
self.tag = '%s-%s' % (self.instrument[:1], "IDQ_TRIGGERS_BY_CHANNEL")
self.last_hdf_save_time = dict.fromkeys(self.keys, None)
self.reduce_count = dict.fromkeys(self.keys, 0)
self.hdf_cadence = 10
self.reduce_cadence = 100
......@@ -336,10 +337,10 @@ class MultiChannelHandler(simplehandler.Handler):
else:
self.fdata += "%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr, channel_tag)
# save dataframe compatible data
buftime_index = idq_aggregator.floor_div(buftime, self.hdf_cadence)
if options.triggers_from_dataframe:
try:
self.dataframes[(channel, rate)].loc[buftime_index, buftime] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
save_idx = idq_aggregator.gps_to_index(buftime)
self.dataframes[(channel, rate)].loc[save_idx] = numpy.array([start_time, stop_time, trigger_time, freq, row.phase, row.sigmasq, row.chisq, row.snr], dtype=float)
except ValueError:
print >>sys.stderr, "Error saving buffer contents to dataframe at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
......@@ -362,15 +363,16 @@ class MultiChannelHandler(simplehandler.Handler):
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
save_path = idq_aggregator.to_agg_path(self.out_path, self.tag, save_index, channel, rate)
try:
idq_aggregator.create_new_dataset(save_path, str(save_index), self.dataframes[(channel, rate)].loc[save_index])
idq_aggregator.create_new_dataset(save_path, str(save_index), idq_aggregator.get_dataframe_subset(save_index, save_index + self.hdf_cadence, self.dataframes[(channel, rate)]))
except KeyError:
print >>sys.stderr, "Error saving dataframe to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# find gps times of max snr triggers per cadence and save to file
self.reduce_count[(channel, rate)] += 1
last_reduce_index = idq_aggregator.floor_div(self.last_hdf_save_time[(channel, rate)], self.reduce_cadence)
reduce_path = idq_aggregator.to_agg_path(self.out_path, self.tag, last_reduce_index, channel, rate)
try:
idq_aggregator.reduce_data(self.dataframes[(channel, rate)], last_reduce_index, current_save_index, reduce_path, reduce_count = 1)
idq_aggregator.reduce_data(self.dataframes[(channel, rate)], last_reduce_index, current_save_index, reduce_path, reduce_count = self.reduce_count[(channel, rate)])
except (KeyError, AttributeError):
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
......@@ -380,17 +382,18 @@ class MultiChannelHandler(simplehandler.Handler):
for save_index in range(last_save_index, current_save_index, self.hdf_cadence):
save_path = idq_aggregator.to_agg_path(self.out_path, self.tag, save_index, channel, rate)
try:
idq_aggregator.create_new_dataset(save_path, str(save_index), self.dataframes[(channel, rate)].loc[save_index])
idq_aggregator.create_new_dataset(save_path, str(save_index), idq_aggregator.get_dataframe_subset(save_index, save_index + self.hdf_cadence, self.dataframes[(channel, rate)]))
except KeyError:
print >>sys.stderr, "Error saving dataframe to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
# case 3: save current triggers from current directory and aggregate
if idq_aggregator.in_new_epoch(buftime, self.last_hdf_save_time[(channel, rate)], self.reduce_cadence):
# find gps times of max snr triggers per cadence and save to file
self.reduce_count[(channel, rate)] += 1
last_reduce_index = idq_aggregator.floor_div(self.last_hdf_save_time[(channel, rate)], self.reduce_cadence)
reduce_path = idq_aggregator.to_agg_path(self.out_path, self.tag, last_reduce_index, channel, rate)
try:
idq_aggregator.reduce_data(self.dataframes[(channel, rate)], last_reduce_index, current_save_index, reduce_path, reduce_count = 1)
idq_aggregator.reduce_data(self.dataframes[(channel, rate)], last_reduce_index, current_save_index, reduce_path, reduce_count = self.reduce_count[(channel, rate)])
except (KeyError, AttributeError):
print >>sys.stderr, "Error saving dataframe aggregates to hdf at buffer time = %d for channel = %s, rate = %d." % (buftime, channel, rate)
traceback.print_exc()
......@@ -408,10 +411,10 @@ class MultiChannelHandler(simplehandler.Handler):
trigger_format = {'start_time': time_format, 'stop_time': time_format, 'trigger_time': time_format, 'frequency': col1_format, 'phase': col2_format, 'sigmasq': col2_format, 'chisq': col2_format, 'snr': col1_format}
#for (channel, rate) in self.keys:
# channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
# if not self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].dropna().empty:
# self.fdata += self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].assign(channel_tag = channel_tag).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n"
# if not idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).dropna().empty:
# self.fdata += idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).assign(channel_tag = channel_tag).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n"
# fast concatenation to do the above:
self.fdata = self.fdata.join([(self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].assign(channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n") for (channel, rate) in self.keys if not self.dataframes[(channel, rate)].loc[idx[:, self.last_save_time:gps_time], :].dropna().empty])
self.fdata = self.fdata.join([(idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).assign(channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)).dropna().to_string(header = False, index = False, formatters = trigger_format) + "\n") for (channel, rate) in self.keys if not idq_aggregator.get_dataframe_subset(self.last_save_time, gps_time, self.dataframes[(channel, rate)]).dropna().empty])
def to_trigger_file(self):
# NOTE
......
......@@ -32,6 +32,7 @@ import sys
import numpy
import pandas
from lal import gpstime
from gstlal import aggregator
......@@ -41,38 +42,37 @@ from gstlal import aggregator
#
####################
# FIXME: works for level 1 aggregation only, not tested for other levels
def reduce_data(data, gps_start, gps_end, path, reduce_count, aggregate = 'max', level = 1):
def reduce_data(data, gps_start, gps_end, path, reduce_count, aggregate = 'max', level = 0):
"""!
This function does a data reduction recursively as needed
by powers of 10 where level specifies the power.
Default minimum is 1 e.g., reduction by 1 order of magnitude.
"""
if level == 1 or (reduce_count % (10 ** level) == 0 and level <= aggregator.DIRS):
if level == 1:
if (reduce_count % (10 ** level) == 0 and level < aggregator.DIRS):
if level == 0:
agg_data = aggregate_data(data, gps_start, gps_end, aggregate = aggregate)
else:
agg_data = aggregate_data(get_dataset_by_range(gps_start, gps_end, path, aggregate = aggregate, level = level), gps_start, gps_end, level = level)
agg_data = aggregate_data(get_dataset_by_range(gps_start, gps_end, path, aggregate = aggregate, level = level - 1), gps_start, gps_end, level = level)
path = update_agg_path(path, gps_start, cur_level = level - 1, new_level = level)
if agg_data is not None:
create_new_dataset(path, 'aggregates', agg_data, aggregate = aggregate)
path = update_agg_path(path, gps_start, level = level)
reduce_data(data, gps_start, gps_end, path, reduce_count, aggregate = aggregate, level = level + 1)
# FIXME: works for level 1 aggregation only, not tested for other levels
def aggregate_data(data, gps_start, gps_end, column = 'snr', aggregate = 'max', level = 1):
def aggregate_data(data, gps_start, gps_end, column = 'snr', aggregate = 'max', level = 0):
"""!
Reduces data of a given level for a given gps range,
column, and aggregate. Returns the aggregated data.
"""
gps_start_index = floor_div(gps_start, 10 ** level)
gps_end_index = floor_div(gps_end, 10 ** level) - (10 ** level)
gps_start_idx = floor_div(gps_start, 10 ** (level+1))
gps_end_idx = floor_div(gps_end, 10 ** (level+1))
if aggregate == 'max':
max_index = data.loc[gps_start_index:gps_end_index].groupby(level='gps_level%d' % level)[column].idxmax().dropna().values
if max_index.size > 0:
return data.loc[max_index]
else:
return None
max_index = get_dataframe_subset(gps_start_idx, gps_end_idx, data, level = level).groupby(pandas.TimeGrouper('%ds' % (10 ** (level+1))))[column].idxmax().dropna().values
else:
raise NotImplementedError
if max_index.size > 0:
return data.loc[max_index]
else:
return None
def get_dataset(path, base, aggregate = None):
"""!
......@@ -91,6 +91,8 @@ def create_new_dataset(path, base, data, aggregate = None, tmp = False):
The data will be stored in an hdf5 file at path @param path with
base name @param base. You can also make a temporary file.
"""
if not os.path.exists(path):
aggregator.makedir(path)
if tmp:
fname = os.path.join(path, "%s.h5.tmp" % base)
else:
......@@ -102,18 +104,17 @@ def create_new_dataset(path, base, data, aggregate = None, tmp = False):
store.append(aggregate, data)
return fname
# FIXME: works for level 1 aggregation only, not tested for other levels
def get_dataset_by_range(gps_start, gps_end, path, aggregate = None, level = 0):
"""!
Returns a dataset for a given aggregation level and gps range.
"""
global_start_index = floor_div(gps_start, 10 ** (level+1))
global_end_index = floor_div(gps_end, 10 ** (level+1))
gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index + aggregator.MIN_TIME_QUANTA * (10 ** level), aggregator.MIN_TIME_QUANTA * (10 ** level)))
gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index, aggregator.MIN_TIME_QUANTA * (10 ** level)))
# generate all relevant files and combine to one dataset
# FIXME: more efficient to filter relevant datasets first
# rather than overpopulating dataset and then filtering
paths = (update_agg_path(path, gps_epoch, level = level) for gps_epoch in gps_epochs)
paths = (update_agg_path(path, gps_epoch, cur_level = level, new_level = level) for gps_epoch in gps_epochs)
if aggregate is None:
pattern = '[0-9]' * 10 + '.h5'
filelist = (glob.glob(os.path.join(path, pattern)) for path in paths)
......@@ -122,29 +123,28 @@ def get_dataset_by_range(gps_start, gps_end, path, aggregate = None, level = 0):
else:
files = (os.path.join(path, 'aggregates.h5') for path in paths)
datasets = (pandas.read_hdf(file_, aggregate) for file_ in files)
return pandas.concat(datasets).loc[global_start_index:global_end_index]
return get_dataframe_subset(global_start_index, global_end_index, pandas.concat(datasets), level = level)
def gps_to_index(gps_time, n_levels = 2):
"""!
Returns a tuple containing a multilevel gps index.
"""
index = [gps_time]
for level in range(1, n_levels):
index.append(floor_div(gps_time, 10 ** level))
return tuple(index[::-1])
def get_dataframe_subset(gps_start, gps_end, dataframe, level = 0):
start_index = gps_to_index(gps_start)
end_index = gps_to_index(gps_end - (10 ** level))
return dataframe.loc[start_index:end_index]
def create_dataframe_index(gps_time, n_levels = 2):
def create_dataframe_index(gps_time):
"""
Returns an n level index based on gps times
Returns a datetime index based on gps times
per minimum aggregator quanta.
"""
index_t = numpy.arange(floor_div(gps_time, aggregator.MIN_TIME_QUANTA), floor_div(gps_time, aggregator.MIN_TIME_QUANTA) + aggregator.MIN_TIME_QUANTA, dtype = numpy.int)
indices = [index_t]
index_names = ['gps_level0']
for level in range(1, n_levels):
indices.append(numpy.fromiter(( floor_div(x, 10 ** level) for x in index_t), dtype = numpy.int))
index_names.append('gps_level%d' % level)
return pandas.MultiIndex.from_tuples(list(zip(*(indices[::-1]))), names = index_names[::-1])
start_time = gps_to_index(floor_div(gps_time, aggregator.MIN_TIME_QUANTA))
return pandas.date_range(start_time, periods = aggregator.MIN_TIME_QUANTA, freq = 's')
def gps_to_index(gps_time):
# FIXME: round microseconds is a workaround for possible bug in the
# gpstime module in lal that adds a microsecond
return pandas.Timestamp(gpstime.gps_to_utc(gps_time).replace(microsecond=0))
def gps_from_index(timestamp):
return int(gpstime.utc_to_gps(timestamp))
def to_agg_path(base_path, tag, gps_time, channel, rate, level = 0):
"""
......@@ -161,15 +161,15 @@ def to_agg_path(base_path, tag, gps_time, channel, rate, level = 0):
path = os.path.join(path, str(rate).zfill(4))
return path
def update_agg_path(path, gps_time, level = 0):
def update_agg_path(path, gps_time, cur_level = 0, new_level = 0):
"""
Returns an updated aggregator path based on
an existing path and a gps time.
"""
path, rate = os.path.split(os.path.normpath(path))
for agg_level in range(aggregator.DIRS):
path, rate = os.path.split(os.path.abspath(path))
for agg_level in range(max(0, cur_level), aggregator.DIRS):
path, _ = os.path.split(path)
return os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level), rate)
return os.path.join(path, os.path.join(aggregator.gps_to_leaf_directory(gps_time, level = new_level), rate))
def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
"""
......@@ -187,4 +187,3 @@ def floor_div(x, n):
"""
assert n > 0
return (x / n) * n
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment