From 990c33db15884870a2f4a0a5ce2b44e0e02d65c3 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Sun, 10 Feb 2019 16:04:17 -0800
Subject: [PATCH] gstlal_ll_dq: use new aggregation classes from datamon to
 aggregate data, tidy up imports and add copyright info

---
 gstlal-ugly/bin/gstlal_ll_dq | 194 +++++++++++++++++++++--------------
 1 file changed, 117 insertions(+), 77 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_dq b/gstlal-ugly/bin/gstlal_ll_dq
index 7af18a6101..bb7273ef5f 100755
--- a/gstlal-ugly/bin/gstlal_ll_dq
+++ b/gstlal-ugly/bin/gstlal_ll_dq
@@ -1,20 +1,54 @@
-#!/usr/bin/python
+#!/usr/bin/env python
+#
+# Copyright (C) 2016  Chad Hanna
+# Copyright (C) 2019  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+
+from collections import deque
 import os
-import numpy, scipy
+import logging
+from optparse import OptionParser
 import shutil
-from collections import deque
-from scipy import signal
-import sys
 import StringIO
-from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd
-from optparse import OptionParser
+import sys
+
+import h5py
+import numpy
+from scipy import signal
+
 import gi
 gi.require_version('Gst', '1.0')
 from gi.repository import GObject, Gst
 GObject.threads_init()
 Gst.init(None)
-import h5py
-import logging
+
+from datamon import aggregator
+from datamon import io
+
+from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd
+
+#
+# =============================================================================
+#
+#                                 Command Line
+#
+# =============================================================================
+#
+
 
 from datamon import aggregator
 from datamon import io
@@ -30,6 +64,7 @@ def parse_command_line():
 	parser.add_option("--sample-rate", metavar = "Hz", default = 4096, type = "int", help = "Sample rate at which to generate the PSD, default 16384 Hz")
 	parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 8s")
 	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
+	parser.add_option("--num-threads", type = int, default = 2, help = "Number of threads to use concurrently, default 2.")
 	parser.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
 	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
 	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
@@ -46,10 +81,10 @@ class PSDHandler(simplehandler.Handler):
 		self.psd = None
 		self.out_path = kwargs["out_path"]
 		self.instrument = kwargs["instrument"]
-		self.influx_client = kwargs["influx_client"]
+		self.agg_sink = kwargs["agg_sink"]
 		del kwargs["out_path"]
 		del kwargs["instrument"]
-		del kwargs["influx_client"]
+		del kwargs["agg_sink"]
 		simplehandler.Handler.__init__(self, *args, **kwargs)
 		self.horizon_distance_func = reference_psd.HorizonDistance(20., 2048., 1./16., 1.4, 1.4)
 
@@ -96,14 +131,16 @@ class PSDHandler(simplehandler.Handler):
 			self.last_reduce_time = int(round(buftime,-2))
 			logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(aggregator.now())))
 
-			timedata = {route: {self.instrument: numpy.array(self.timedeq)} for route in self.routes}
-			datadata = {route: {self.instrument: numpy.array(self.datadeq[route])} for route in self.routes}
+			data = {route: {self.instrument: (list(self.timedeq), list(self.datadeq[route]))} for route in self.routes}
 
+			### store and reduce noise / range history
 			for route in self.routes:
-				if self.influx_client:
-					io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'ifo')
-				else:
-					self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, route, self.instrument, 'ifo', self.datatypes, timedata[route], datadata[route], self.prevdataspan))
+				agg_sink.store_and_reduce(route, data[route], 'data', tags='ifo', aggregates=("min", "median", "max"))
+
+			### flush buffers
+			self.timedeq.clear()
+			for route in self.routes:
+				self.datadeq[route].clear()
 
 			# Save a "latest" psd
 			# NOTE: The PSD is special, we just record it. No min/median/max
@@ -129,68 +166,71 @@ class PSDHandler(simplehandler.Handler):
 		shutil.move(tmppath, path)
 
 
-
 #
-# MAIN
+# =============================================================================
 #
