diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg
index f1e29bd47256b8b238346c54a98d3c4ebaba06bc..75b5bafe68d452f3a1ef57cc2eabb5e332f826ad 100755
--- a/gstlal-ugly/bin/gstlal_etg
+++ b/gstlal-ugly/bin/gstlal_etg
@@ -170,13 +170,20 @@ class MultiChannelHandler(simplehandler.Handler):
 		self.last_save_time = None
 		self.cadence = options.cadence
 		self.tag = '%s-%s' % (self.instrument[:1], self.description)
-		# create header for trigger file
-		if options.latency:
-			self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency")
+		# hdf saving properties
+		if options.save_hdf:
+			columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
+			self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
+		# ascii saving properties
 		else:
-			self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
-		self.fdata = deque(maxlen = 25000)
-		self.fdata.append(self.header)
+			self.tag = '%s-%s' % (self.instrument[:1], self.description)
+			# create header for trigger file
+			if options.latency:
+				self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel", "latency")
+			else:
+				self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
+			self.fdata = deque(maxlen = 25000)
+			self.fdata.append(self.header)
 
 		# set initialization time
 		if options.data_source in ("framexmit", "lvshm"):
@@ -260,9 +267,14 @@ class MultiChannelHandler(simplehandler.Handler):
 
 			# Save triggers (in ascii format) once every cadence
 			if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
-				self.to_trigger_file(buftime)
-				self.fdata.clear()
-				self.fdata.append(self.header)
+				if options.save_hdf:
+					fname = '%s-%d-%d' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence)
+					fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
+					self.fdata.dump(fpath, fname)
+				else:
+					self.to_trigger_file(buftime)
+					self.fdata.clear()
+					self.fdata.append(self.header)
 				self.last_save_time = buftime
 
 			# read buffer contents
@@ -303,15 +315,18 @@ class MultiChannelHandler(simplehandler.Handler):
 		# NOTE
 		# Setting stop time to trigger time for use with half sine gaussians
 		stop_time = trigger_time
-		# save iDQ compatible data
-		if options.latency:
-			self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency))
-		else:
-			self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
 		# append row to feature vector for bottle requests
 		etg_row = {'timestamp': buftime, 'channel': channel, 'rate': rate, 'start_time': start_time, 'stop_time': stop_time, 'trigger_time': trigger_time,
-                           'frequency': freq, 'q': q, 'phase': row.phase, 'sigma_sq': row.sigmasq, 'chi_sq': row.chisq, 'snr': row.snr}
+                           'frequency': freq, 'q': q, 'phase': row.phase, 'sigmasq': row.sigmasq, 'chisq': row.chisq, 'snr': row.snr}
 		self.etg_event.append(etg_row)
+		# save iDQ compatible data
+		if options.save_hdf:
+			self.fdata.append(etg_row, key = (channel, rate), buftime = buftime)
+		else:
+			if options.latency:
+				self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\t%.2f\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag, latency))
+			else:
+				self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
 
 
 	def to_trigger_file(self, buftime = None):
@@ -481,6 +496,7 @@ def parse_command_line():
 	parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
 	parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
 	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
+	parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
 	parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
 	parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
 	parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
@@ -669,7 +685,8 @@ if options.verbose:
 mainloop.run()
 
 # save remaining triggers
-handler.to_trigger_file()
+if not options.save_hdf:
+	handler.to_trigger_file()
 
 #
 # Shut down pipeline
diff --git a/gstlal-ugly/python/idq_aggregator.py b/gstlal-ugly/python/idq_aggregator.py
index 5c71894d8281733e04f5d1883d411415ad96fcaf..e3819f5e170bcc1861fcb1ceddb761c6792bb43e 100644
--- a/gstlal-ugly/python/idq_aggregator.py
+++ b/gstlal-ugly/python/idq_aggregator.py
@@ -29,8 +29,8 @@ import os
 import glob
 import sys
 
+import h5py
 import numpy
-import pandas
 
 from lal import gpstime
 from gstlal import aggregator
@@ -42,141 +42,59 @@ from gstlal import aggregator
 #
 ####################
 
