Skip to content
Snippets Groups Projects
Commit dfe98d79 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_idq_trigger_gen: added bottle routes, http server to produce psds,...

gstlal_idq_trigger_gen: added bottle routes, http server to produce psds, added verbose output; gstlal_plot_channel_psd: plotting utility for psd xml files
parent 94fe056d
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,7 @@ dist_bin_SCRIPTS = \
gstlal_ninja_smooth_reference_psd \
gstlal_median_of_psds \
gstlal_psd_pipe \
gstlal_plot_channel_psd \
gstlal_segments_operations \
gstlal_recolor_frames_pipe \
gstlal_inj_frames \
......
......@@ -29,6 +29,7 @@ from optparse import OptionParser
from collections import deque
import os
import sys
import socket
import resource
import StringIO
import threading
......@@ -42,13 +43,20 @@ Gst.init(None)
import lal
import numpy
from gstlal import pipeio
from gstlal import datasource
from gstlal import idq_multirate_datasource
from gstlal import multichannel_datasource
from gstlal import sngltriggertable
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal import httpinterface
from gstlal import bottle
from glue.ligolw import utils as ligolw_utils
from glue import iterutils
from glue.ligolw import ligolw
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import process as ligolw_process
from lal import gpstime
#
......@@ -162,6 +170,13 @@ def parse_command_line():
class MultiChannelHandler(simplehandler.Handler):
"""
A subclass of simplehandler.Handler to be used with
multiple channels.
Implements additional message handling for dealing with spectrum
messages and creates trigger files for use in iDQ.
"""
def __init__(self, *args, **kwargs):
self.lock = threading.Lock()
self.basis_params = kwargs.pop("basis_params")
......@@ -175,9 +190,37 @@ class MultiChannelHandler(simplehandler.Handler):
#self.header = "# %18s\t%20s\t%20s\t%6s\t%8s\t%8s\t%8s\t%10s\t%10s\t%9s\t%8s\t%s\n" % ("start_time", "stop_time", "trigger_time", "phase", "snr", "chisq", "sigmasq", "frequency", "Q", "latency", "rate", "channel")
self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "sigmasq", "chisq", "snr", "channel")
self.fdata = ""
# set up bottle routes for spectra by each whitener
self.psds = {}
bottle.route("/psds.xml")(self.web_get_psd_xml)
super(MultiChannelHandler, self).__init__(*args, **kwargs)
def do_on_message(self, bus, message):
"""!
Handle application-specific message types,
e.g., spectrum messages.
@param bus A reference to the pipeline's bus
@param message A reference to the incoming message
"""
#
# return value of True tells parent class that we have done
# all that is needed in response to the message, and that
# it should ignore it. a return value of False means the
# parent class should do what it thinks should be done
#
if message.type == Gst.MessageType.ELEMENT:
if message.get_structure().get_name() == "spectrum":
# get the channel name & psd.
instrument, info = message.src.get_name().split("_", 1)
channel, _ = info.rsplit("_", 1)
psd = pipeio.parse_spectrum_message(message)
# save psd
self.psds[channel] = psd
return True
return False
def bufhandler(self, elem, sink_dict):
......@@ -249,6 +292,20 @@ class MultiChannelHandler(simplehandler.Handler):
f.write(data)
shutil.move(tmpfile, fpath)
def gen_psd_xmldoc(self):
xmldoc = lal.series.make_psd_xmldoc(self.psds)
process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_idq", {})
ligolw_process.set_process_end_time(process)
return xmldoc
def web_get_psd_xml(self):
with self.lock:
output = StringIO.StringIO()
ligolw_utils.write_fileobj(self.gen_psd_xmldoc(), output)
outstr = output.getvalue()
output.close()
return outstr
class LinkedAppSync(pipeparts.AppSync):
def __init__(self, appsink_new_buffer, sink_dict = {}):
......@@ -352,8 +409,10 @@ class LinkedAppSync(pipeparts.AppSync):
#
####################
#
# parsing and setting up some core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
......@@ -363,7 +422,58 @@ channels = data_source_info.channel_dict.keys()
# dictionary of basis parameters keyed by ifo, rate
basis_params = {}
#
# create a new, empty, Bottle application and make it the current
# default, then start http server to serve it up
#
bottle.default_app.push()
# uncomment the next line to show tracebacks when something fails
# in the web server
#bottle.app().catchall = False
import base64, uuid # FIXME: don't import when the uniquification scheme is fixed
httpservers = httpinterface.HTTPServers(
# FIXME: either switch to using avahi's native name
# uniquification system or adopt a naturally unique naming
# scheme (e.g., include a search identifier and job
# number).
service_name = "gstlal_idq (%s)" % base64.urlsafe_b64encode(uuid.uuid4().bytes),
service_properties = {},
verbose = options.verbose
)
#
# Set up a registry of the resources that this job provides
#
@bottle.route("/")
@bottle.route("/index.html")
def index(channel_list = channels):
# get the host and port to report in the links from the
# request we've received so that the URLs contain the IP
# address by which client has contacted us
netloc = bottle.request.urlparts[1]
server_address = "http://%s" % netloc
yield "<html><body>\n<h3>%s %s</h3>\n<p>\n" % (netloc, " ".join(sorted(channel_list)))
for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
# don't create links back to this page
if route.rule in ("/", "/index.html"):
continue
# only create links for GET methods
if route.method != "GET":
continue
yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
yield "</p>\n</body></html>"
# FIXME: get service-discovery working, then don't do this
open("registry.txt", "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
#
# building the event loop and pipeline
#
if options.verbose:
print >>sys.stderr, "assembling pipeline..."
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, description = options.description, out_path = options.out_path, instrument = instrument)
......@@ -399,6 +509,9 @@ for channel in channels:
thishead = pipeparts.mktrigger(pipeline, thishead, rate, max_snr = True)
src[(channel, rate)] = thishead
if options.verbose:
print >>sys.stderr, "done."
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline..."
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
......@@ -418,4 +531,37 @@ if data_source_info.data_source not in ("lvshm", "framexmit"):# what about nds o
# run
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
mainloop.run()
mainloop.run()
if options.verbose:
print >>sys.stderr, "running pipeline..."
#
# Shutdown the web interface servers and garbage collect the Bottle
# app. This should release the references the Bottle app's routes
# hold to the pipeline's data (like template banks and so on).
#
del httpservers
bottle.default_app.pop()
#
# Set pipeline state to NULL and garbage collect the handler
#
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
del handler.pipeline
del handler
#
# close program manually if data source is live
#
if options.data_source in ("lvshm", "framexmit"):
sys.exit(0)
#!/usr/bin/env python
#
# Copyright (C) 2012-2015 Chad Hanna
# Copyright (C) 2015 Kipp Cannon
# Copyright (C) 2017 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from optparse import OptionParser
import os
import sys
import logging
import math
import numpy
import matplotlib
from matplotlib import figure
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from lal import series
from glue.ligolw import utils as ligolw_utils
from glue.ligolw import lsctables
from gstlal import plotutil
matplotlib.rcParams.update({
"font.size": 10.0,
"axes.titlesize": 10.0,
"axes.labelsize": 10.0,
"xtick.labelsize": 8.0,
"ytick.labelsize": 8.0,
"legend.fontsize": 8.0,
"figure.dpi": 300,
"savefig.dpi": 300,
"text.usetex": True,
"path.simplify": True
})
## @file
# A program to plot reference psds; see gstlal_plot_channel_psd for more info
#
# A program to plot a psd such as one generated by gstlal_reference_psd
#
# ### Usage:
#
# gstlal_plot_channel_psd OUTPUT-NAME PSD-FILE-1 PSD-FILE-2
#
# e.g.,
#
# gstlal_plot_channel_psd psd.png psd.xml.gz
#
color_scheme = ['#332288', '#88CCEE', '#44AA99', '#117733', '#999933', '#DDCC77', '#CC6677', '#882255', '#AA4499']
def parse_command_line():
parser = OptionParser()
parser.add_option("-o", "--output", help = "Set plot filename (default = replace input file's extension with .png).")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
if not filenames:
raise ValueError("must supply at least one input filename")
if options.output and len(filenames) > 1:
raise ValueError("must supply only one input file when setting --output")
return options, filenames
def plot_psds(psds, plot_width = 640):
"""!
Produces a matplotlib figure of PSDs.
@param psds A dictionary of PSDs as REAL8FrequencySeries keyed by
channel
@param plot_width How wide to make the plot in pixels
"""
on_channels = set(psds)
fig = figure.Figure()
FigureCanvas(fig)
fig.set_size_inches(plot_width / float(fig.get_dpi()), int(round(plot_width / plotutil.golden_ratio)) / float(fig.get_dpi()))
axes = fig.gca()
axes.grid(which = "both", linestyle = "-", linewidth = 0.2)
min_psds, max_psds = [], []
min_fs, max_fs = [], []
color_index = 0
for channel, psd in sorted(psds.items()):
if psd is None:
continue
psd_data = psd.data.data
f = psd.f0 + numpy.arange(len(psd_data)) * psd.deltaF
logging.info("found PSD for %s spanning [%g Hz, %g Hz]" % (channel, f[0], f[-1]))
min_fs.append(f[0])
max_fs.append(f[-1])
if channel in on_channels:
alpha = 0.8
linestyle = "-"
else:
alpha = 0.6
linestyle = ":"
label = r"%s" % channel.replace('_', '\_')
axes.loglog(f, psd_data, alpha = alpha, color = color_scheme[color_index], linestyle = linestyle, label = label)
# record the minimum from within the rage 10 Hz -- 1 kHz
min_psds.append(min([x for x in psd_data[int((10.0 - psd.f0) / psd.deltaF) : int((1000 - psd.f0) / psd.deltaF)] if x > 0]))
# record the maximum from within the rage 1 Hz -- 1 kHz
max_psds.append(psd_data[int((1.0 - psd.f0) / psd.deltaF) : int((1000 - psd.f0) / psd.deltaF)].max())
color_index += 1
if min_fs:
axes.set_xlim((6.0, max(max_fs)))
else:
axes.set_xlim((6.0, 3000.0))
if min_psds:
axes.set_ylim((10**math.floor(math.log10(min(min_psds))), 10**math.ceil(math.log10(max(max_psds)))))
title = r"Noise Spectral Density"
axes.set_title(title)
axes.set_xlabel(r"Frequency (Hz)")
axes.set_ylabel(r"Spectral Density ($\mathrm{1} / \mathrm{Hz}$)")
axes.legend(loc = "upper right")
fig.tight_layout(pad = .8)
return fig
options, filenames = parse_command_line()
index = 0
for fname in filenames:
index += 1
if options.output:
outname = options.output
else:
outname = os.path.join(os.path.splitext(fname)[0], "%d" % index)
fig = plot_psds(
series.read_psd_xmldoc(
ligolw_utils.load_filename(
fname,
verbose = options.verbose,
contenthandler = series.PSDContentHandler
)
),
plot_width = 2400
)
fig.savefig(outname)
......@@ -181,7 +181,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
# reduced to account for the loss of the zero padding to
# keep the Hann window the same as implied by the
# user-supplied parameters.
whiten = pipeparts.mkwhiten(pipeline, pipeparts.mkqueue(pipeline, head, max_size_time = 2 * psd_fft_length * Gst.SECOND), fft_length = psd_fft_length - 2 * zero_pad, zero_pad = 0, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lal_whiten" % (instrument, channel_name))
whiten = pipeparts.mkwhiten(pipeline, pipeparts.mkqueue(pipeline, head, max_size_time = 2 * psd_fft_length * Gst.SECOND), fft_length = psd_fft_length - 2 * zero_pad, zero_pad = 0, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lalwhiten" % (instrument, channel_name))
pipeparts.mkfakesink(pipeline, whiten)
# high pass filter
......@@ -222,11 +222,11 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
head = pipeparts.mkgate(pipeline, head, control = dqvector, default_state = False, threshold = 1, hold_length = -max(rates), attack_length = -max(rates) * (psd_fft_length + 1))
# Drop initial data to let the PSD settle
head = pipeparts.mkdrop(pipeline, head, drop_samples = 16 * psd_fft_length * max(rates))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_%s_timestamps_fir" % (instrument, channel_name))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_%s_timestampsfir" % (instrument, channel_name))
#head = pipeparts.mknxydumpsinktee(pipeline, head, filename = "after_mkfirbank.txt")
else:
head = whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lal_whiten" % (instrument, channel_name))
head = whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lalwhiten" % (instrument, channel_name))
# make the buffers going downstream smaller, this can
# really help with RAM
head = pipeparts.mkreblock(pipeline, head, block_duration = block_duration)
......@@ -274,7 +274,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
else:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (max(rates), GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_%s_%d_timestamps__white" % (instrument, channel_name, max(rates)))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_%s_%d_timestampwhite" % (instrument, channel_name, max(rates)))
#
# optionally add vetoes
......@@ -336,7 +336,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = None, psd_f
else:
head[rate] = head[max(rates)]
head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, head[rate], quality = quality), caps = "audio/x-raw, rate=%d" % rate)
head[rate] = pipeparts.mkchecktimestamps(pipeline, head[rate], "%s_%s_%d_timestamps_white" % (instrument, channel_name, rate))
head[rate] = pipeparts.mkchecktimestamps(pipeline, head[rate], "%s_%s_%d_timestampswhite" % (instrument, channel_name, rate))
head[rate] = pipeparts.mktee(pipeline, head[rate])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment