diff --git a/gstlal-ugly/bin/gstlal_ll_dq b/gstlal-ugly/bin/gstlal_ll_dq
index b0bfd1e4a22dadb20b028ac95aa58f8e792a06dc..f2098c8a04a8d22f0bc5e14e62501f4114ef6d89 100755
--- a/gstlal-ugly/bin/gstlal_ll_dq
+++ b/gstlal-ugly/bin/gstlal_ll_dq
@@ -6,7 +6,7 @@ from collections import deque
 from scipy import signal
 import sys
 import StringIO
-from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd, aggregator
+from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd
 from optparse import OptionParser
 import gi
 gi.require_version('Gst', '1.0')
@@ -16,6 +16,9 @@ Gst.init(None)
 import h5py
 import logging
 
+from datamon import aggregator
+from datamon import io
+
 def parse_command_line():
 	parser = OptionParser(description = __doc__)
 
@@ -41,11 +44,14 @@ class PSDHandler(simplehandler.Handler):
 		del kwargs["instrument"]
 		simplehandler.Handler.__init__(self, *args, **kwargs)
 		self.horizon_distance_func = reference_psd.HorizonDistance(20., 2048., 1./16., 1.4, 1.4)
-		self.range_history = deque(maxlen = 10000)
-		self.range_history_time = deque(maxlen = 10000)
-		self.noisedeq = deque(maxlen = 10000)
+
+		self.routes = ("noise", "range_history")
+		self.datatypes = (("min", min), ("median", aggregator.median), ("max", max))
 		self.timedeq = deque(maxlen = 10000)
+		self.datadeq = {route: deque(maxlen = 10000) for route in self.routes}
 		self.last_reduce_time = None
+		self.prevdataspan = set()
+
 	def do_on_message(self, bus, message):
 		if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
 			self.psd = pipeio.parse_spectrum_message(message)
@@ -58,19 +64,16 @@ class PSDHandler(simplehandler.Handler):
 		if self.last_reduce_time is None:
 			self.last_reduce_time = int(round(buftime,-2))
 		(result, mapinfo) = buf.map(Gst.MapFlags.READ)
-		thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument)
-		for typ in ("min", "median", "max"):
-			aggregator.makedir(os.path.join(thisdir, typ))
 		if mapinfo.data:
 			# First noise
 			s = StringIO.StringIO(mapinfo.data)
 			data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().split('\n') if x])
 			ix = numpy.argmax(data, axis=0)[1]
 			self.timedeq.append(buftime)
-			self.noisedeq.append(data[ix,1])
+			self.datadeq['noise'].append(data[ix,1])
 
 			# Then range
-			self.range_history.append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
+			self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
 
 			# The PSD
 			psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4
@@ -80,35 +83,19 @@ class PSDHandler(simplehandler.Handler):
 			del buf
 			return Gst.FlowReturn.OK
 
-
-		# Save a "latest"
-		self.to_hdf5(os.path.join(os.path.join(self.out_path, self.instrument), "psd.hdf5"), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
-		# write out all of the file types
-		for typ in ("min", "median", "max"):
-			self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "noise.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.noisedeq)})
-			self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "range_history.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.range_history)})
-
-		#
-		# FIXME do data reduction by levels here.
-		#
-
 		# Only reduce every 100s
 		if (buftime - self.last_reduce_time) >= 100:
-			logging.info("reducing data and writing PSD snaphot for %d @ %d" % (buftime, int(aggregator.now()))) 
 			self.last_reduce_time = int(round(buftime,-2))
-			for typ, func in (("min", min), ("median", aggregator.median), ("max", max)):
-				for route in ("noise", "range_history"):
-					for level in range(0, aggregator.DIRS-1):
-						thisdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level)), self.instrument), typ)
-						nextdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level+1)), self.instrument), typ)
-						aggregator.makedir(nextdir)
-						this_fname, this_x, this_y = aggregator.get_dataset(thisdir, route)
-						next_fname, next_x, next_y = aggregator.get_dataset(nextdir, route)
-						xarr, yarr = aggregator.reduce_data(numpy.concatenate((this_x,next_x)), numpy.concatenate((this_y, next_y)), func, level = level + 1)
-						self.to_hdf5(next_fname, {"time": numpy.array(xarr), "data": numpy.array(yarr)})
-			# The PSD is special, we just record it. No min/median/max
-			thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument)
-			psd_name = "%s-PSD-%d0-100.hdf5" % (self.instrument, int(round(buftime,-2)))
+			logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(aggregator.now())))
+
+			timedata = {(self.instrument, route): numpy.array(self.timedeq) for route in self.routes}
+			datadata = {(self.instrument, route): numpy.array(self.datadeq[route]) for route in self.routes}
+			self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, self.routes, self.instrument, 'instrument', self.datatypes, timedata, datadata, self.prevdataspan))
+
+			# Save a "latest" psd
+			# NOTE: The PSD is special, we just record it. No min/median/max
+			thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime), 'by_instrument', self.instrument)
+			psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
 			self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
 
 		buf.unmap(mapinfo)
@@ -122,7 +109,6 @@ class PSDHandler(simplehandler.Handler):
 
 	def to_hdf5(self, path, datadict):
 		tmppath = "/dev/shm/%s" % path.replace("/","_") + ".tmp"
