Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • steffen.grunewald/gstlal
  • sumedha.biswas/gstlal
  • spiir-group/gstlal
  • madeline-wade/gstlal
  • hunter.schuler/gstlal
  • adam-mercer/gstlal
  • amit.reza/gstlal
  • alvin.li/gstlal
  • duncanmmacleod/gstlal
  • rebecca.ewing/gstlal
  • javed.sk/gstlal
  • leo.tsukada/gstlal
  • brian.bockelman/gstlal
  • ed-maros/gstlal
  • koh.ueno/gstlal
  • leo-singer/gstlal
  • lscsoft/gstlal
17 results
Show changes
Showing
with 519 additions and 1840 deletions
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program that measures template overlaps and how templates are spread out in the parameter space
for gstlal_feature_extractor
"""
####################
#
# preamble
#
####################
import itertools
from optparse import OptionParser
import os
import random
import sys
import urlparse
import numpy
import lal
from glue import markup
from gstlal import plotutil
from gstlal import aggregator
from gstlal.fxtools import utils
import matplotlib
matplotlib.use('Agg')
from mpl_toolkits.axes_grid import make_axes_locatable
from matplotlib.colorbar import Colorbar
from matplotlib import pyplot as plt
from matplotlib import ticker as tkr
import matplotlib.cm as cm
matplotlib.rcParams.update({
"font.size": 13.0,
"axes.titlesize": 13.0,
"axes.labelsize": 13.0,
"xtick.labelsize": 13.0,
"ytick.labelsize": 13.0,
"legend.fontsize": 13.0,
"figure.dpi": 300,
"savefig.dpi": 300,
"text.usetex": False,
"path.simplify": True
})
cluster_urls = {'CIT': 'https://ldas-jobs.ligo.caltech.edu/',
'LHO': 'https://ldas-jobs.ligo-wa.caltech.edu/',
'LLO': 'https://ldas-jobs.ligo-la.caltech.edu/',
'uwm': 'https://ldas-jobs.cgca.uwm.edu/'
}
colors = ['#2c7fb8', '#e66101', '#5e3c99', '#d01c8b']
####################
#
# functions
#
####################
def plot_waveform(time, waveform, waveform_type='', waveform_params=None):
fig, axes = plt.subplots()
axes.plot(time, waveform, color = '#2c7fb8', alpha = 0.7, lw=2)
axes.set_title(r"%s waveform" % (waveform_type))
axes.set_ylabel(r"Amplitude [arb. unit]")
axes.set_xlabel(r"Time [seconds]")
axes.set_xlim(time[0], time[-1])
if waveform_params:
axes.text(0.96 * max(time), 0.98 * min(waveform), r"%s" % repr(waveform_params), size=10, ha='right')
return fig
def plot_waveforms(times, waveforms, waveform_type='', waveform_params=None):
"""
Plots multiple waveforms in one plot (up to 4 at one time)
"""
assert len(times) <= 4
# determine waveform limits
amp_min = min(numpy.min(waveform) for waveform in waveforms)
amp_max = max(numpy.max(waveform) for waveform in waveforms)
fig, axes = plt.subplots(len(times), sharex=True, sharey=True)
for ax, key, color in zip(axes, truedat.keys(), colors):
ax.plot(time, timeseries, color = color, alpha = 0.7, lw=2)
ax.set_ylim(amp_min, amp_max)
ax.set_xlim(time[0], time[-1])
ax.set_xlabel(r"Time [seconds]")
if waveform_params:
ax.text(0.98 * max(times), 0.97 * amp_min, r"%s" % repr(waveform_params), size=10, ha='right')
axes[0].set_title(r"Waveforms")
fig.text(0.04, 0.5, r"Amplitude [arb. unit]", ha='center', va='center', rotation='vertical')
fig.subplots_adjust(hspace=0)
plt.setp([a.get_xticklabels() for a in fig.axes[:-1]], visible=False)
return fig
def plot_template_bank(waveform_param1, waveform_param2, overlaps, waveform_type='', waveform_params=None):
fig, axes = plt.subplots()
axes.scatter(waveform_param1, waveform_param2, c = overlaps, cmap = cm.coolwarm, alpha = 0.8, lw=0)
norm = matplotlib.colors.Normalize(vmin=min(overlaps), vmax=numpy.max(overlaps), clip=True)
axes.set_title(r"Template Bank Placement for %s" % (waveform_type))
axes.set_xlabel(r"%s" % waveform_params[0])
axes.set_ylabel(r"%s" % waveform_params[1])
axes.set_xlim(min(waveform_param1) - 0.1 * min(waveform_param1), 1.1 * max(waveform_param1))
axes.set_ylim(min(waveform_param2) - 0.1 * min(waveform_param2), 1.1 * max(waveform_param2))
axes.loglog()
# set up colorbar
divider = make_axes_locatable(axes)
cax = divider.append_axes( "right", size="5%", pad=0.1)
cbl = matplotlib.colorbar.ColorbarBase(cax, cmap = cm.coolwarm, norm=norm, orientation="vertical")
cbl.set_label(r"Overlap")
plt.tight_layout()
return fig
def generate_html_file(plot_paths, waveform_type=''):
if options.verbose:
print >>sys.stderr, "Creating html report..."
channels = set()
#
### head
#
title = "Whitener Results"
metainfo = {'charset': 'utf-8', 'name': 'viewport', 'content': 'width=device-width, initial-scale=1'}
doctype = '<!DOCTYPE html>'
css = 'https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css'
bootstrap = ['https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js']
page = markup.page()
page.init(title = title, metainfo = metainfo, doctype = doctype, css = css, script = bootstrap)
#
### body
#
page.div(class_ = 'container')
# header
page.h2('Waveform report for %s' % waveform_type)
# plots
plot_paths = sorted(plot_paths, key=lambda x: x[1])
for key in plot_paths:
num, plot = key
plot_url = to_output_url(options.output_dir) + '/' + plot
page.div(_class = 'col-md-6')
page.div(_class = 'thumbnail')
page.a(markup.oneliner.img(src = plot_url, alt = '', style = 'width:100%', _class='img-responsive'), href = plot_url, target = '_blank')
page.div.close()
page.div.close()
#
### generate page
#
page.div.close()
with open(os.path.join(options.output_dir, 'index.html'), 'w') as f:
print >> f, page
if options.verbose:
print >>sys.stderr, "done."
def to_output_url(output_dir):
username = os.getlogin()
basepath = os.path.join(os.path.join('/home/', username), 'public_html')
extension_url = os.path.relpath(os.path.abspath(output_dir), basepath)
base_url = urlparse.urljoin(cluster_urls[options.cluster], '~' + username)
return base_url + '/' + extension_url
###############################
#
# command line parser
#
###############################
def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__)
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.05, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.05.")
parser.add_option("-q", "--qhigh", type = "float", default = 100, help = "Q high value for half sine-gaussian waveforms. Default = 100.")
parser.add_option("--output-dir", metavar = "filename", default = ".", help = "Set the location of the output (plots, etc).")
parser.add_option("--cluster", help = "Set the cluster that this script is being run on (for proper public_html linking)")
parser.add_option("--waveform", default = "sine_gaussian", help = "Set the type of waveform to plot. options=[sine_gaussian, half_sine_gaussian].")
# parse the arguments and sanity check
options, args = parser.parse_args()
return options, args
####################
#
# main
#
####################
if __name__ == '__main__':
options, args = parse_command_line()
# create directory if it doesn't exist
aggregator.makedir(options.output_dir)
# common parameters we will use throughout
max_samp_rate = 8192
min_samp_rate = 32
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
if options.verbose:
print >>sys.stderr, "Creating templates..."
# generate templates for each rate considered
rates = [min_samp_rate*2**i for i in range(n_rates)]
downsample_factor = 0.8
qhigh = options.qhigh
qlow = 3.3166
fhigh = max_samp_rate / 2.
flow = min_samp_rate / 4.
if options.waveform == 'sine_gaussian':
waveforms = utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch = options.mismatch, downsample_factor=downsample_factor)
elif options.waveform == 'half_sine_gaussian':
waveforms = utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch = options.mismatch, downsample_factor=downsample_factor)
else:
raise NotImplementedError
basis_params = waveforms.parameter_grid
templates = {rate: [waveform for waveform in waveforms.generate_templates(rate, quadrature=False)] for rate in rates}
# get all templates and params
all_templates = list(itertools.chain(*templates.values()))
all_params = list(itertools.chain(*basis_params.values()))
if options.verbose:
print >>sys.stderr, "Creating template overlaps..."
# zero pad templates to make them the same length
max_sample_pts = max(len(template) for template in all_templates)
all_templates = [numpy.pad(template, ((max_sample_pts - len(template)) // 2, (max_sample_pts - len(template)) // 2), 'constant') for template in all_templates]
# calculate overlap for each template and find maximum
overlaps = []
for this_template in all_templates:
overlaps.append(max([numpy.dot(this_template,template) for template in all_templates if not numpy.array_equal(template, this_template)]))
print >>sys.stderr, "total number of templates: %d" % len(all_templates)
print >>sys.stderr, "min overlap specified: %f" % (1 - options.mismatch)
print >>sys.stderr, "max template overlap: %f" % max(overlaps)
print >>sys.stderr, "min template overlap: %f" % min(overlaps)
# generate template plots
plot_paths = []
# cast params to a nicer format
# FIXME: should really be passing a dict of params instead
param_names = ['f', 'Q', 'duration']
waveform_type = options.waveform.replace('_', ' ').title()
# limit the number of waveforms plotted per frequency band
num_samples = 3
if options.verbose:
print >>sys.stderr, "Creating waveform plots..."
for rate in rates:
#for template_id, template in enumerate(random.sample(templates[rate], num_samples)):
for template_id in random.sample(numpy.arange(len(templates[rate])), num_samples):
waveform_params = ["%s: %.3f" % (name, param) for param, name in zip(basis_params[rate][template_id], param_names)]
template = templates[rate][template_id]
if options.verbose:
print >>sys.stderr, "\tCreating waveform plot with parameters: %s" % repr(waveform_params)
series_fig = plot_waveform(waveforms.times[rate], template, waveform_type, waveform_params)
fname = 'plot_%s_%s-timeseries.png' % (str(rate).zfill(4), str(template_id).zfill(4))
plot_paths.append((template_id*int(rate),fname))
series_fig.savefig(os.path.join(options.output_dir, fname))
plt.close(fname)
# generate template overlap map
freqs = [param[0] for param in all_params]
Qs = [param[1] for param in all_params]
if options.verbose:
print >>sys.stderr, "Creating template overlap plot..."
overlap_fig = plot_template_bank(freqs, Qs, overlaps, waveform_type, param_names[:2])
fname = 'plot-template_overlap.png'
plot_paths.append((0,fname))
overlap_fig.savefig(os.path.join(options.output_dir, fname))
plt.close(fname)
# generate html page
generate_html_file(plot_paths, waveform_type=waveform_type)
#!/usr/bin/env python
# Copyright (C) 2017 Sydney J. Chamberlin, Patrick Godwin, Chad Hanna, Duncan Meacher
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program that allows diagnosis of the whitening stage of gstlal_feature_extractor
"""
####################
#
# preamble
#
####################
from optparse import OptionParser
import os
import sys
import numpy
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GstAudio
GObject.threads_init()
Gst.init(None)
import lal
from gstlal import aggregator
from gstlal import pipeio
from gstlal import datasource
from gstlal import reference_psd
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal.fxtools import multichannel_datasource
# global settings for whitening properties
PSD_FFT_LENGTH = 32
PSD_DROP_TIME = 16 * PSD_FFT_LENGTH
###############################
#
# command line parser
#
###############################
def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__)
# First append the datasource common options
multichannel_datasource.append_options(parser)
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
parser.add_option("--high-pass", action = "store_true", default = False, help = "Add a high-pass filter to the pipeline")
parser.add_option("--high-pass-cutoff", type = int, default = 12, help = "Set the high-pass filter cutoff, default = 12 Hz.")
# parse the arguments and sanity check
options, filenames = parser.parse_args()
return options, filenames
####################
#
# main
#
####################
#
# parsing and setting up some core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
instrument = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
# create output directory if it doesn't already exists
aggregator.makedir(options.out_path)
#
# building the event loop and pipeline
#
if options.verbose:
print >>sys.stderr, "assembling pipeline..."
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
handler = simplehandler.Handler(mainloop, pipeline)
# generate multiple channel sources, and link up pipeline
whiten = {}
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, data_source_info.channel_subsets[0], verbose = options.verbose)
for channel in channels:
if options.verbose:
head[channel] = pipeparts.mkprogressreport(pipeline, head[channel], "%s_progress_after_multisrc" % channel)
# define whitening params
samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
max_rate = min(2048, samp_rate)
block_duration = int(1 * Gst.SECOND)
block_stride = block_duration * max_rate // Gst.SECOND
psd_fft_length = PSD_FFT_LENGTH
zero_pad = 0
width = 32
#
# whiten auxiliary channel data
#
# downsample to max sampling rate if necessary
if samp_rate > max_rate:
head[channel] = pipeparts.mkaudiocheblimit(pipeline, head[channel], cutoff = 0.9 * (max_rate/2), type = 2, ripple = 0.1)
head[channel] = pipeparts.mkaudioundersample(pipeline, head[channel])
head[channel] = pipeparts.mkcapsfilter(pipeline, head[channel], caps = "audio/x-raw, rate=%d" % max_rate)
if options.verbose:
head[channel] = pipeparts.mkprogressreport(pipeline, head[channel], "%s_progress_after_downsample" % channel)
# construct whitener
head[channel] = pipeparts.mktee(pipeline, head[channel])
whiten[channel] = pipeparts.mkwhiten(pipeline, head[channel], fft_length = psd_fft_length, zero_pad = 0, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_lalwhiten" % channel)
pipeparts.mkfakesink(pipeline, whiten[channel])
# high pass filter
if options.high_pass:
# FIXME: don't hardcode native rate cutoff for high-pass filtering
kernel = reference_psd.one_second_highpass_kernel(max_rate, cutoff = options.high_pass_cutoff)
assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
head[channel] = pipeparts.mkfirbank(pipeline, head[channel], fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2)
# FIR filter for whitening kernel
head[channel] = pipeparts.mkfirbank(pipeline, head[channel], fir_matrix = numpy.zeros((1, 1 + max_rate * psd_fft_length), dtype=numpy.float64), block_stride = block_stride, time_domain = False, latency = 0)
def set_fir_psd(whiten, pspec, firbank, psd_fir_kernel):
psd_data = numpy.array(whiten.get_property("mean-psd"))
psd = lal.CreateREAL8FrequencySeries(
name = "psd",
epoch = lal.LIGOTimeGPS(0),
f0 = 0.0,
deltaF = whiten.get_property("delta-f"),
sampleUnits = lal.Unit(whiten.get_property("psd-units")),
length = len(psd_data)
)
psd.data.data = psd_data
kernel, latency, sample_rate = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd)
kernel, theta = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate)
kernel -= numpy.mean(kernel) # subtract DC offset from signal
firbank.set_property("fir-matrix", numpy.array(kernel, ndmin = 2))
whiten[channel].connect_after("notify::mean-psd", set_fir_psd, head[channel], reference_psd.PSDFirKernel())
# Drop initial data to let the PSD settle
head[channel] = pipeparts.mkdrop(pipeline, head[channel], drop_samples = PSD_DROP_TIME * max_rate)
if options.verbose:
head[channel] = pipeparts.mkprogressreport(pipeline, head[channel], "%s_progress_after_drop" % channel)
# use running average PSD
whiten[channel].set_property("psd-mode", 0)
# convert to desired precision
head[channel] = pipeparts.mkaudioconvert(pipeline, head[channel])
if width == 64:
head[channel] = pipeparts.mkcapsfilter(pipeline, head[channel], "audio/x-raw, rate=%d, format=%s" % (max_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
else:
head[channel] = pipeparts.mkcapsfilter(pipeline, head[channel], "audio/x-raw, rate=%d, format=%s" % (max_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
if options.verbose:
head[channel] = pipeparts.mkprogressreport(pipeline, head[channel], "%s_progress_before sink" % channel)
# dump timeseries to disk
pipeparts.mknxydumpsink(pipeline, head[channel], filename=os.path.join(options.out_path, "whitenedtimeseries_%d_%s.txt" % (samp_rate,channel)))
# Allow Ctrl+C or sig term to gracefully shut down the program for online
# sources, otherwise it will just kill it
if data_source_info.data_source in ("lvshm", "framexmit"):# what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
# Seek
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if data_source_info.data_source not in ("lvshm", "framexmit"):# what about nds online?
datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)
#
# Run pipeline
#
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
if options.verbose:
print >>sys.stderr, "running pipeline..."
mainloop.run()
#
# Shut down pipeline
#
if options.verbose:
print >>sys.stderr, "shutting down pipeline..."
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
#
# close program manually if data source is live
#
if options.data_source in ("lvshm", "framexmit"):
sys.exit(0)
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_hdf5_sink [--options]"
__description__ = "an executable to dump streaming data to disk via hdf5"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
import itertools
import json
import optparse
import os
import signal
import sys
import time
import shutil
from collections import deque
from confluent_kafka import Consumer, KafkaError
import h5py
import numpy
from gstlal import aggregator
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "File Sink Options", "General settings for configuring the file sink.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
group.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_FEATURES")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--waveform", type="string", default = "sine_gaussian", help = "Set the waveform used for producing features. Default = sine_gaussian.")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.")
group.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Channel Options", "Settings used for deciding which auxiliary channels to process.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE")
group.add_option("--section-include", default=[], type="string", action="append", help="Set the channel sections to be included from the INI file. Can be given multiple times. Pass in spaces as underscores instead. If not specified, assumed to include all sections")
group.add_option("--safety-include", default=["safe"], type="string", action="append", help="Set the safety values for channels to be included from the INI file. Can be given multiple times. Default = 'safe'.")
group.add_option("--fidelity-exclude", default=[], type="string", action="append", help="Set the fidelity values for channels to be excluded from the INI file. Can supply multiple values by repeating this argument. Each must be on of (add here)")
group.add_option("--safe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file (requires exact match). Can be repeated. If not specified, assume to include all channels.")
group.add_option("--unsafe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file, disregarding safety information (requires exact match). Can be repeated.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class HDF5StreamSink(object):
"""
Handles the processing of incoming streaming features, saving datasets to disk in hdf5 format.
"""
def __init__(self, logger, options):
logger.info('setting up hdf5 stream sink...')
### initialize timing options
self.request_timeout = options.request_timeout
self.processing_cadence = options.processing_cadence
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'group_1'}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([options.input_topic_basename])
### initialize queues
self.feature_queue = deque(maxlen = 300)
### set up keys needed to do processing
name, extension = options.channel_list.rsplit('.', 1)
if extension == 'ini':
self.keys = multichannel_datasource.channel_dict_from_channel_ini(options).keys()
else:
self.keys = multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys()
### iDQ saving properties
self.timestamp = None
self.last_save_time = None
self.last_persist_time = None
self.rootdir = options.rootdir
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.waveform = options.waveform
self.basename = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
self.feature_data = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.write_cadence,
sample_rate = self.sample_rate,
waveform = self.waveform
)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
self.tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
self.tmp_dir = os.environ['TMPDIR']
def set_hdf_file_properties(self, start_time, duration):
"""
Returns the file name, as well as locations of temporary and permanent locations of
directories where triggers will live, when given the current gps time and a gps duration.
Also takes care of creating new directories as needed and removing any leftover temporary files.
"""
# set/update file names and directories with new gps time and duration
self.feature_name = os.path.splitext(utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
self.feature_path = utils.to_trigger_path(os.path.abspath(self.rootdir), self.basename, start_time)
self.tmp_path = utils.to_trigger_path(self.tmp_dir, self.basename, start_time)
# create temp and output directories if they don't exist
aggregator.makedir(self.feature_path)
aggregator.makedir(self.tmp_path)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
def fetch_data(self):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = self.consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
### parse and add to queue
features = json.loads(message.value())
self.add_to_queue(features['timestamp'], features['features'])
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
self.feature_queue.appendleft((timestamp, data))
def process_queue(self):
"""
takes data from the queue and adds to datasets, periodically persisting to disk
"""
while self.feature_queue:
### remove data with oldest timestamp and process
self.timestamp, features = self.feature_queue.pop()
logger.info('processing features for timestamp %f' % self.timestamp)
# set save times and initialize specific saving properties if not already set
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
logger.info("saving features to disk at timestamp = %f" % self.timestamp)
save_time = utils.floor_div(self.last_save_time, self.write_cadence)
self.feature_data.dump(self.tmp_path, self.feature_name, save_time, tmp = True)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
logger.info("persisting features to disk for gps range %f - %f" % (self.timestamp-self.persist_cadence, self.timestamp))
self.persist_to_disk()
self.last_persist_time = self.timestamp
self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
### add new feature vector to dataset
self.feature_data.append(self.timestamp, features)
def persist_to_disk(self):
"""
moves a file from its temporary to final position
"""
final_path = os.path.join(self.feature_path, self.feature_name)+".h5"
tmp_path = os.path.join(self.tmp_path, self.feature_name)+".h5.tmp"
shutil.move(tmp_path, final_path)
def start(self):
"""
starts ingesting data and saving features to disk
"""
logger.info('starting streaming hdf5 sink...')
self.is_running = True
while self.is_running:
### ingest and combine incoming feature subsets, dropping late data
self.fetch_data()
### push combined features downstream
while self.feature_queue:
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def stop(self):
"""
stops ingesting data and save rest of features to disk
"""
logger.info('shutting down hdf5 sink...')
self.persist_to_disk()
### FIXME: should also handle pushing rest of data in buffer
self.is_running = False
class SignalHandler(object):
"""
helper class to shut down the hdf5 sink gracefully before exiting
"""
def __init__(self, sink, signals = [signal.SIGINT, signal.SIGTERM]):
self.sink = sink
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
self.sink.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'hdf5_sink']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create hdf5 sink instance
sink = HDF5StreamSink(logger, options=options)
# install signal handler
SignalHandler(sink)
# start up hdf5 sink
sink.start()
#!/usr/bin/env python
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_monitor [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import defaultdict, deque
import json
import optparse
import os
import signal
import sqlite3
import sys
import time
from confluent_kafka import Consumer, KafkaError
from ligo.scald import io
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Monitor Options", "General settings for configuring the monitor.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--target-channel", metavar = "string", help = "Sets the target channel to view.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the monitor acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
group.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--data-type", metavar="string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default = max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamMonitor(object):
"""
Listens to incoming streaming features, collects metrics and pushes relevant metrics to sqlite.
"""
def __init__(self, logger, options):
logger.info('setting up feature monitor...')
### initialize timing options
self.request_timeout = options.request_timeout
self.processing_cadence = options.processing_cadence
self.sample_rate = options.sample_rate
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'monitor_%s'%options.tag}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([options.input_topic_basename])
### initialize queues
self.feature_queue = deque(maxlen = 60 * self.sample_rate)
### other settings
if options.target_channel:
self.target_channel = options.target_channel
else:
self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument
self.num_channels = options.num_channels
self.data_type = options.data_type
### keep track of last timestamp processed and saved
self.last_save = None
self.timestamp = None
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
)
### determine channels to be processed
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
### define measurements to be stored
for metric in ('target_snr', 'synchronizer_latency', 'percent_missed'):
self.agg_sink.register_schema(metric, columns='data', column_key='data', tags='job', tag_key='job')
def fetch_data(self):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = self.consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
features = json.loads(message.value())
self.add_to_queue(features['timestamp'], features['features'])
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
self.feature_queue.appendleft((timestamp, data))
self.timestamp = timestamp
def process_queue(self):
"""
process features and generate metrics from synchronizer on a regular cadence
"""
if self.timestamp:
if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
metrics = defaultdict(list)
while len(self.feature_queue) > 0:
### remove data with oldest timestamp and process
timestamp, features = self.feature_queue.pop()
latency = utils.gps2latency(timestamp)
### check for missing channels
these_channels = set(features.keys())
missing_channels = self.channels - these_channels
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
### generate metrics
metrics['time'].append(timestamp)
metrics['synchronizer_latency'].append(latency)
metrics['percent_missed'].append(100 * (float(self.num_channels - len(features.keys())) / self.num_channels))
if features.has_key(self.target_channel):
metrics['target_time'].append(timestamp)
metrics['target_snr'].append(features[self.target_channel][0]['snr'])
### store and aggregate features
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, aggregate=self.data_type)
if len(metrics['target_time']) > 0:
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, aggregate=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['synchronizer_latency']), max(metrics['percent_missed'])))
def start(self):
"""
starts ingesting features and monitoring and pushes metrics to sqlite
"""
logger.info('starting feature monitor...')
self.is_running = True
while self.is_running:
### ingest incoming features
self.fetch_data()
### store and aggregate generated metrics
self.process_queue()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def stop(self):
"""
shut down gracefully
"""
logger.info('shutting down feature monitor...')
self.conn.close()
class SignalHandler(object):
"""
helper class to shut down the stream monitor gracefully before exiting
"""
def __init__(self, monitor, signals = [signal.SIGINT, signal.SIGTERM]):
self.monitor = monitor
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
self.monitor.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_monitor']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create summary instance
monitor = StreamMonitor(logger, options=options)
# install signal handler
SignalHandler(monitor)
# start up monitor
monitor.start()
#!/usr/bin/env python
# Copyright (C) 2017-2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_synchronizer [--options]"
__description__ = "an executable to synchronize incoming gstlal feature extractor streams and send downstream"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
import heapq
import json
import signal
import sys
import time
import timeit
from collections import deque
from Queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPool
from optparse import OptionParser
from lal import gpstime
from confluent_kafka import Producer, Consumer, KafkaError
from gstlal.fxtools import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
parser.add_option("--num-topics", type = "int", help = "Sets the number of input kafka topics to read from. Required.")
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamSynchronizer(object):
"""
Handles the synchronization of several incoming streams, populating data queues
and pushing feature vectors to a queue for downstream processing.
"""
def __init__(self, logger, options):
logger.info('setting up stream synchronizer...')
### initialize timing options
self.processing_cadence = options.processing_cadence
self.request_timeout = options.request_timeout
self.latency_timeout = options.latency_timeout
self.sample_rate = options.sample_rate
self.no_drop = options.no_drop
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server}
self.num_topics = options.num_topics
### initialize consumers
self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(4)) for i in range(1, self.num_topics + 1)]
# FIXME: hacky way of introducing group id, should be a settable option
consumer_kafka_settings = self.kafka_settings
consumer_kafka_settings['group.id'] = 'group_1'
self.consumers = [Consumer(consumer_kafka_settings) for topic in self.consumer_names]
for topic, consumer in zip(self.consumer_names, self.consumers):
consumer.subscribe([topic])
### initialize producer
self.producer_name = options.output_topic_basename
self.producer = Producer(self.kafka_settings)
### initialize queues
self.last_timestamp = 0
# 30 second queue for incoming buffers
self.feature_queue = PriorityQueue(maxsize = 30 * self.sample_rate * self.num_topics)
# 5 minute queue for outgoing buffers
self.feature_buffer = deque(maxlen = 300)
def fetch_data(self, consumer):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = consumer.poll(timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
### decode json and parse data
feature_subset = json.loads(message.value())
### add to queue if timestamp is within timeout
if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
self.add_to_queue(feature_subset['timestamp'], feature_subset['features'])
def fetch_all_data(self):
"""
requests for a new message from all topics, and add
to the feature queue
"""
pool = ThreadPool(self.num_topics)
result = pool.map_async(self.fetch_data, self.consumers)
result.wait()
pool.close()
def add_to_queue(self, timestamp, data):
"""
add a set of features for a given timestamp to the feature queue
"""
self.feature_queue.put((timestamp, data))
def process_queue(self):
"""
checks if conditions are right to combine new features for a given timestamp,
and if so, takes subsets from the feature queue, combines them, and push the
result to a buffer
"""
### clear out queue of any stale data
while not self.feature_queue.empty() and self.last_timestamp >= self.feature_queue.queue[0][0]:
self.feature_queue.get()
### inspect timestamps in front of queue
num_elems = min(self.num_topics, self.feature_queue.qsize())
timestamps = [block[0] for block in heapq.nsmallest(num_elems, self.feature_queue.queue)]
### check if either all timestamps are identical, or if the timestamps
### are old enough to process regardless. if so, process elements from queue
if timestamps:
if timestamps[0] <= self.max_timeout() or (len(set(timestamps)) == 1 and num_elems == self.num_topics):
### find number of elements to remove from queue
if timestamps[0] <= self.max_timeout():
num_subsets = len([timestamp for timestamp in timestamps if timestamp == timestamps[0]])
else:
num_subsets = num_elems
### remove data with oldest timestamp and process
subsets = [self.feature_queue.get() for i in range(num_subsets)]
logger.info('combining %d / %d feature subsets for timestamp %f' % (len(subsets),self.num_topics,timestamps[0]))
features = self.combine_subsets(subsets)
self.feature_buffer.appendleft((timestamps[0], features))
self.last_timestamp = timestamps[0]
def combine_subsets(self, subsets):
"""
combine subsets of features from multiple streams in a sensible way
"""
datum = [subset[1] for subset in subsets]
return {ch: rows for channel_subsets in datum for ch, rows in channel_subsets.items()}
def push_features(self):
"""
pushes any features that have been combined downstream in an outgoing topic
"""
# push full feature vector to producer if buffer isn't empty
if self.feature_buffer:
timestamp, features = self.feature_buffer.pop()
logger.info('pushing features with timestamp %f downstream, latency is %.3f' % (timestamp, utils.gps2latency(timestamp)))
feature_packet = {'timestamp': timestamp, 'features': features}
self.producer.produce(timestamp = timestamp, topic = self.producer_name, value = json.dumps(feature_packet))
self.producer.poll(0)
def max_timeout(self):
"""
calculates the oldest timestamp allowed for incoming data
"""
return float(gpstime.tconvert('now')) + (timeit.default_timer() % 1) - self.latency_timeout
def synchronize(self):
"""
puts all the synchronization steps together and adds a timer based on the
processing cadence to run periodically
"""
while self.is_running:
### ingest and combine incoming feature subsets, dropping late data
self.fetch_all_data()
self.process_queue()
### push combined features downstream
while self.feature_buffer:
self.push_features()
### repeat with processing cadence
time.sleep(self.processing_cadence)
def start(self):
"""
starts the synchronization sequence
"""
logger.info('starting stream synchronizer for %d incoming feature streams...' % self.num_topics)
self.is_running = True
self.synchronize()
def stop(self):
"""
stops the synchronization sequence
"""
logger.info('shutting down stream synchronizer...')
### FIXME: should also handle pushing rest of data in buffer
self.is_running = False
class SignalHandler(object):
"""
helper class to shut down the synchronizer gracefully before exiting
"""
def __init__(self, synchronizer, signals = [signal.SIGINT, signal.SIGTERM]):
self.synchronizer = synchronizer
for sig in signals:
signal.signal(sig, self)
def __call__(self, signum, frame):
print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
self.synchronizer.stop()
sys.exit(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
# parse arguments
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'synchronizer']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
# create ETG synchronizer instance
synchronizer = StreamSynchronizer(logger, options=options)
# install signal handler
SignalHandler(synchronizer)
# start up synchronizer
synchronizer.start()
#!/usr/bin/env python3
#
# Copyright (C) 2021 Soichiro Kuwahara
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Delta function injecting tool"""
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import math
from optparse import OptionParser
import sys
import lal
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
@lsctables.use_in
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
__author__ = "Soichrio Kuwahara <soichiro.kuwahara@ligo.org>"
program_name = "gstlal_impulse_inj"
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
def parse_command_line():
parser = OptionParser(
description = "GstLAL-based delta function injection pipeline."
)
parser.add_option("--hpeak", type = "float", help = "Set beta.")
parser.add_option("--output", metavar = "filename", help = "Set the name of the output file (default = stdout).")
parser.add_option("--time-slide-file", metavar = "filename", help = "Associate injections with the first time slide ID in this XML file (required).")
parser.add_option("--gps-geocent-time", metavar = "s", help = "Set the start time of the tiling in GPS seconds (required).")
parser.add_option("--verbose", action = "store_true", help = "Be verbose.")
options, filenames = parser.parse_args()
# FIXME: Designed only for injecting one impulse.
# save for the process_params table
options.options_dict = dict(options.__dict__)
# check for params
required_options = set(("hpeak", "output", "time_slide_file", "gps_geocent_time"))
missing_options = set(option for option in required_options if getattr(options, option) is None)
if missing_options:
raise ValueError("missing required option(s) %s" % ", ".join("--%s" % options.subst("_", "_") for option in (requied_options - missing_options)))
# type-cast
options.gps_geocent_time = lal.LIGOTimeGPS(options.gps_geocent_time)
return options, filenames
#
# =============================================================================
#
# Main
#
# =============================================================================
#
options, filenames = parse_command_line()
#
# use the time-slide file to start the output document
#
xmldoc = ligolw_utils.load_filename(options.time_slide_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler)
#
# add our metadata
#
process = ligolw_process.register_to_xmldoc(xmldoc, "impulse_inj", options.options_dict)
#
# use whatever time slide vector comes first in the table (lazy)
#
time_slide_table = lsctables.TimeSlideTable.get_table(xmldoc)
time_slide_id = time_slide_table[0].time_slide_id
if options.verbose:
print("associating injections with time slide (%d) %s" % (time_slide_id, time_slide_table.as_dict()[time_slide_id]), file = sys.stderr)
#
# find or add a sim_burst table
#
try:
lsctables.SimBurstTable.get_table(xmldoc)
except ValueError:
# no sim_burst table in document
pass
else:
raise ValueError("%s contains a sim_burst table. this program isn't smart enough to deal with that." % options.time_slide_xml)
sim_burst_tbl = xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SimBurstTable, ["process:process_id", "simulation_id", "time_slide:time_slide_id", "waveform", "waveform_number", "ra", "dec", "psi", "q", "hrss", "time_geocent_gps", "time_geocent_gps_ns", "time_geocent_gmst", "duration", "frequency", "bandwidth", "egw_over_rsquared", "amplitude", "pol_ellipse_angle", "pol_ellipse_e"]))
#
# populate the sim_burst table with injections
#
sim_burst_tbl.append(sim_burst_tbl.RowType(
# metadata
process_id = process.process_id,
simulation_id = sim_burst_tbl.get_next_id(),
time_slide_id = time_slide_id,
waveform = "Impulse",
# waveform parameters
time_geocent = options.gps_geocent_time,
amplitude = options.hpeak,
# FIXME. sky location and polarization axis orientation
ra = 0.,
dec = 0.,
psi = 0.,
# unnecessary columns
waveform_number = 0.,
frequency = math.nan,
bandwidth = math.nan,
egw_over_rsquared = math.nan,
duration = math.nan,
q = math.nan,
hrss = math.nan,
pol_ellipse_angle = math.nan,
pol_ellipse_e = math.nan
))
#
# write output
#
ligolw_utils.write_filename(xmldoc, options.output, verbose = options.verbose)
#!/usr/bin/env python
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run a series of gstlal_feature_extractor jobs online
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
# =============================
#
# preamble
#
# =============================
import optparse
import os
from gstlal import aggregator
from gstlal import dagparts
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils
# =============================
#
# functions
#
# =============================
def generate_options(options):
"""
Generates a list of command line options to pass into DAG nodes.
"""
# data source options
if options.data_source == 'lvshm':
data_source_options = {
"data-source": options.data_source,
"shared-memory-partition": options.shared_memory_partition,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
elif options.data_source == 'framexmit':
data_source_options = {"data-source": options.data_source}
# waveform options
waveform_options = {
"waveform": options.waveform,
"mismatch": options.mismatch,
"qhigh": options.qhigh
}
# data transfer options
if options.save_format == 'kafka':
save_options = {
"save-format": options.save_format,
"data-transfer": options.data_transfer,
"sample-rate": options.sample_rate,
"kafka-partition": options.kafka_partition,
"kafka-topic": options.kafka_topic,
"kafka-server": options.kafka_server
}
elif options.save_format == 'hdf5':
save_options = {
"save-format": options.save_format,
"sample-rate": options.sample_rate,
"cadence": options.cadence,
"persist-cadence": options.persist_cadence
}
else:
raise NotImplementedError, 'not an available option for online jobs at this time'
# program behavior options
program_options = {"psd-fft-length": options.psd_fft_length}
if options.disable_web_service:
program_options.update({"disable-web-service": options.disable_web_service})
if options.verbose:
program_options.update({"verbose": options.verbose})
# gobble options together
out_options = {}
out_options.update(data_source_options)
out_options.update(waveform_options)
out_options.update(save_options)
out_options.update(program_options)
return out_options
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info):
feature_extractor_nodes = {}
channel_list = []
# generate common command line options
command_line_options = generate_options(options)
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
if options.verbose:
print("Creating node for channel subset %d"%ii)
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
channels[0] = channels[0].split('=')[1] # this is done to peel off --channel-name option off first channel
channel_list.extend([(channel, int(data_source_info.channel_dict[channel]['fsamp'])) for channel in channel_subset])
# create specific options for each channel subset
subset_options = {
"max-streams": options.max_streams * 2, # FIXME: done to force all channels to be processed in parallel, but should be handled upstream more gracefully
"job-id": str(ii + 1).zfill(4),
"channel-name":' '.join(channels)
}
subset_options.update(command_line_options)
feature_extractor_nodes[ii] = \
dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = parent_nodes,
opts = subset_options,
output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
)
num_channels = len(channel_list)
print("Writing channel list of all channels processed")
listpath = os.path.join(options.out_path, "full_channel_list.txt")
with open(listpath, 'w') as f:
for channel, rate in channel_list:
f.write('%s\t%d\n'%(channel, rate))
return feature_extractor_nodes, num_channels
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source and feature extraction options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# Condor commands
group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla")
parser.add_option("--disable-agg-jobs", action = "store_true", help = "If set, do not launch aggregation jobs to process and aggregate incoming features.")
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB")
parser.add_option("--auxiliary-request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for auxiliary processes, default = 2")
parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
parser.add_option_group(group)
# Synchronizer/File Sink commands
group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option_group(group)
# Aggregation/Monitoring commands
group = optparse.OptionGroup(parser, "Aggregator Options", "Adjust parameters used for aggregation and monitoring of features.")
parser.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
parser.add_option("--num-agg-jobs", type = "int", default = 4, help = "Number of aggregator jobs to aggregate incoming features. Default = 4.")
parser.add_option("--num-agg-processes-per-job", type = "int", default = 2, help = "Number of processes per aggregator job to aggregate incoming features. Used if --agg-data-backend = hdf5. Default = 2.")
parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --agg-data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --agg-data-backend = influx.")
parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --agg-data-backend = influx.")
parser.add_option_group(group)
options, filenames = parser.parse_args()
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
#
# create directories if needed
#
for dir_ in ('features', 'synchronizer', 'monitor', 'aggregator', 'logs'):
aggregator.makedir(dir_)
#
# set up dag and job classes
#
dag = dagparts.DAG("%s_feature_extraction_pipe" % ifo)
# feature extractor job
if options.condor_universe == 'local':
condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe)
feature_extractor_nodes, num_channels = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
# auxiliary jobs
if options.save_format == 'kafka':
if options.condor_universe == 'local':
auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
auxiliary_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = dagparts.DAGJob("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
hdf5_sink_job = dagparts.DAGJob("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
monitor_job = dagparts.DAGJob("gstlal_feature_monitor", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
# aggregator jobs
if not options.disable_agg_jobs:
aggregator_job = dagparts.DAGJob("gstlal_feature_aggregator", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
#
# set up options for auxiliary jobs
#
common_options = {
"verbose": options.verbose,
"tag": options.tag,
"processing-cadence": options.processing_cadence,
"request-timeout": options.request_timeout,
"kafka-server": options.kafka_server
}
synchronizer_options = {
"latency-timeout": options.latency_timeout,
"sample-rate": options.sample_rate,
"input-topic-basename": options.kafka_topic,
"output-topic-basename": '_'.join(['synchronizer', options.tag])
}
if options.no_drop:
synchronizer_options.update({"no-drop": options.no_drop})
monitor_options = {
"instrument": ifo,
"target-channel": options.target_channel,
"sample-rate": options.sample_rate,
"input-topic-basename": '_'.join(['synchronizer', options.tag]),
"num-channels": num_channels,
"data-backend": options.agg_data_backend,
"data-type": "max",
}
hdf5_sink_options = {
"instrument": ifo,
"channel-list": options.channel_list,
"waveform": options.waveform,
"sample-rate": options.sample_rate,
"write-cadence": options.cadence,
"persist-cadence": options.persist_cadence,
"input-topic-basename": '_'.join(['synchronizer', options.tag])
}
extra_hdf5_channel_options = {
"section-include": options.section_include,
"safety-include": list(options.safety_include),
"fidelity-exclude": list(options.fidelity_exclude),
"safe-channel-include": options.safe_channel_include,
"unsafe-channel-include": options.unsafe_channel_include,
}
aggregator_options = {
"sample-rate": options.sample_rate,
"input-topic-basename": options.kafka_topic,
"data-backend": options.agg_data_backend,
"data-type": "max",
}
if options.agg_data_backend == 'influx':
backend_options = {
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
}
else:
backend_options = {"num-processes": options.num_agg_processes_per_job}
aggregator_options.update(backend_options)
monitor_options.update(backend_options)
### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
for option_name, option in extra_hdf5_channel_options.items():
if option:
hdf5_sink_options[option_name] = option
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
aggregator_options.update(common_options)
monitor_options.update(common_options)
monitor_options.update({"channel-list": os.path.join(options.out_path, "full_channel_list.txt")})
#
# set up jobs
#
def groups(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "synchronizer")})
hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "features")})
monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "monitor")})
### aggregator jobs
if not options.disable_agg_jobs:
all_fx_jobs = [(str(ii).zfill(4), channel_subset) for ii, channel_subset in enumerate(data_source_info.channel_subsets)]
for job_subset in groups(all_fx_jobs, options.num_agg_jobs):
jobs, channels = zip(*job_subset)
job_channel_options = {"jobs": jobs}
job_channel_options.update(aggregator_options)
agg_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
#
# write out dag and sub files
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-burst],[0.1.1],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_INIT([gstlal-burst],[0.4.0],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......@@ -24,8 +24,7 @@ AC_CONFIG_FILES([ \
lib/gstlal-burst/gstlal-burst.pc \
lib/gstlal-burst/Makefile \
python/Makefile \
python/excesspower/Makefile \
python/fxtools/Makefile \
python/cherenkov/Makefile \
share/Makefile
])
......@@ -99,14 +98,15 @@ case `cat /etc/redhat-release 2> /dev/null` in
;;
esac
LT_INIT
PKG_PROG_PKG_CONFIG()
AC_SUBST([MIN_PKG_CONFIG_VERSION], [0.18.0])
PKG_PROG_PKG_CONFIG("$MIN_PKG_CONFIG_VERSION")
#
# Check for Python
#
AC_SUBST([MIN_PYTHON_VERSION],["2.7"])
AC_SUBST([MIN_PYTHON_VERSION],["3.6"])
AM_PATH_PYTHON(["$MIN_PYTHON_VERSION"],,)
AX_PYTHON_DEVEL()
# hack to remove default lib dirs from PYTHON_LIBS. only tested on Debian
......@@ -223,6 +223,7 @@ to your GI_TYPELIB_PATH environment variable.])
#
AC_SUBST([MIN_NUMPY_VERSION], [1.7.0])
AX_PYTHON_MODULE(numpy, fatal)
NUMPY_CFLAGS=-I`$PYTHON -c "import numpy;print (numpy.get_include());"`
old_CFLAGS="$CFLAGS"
......@@ -239,24 +240,24 @@ CFLAGS="$old_CFLAGS"
#
AC_SUBST([MIN_LAL_VERSION], [6.19.0])
AC_SUBST([MIN_LALMETAIO_VERSION], [1.4.0])
AC_SUBST([MIN_LALBURST_VERSION], [1.5.0])
AC_SUBST([MIN_LALSIMULATION_VERSION], [1.8.0])
AC_SUBST([MIN_LAL_VERSION], [7.2.4])
AC_SUBST([MIN_LALMETAIO_VERSION], [3.0.2])
AC_SUBST([MIN_LALBURST_VERSION], [1.7.0])
AC_SUBST([MIN_LALSIMULATION_VERSION], [4.0.2])
PKG_CHECK_MODULES([LAL], [lal >= ${MIN_LAL_VERSION} lalburst >= ${MIN_LALBURST_VERSION} lalmetaio >= ${MIN_LALMETAIO_VERSION} lalsupport])
AC_SUBST([LAL_CFLAGS])
AC_SUBST([LAL_LIBS])
#
# Check for glue, et al.
# Check for LIGO libraries
#
AC_SUBST([MIN_GLUE_VERSION], [1.59.3])
AX_PYTHON_GLUE([$MIN_GLUE_VERSION])
AC_SUBST([MIN_LIGO_SEGMENTS_VERSION], [1.2.0])
AX_PYTHON_LIGO_SEGMENTS([$MIN_LIGO_SEGMENTS_VERSION])
AC_SUBST([MIN_LIGO_LW_VERSION], [1.8.3])
AX_PYTHON_LIGO_LW([$MIN_LIGO_LW_VERSION])
#
......@@ -264,7 +265,7 @@ AX_PYTHON_LIGO_SEGMENTS([$MIN_LIGO_SEGMENTS_VERSION])
#
AC_SUBST([MIN_GSTLAL_VERSION], [1.5.0])
AC_SUBST([MIN_GSTLAL_VERSION], [1.10.0])
PKG_CHECK_MODULES([GSTLAL], [gstlal >= ${MIN_GSTLAL_VERSION}])
AC_SUBST([GSTLAL_VERSION], [`$PKG_CONFIG --modversion gstlal`])
AX_GSTLAL_SPLIT_VERSION([GSTLAL_VERSION])
......@@ -279,7 +280,7 @@ AC_SUBST([GSTLAL_LIBS])
#
AC_SUBST([MIN_GSTLAL_UGLY_VERSION], [1.6.0])
AC_SUBST([MIN_GSTLAL_UGLY_VERSION], [1.10.0])
AX_GSTLAL_SPLIT_VERSION([GSTLAL_UGLY_VERSION])
AC_SUBST([GSTLAL_UGLY_VERSION_MAJOR])
AC_SUBST([GSTLAL_UGLY_VERSION_MINOR])
......
gstlal-burst (0.4.0-1) unstable; urgency=low
* migrate lal_trigger from gstlal-burst to gstlal
* remove dependency on lscsoft-glue
* port SNAX workflow to use DAG workflow API
* SNAX: bug fixes and improvements
* string pipeline: bug fixes and improvements
-- Alexander E. Pace <alexander.pace@ligo.org> Fri, 11 Nov 2022 09:58:54 -0800
gstlal-burst (0.3.1-1) unstable; urgency=low
* update minimum python-ligo-lw version to 1.7
* port to ligo-lw 'compress =' keyword arg scheme
* port to ligo-lw's dbtables.workingcopy
* string_extrinsics.SNRPDF: port to BinnedLnPDF
* gstlal_cs_triggergen: fix TabError issues
* fix various import errors in gstlal-burst programs
-- Patrick Godwin <patrick.godwin@ligo.org> Wed, 10 Mar 2021 08:28:49 -0800
gstlal-burst (0.3.0-1) unstable; urgency=low
* add gstlal_snax_generate to create synthetic features online
* add gstlal_snax_whiten to write whitened timeseries to disk
* gstlal_snax_extract: remove unused web server feature
* SNAX: remove unused row transfer format via Kafka
* SNAX: performance improvements in AppSync
* SNAX: allow feature generation over multiple frequency bins
* SNAX: refactor common pipeline utilities into snax.pipeparts
* python3 compatibility fixes
* update dependencies to rely on python3 packages
* bump minimum python version to 3.6
* update minimum versions for lalsuite/gstlal packages
-- Patrick Godwin <patrick.godwin@ligo.org> Thu, 28 Jan 2021 08:58:54 -0800
gstlal-burst (0.2.1-1) unstable; urgency=low
* Update minimum versions - gstlal: 1.6.0, gstlal-ugly: 1.7.0
* Reduce footprint of logging in online processes
* Switch to single Kafka consumer in snax_aggregate for increased
performance
-- Patrick Godwin <patrick.godwin@ligo.org> Thu, 06 Feb 2020 07:11:31 -0800
gstlal-burst (0.2.0-1.1) unstable; urgency=low
* Bump lalsuite component versions to O3 ones
* Fix debian/*.install file
-- Steffen Grunewald <steffen.grunewald@ligo.org> Thu, 24 Oct 2019 15:15:00 +0200
gstlal-burst (0.2.0-1) unstable; urgency=low
* Rename fxtools submodule to snax to reflect new name from
gstlal feature extractor to SNAX
* Expose --psd-fft-length option for finer whitener control
* Fix in whitener zero-padding
* Update tags in monitoring to include subsystem info
* Change trigger_time -> time column, remove NaN rows for gwdatafind
compatibility
* Call smrepair upon startup if reading data from /dev/shm
* Add feature combiner job for offline to combine features from distinct
jobs to match online format
* Add option for monitor, aggregator to connect to Influx with auth/HTTPS
* Increase blocksize in reading frames for improved performance with NFS
* Fix issue with pipeline hanging in offline jobs in some edge cases
* Switch to single Kafka consumer in synchronizer for improved performance
-- Patrick Godwin <patrick.godwin@ligo.org> Mon, 21 Oct 2019 11:11:13 -0700
gstlal-burst (0.1.1) unstable; urgency=low
* Updated gstlal_feature_aggregator, gstlal_feature_monitor to deal with
......
Source: gstlal-burst
Maintainer: Chris Pankow <chris.pankow@gravity.phys.uwm.edu>
Maintainer: Patrick Godwin <patrick.godwin@ligo.org>
Section: lscsoft
Priority: optional
Standards-Version: 3.9.2
X-Python-Version: >= @MIN_PYTHON_VERSION@
X-Python3-Version: >= @MIN_PYTHON_VERSION@
Build-Depends:
debhelper (>= 9),
python-all-dev (>= @MIN_PYTHON_VERSION@),
python-numpy,
lal-dev (>= @MIN_LAL_VERSION@),
lalburst-dev (>= @MIN_LALBURST_VERSION@),
lalmetaio-dev (>= @MIN_LALMETAIO_VERSION@),
python-glue (>= @MIN_GLUE_VERSION@),
dh-python,
fakeroot,
pkg-config,
gstlal-dev (>= @MIN_GSTLAL_VERSION@),
gstlal-ugly-dev (>= @MIN_GSTLAL_UGLY_VERSION@)
gstlal-ugly-dev (>= @MIN_GSTLAL_UGLY_VERSION@),
liblal-dev (>= @MIN_LAL_VERSION@),
liblalburst-dev (>= @MIN_LALBURST_VERSION@),
liblalmetaio-dev (>= @MIN_LALMETAIO_VERSION@),
pkg-config (>= @MIN_PKG_CONFIG_VERSION@),
python3-all-dev (>= @MIN_PYTHON_VERSION@),
python3-numpy
Package: gstlal-burst
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends},
python (>= @MIN_PYTHON_VERSION@),
python-glue (>= @MIN_GLUE_VERSION@),
python-ligo-segments (>= @MIN_LIGO_SEGMENTS_VERSION@),
python-gobject,
python-numpy,
python-scipy,
Depends: ${shlibs:Depends}, ${misc:Depends}, ${python3:Depends},
gstlal (>= @MIN_GSTLAL_VERSION@),
gstlal-ugly (>= @MIN_GSTLAL_UGLY_VERSION@),
lal (>= @MIN_LAL_VERSION@),
lalmetaio (>= @MIN_LALMETAIO_VERSION@),
lalburst (>= @MIN_LALBURST_VERSION@),
gstlal (>= @MIN_GSTLAL_VERSION@),
gstlal-ugly (>= @MIN_GSTLAL_UGLY_VERSION@)
python3 (>= @MIN_PYTHON_VERSION@),
python3-gi,
python3-gst-1.0,
python3-lal (>= @MIN_LAL_VERSION@),
python3-lalburst (>= @MIN_LALBURST_VERSION@),
python3-ligo-segments (>= @MIN_LIGO_SEGMENTS_VERSION@),
python3-matplotlib,
python3-numpy,
python3-scipy
Conflicts:
gstlal-ugly (<< 0.6.0)
Description: GStreamer for generic transient data analysis
......
usr/bin/*
usr/lib/*/python*/*/gstlal
usr/lib/python*/*/gstlal
usr/lib/*/*.so.*
usr/lib/*/gstreamer-*/*.so*
usr/lib/python*/*/gstlal
#!/usr/bin/make -f
%:
dh $@ --with=python2
dh $@ --with=python3
override_dh_auto_install:
$(MAKE) DESTDIR=$(CURDIR)/debian/tmp install
......
......@@ -6,7 +6,7 @@ AC_DEFUN([AX_PYTHON_GLUE],[
AX_PYTHON_MODULE([glue])
AS_IF([test "x$HAVE_PYMOD_GLUE" == "xyes"], [
AC_MSG_CHECKING(glue version)
GLUE_VERSION=`$PYTHON -c "from glue import __version__ ; print '.'.join(__version__.strip().split('.'))"`
GLUE_VERSION=`$PYTHON -c "from glue import __version__ ; print('.'.join(__version__.strip().split('.')))"`
AS_IF([test $? != "0"], [
AC_MSG_ERROR(["cannot determine version"])
])
......@@ -28,7 +28,7 @@ AC_DEFUN([AX_PYTHON_LIGO_SEGMENTS],[
AX_PYTHON_MODULE([ligo.segments])
AS_IF([test "x$HAVE_PYMOD_LIGO_SEGMENTS" == "xyes"], [
AC_MSG_CHECKING(ligo.segments version)
LIGO_SEGMENTS_VERSION=`$PYTHON -c "from ligo.segments import __version__ ; print '.'.join(__version__.strip().split('.'))"`
LIGO_SEGMENTS_VERSION=`$PYTHON -c "from ligo.segments import __version__ ; print('.'.join(__version__.strip().split('.')))"`
AS_IF([test $? != "0"], [
AC_MSG_ERROR(["cannot determine version"])
])
......@@ -41,3 +41,25 @@ AC_DEFUN([AX_PYTHON_LIGO_SEGMENTS],[
unset minversion
])
])
#
# AX_PYTHON_LIGO_LW([MINVERSION = 0])
#
AC_DEFUN([AX_PYTHON_LIGO_LW],[
AC_REQUIRE([AM_PATH_PYTHON])
AX_PYTHON_MODULE([ligo.lw])
AS_IF([test "x$HAVE_PYMOD_LIGO_SEGMENTS" == "xyes"], [
AC_MSG_CHECKING(ligo.lw version)
LIGO_LW_VERSION=`$PYTHON -c "from ligo.lw import __version__ ; print('.'.join(__version__.strip().split('.')))"`
AS_IF([test $? != "0"], [
AC_MSG_ERROR(["cannot determine version"])
])
minversion=$1
AX_COMPARE_VERSION([$LIGO_LW_VERSION], [ge], [${minversion:-0}], [
AC_MSG_RESULT([$LIGO_LW_VERSION])
], [
AC_MSG_WARN([found $LIGO_LW_VERSION, require at least $1])
])
unset minversion
])
])
......@@ -4,10 +4,9 @@ plugin_LTLIBRARIES = lib@GSTPLUGINPREFIX@gstlalburst.la
lib@GSTPLUGINPREFIX@gstlalburst_la_SOURCES = \
gstlalburst.c \
gstlal_trigger.h gstlal_trigger.c \
gstlal_string_triggergen.c gstlal_string_triggergen.h
lib@GSTPLUGINPREFIX@gstlalburst_la_CFLAGS = $(AM_CFLAGS) $(GSL_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS)
lib@GSTPLUGINPREFIX@gstlalburst_la_CFLAGS = $(AM_CFLAGS) $(GSL_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS) $(gstreamer_audio_CFLAGS)
lib@GSTPLUGINPREFIX@gstlalburst_la_LIBADD = $(top_builddir)/lib/gstlal-burst/libgstlalburst.la
lib@GSTPLUGINPREFIX@gstlalburst_la_LDFLAGS = $(AM_LDFLAGS) $(GSL_LIBS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS) $(GSTLAL_PLUGIN_LDFLAGS)
lib@GSTPLUGINPREFIX@gstlalburst_la_LDFLAGS = $(AM_LDFLAGS) $(GSL_LIBS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS) $(gstreamer_audio_LIBS) $(GSTLAL_PLUGIN_LDFLAGS)
......@@ -101,13 +101,13 @@ G_DEFINE_TYPE_WITH_CODE(
static unsigned autocorrelation_channels(const gsl_matrix_float *autocorrelation_matrix)
{
return autocorrelation_matrix->size1;
return autocorrelation_matrix ? autocorrelation_matrix->size1 : 0;
}
static unsigned autocorrelation_length(const gsl_matrix_float *autocorrelation_matrix)
{
return autocorrelation_matrix->size2;
return autocorrelation_matrix ? autocorrelation_matrix->size2 : 0;
}
......@@ -153,6 +153,36 @@ static int setup_bankfile_input(GSTLALStringTriggergen *element, char *bank_file
}
static int exists_unrealized_triggers(GSTLALStringTriggergen *element)
{
gint channel;
for(channel = 0; channel < element->num_templates; channel++)
if(element->bank[channel].snr != 0.)
return TRUE;
return FALSE;
}
/*
* return min(time of offset, min(times of realized triggers))
*/
static GstClockTime buffer_pts(GSTLALStringTriggergen *element, guint64 offset)
{
gint channel;
/* time of first SNR sample that can be a trigger */
GstClockTime t = element->t0 + gst_util_uint64_scale_int_round(offset + (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 - element->offset0, GST_SECOND, GST_AUDIO_INFO_RATE(&element->audio_info));
/* scan for unrealized triggers with an earlier time */
for(channel = 0; channel < element->num_templates; channel++)
if(element->bank[channel].snr > 0.0 && (GstClockTime) XLALGPSToINT8NS(&element->bank[channel].peak_time) < t)
t = XLALGPSToINT8NS(&element->bank[channel].peak_time);
return t;
}
/*
* compute autocorrelation norms --- the expectation value in noise.
*/
......@@ -204,32 +234,33 @@ static GstFlowReturn trigger_generator(GSTLALStringTriggergen *element, GstBuffe
float *snrsample;
SnglBurst *triggers = NULL;
guint ntriggers = 0;
guint64 offset;
guint64 length;
guint64 offset = gst_audioadapter_offset(element->adapter);
guint64 length = get_available_samples(element);
guint sample;
gint channel;
length = get_available_samples(element);
if(length < autocorrelation_length(element->autocorrelation_matrix)) {
GST_BUFFER_OFFSET_END(outbuf) = GST_BUFFER_OFFSET(outbuf) + ntriggers;
return GST_FLOW_OK;
}
/*
* obtain PTS and DURATION of output buffer. must be done before
* generating triggers
*/
g_mutex_lock(&element->bank_lock);
snrsample = snrdata = g_malloc(length * element->num_templates * sizeof(*snrdata));
GST_BUFFER_PTS(outbuf) = buffer_pts(element, offset);
GST_BUFFER_DURATION(outbuf) = element->t0 + gst_util_uint64_scale_int_round(offset + length - (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 - element->offset0, GST_SECOND, GST_AUDIO_INFO_RATE(&element->audio_info)) - GST_BUFFER_PTS(outbuf);
/* copy samples */
offset = gst_audioadapter_offset(element->adapter);
snrsample = snrdata = g_malloc(length * element->num_templates * sizeof(*snrdata));
gst_audioadapter_copy_samples(element->adapter, snrdata, length, NULL, NULL);
/* compute the chisq norm if it doesn't exist */
if(!element->autocorrelation_norm)
element->autocorrelation_norm = gstlal_autocorrelation_chi2_compute_norms_string(element->autocorrelation_matrix, NULL);
/* check that autocorrelation vector has odd number of samples */
g_assert(autocorrelation_length(element->autocorrelation_matrix) & 1);
/* find events */
/* find events. earliest sample that can be a new trigger starts a
* little bit in from the start of the adapter because we are
* re-using data from the last iteration for \chi^2 calculation.
* the last sample that can be a new trigger is not at the end of
* the adapter's contents for the same reason */
snrsample += (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 * element->num_templates;
for(sample = (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2; sample < length - (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2; sample++){
LIGOTimeGPS t;
......@@ -240,19 +271,31 @@ static GstFlowReturn trigger_generator(GSTLALStringTriggergen *element, GstBuffe
float snr = fabsf(*snrsample);
if(snr >= element->threshold) {
/*
* If this is the first sample above threshold (i.e. snr of trigger is (re)set to 0), record the start time.
* If this is the first sample above
* threshold (i.e. snr of trigger is
* (re)set to 0), record the start time.
*/
if(element->bank[channel].snr < element->threshold)
element->bank[channel].start_time = t;
/*
* Keep track of last time above threshold and the duration.
* Keep track of last time above threshold
* and the duration. For duration add a
* sample of fuzz on both sides (like in
* lalapps_StringSearch). FIXME: do we
* need the fuzz?
*/
element->last_time[channel] = t;
element->bank[channel].duration = XLALGPSDiff(&element->last_time[channel], &element->bank[channel].start_time);
element->bank[channel].duration = XLALGPSDiff(&t, &element->bank[channel].start_time) + (float) 2.0 / GST_AUDIO_INFO_RATE(&element->audio_info);
/*
* if this sample is the highest SNR,
* update the trigger
*/
if(snr > element->bank[channel].snr) {
/*
* Higher SNR than the "current winner". Update.
*/
const float *autocorrelation = (const float *) gsl_matrix_float_const_ptr(element->autocorrelation_matrix, channel, 0);
const float *autocorrelation_end = autocorrelation + autocorrelation_length(element->autocorrelation_matrix);
float *snrseries = snrsample - (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 * element->num_templates;
......@@ -269,9 +312,12 @@ static GstFlowReturn trigger_generator(GSTLALStringTriggergen *element, GstBuffe
}
} else if(element->bank[channel].snr != 0. && XLALGPSDiff(&t, &element->last_time[channel]) > element->cluster) {
/*
* Trigger is ready to be passed.
* Push trigger to buffer, and reset it.
* there is a trigger ready to be pushed,
* and we have been below threshold for
* longer than the clustering time, so make
* a new trigger and reset this template
*/
triggers = g_renew(SnglBurst, triggers, ntriggers + 1);
triggers[ntriggers++] = element->bank[channel];
element->bank[channel].snr = 0.0;
......@@ -280,7 +326,6 @@ static GstFlowReturn trigger_generator(GSTLALStringTriggergen *element, GstBuffe
}
}
}
g_mutex_unlock(&element->bank_lock);
g_free(snrdata);
gst_audioadapter_flush_samples(element->adapter, length - (autocorrelation_length(element->autocorrelation_matrix) - 1));
......@@ -434,7 +479,6 @@ static gboolean start(GstBaseTransform *trans)
element->t0 = GST_CLOCK_TIME_NONE;
element->offset0 = GST_BUFFER_OFFSET_NONE;
element->next_in_offset = GST_BUFFER_OFFSET_NONE;
element->next_out_offset = GST_BUFFER_OFFSET_NONE;
element->need_discont = TRUE;
}
......@@ -463,26 +507,103 @@ static GstFlowReturn prepare_output_buffer(GstBaseTransform *trans, GstBuffer *i
static GstFlowReturn transform(GstBaseTransform *trans, GstBuffer *inbuf, GstBuffer *outbuf)
{
GSTLALStringTriggergen *element = GSTLAL_STRING_TRIGGERGEN(trans);
guint64 length;
guint64 offset;
GstFlowReturn result;
/* FIXME: this code isn't setting the output offset to anything meaningful */
g_assert(GST_BUFFER_PTS_IS_VALID(inbuf));
g_assert(GST_BUFFER_DURATION_IS_VALID(inbuf));
g_assert(GST_BUFFER_OFFSET_IS_VALID(inbuf));
g_assert(GST_BUFFER_OFFSET_END_IS_VALID(inbuf));
if(GST_BUFFER_IS_DISCONT(inbuf) || GST_BUFFER_OFFSET(inbuf) != element->next_in_offset || !GST_CLOCK_TIME_IS_VALID(element->t0)) {
gst_audioadapter_clear(element->adapter);
element->t0 = GST_BUFFER_PTS(inbuf);
element->offset0 = GST_BUFFER_OFFSET(inbuf);
element->next_out_offset = 0;
} else if(!gst_audioadapter_is_empty(element->adapter))
g_assert_cmpuint(GST_BUFFER_PTS(inbuf), ==, gst_audioadapter_expected_timestamp(element->adapter));
element->next_in_offset = GST_BUFFER_OFFSET_END(inbuf);
/* check that autocorrelation vector has odd number of samples.
* NOTE: autocorrelation_length() returns 0 if the
* autocorrelation_matrix is not set, so this g_assert also tests
* for a missing autocorrelation_matrix. in set_property(), if the
* conversion from GValueArray fails the matrix will be left set to
* NULL, so this is also catching those failures. */
g_assert(autocorrelation_length(element->autocorrelation_matrix) & 1);
/*
* gap logic
*/
gst_buffer_ref(inbuf);
gst_audioadapter_push(element->adapter, inbuf);
length = get_available_samples(element);
offset = gst_audioadapter_offset(element->adapter);
g_mutex_lock(&element->bank_lock);
if(length < autocorrelation_length(element->autocorrelation_matrix)) {
/* not enough SNR to search for triggers */
GST_BUFFER_PTS(outbuf) = buffer_pts(element, offset);
GST_BUFFER_DURATION(outbuf) = 0;
GST_BUFFER_OFFSET_END(outbuf) = GST_BUFFER_OFFSET(outbuf) + 0 /* ntriggers */;
result = GST_FLOW_OK;
} else if(!GST_BUFFER_FLAG_IS_SET(inbuf, GST_BUFFER_FLAG_GAP)) {
/* have enough SNR to make triggers, and this buffer is not a gap */
result = trigger_generator(element, outbuf);
} else if(!exists_unrealized_triggers(element)) {
/* this is a gap, there are no unrealized triggers waiting */
gst_audioadapter_flush_samples(element->adapter, length - (autocorrelation_length(element->autocorrelation_matrix) - 1));
GST_BUFFER_PTS(outbuf) = buffer_pts(element, offset);
GST_BUFFER_DURATION(outbuf) = element->t0 + gst_util_uint64_scale_int_round(offset + length - (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 - element->offset0, GST_SECOND, GST_AUDIO_INFO_RATE(&element->audio_info)) - GST_BUFFER_PTS(outbuf);
GST_BUFFER_OFFSET_END(outbuf) = GST_BUFFER_OFFSET(outbuf) + 0 /* ntriggers */;
GST_BUFFER_FLAG_SET(outbuf, GST_BUFFER_FLAG_GAP);
result = GST_FLOW_OK;
} else {
/* this is a gap, there are still unrealized triggers waiting */
/* FIXME: there are really two cases: we have unrealized
* triggers waiting and the input gap buffer will push them
* all beyond the clustering time (forcing them all to be
* cleared), or it isn't (some might remain for next time).
* in the former case, we can produce two buffers: a
* non-gap buffer containing triggers, followed by a gap
* buffer representing the period of input gap data which
* is guaranteed not to contain triggers. for now we are
* lazy and just produce one non-gap buffer, which might or
* might not contain triggers */
SnglBurst *triggers = NULL;
guint ntriggers = 0;
LIGOTimeGPS t;
result = trigger_generator(element, outbuf);
/* set output buffer properties. must be done before clearing triggers */
GST_BUFFER_PTS(outbuf) = buffer_pts(element, offset);
GST_BUFFER_DURATION(outbuf) = element->t0 + gst_util_uint64_scale_int_round(offset + length - (autocorrelation_length(element->autocorrelation_matrix) - 1) / 2 - element->offset0, GST_SECOND, GST_AUDIO_INFO_RATE(&element->audio_info)) - GST_BUFFER_PTS(outbuf);
/* has the clustering time passed for any of the unrealized triggers? */
XLALINT8NSToGPS(&t, GST_BUFFER_PTS(outbuf) + GST_BUFFER_DURATION(outbuf));
for(gint channel = 0; channel < element->num_templates; channel++) {
if(element->bank[channel].snr != 0. && XLALGPSDiff(&t, &element->last_time[channel]) > element->cluster){
/* this channel has made a trigger */
triggers = g_renew(SnglBurst, triggers, ntriggers + 1);
triggers[ntriggers++] = element->bank[channel];
element->bank[channel].snr = 0.0;
element->bank[channel].chisq = 0.0;
element->bank[channel].chisq_dof = 0.0;
}
}
gst_audioadapter_flush_samples(element->adapter, length - (autocorrelation_length(element->autocorrelation_matrix) - 1));
if(ntriggers)
gst_buffer_replace_all_memory(outbuf, gst_memory_new_wrapped(GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS, triggers, ntriggers * sizeof(*triggers), 0, ntriggers * sizeof(*triggers), triggers, g_free));
else
gst_buffer_remove_all_memory(outbuf);
GST_BUFFER_OFFSET_END(outbuf) = GST_BUFFER_OFFSET(outbuf) + ntriggers;
result = GST_FLOW_OK;
}
g_mutex_unlock(&element->bank_lock);
/*
* done
......@@ -674,6 +795,7 @@ static void gstlal_string_triggergen_class_init(GSTLALStringTriggergenClass *kla
transform_class->transform = GST_DEBUG_FUNCPTR(transform);
transform_class->transform_caps = GST_DEBUG_FUNCPTR(transform_caps);
transform_class->start = GST_DEBUG_FUNCPTR(start);
/* FIXME: add a stop method to push any final unrealized triggers */
transform_class->prepare_output_buffer = GST_DEBUG_FUNCPTR(prepare_output_buffer);
g_object_class_install_property(
......
......@@ -54,7 +54,6 @@ typedef struct {
GstClockTime t0;
guint64 offset0;
guint64 next_in_offset;
guint64 next_out_offset;
gboolean need_discont;
/*
......
......@@ -51,7 +51,6 @@
#include <gstlal/gstlal_tags.h>
#include <gstlal_string_triggergen.h>
#include <gstlal_trigger.h>
/*
......@@ -70,7 +69,6 @@ static gboolean plugin_init(GstPlugin *plugin)
GType type;
} *element, elements[] = {
{"lal_string_triggergen", GSTLAL_STRING_TRIGGERGEN_TYPE},
{"lal_trigger", GSTLAL_TRIGGER_TYPE},
{NULL, 0},
};
......
%define gstreamername gstreamer1
%global __python %{__python3}
Name: @PACKAGE_NAME@
Version: @PACKAGE_VERSION@
......@@ -7,43 +8,56 @@ Summary: GSTLAL Burst
License: GPL
Group: LSC Software/Data Analysis
Requires: gstlal-ugly >= @MIN_GSTLAL_UGLY_VERSION@
Requires: gstlal >= @MIN_GSTLAL_VERSION@
Requires: python >= @MIN_PYTHON_VERSION@
Requires: glue >= @MIN_GLUE_VERSION@
Requires: python2-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@
Requires: gobject-introspection >= @MIN_GOBJECT_INTROSPECTION_VERSION@
# --- package requirements --- #
Requires: fftw >= 3
Requires: python-%{gstreamername}
Requires: gobject-introspection >= @MIN_GOBJECT_INTROSPECTION_VERSION@
Requires: gstlal >= @MIN_GSTLAL_VERSION@
Requires: gstlal-ugly >= @MIN_GSTLAL_UGLY_VERSION@
Requires: %{gstreamername} >= @MIN_GSTREAMER_VERSION@
Requires: %{gstreamername}-plugins-base >= @MIN_GSTREAMER_VERSION@
Requires: %{gstreamername}-plugins-good >= @MIN_GSTREAMER_VERSION@
Requires: h5py
Requires: numpy
Requires: scipy
Requires: gsl
Requires: orc >= @MIN_ORC_VERSION@
# --- LSCSoft package requirements --- #
Requires: lal >= @MIN_LAL_VERSION@
Requires: lalmetaio >= @MIN_LALMETAIO_VERSION@
Requires: lalburst >= @MIN_LALBURST_VERSION@
Requires: orc >= @MIN_ORC_VERSION@
Requires: gsl
Requires: python%{python3_pkgversion}-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@
# --- python package requirements --- #
Requires: python3 >= @MIN_PYTHON_VERSION@
Requires: python%{python3_pkgversion}-%{gstreamername}
Requires: python%{python3_pkgversion}-numpy
Requires: python%{python3_pkgversion}-scipy
%if 0%{?rhel} == 8
Requires: python%{python3_pkgversion}-numpy >= @MIN_NUMPY_VERSION@
Requires: python3-matplotlib
%else
Requires: numpy >= @MIN_NUMPY_VERSION@
Requires: python%{python3_pkgversion}-matplotlib
%endif
# -- build requirements --- #
BuildRequires: fftw-devel >= 3
BuildRequires: gobject-introspection-devel >= @MIN_GOBJECT_INTROSPECTION_VERSION@
BuildRequires: graphviz
BuildRequires: gsl-devel
BuildRequires: gstlal-devel >= @MIN_GSTLAL_VERSION@
BuildRequires: python-devel >= @MIN_PYTHON_VERSION@
BuildRequires: fftw-devel >= 3
BuildRequires: %{gstreamername}-devel >= @MIN_GSTREAMER_VERSION@
BuildRequires: %{gstreamername}-plugins-base-devel >= @MIN_GSTREAMER_VERSION@
BuildRequires: lal-devel >= @MIN_LAL_VERSION@
BuildRequires: lal-python >= @MIN_LAL_VERSION@
BuildRequires: lalburst-devel >= @MIN_LALBURST_VERSION@
BuildRequires: lalmetaio-devel >= @MIN_LALMETAIO_VERSION@
BuildRequires: gsl-devel
BuildRequires: graphviz
BuildRequires: liblal-devel >= @MIN_LAL_VERSION@
BuildRequires: liblalburst-devel >= @MIN_LALBURST_VERSION@
BuildRequires: liblalmetaio-devel >= @MIN_LALMETAIO_VERSION@
BuildRequires: orc >= @MIN_ORC_VERSION@
BuildRequires: pkgconfig >= @MIN_PKG_CONFIG_VERSION@
BuildRequires: python3-devel >= @MIN_PYTHON_VERSION@
BuildRequires: python%{python3_pkgversion}-lal >= @MIN_LAL_VERSION@
Conflicts: gstlal-ugly < 0.6.0
Source: @PACKAGE_NAME@-%{version}.tar.gz
URL: https://www.lsc-group.phys.uwm.edu/daswg/projects/gstlal.html
Packager: Chris Pankow <chris.pankow@gravity.phys.uwm.edu>
Source: https://software.igwn.org/lscsoft/source/@PACKAGE_NAME@-%{version}.tar.gz
URL: https://git.ligo.org/lscsoft/gstlal
Packager: Patrick Godwin <patrick.godwin@ligo.org>
BuildRoot: %{_tmppath}/%{name}-%{version}-root
%description
This package contains the plugins and shared libraries required to run the gstlal burst (generic transient) pipeline.
......@@ -54,13 +68,13 @@ Group: LSC Software/Data Analysis
Requires: %{name} = %{version}
Requires: gstlal-devel >= @MIN_GSTLAL_VERSION@
Requires: gstlal-ugly-devel >= @MIN_GSTLAL_UGLY_VERSION@
Requires: python-devel >= @MIN_PYTHON_VERSION@
Requires: python3-devel >= @MIN_PYTHON_VERSION@
Requires: fftw-devel >= 3
Requires: %{gstreamername}-devel >= @MIN_GSTREAMER_VERSION@
Requires: %{gstreamername}-plugins-base-devel >= @MIN_GSTREAMER_VERSION@
Requires: lal-devel >= @MIN_LAL_VERSION@
Requires: lalmetaio-devel >= @MIN_LALMETAIO_VERSION@
Requires: lalburst-devel >= @MIN_LALBURST_VERSION@
Requires: liblal-devel >= @MIN_LAL_VERSION@
Requires: liblalmetaio-devel >= @MIN_LALMETAIO_VERSION@
Requires: liblalburst-devel >= @MIN_LALBURST_VERSION@
%description devel
This package contains the files needed for building gstlal-burst based plugins
......@@ -71,7 +85,7 @@ and programs.
%build
%configure
%configure PYTHON=python3
%{__make}
......
AM_CPPFLAGS = -I$(top_srcdir)/lib -I$(top_builddir)/lib
pkginclude_HEADERS = gstlal_snglburst.h gstlal_sngltrigger.h sngltriggerrowtype.h
pkginclude_HEADERS = gstlal_snglburst.h
pkgconfig_DATA = gstlal-burst.pc
lib_LTLIBRARIES = libgstlalburst.la
libgstlalburst_la_SOURCES = \
gstlal_snglburst.h gstlal_snglburst.c \
gstlal_sngltrigger.h gstlal_sngltrigger.c \
sngltriggerrowtype.h sngltriggerrowtype.c
libgstlalburst_la_CFLAGS = $(AM_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS)
libgstlalburst_la_LDFLAGS = -version-info $(LIBVERSION) $(AM_LDFLAGS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS)
gstlal_snglburst.h gstlal_snglburst.c
libgstlalburst_la_CFLAGS = $(AM_CFLAGS) $(GSL_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS)
libgstlalburst_la_LDFLAGS = -version-info $(LIBVERSION) $(AM_LDFLAGS) $(GSL_LIBS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS)
......@@ -6,6 +6,6 @@ includedir=@includedir@
Name: libgstlalburst
Description: LAL/Gstreamer Library (the bursty bits)
Version: @VERSION@
Requires: lal lalmetaio lalburst gstlal
Requires: lal lalmetaio glib-2.0 gsl gstlal
Libs: -L${libdir} -lgstlalburst
Cflags: -I${includedir}