-def reduce_data(data, gps_end, path, reduce_count, channel, rate, reduce_cadence = 100, aggregate = 'max', level = 0):
-	"""!
-	This function does a data reduction recursively as needed
-	by powers of 10 where level specifies the power.
-	Default minimum is 1 e.g., reduction by 1 order of magnitude.
-	"""
-	if (reduce_count % (10 ** level) == 0 and level < aggregator.DIRS):
-		gps_start = gps_end - reduce_cadence * (10 ** level)
-		if level == 0:
-			agg_data = aggregate_data(data, gps_start, gps_end, aggregate = aggregate)
-		else:
-			agg_data = aggregate_data(get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = aggregate, level = level - 1), gps_start, gps_end, level = level)
-			path = update_agg_path(path, gps_start, cur_level = level - 1, new_level = level)
-		if agg_data is not None:
-			create_new_dataset(path, 'aggregates', agg_data, channel, rate, aggregate = aggregate)
-		reduce_data(data, gps_end, path, reduce_count, channel, rate, aggregate = aggregate, level = level + 1)
-
-def aggregate_data(data, gps_start, gps_end, column = 'snr', aggregate = 'max', level = 0):
-	"""!
-	Reduces data of a given level for a given gps range,
-	column, and aggregate. Returns the aggregated data.
-	"""
-	gps_start_idx = floor_div(gps_start, 10 ** (level+1))
-	gps_end_idx = floor_div(gps_end, 10 ** (level+1))
-	if aggregate == 'max':
-		max_index = get_dataframe_subset(gps_start_idx, gps_end_idx, data, level = level).groupby(pandas.TimeGrouper('%ds' % (10 ** (level+1))))[column].idxmax().dropna().values
-	else:
-		raise NotImplementedError
-	if max_index.size > 0:
-		return data.loc[max_index]
-	else:
-		return None
-
-def get_dataset(path, base, channel, rate, aggregate = None):
+def get_dataset(path, base):
 	"""!
 	open a dataset at @param path with name @param base and return the data
 	"""
-	fname = os.path.join(path, "%s.h5" % base)
-	with pandas.HDFStore(fname) as store:
-		if aggregate is None:
-			return store.select('%s/%s/data' % (channel, str(rate)))
-		else:
-			return store.select('%s/%s/%s' % (channel, str(rate), aggregate))
-
-def create_new_dataset(path, base, data, channel, rate, aggregate = None, tmp = False):
+	fname = os.path.join(path, "%s.hdf5" % base)
+	try:
+		f = h5py.File(fname, "r")
+		fields = f.keys()
+		data = zip(*[f[field] for field in fields])
+		f.close()
+		d_types = [(str(field), 'f8') for field in fields]
+		data = numpy.array(data, dtype=d_types)
+		return fname, data
+	except IOError:
+		return fname, []
+
+def create_new_dataset(path, base, fields, data = None, tmp = False):
 	"""!
 	A function to create a new dataset with data @param data.
 	The data will be stored in an hdf5 file at path @param path with
 	base name @param base.  You can also make a temporary file.
 	"""
-	# create path if one doesn't already exist
-	if not os.path.exists(path):
-		aggregator.makedir(path)
 	if tmp:
-		fname = os.path.join(path, "%s.h5.tmp" % base)
+		fname = os.path.join(path, "%s.hdf5.tmp" % base)
 	else:
-		fname = os.path.join(path, "%s.h5" % base)
-	with pandas.HDFStore(fname) as store:
-		if aggregate is None:
-			store.put('%s/%s/data' % (channel, str(rate)), data)
+		# A non temp dataset should not be overwritten
+		fname = os.path.join(path, "%s.hdf5" % base)
+		if os.path.exists(fname):
+			return fname
+	# create dir if it doesn't exist
+	if not os.path.exists(path):
+		aggregator.makedir(path)
+	# save data to hdf5
+	f = h5py.File(fname, "w")
+	for field in fields:
+		if data is None:
+			f.create_dataset(field, (0,), dtype="f8")
 		else:
-			store.append('%s/%s/%s' % (channel, str(rate), aggregate), data)
-	return fname
+			f.create_dataset(field, (len(data[field]),), dtype="f8")
+			f[field][...] = data[field]
 
-def get_dataset_by_range(gps_start, gps_end, path, channel, rate, aggregate = None, level = 0):
-	"""!
-	Returns a dataset for a given aggregation level and gps range.
-	"""
-	global_start_index = floor_div(gps_start, 10 ** (level+1))
-	global_end_index = floor_div(gps_end, 10 ** (level+1))
-	gps_epochs = (floor_div(t, aggregator.MIN_TIME_QUANTA * (10 ** level)) for t in range(global_start_index, global_end_index, aggregator.MIN_TIME_QUANTA * (10 ** level)))
-	# generate all relevant files and combine to one dataset
-	# FIXME: more efficient to filter relevant datasets first
-	#        rather than overpopulating dataset and then filtering
-	paths = (update_agg_path(path, gps_epoch, cur_level = level, new_level = level) for gps_epoch in gps_epochs)
-	if aggregate is None:
-		pattern = '[0-9]' * 10 + '.h5'
-		filelist = (glob.glob(os.path.join(path, pattern)) for path in paths)
-		# flatten list
-		files = (file_ for sublist in filelist for file_ in sublist)
-	else:
-		files = (os.path.join(path, 'aggregates.h5') for path in paths)
-	datasets = (pandas.read_hdf(file_, '%s/%s/%s' % (channel, rate, aggregate)) for file_ in files)
-	return get_dataframe_subset(global_start_index, global_end_index, pandas.concat(datasets), level = level)
-
-def get_dataframe_subset(gps_start, gps_end, dataframe, level = 0):
-	start_index = gps_to_index(gps_start)
-	end_index = gps_to_index(gps_end - (10 ** level))
-	return dataframe.loc[start_index:end_index]
-
-def create_dataframe_index(gps_time):
-	"""
-	Returns a datetime index based on gps times
-	per minimum aggregator quanta.
-	"""
-	start_time = gps_to_index(floor_div(gps_time, aggregator.MIN_TIME_QUANTA))
-	return pandas.date_range(start_time, periods = aggregator.MIN_TIME_QUANTA, freq = 's')
-
-def gps_to_index(gps_time):
-	# FIXME: round microseconds is a workaround for possible bug in the
-	#        gpstime module in lal that adds a microsecond 
-	return pandas.Timestamp(gpstime.gps_to_utc(gps_time).replace(microsecond=0))
-
-def gps_from_index(timestamp):
-	return int(gpstime.utc_to_gps(timestamp))
-
-def to_agg_path(base_path, tag, gps_time, level = 0):
-	"""
-	Returns a hierarchical gps time path based on
-	channel rate and level in hierarchy.
-	e.g. level 0: base/tag/1/2/3/4/5/6/
-	e.g. level 2: base/tag/1/2/3/4/
-	"""
-	path = os.path.join(base_path, tag)
-	path = os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = level))
-	return path
-
-def update_agg_path(path, gps_time, cur_level = 0, new_level = 0):
-	"""
-	Returns an updated aggregator path based on
-	an existing path and a gps time.
-	"""
-	for agg_level in range(max(0, cur_level), aggregator.DIRS):
-		path, _ = os.path.split(path)
-	return os.path.join(path, aggregator.gps_to_leaf_directory(gps_time, level = new_level))
+	f.close()
+	return fname
 
 def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
-	"""
+	"""!
 	Returns whether new and old gps times are in different
 	epochs.
 	"""
 	return (new_gps_time - floor_div(prev_gps_time, gps_epoch)) >= gps_epoch
 
 def floor_div(x, n):
-	"""
+	"""!
 	Floor an integer by removing its remainder
 	from integer division by another integer n.
 	e.g. floor_div(163, 10) = 160
@@ -184,3 +102,68 @@ def floor_div(x, n):
 	"""
 	assert n > 0
 	return (x / n) * n
+
+
+
+####################
+#
+#     classes
+#
+####################
+
+class FeatureData(object):
+	"""!
+	Base class for saving feature data.
+	Extend for a specific file-based implementation.
+	"""
+	def __init__(self, columns, keys = None, **kwargs):
+		self.keys = keys
+		self.columns = columns
+		self.etg_data = {}
+
+	def dump(self, path):
+		raise NotImplementedError
+
+	def load(self, path):
+		raise NotImplementedError
+
+	def pop(self):
+		raise NotImplementedError
+
+	def append(self, key, value):
+		raise NotImplementedError
+
+	def clear(self):
+		raise NotImplementedError
+
+class HDF5FeatureData(FeatureData):
+	"""!
+	Saves feature data to hdf5.
+	"""
+	def __init__(self, columns, keys, **kwargs):
+		super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
+		self.cadence = kwargs.pop("cadence")
+		self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
+		for key in keys:
+			self.etg_data[key][:] = numpy.nan
+
+	def dump(self, path, base):
+		for key in self.keys:
+			key_path = os.path.join(path, str(key[0]), str(key[1]).zfill(4))
+			create_new_dataset(key_path, base, self.columns, data = self.etg_data[key])
+		self.clear()
+
+	def load(self, path):
+		raise NotImplementedError
+
+	def pop(self):
+		raise NotImplementedError
+
+	def append(self, value, key = None, buftime = None):
+		if buftime and key:
+			idx = buftime % self.cadence
+			self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns])
+
+	def clear(self):
+		for key in self.keys:
+			self.etg_data[key][:] = numpy.nan