Skip to content
Snippets Groups Projects
Commit 259185db authored by Patrick Godwin's avatar Patrick Godwin
Browse files

idq_aggregator.py: fixed get_dataset_by_range function for level 1...

idq_aggregator.py: fixed get_dataset_by_range function for level 1 aggregation, fixed buf in gps_to_index
parent a7ae3f87
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,8 @@
import os
import glob
import sys
import numpy
import pandas
......@@ -100,25 +102,27 @@ def create_new_dataset(path, base, data, aggregate = None, tmp = False):
store.append(aggregate, data)
return fname
# FIXME: function not currently working yet
# FIXME: works for level 1 aggregation only, not tested for other levels
def get_dataset_by_range(gps_start, gps_end, path, aggregate = None, level = 0):
"""!
Returns a dataset for a given aggregation level and gps range.
"""
global_start_index = floor_div(gps_start, 10 ** (level+1))
global_end_index = floor_div(gps_end, 10 ** (level+1))
gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index, aggregator.MIN_TIME_QUANTA * (10 ** level)))
dataset = pandas.DataFrame()
for gps_epoch in gps_epochs:
path = update_agg_path(path, gps_epoch, level = level)
gps_start_index = max(global_start_index, gps_epoch)
gps_end_index = min(global_end_index, gps_epoch + aggregator.MIN_TIME_QUANTA * (10 ** level))
if aggregate is None:
for gps_index in range(gps_start_index, gps_end_index, 10 ** (level+1)):
dataset = dataset.append(get_dataset(path, str(gps_index), aggregate = aggregate))
else:
dataset = dataset.append(get_dataset(path, 'aggregates', aggregate = aggregate))
return dataset
gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index + aggregator.MIN_TIME_QUANTA * (10 ** level), aggregator.MIN_TIME_QUANTA * (10 ** level)))
# generate all relevant files and combine to one dataset
# FIXME: more efficient to filter relevant datasets first
# rather than overpopulating dataset and then filtering
paths = (update_agg_path(path, gps_epoch, level = level) for gps_epoch in gps_epochs)
if aggregate is None:
pattern = '[0-9]' * 10 + '.h5'
filelist = (glob.glob(os.path.join(path, pattern)) for path in paths)
# flatten list
files = (file_ for sublist in filelist for file_ in sublist)
else:
files = (os.path.join(path, 'aggregates.h5') for path in paths)
datasets = (pandas.read_hdf(file_, aggregate) for file_ in files)
return pandas.concat(datasets).loc[global_start_index:global_end_index]
def gps_to_index(gps_time, n_levels = 2):
"""!
......@@ -162,8 +166,8 @@ def update_agg_path(path, gps_time, level = 0):
Returns an updated aggregator path based on
an existing path and a gps time.
"""
path, rate = os.path.split(path)
for agg_level in range(aggregator.MIN_TIME_QUANTA):
path, rate = os.path.split(os.path.normpath(path))
for agg_level in range(aggregator.DIRS):
path, _ = os.path.split(path)
return os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level), rate)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment