Skip to content
Snippets Groups Projects
Commit 53883a21 authored by Chad Hanna's avatar Chad Hanna
Browse files

gstlal_ll_dq: add data reduction

parent 0cd1f324
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ from collections import deque
from scipy import signal
import sys
import StringIO
from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd
from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd, aggregator
from optparse import OptionParser
import gi
gi.require_version('Gst', '1.0')
......@@ -35,34 +35,83 @@ class PSDHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
self.psd = None
self.out_path = kwargs["out_path"]
self.instrument = kwargs["instrument"]
del kwargs["out_path"]
del kwargs["instrument"]
simplehandler.Handler.__init__(self, *args, **kwargs)
self.horizon_history = deque(maxlen = 900)
self.horizon_history_time = deque(maxlen = 900)
self.noisedeq = deque(maxlen = 3600)
self.timedeq = deque(maxlen = 3600)
self.horizon_history = deque(maxlen = 10000)
self.horizon_history_time = deque(maxlen = 10000)
self.noisedeq = deque(maxlen = 10000)
self.timedeq = deque(maxlen = 10000)
def do_on_message(self, bus, message):
if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
self.psd = pipeio.parse_spectrum_message(message)
self.horizon_history.append(reference_psd.horizon_distance(self.psd, 1.4, 1.4, 8, 20))
self.horizon_history_time.append(float(self.psd.epoch))
self.to_hdf5("psd.hdf5", {"freq": numpy.arange(self.psd.data.length / 32) * self.psd.deltaF * 32, "asd": signal.decimate(self.psd.data.data[:], 32, ftype='fir')[:-1]**.5})
#self.to_hdf5("psd.hdf5", {"freq": numpy.arange(self.psd.data.length) * self.psd.deltaF*32, "asd": self.psd.data.data[:-1:32]**.5})
self.to_hdf5("horizon_history.hdf5", {"time": numpy.array(self.horizon_history_time), "horizon": numpy.array(self.horizon_history)})
return True
return False
def bufhandler(self,elem):
buf = elem.emit("pull-sample").get_buffer()
buftime = buf.pts / 1e9
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
thisdir = os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime))
for typ in ("min", "median", "max"):
aggregator.makedir(os.path.join(thisdir, typ))
if mapinfo.data:
# First noise
s = StringIO.StringIO(mapinfo.data)
data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().split('\n') if x])
ix = numpy.argmax(data, axis=0)[1]
self.timedeq.append(data[ix,0])
self.timedeq.append(buftime)
self.noisedeq.append(data[ix,1])
self.to_hdf5("noise.hdf5", {"time": numpy.array(self.timedeq), "noise": numpy.array(self.noisedeq)})
# Then range
self.horizon_history.append(reference_psd.horizon_distance(self.psd, 1.4, 1.4, 8, 20))
# The PSD
psd_freq = numpy.arange(self.psd.data.length / 32) * self.psd.deltaF * 32
psd_data = signal.decimate(self.psd.data.data[:], 32, ftype='fir')[:-1]**.5
else:
# First noise
self.timedeq.append(buftime)
self.noisedeq.append(0.0)
# Then range
self.horizon_history.append(0.0)
# Then PSD
psd_freq = numpy.arange(1024) * 2
psd_data = numpy.zeros(1024)
# The PSD is special, we just record it. No min/median/max
psd_name = "%s-PSD-%d0-10.hdf5" % (self.instrument, int(buftime / 10))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data})
# Save a "latest"
psd_name = "psd.hdf5"
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data})
# write out all of the file types
for typ in ("min", "median", "max"):
self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "noise.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.noisedeq)})
self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "horizon_history.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.horizon_history)})
#
# FIXME do data reduction by levels here.
#
# Only reduce every 100s
if not (buftime % 100):
for typ, func in (("min", min), ("median", aggregator.median), ("max", max)):
for route in ("noise", "horizon_history"):
for level in range(0, aggregator.DIRS-1):
thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime, level = level)), typ)
nextdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime, level = level+1)), typ)
aggregator.makedir(nextdir)
this_fname, this_x, this_y = aggregator.get_dataset(thisdir, route)
next_fname, next_x, next_y = aggregator.get_dataset(nextdir, route)
xarr, yarr = aggregator.reduce_data(this_x + next_x, this_y + next_y, func, level = level + 1)
self.to_hdf5(next_fname, {"time": numpy.array(xarr), "data": numpy.array(yarr)})
buf.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
......@@ -72,8 +121,7 @@ class PSDHandler(simplehandler.Handler):
del buf
return Gst.FlowReturn.OK
def to_hdf5(self, name, datadict):
path = os.path.join(self.out_path,"%s" % name)
def to_hdf5(self, path, datadict):
tmppath = path + ".tmp"
f = h5py.File(tmppath, "w")
for k, v in datadict.items():
......@@ -104,7 +152,7 @@ if options.verbose:
print >>sys.stderr, "building pipeline ..."
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(name="DQ")
handler = PSDHandler(mainloop, pipeline, out_path = options.out_path)
handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument)
head = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose)
#head = pipeparts.mkresample(pipeline, head, quality = 9)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment