gstlal_ll_dq 9.23 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#!/usr/bin/env python
#
# Copyright (C) 2016  Chad Hanna
# Copyright (C) 2019  Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from collections import deque
Chad Hanna's avatar
Chad Hanna committed
22
import os
23 24
import logging
from optparse import OptionParser
Chad Hanna's avatar
Chad Hanna committed
25
import shutil
Chad Hanna's avatar
Chad Hanna committed
26
import StringIO
27 28 29 30 31 32
import sys

import h5py
import numpy
from scipy import signal

Chad Hanna's avatar
Chad Hanna committed
33 34 35 36 37
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
38

39 40
from ligo.scald import utils
from ligo.scald.io import core, hdf5, influx
41 42 43 44 45 46 47 48 49 50 51

from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd

#
# =============================================================================
#
#                                 Command Line
#
# =============================================================================
#

Chad Hanna's avatar
Chad Hanna committed
52 53 54 55 56 57 58
def parse_command_line():
	parser = OptionParser(description = __doc__)

	# generic "source" options
	datasource.append_options(parser)

	# add our own options
Chad Hanna's avatar
Chad Hanna committed
59
	parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
Chad Hanna's avatar
Chad Hanna committed
60
	parser.add_option("--sample-rate", metavar = "Hz", default = 4096, type = "int", help = "Sample rate at which to generate the PSD, default 16384 Hz")
Chad Hanna's avatar
Chad Hanna committed
61
	parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 8s")
Chad Hanna's avatar
Chad Hanna committed
62
	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
63
	parser.add_option("--num-threads", type = int, default = 2, help = "Number of threads to use concurrently, default 2.")
64 65 66 67
	parser.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
68 69
	parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
	parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")
Chad Hanna's avatar
Chad Hanna committed
70 71 72

	options, filenames = parser.parse_args()

73 74
	assert options.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]'

Chad Hanna's avatar
Chad Hanna committed
75 76 77 78 79
	return options, filenames

class PSDHandler(simplehandler.Handler):
	def __init__(self, *args, **kwargs):
		self.psd = None
Chad Hanna's avatar
Chad Hanna committed
80
		self.out_path = kwargs["out_path"]
Chad Hanna's avatar
Chad Hanna committed
81
		self.instrument = kwargs["instrument"]
82
		self.agg_sink = kwargs["agg_sink"]
Chad Hanna's avatar
Chad Hanna committed
83
		del kwargs["out_path"]
Chad Hanna's avatar
Chad Hanna committed
84
		del kwargs["instrument"]
85
		del kwargs["agg_sink"]
Chad Hanna's avatar
Chad Hanna committed
86
		simplehandler.Handler.__init__(self, *args, **kwargs)
87
		self.horizon_distance_func = reference_psd.HorizonDistance(15., 900., 1./16., 1.4, 1.4)
88 89

		self.routes = ("noise", "range_history")
Chad Hanna's avatar
Chad Hanna committed
90
		self.timedeq = deque(maxlen = 10000)
91
		self.datadeq = {route: deque(maxlen = 10000) for route in self.routes}
92
		self.last_reduce_time = None
93 94
		self.prevdataspan = set()

Chad Hanna's avatar
Chad Hanna committed
95 96 97 98 99 100
	def do_on_message(self, bus, message):
		if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
			self.psd = pipeio.parse_spectrum_message(message)
			return True
		return False

Chad Hanna's avatar
Chad Hanna committed
101 102
	def bufhandler(self,elem):
		buf = elem.emit("pull-sample").get_buffer()
103 104 105
		buftime = int(buf.pts / 1e9)
		if self.last_reduce_time is None:
			self.last_reduce_time = int(round(buftime,-2))
Chad Hanna's avatar
Chad Hanna committed
106 107
		(result, mapinfo) = buf.map(Gst.MapFlags.READ)
		if mapinfo.data:
Chad Hanna's avatar
Chad Hanna committed
108
			# First noise
Chad Hanna's avatar
Chad Hanna committed
109 110 111
			s = StringIO.StringIO(mapinfo.data)
			data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().split('\n') if x])
			ix = numpy.argmax(data, axis=0)[1]
Chad Hanna's avatar
Chad Hanna committed
112
			self.timedeq.append(buftime)
113
			self.datadeq['noise'].append(data[ix,1])
Chad Hanna's avatar
Chad Hanna committed
114 115

			# Then range
116
			self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
Chad Hanna's avatar
Chad Hanna committed
117 118

			# The PSD
119 120
			psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4
			psd_data = signal.decimate(self.psd.data.data[:], 4, ftype='fir')[:-1]**.5
Chad Hanna's avatar
Chad Hanna committed
121
		else:
122 123 124
			buf.unmap(mapinfo)
			del buf
			return Gst.FlowReturn.OK
Chad Hanna's avatar
Chad Hanna committed
125 126

		# Only reduce every 100s
127 128
		if (buftime - self.last_reduce_time) >= 100:
			self.last_reduce_time = int(round(buftime,-2))
129
			logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now())))
130

131
			data = {route: {self.instrument: {'time': list(self.timedeq), 'fields': {'data': list(self.datadeq[route])}}} for route in self.routes}
132

133
			### store and reduce noise / range history
134
			for route in self.routes:
135
				agg_sink.store_columns(route, data[route], aggregate="max")
136 137 138 139 140

			### flush buffers
			self.timedeq.clear()
			for route in self.routes:
				self.datadeq[route].clear()
141 142 143

			# Save a "latest" psd
			# NOTE: The PSD is special, we just record it. No min/median/max
144 145
			thisdir = os.path.join(self.out_path, core.gps_to_leaf_directory(buftime))
			core.makedir(thisdir)
146
			psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
147
			self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
Chad Hanna's avatar
Chad Hanna committed
148

Chad Hanna's avatar
Chad Hanna committed
149 150 151 152 153 154 155 156 157
		buf.unmap(mapinfo)
		del buf
		return Gst.FlowReturn.OK

	def prehandler(self,elem):
		buf = elem.emit("pull-preroll")
		del buf
		return Gst.FlowReturn.OK

Chad Hanna's avatar
Chad Hanna committed
158
	def to_hdf5(self, path, datadict):
159
		tmppath = "/dev/shm/%s" % path.replace("/","_") + ".tmp"
Chad Hanna's avatar
Chad Hanna committed
160 161 162
		f = h5py.File(tmppath, "w")
		for k, v in datadict.items():
			f[k] = v
Chad Hanna's avatar
Chad Hanna committed
163
		f.close()
Chad Hanna's avatar
Chad Hanna committed
164 165
		shutil.move(tmppath, path)

Chad Hanna's avatar
Chad Hanna committed
166 167

#
168
# =============================================================================
Chad Hanna's avatar
Chad Hanna committed
169
#
170
#                                     Main
Chad Hanna's avatar
Chad Hanna committed
171
#
172
# =============================================================================
Chad Hanna's avatar
Chad Hanna committed
173 174
#

175 176 177 178 179 180 181
if __name__ == '__main__':
	options, filenames = parse_command_line()

	logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")

	# set up aggregator sink
	if options.data_backend == 'influx':
182
		agg_sink = influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name, auth=options.enable_auth, https=options.enable_https)
183
	else: ### hdf5 data backend
184
		agg_sink = hdf5.Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
185 186 187 188

	# register measurement schemas for aggregators
	for route in ('noise', 'range_history'):
		agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238

	# parse the generic "source" options, check for inconsistencies is done inside
	# the class init method
	gw_data_source_info = datasource.GWDataSourceInfo(options)

	# only support one channel
	instrument = gw_data_source_info.channel_dict.keys()[0]

	#
	# build pipeline
	#

	if options.verbose:
		print >>sys.stderr, "building pipeline ..."
	mainloop = GObject.MainLoop()
	pipeline = Gst.Pipeline(name="DQ")
	handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument, agg_sink = agg_sink)

	head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose)
	head = pipeparts.mkresample(pipeline, head, quality = 9)
	head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d" % options.sample_rate)
	head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 8)
	head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, fft_length = options.psd_fft_length, average_samples = 64, median_samples = 7, expand_gaps = True)
	head = pipeparts.mkqueue(pipeline, head)
	head = pipeparts.mkreblock(pipeline, head)
	head = pipeparts.mkgeneric(pipeline, head, "lal_nxydump")
	sink = pipeparts.mkappsink(pipeline, head, max_buffers = 1, sync = False)
	sink.connect("new-sample", handler.bufhandler)
	sink.connect("new-preroll", handler.prehandler)

	#
	# process segment
	#

	if options.verbose:
		print >>sys.stderr, "putting pipeline into READY state ..."
	if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
		raise RuntimeError("pipeline failed to enter READY state")
	if gw_data_source_info.data_source not in ("lvshm", "framexmit"):# FIXME what about nds online?
		datasource.pipeline_seek_for_gps(pipeline, *gw_data_source_info.seg)
	if options.verbose:
		print >>sys.stderr, "putting pipeline into PLAYING state ..."
	if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
		raise RuntimeError("pipeline failed to enter PLAYING state")
	if options.verbose:
		print >>sys.stderr, "running pipeline ..."
	mainloop.run()

	if options.verbose:
		print >>sys.stderr, "Shutting down"