-
-options, filenames = parse_command_line()
-
-logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
-
-# Instantiate influxDB connection if data backend is influx
-if options.data_backend == 'influx':
-	influx_client = io.influx.create_client(options.influx_hostname, options.influx_port)
-else:
-	influx_client = None
-
-# parse the generic "source" options, check for inconsistencies is done inside
-# the class init method
-gw_data_source_info = datasource.GWDataSourceInfo(options)
-
-# only support one channel
-instrument = gw_data_source_info.channel_dict.keys()[0]
-
+#                                     Main
 #
-# build pipeline
+# =============================================================================
 #
 
-if options.verbose:
-	print >>sys.stderr, "building pipeline ..."
-mainloop = GObject.MainLoop()
-pipeline = Gst.Pipeline(name="DQ")
-handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument, influx_client = influx_client)
-
-head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose)
-head = pipeparts.mkresample(pipeline, head, quality = 9)
-head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d" % options.sample_rate)
-head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 8)
-head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, fft_length = options.psd_fft_length, average_samples = 64, median_samples = 7, expand_gaps = True)
-head = pipeparts.mkqueue(pipeline, head)
-head = pipeparts.mkreblock(pipeline, head)
-head = pipeparts.mkgeneric(pipeline, head, "lal_nxydump")
-sink = pipeparts.mkappsink(pipeline, head, max_buffers = 1, sync = False)
-sink.connect("new-sample", handler.bufhandler)
-sink.connect("new-preroll", handler.prehandler)
-
-#
-# process segment
-#
-
-if options.verbose:
-	print >>sys.stderr, "putting pipeline into READY state ..."
-if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
-	raise RuntimeError("pipeline failed to enter READY state")
-if gw_data_source_info.data_source not in ("lvshm", "framexmit"):# FIXME what about nds online?
-	datasource.pipeline_seek_for_gps(pipeline, *gw_data_source_info.seg)
-if options.verbose:
-	print >>sys.stderr, "putting pipeline into PLAYING state ..."
-if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
-	raise RuntimeError("pipeline failed to enter PLAYING state")
-if options.verbose:
-	print >>sys.stderr, "running pipeline ..."
-mainloop.run()
-
-if options.verbose:
-	print >>sys.stderr, "Shutting down"
-
+if __name__ == '__main__':
+	options, filenames = parse_command_line()
+
+	logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
+
+	# set up aggregator sink
+	if options.data_backend == 'influx':
+		agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
+	else: ### hdf5 data backend
+		agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
+
+	# parse the generic "source" options, check for inconsistencies is done inside
+	# the class init method
+	gw_data_source_info = datasource.GWDataSourceInfo(options)
+
+	# only support one channel
+	instrument = gw_data_source_info.channel_dict.keys()[0]
+
+	#
+	# build pipeline
+	#
+
+	if options.verbose:
+		print >>sys.stderr, "building pipeline ..."
+	mainloop = GObject.MainLoop()
+	pipeline = Gst.Pipeline(name="DQ")
+	handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument, agg_sink = agg_sink)
+
+	head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose)
+	head = pipeparts.mkresample(pipeline, head, quality = 9)
+	head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d" % options.sample_rate)
+	head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 8)
+	head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, fft_length = options.psd_fft_length, average_samples = 64, median_samples = 7, expand_gaps = True)
+	head = pipeparts.mkqueue(pipeline, head)
+	head = pipeparts.mkreblock(pipeline, head)
+	head = pipeparts.mkgeneric(pipeline, head, "lal_nxydump")
+	sink = pipeparts.mkappsink(pipeline, head, max_buffers = 1, sync = False)
+	sink.connect("new-sample", handler.bufhandler)
+	sink.connect("new-preroll", handler.prehandler)
+
+	#
+	# process segment
+	#
+
+	if options.verbose:
+		print >>sys.stderr, "putting pipeline into READY state ..."
+	if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
+		raise RuntimeError("pipeline failed to enter READY state")
+	if gw_data_source_info.data_source not in ("lvshm", "framexmit"):# FIXME what about nds online?
+		datasource.pipeline_seek_for_gps(pipeline, *gw_data_source_info.seg)
+	if options.verbose:
+		print >>sys.stderr, "putting pipeline into PLAYING state ..."
+	if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
+		raise RuntimeError("pipeline failed to enter PLAYING state")
+	if options.verbose:
+		print >>sys.stderr, "running pipeline ..."
+	mainloop.run()
+
+	if options.verbose:
+		print >>sys.stderr, "Shutting down"
-- 
GitLab