-		print tmppath
 		f = h5py.File(tmppath, "w")
 		for k, v in datadict.items():
 			f[k] = v
diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index 48405fed7dc8c7c04480d92e9065030cc501d4b4..038f9ae55ed55f6a786f4014a4696c50bfc2afb5 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -32,9 +32,11 @@ import urllib2
 import shutil
 import collections
 from multiprocessing import Pool
-from gstlal import aggregator
 import json
 
+from datamon import aggregator
+from datamon import io
+
 MIN_TIME_QUANTA = 10000
 DIRS = 6
 
@@ -82,10 +84,9 @@ def parse_command_line():
 #
 
 if __name__ == '__main__':
-
 	options = parse_command_line()
-	# FIXME don't hardcode some of these?
 
+	# FIXME don't hardcode some of these?
 	datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
 	jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
 	routes = options.route
@@ -94,12 +95,14 @@ if __name__ == '__main__':
 
 	pool = Pool(options.num_threads)
 	prevdataspan = set()
+
 	# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
 	if options.kafka_server is not None:
 		from kafka import KafkaConsumer
 		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
 	else:
 		consumer = None
+
 	# start an infinite loop to keep updating and aggregating data
 	while True:
 		logging.info("sleeping")
@@ -109,29 +112,23 @@ if __name__ == '__main__':
 		if consumer is not None:
 			# this is not threadsafe!
 			logging.info("getting data from kafka")
-			timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer)
+			timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes)
 		else:
-			timedata, datadata = None, None
+			timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_id, num_threads=options.num_threads)
 
 		# First get the raw and reduced data for each job in parallel
-		mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs]
-		for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs):
-		#for ds in map(aggregator.get_data_from_job_and_reduce, mapargs):
+		mapargs = [(options.base_dir, routes, job, 'job', datatypes, timedata, datadata, prevdataspan) for job in jobs]
+		for ds in pool.map(io.hdf5.reduce_by_tag, mapargs):
 			dataspan.update(ds)
 		prevdataspan = dataspan.copy()
+
 		# Then reduce the data across jobs at each level
 		mapargs = []
 		for start, end in zip(*aggregator.job_expanse(dataspan)):
-			# FIXME don't hardcode this range
-			for level in range(DIRS):
-				this_level_dir = "/".join([options.base_dir, aggregator.gps_to_leaf_directory(start, level = level)])
-				mapargs = []
-				for route in routes:
-					for (typ,func) in datatypes:
-						aggregator.setup_dir_across_job_by_level(start, typ, route, options.base_dir, verbose = True, level = level)
-						mapargs.append((jobs, this_level_dir, typ, route, func, level, start, end))
-				pool.map(aggregator.reduce_across_jobs, mapargs)
-				#map(aggregator.reduce_across_jobs, mapargs)
+			mapargs = []
+			for route in routes:
+				mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
+			pool.map(io.hdf5.reduce_across_tags, mapargs)
 			logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now())))
 
 	#
diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_state b/gstlal-ugly/bin/gstlal_ll_inspiral_state
index 830f1f4c9c45d69ba3db6a40ff5849dcf1c3c142..d7b10d47ff6662f14309dffff27c0080aa834cee 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_state
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_state
@@ -32,9 +32,10 @@ import urllib2
 import shutil
 import collections
 from multiprocessing import Pool
-from gstlal import aggregator
 import json
 
+from datamon import aggregator
+from datamon import io
 
 #
 # =============================================================================
@@ -73,18 +74,6 @@ def parse_command_line():
 # =============================================================================
 #
 
-def get_data_from_route((job, job_tag, routes, basedir)):
-	with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
-		url = f.readline().strip()
-	for route in routes:
-		logging.info("processing job %s for route %s : %s" % (job, route, url))
-		data = aggregator.get_url(url, route)
-		jobtime, jobdata = data[0], data[1]
-		path = "%s/by_job/%s" % (basedir, job)
-		tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = jobtime, data = jobdata, tmp = True)
-		shutil.move(tmpfname, fname)
-
-
 if __name__ == '__main__':
 
 	options = parse_command_line()
@@ -108,7 +97,6 @@ if __name__ == '__main__':
 		from kafka import KafkaConsumer
 		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
 	else:
-		pool = Pool(options.num_threads)
 		consumer = None
 	while True:
 		logging.info("sleeping")
@@ -117,17 +105,16 @@ if __name__ == '__main__':
 		if consumer:
 			# this is not threadsafe!
 			logging.info("getting data from kafka")
-			timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer, req_all = True)
+			timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, req_all = True)
 			for (job,route) in timedata:
 				if "L1" in route or "H1" in route:
 					# FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
 					datadata[(job,route)] /= 16
-				path = "%s/by_job/%s" % (options.base_dir, job)
-				tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = timedata[(job,route)], data = datadata[(job,route)], tmp = True)
-				shutil.move(tmpfname, fname)
 		else:
-			mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs]
-			pool.map(get_data_from_route, mapargs)
+			timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads = options.num_threads)
 
+		for (job, route) in timedata:
+			path = "%s/by_job/%s" % (options.base_dir, job)
+			io.hdf5.store_timeseries(path, route.replace("/","_"), timedata[(job, route)], datadata[(job, route)])
 
 	sys.exit(1)