...
 
Commits (86)
gstlal-burst.spec
lib/gstlal-burst.pc
lib/gstlal-burst/gstlal-burst.pc
ACLOCAL_AMFLAGS = -I gnuscripts
EXTRA_DIST = gstlal-burst.spec
SUBDIRS = debian lib python bin gst
SUBDIRS = debian lib gst python bin share
# check that the most recent changelog entry's version matches the package
# version
......
......@@ -2,12 +2,13 @@ dist_bin_SCRIPTS = \
gstlal_cs_triggergen \
gstlal_excesspower \
gstlal_excesspower_trigvis \
gstlal_feature_aggregator \
gstlal_feature_extractor \
gstlal_feature_extractor_pipe \
gstlal_ll_feature_extractor_pipe \
gstlal_feature_extractor_whitener_check \
gstlal_feature_extractor_template_overlap \
gstlal_feature_hdf5_sink \
gstlal_feature_monitor \
gstlal_feature_synchronizer
gstlal_snax_aggregate \
gstlal_snax_bank_overlap \
gstlal_snax_combine \
gstlal_snax_dag_online \
gstlal_snax_dag_offline \
gstlal_snax_extract \
gstlal_snax_monitor \
gstlal_snax_sink \
gstlal_snax_synchronize \
gstlal_snax_whiten
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_aggregator [--options]"
__usage__ = "gstlal_snax_aggregate [--options]"
__description__ = "an executable to aggregate and generate job metrics for streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -37,10 +37,10 @@ import numpy
from confluent_kafka import Consumer, KafkaError
from ligo.scald import aggregator
from ligo.scald import io
from ligo.scald import utils as scald_utils
from ligo.scald.io import hdf5, influx
from gstlal.fxtools import utils
from gstlal.snax import utils
#-------------------------------------------------
# Functions
......@@ -64,6 +64,8 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar = "string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default: max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -97,7 +99,7 @@ class StreamAggregator(object):
### other aggregator options
self.data_type = options.data_type
self.last_save = aggregator.now()
self.last_save = scald_utils.gps_now()
### initialize consumers
self.jobs = options.jobs
......@@ -112,19 +114,25 @@ class StreamAggregator(object):
### set up aggregator
logger.info("setting up aggregator with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
)
### define measurements to be stored from aggregators
self.agg_sink.register_schema('latency', columns='data', column_key='data', tags='job', tag_key='job')
self.agg_sink.register_schema('snr', columns='data', column_key='data', tags=('channel', 'subsystem'), tag_key='channel')
def fetch_data(self, job_consumer):
"""
requests for a new message from an individual topic,
......@@ -154,8 +162,8 @@ class StreamAggregator(object):
"""
process and aggregate features from feature extraction jobs on a regular cadence
"""
if utils.in_new_epoch(aggregator.now(), self.last_save, 1):
self.last_save = aggregator.now()
if utils.in_new_epoch(scald_utils.gps_now(), self.last_save, 1):
self.last_save = scald_utils.gps_now()
### format incoming packets into metrics and timeseries
feature_packets = [(job, self.feature_queue[job].pop()) for job in self.jobs for i in range(len(self.feature_queue[job]))]
......@@ -163,11 +171,11 @@ class StreamAggregator(object):
### store and aggregate metrics
metric_data = {job: {'time': metrics['time'], 'fields': {'data': metrics['latency']}} for job, metrics in all_metrics.items()}
self.agg_sink.store_columns('latency', metric_data, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('latency', metric_data, aggregate=self.data_type)
### store and aggregate features
timeseries_data = {channel: {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, 'data', tags='channel', aggregate=self.data_type)
timeseries_data = {(channel, self._channel_to_subsystem(channel)): {'time': timeseries['time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, aggregate=self.data_type)
try:
max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
......@@ -218,6 +226,13 @@ class StreamAggregator(object):
logger.info('shutting down feature aggregator...')
self.is_running = False
@staticmethod
def _channel_to_subsystem(channel):
"""
given a channel, returns the subsystem the channel lives in
"""
return channel.split(':')[1].split('-')[0]
class SignalHandler(object):
"""
helper class to shut down the stream aggregator gracefully before exiting
......@@ -242,7 +257,7 @@ if __name__ == '__main__':
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_aggregator']),
'-'.join([options.tag, 'feature_aggregator', options.jobs[0]]),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
......
......@@ -18,7 +18,7 @@
"""
A program that measures template overlaps and how templates are spread out in the parameter space
for gstlal_feature_extractor
for gstlal_snax_extract
"""
####################
......@@ -42,7 +42,7 @@ from glue import markup
from gstlal import plotutil
from gstlal import aggregator
from gstlal.fxtools import utils
from gstlal.snax import utils
import matplotlib
matplotlib.use('Agg')
......
#!/usr/bin/env python
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_combine [--options]"
__description__ = "an executable to combine features from the batch pipeline to provide a more user-friendly output"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
# =============================
#
# preamble
#
# =============================
from collections import defaultdict
import itertools
import optparse
import os
import sys
import shutil
import h5py
import numpy
from ligo.segments import infinity, segment, segmentlist
from gstlal import aggregator
from gstlal.snax import utils
# =============================================================================
#
# FUNCTIONS
#
# =============================================================================
def parse_command_line():
"""
Parse command line inputs.
"""
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Combiner Options", "General settings for configuring the file combiner.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--start-time", type = "int", help = "Set the start time to combine features.")
group.add_option("--end-time", type = "int", help = "Set the end time to combine features.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
group.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_FEATURES")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--outdir", metavar = "path", help = "If set, chooses an alternate directory to save the features to. Default = --rootdir")
parser.add_option_group(group)
opts, args = parser.parse_args()
return opts, args
# ===================
#
# main
#
# ===================
if __name__ == "__main__":
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'combiner']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
### define gps bounds to grab features
start_time = options.start_time if options.start_time else -infinity()
end_time = options.end_time if options.end_time else infinity()
file_segs = segmentlist([segment(start_time, end_time)])
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
### build cache of hdf5-formatted features, grouped by segment
pattern = '{ifo}-{basename}/{ifo}-{basename}-*/{ifo}-{basename}-*/{ifo}-{basename}-*.h5'.format(
basename=options.basename,
ifo=options.instrument[0],
)
cache = sorted(utils.path2cache(options.rootdir, pattern), key=lambda x: x.segment)
### filter cache with segments
cache = [entry for entry in cache if file_segs.intersects_segment(entry.segment)]
### group by segment
grouped_cache = [(seg, list(group)) for seg, group in itertools.groupby(cache, key=lambda x: x.segment)]
### combine features in each stride
for seg, cache in grouped_cache:
logger.info('combining features within times: {} - {}'.format(*seg))
features = defaultdict(dict)
### assume filenames, metadata is the same in each group
dirname = os.path.split(os.path.dirname(cache[0].path))[0]
filename = os.path.splitext(os.path.basename(cache[0].path))[0]
metadata = {}
with h5py.File(cache[0].path, 'r') as f:
metadata['waveform'] = f.attrs.get('waveform')
metadata['sample_rate'] = f.attrs.get('sample_rate')
### load features
for entry in cache:
with h5py.File(entry.path, 'r') as f:
channels = f.keys()
for channel in channels:
dsets = f[channel].keys()
for dset in dsets:
features[channel][dset] = numpy.array(f[channel][dset])
### save combined features to disk
for channel in features.keys():
for dset in features[channel].keys():
utils.create_new_dataset(tmp_dir, filename, features[channel][dset], name=dset, group=channel, tmp=True, metadata=metadata)
### determine final location for features
if options.outdir:
start_time = int(filename.split('-')[2])
basename = '-'.join([options.instrument[0], options.basename])
base_path = utils.to_trigger_path(options.outdir, basename, start_time)
aggregator.makedir(base_path)
final_path = os.path.join(base_path, filename)+".h5"
else:
final_path = os.path.join(dirname, filename)+".h5"
tmp_path = os.path.join(tmp_dir, filename)+".h5.tmp"
logger.info('saving features to: {}'.format(final_path))
shutil.move(tmp_path, final_path)
......@@ -30,7 +30,7 @@ A program to extract features from auxiliary channel data in real time or in off
###
### digraph llpipe {
### labeljust = "r";
### label="gstlal_feature_extractor"
### label="gstlal_snax_extract"
### rankdir=LR;
### graph [fontname="Roman", fontsize=24];
### edge [ fontname="Roman", fontsize=10 ];
......@@ -156,13 +156,13 @@ from gstlal import httpinterface
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal.fxtools import auxcache
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import sngltriggertable
from gstlal.fxtools import utils
from gstlal.fxtools import waveforms as fxwaveforms
from gstlal.snax import auxcache
from gstlal.snax import feature_extractor
from gstlal.snax import multichannel_datasource
from gstlal.snax import multirate_datasource
from gstlal.snax import sngltriggertable
from gstlal.snax import utils
from gstlal.snax import waveforms as fxwaveforms
#
# Make sure we have sufficient resources
......@@ -221,6 +221,7 @@ def parse_command_line():
# check if input sample rate is sensible
assert options.sample_rate == 1 or options.sample_rate % 2 == 0
assert options.min_downsample_rate % 2 == 0
# check if persist and save cadence times are sensible
assert options.persist_cadence >= options.cadence
......@@ -258,7 +259,7 @@ duration = options.feature_end_time - options.feature_start_time
logdir = os.path.join(options.out_path, 'logs', options.job_id)
aggregator.makedir(logdir)
logger = utils.get_logger('gstlal-feature-extractor_%d-%d' % (options.feature_start_time, duration), rootdir=logdir, verbose=options.verbose)
logger = utils.get_logger('gstlal_snax_extract: %d-%d' % (options.feature_start_time, options.feature_end_time), rootdir=logdir, verbose=options.verbose)
logger.info("writing log to %s" % logdir)
#
......@@ -418,24 +419,25 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
head[channel] = pipeparts.mklatency(pipeline, head[channel], name=utils.latency_name('beforewhitening', 2, channel))
# whiten auxiliary channel data
for rate, thishead in multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32, nxydump_segment=options.nxydump_segment).items():
for rate, thishead in multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32, nxydump_segment=options.nxydump_segment, psd_fft_length=options.psd_fft_length, min_rate=options.min_downsample_rate).items():
thisrate = max(options.min_downsample_rate, rate)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterwhitening', 3, channel, rate))
# determine whether to do time-domain or frequency-domain convolution
time_domain = (waveforms[channel].sample_pts(rate)*rate) < (5*waveforms[channel].sample_pts(rate)*numpy.log2(rate))
time_domain = (waveforms[channel].sample_pts(thisrate)*thisrate) < (5*waveforms[channel].sample_pts(thisrate)*numpy.log2(thisrate))
# create FIR bank of half sine-gaussian templates
fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate)])
fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate, sampling_rate=thisrate)])
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = waveforms[channel].latency(rate))
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(thisrate), latency = waveforms[channel].latency(thisrate))
# add queues, change stream format, add tags
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterFIRbank', 4, channel, rate))
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
thishead = pipeparts.mktogglecomplex(pipeline, thishead)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % rate)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % thisrate)
thishead = pipeparts.mktaginject(pipeline, thishead, "instrument=%s,channel-name=%s" %( instrument, channel))
# dump segments to disk if specified
......@@ -445,9 +447,9 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# extract features from time series
if options.feature_mode == 'timeseries':
thishead = pipeparts.mktrigger(pipeline, tee, int(rate // options.sample_rate), max_snr = True)
thishead = pipeparts.mktrigger(pipeline, tee, int(thisrate // options.sample_rate), max_snr = True)
elif options.feature_mode == 'etg':
thishead = pipeparts.mktrigger(pipeline, tee, rate, snr_thresh = options.snr_threshold)
thishead = pipeparts.mktrigger(pipeline, tee, thisrate, snr_thresh = options.snr_threshold)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('aftertrigger', 5, channel, rate))
......
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_monitor [--options]"
__usage__ = "gstlal_snax_monitor [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -35,10 +35,10 @@ import time
from confluent_kafka import Consumer, KafkaError
from ligo.scald import io
from ligo.scald.io import hdf5, influx
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import utils
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
#-------------------------------------------------
# Functions
......@@ -65,6 +65,8 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar="string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default = max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -117,14 +119,16 @@ class StreamMonitor(object):
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
......@@ -134,6 +138,9 @@ class StreamMonitor(object):
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
### define measurements to be stored
for metric in ('target_snr', 'synchronizer_latency', 'percent_missed'):
self.agg_sink.register_schema(metric, columns='data', column_key='data', tags='job', tag_key='job')
def fetch_data(self):
"""
......@@ -161,6 +168,9 @@ class StreamMonitor(object):
if self.timestamp:
if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
### check for missing channels
missing_channels = set()
metrics = defaultdict(list)
while len(self.feature_queue) > 0:
### remove data with oldest timestamp and process
......@@ -170,9 +180,6 @@ class StreamMonitor(object):
### check for missing channels
these_channels = set(features.keys())
missing_channels = self.channels - these_channels
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
### generate metrics
metrics['time'].append(timestamp)
metrics['synchronizer_latency'].append(latency)
......@@ -185,13 +192,16 @@ class StreamMonitor(object):
### store and aggregate features
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns(metric, {'synchronizer': data}, aggregate=self.data_type)
if len(metrics['target_time']) > 0:
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, aggregate=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['synchronizer_latency']), max(metrics['percent_missed'])))
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
def start(self):
"""
......@@ -212,7 +222,6 @@ class StreamMonitor(object):
shut down gracefully
"""
logger.info('shutting down feature monitor...')
self.conn.close()
class SignalHandler(object):
"""
......
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_hdf5_sink [--options]"
__usage__ = "gstlal_snax_sink [--options]"
__description__ = "an executable to dump streaming data to disk via hdf5"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -40,8 +40,8 @@ import numpy
from gstlal import aggregator
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import utils
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
#-------------------------------------------------
# Functions
......@@ -53,8 +53,9 @@ def parse_command_line():
group = optparse.OptionGroup(parser, "File Sink Options", "General settings for configuring the file sink.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
group.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_FEATURES")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
group.add_option("--features-path", metavar = "path", default = ".", help = "Write features to this path. Default = .")
group.add_option("--basename", metavar = "string", default = "SNAX_FEATURES", help = "Sets the basename for files written to disk. Default = SNAX_FEATURES")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--waveform", type="string", default = "sine_gaussian", help = "Set the waveform used for producing features. Default = sine_gaussian.")
......@@ -91,6 +92,7 @@ class HDF5StreamSink(object):
"""
def __init__(self, logger, options):
logger.info('setting up hdf5 stream sink...')
self.tag = options.tag
### initialize timing options
self.request_timeout = options.request_timeout
......@@ -98,8 +100,10 @@ class HDF5StreamSink(object):
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'group_1'}
self.kafka_settings = {
'bootstrap.servers': options.kafka_server,
'group.id': 'hdf5_sink_{}'.format(self.tag)
}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
......@@ -120,13 +124,20 @@ class HDF5StreamSink(object):
self.last_save_time = None
self.last_persist_time = None
self.rootdir = options.rootdir
self.base_features_path = options.features_path
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.waveform = options.waveform
self.basename = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate, waveform = self.waveform)
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
self.feature_data = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.write_cadence,
sample_rate = self.sample_rate,
waveform = self.waveform
)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
......@@ -142,7 +153,7 @@ class HDF5StreamSink(object):
"""
# set/update file names and directories with new gps time and duration
self.feature_name = os.path.splitext(utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
self.feature_path = utils.to_trigger_path(os.path.abspath(self.rootdir), self.basename, start_time)
self.feature_path = utils.to_trigger_path(os.path.abspath(self.base_features_path), self.basename, start_time)
self.tmp_path = utils.to_trigger_path(self.tmp_dir, self.basename, start_time)
# create temp and output directories if they don't exist
......@@ -188,13 +199,14 @@ class HDF5StreamSink(object):
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
logger.info("saving features to disk at timestamp = %f" % self.timestamp)
self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True)
save_time = utils.floor_div(self.last_save_time, self.write_cadence)
self.feature_data.dump(self.tmp_path, self.feature_name, save_time, tmp = True)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
......
......@@ -16,8 +16,8 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_synchronizer [--options]"
__description__ = "an executable to synchronize incoming gstlal feature extractor streams and send downstream"
__usage__ = "gstlal_snax_synchronize [--options]"
__description__ = "an executable to synchronize incoming feature streams and send downstream"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
......@@ -33,14 +33,13 @@ import timeit
from collections import deque
from Queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPool
from optparse import OptionParser
from lal import gpstime
from confluent_kafka import Producer, Consumer, KafkaError
from gstlal.fxtools import utils
from gstlal.snax import utils
#-------------------------------------------------
# Functions
......@@ -92,13 +91,11 @@ class StreamSynchronizer(object):
self.num_topics = options.num_topics
### initialize consumers
self.consumer_names = ['%s_%s' % (options.input_topic_basename, str(i).zfill(4)) for i in range(1, self.num_topics + 1)]
# FIXME: hacky way of introducing group id, should be a settable option
self.topics = ['%s_%s' % (options.input_topic_basename, str(i).zfill(4)) for i in range(1, self.num_topics + 1)]
consumer_kafka_settings = self.kafka_settings
consumer_kafka_settings['group.id'] = 'group_1'
self.consumers = [Consumer(consumer_kafka_settings) for topic in self.consumer_names]
for topic, consumer in zip(self.consumer_names, self.consumers):
consumer.subscribe([topic])
consumer_kafka_settings['group.id'] = '-'.join(['synchronizer', options.tag])
self.consumer = Consumer(consumer_kafka_settings)
self.consumer.subscribe([topic for topic in self.topics])
### initialize producer
self.producer_name = options.output_topic_basename
......@@ -111,32 +108,23 @@ class StreamSynchronizer(object):
# 5 minute queue for outgoing buffers
self.feature_buffer = deque(maxlen = 300)
def fetch_data(self, consumer):
def fetch_data(self):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
message = consumer.poll(timeout=self.request_timeout)
messages = self.consumer.consume(num_messages=len(self.topics), timeout=self.request_timeout)
### only add to queue if no errors in receiving data
if message and not message.error():
for message in messages:
### only add to queue if no errors in receiving data
if message and not message.error():
### decode json and parse data
feature_subset = json.loads(message.value())
### decode json and parse data
feature_subset = json.loads(message.value())
### add to queue if timestamp is within timeout
if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
self.add_to_queue(feature_subset['timestamp'], feature_subset['features'])
def fetch_all_data(self):
"""
requests for a new message from all topics, and add
to the feature queue
"""
pool = ThreadPool(self.num_topics)
result = pool.map_async(self.fetch_data, self.consumers)
result.wait()
pool.close()
### add to queue if timestamp is within timeout
if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
self.add_to_queue(feature_subset['timestamp'], feature_subset['features'])
def add_to_queue(self, timestamp, data):
"""
......@@ -208,7 +196,7 @@ class StreamSynchronizer(object):
"""
while self.is_running:
### ingest and combine incoming feature subsets, dropping late data
self.fetch_all_data()
self.fetch_data()
self.process_queue()
### push combined features downstream
while self.feature_buffer:
......
......@@ -17,7 +17,7 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program that allows diagnosis of the whitening stage of gstlal_feature_extractor
A program that whitens timeseries
"""
......@@ -49,7 +49,7 @@ from gstlal import reference_psd
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal.fxtools import multichannel_datasource
from gstlal.snax import multichannel_datasource
# global settings for whitening properties
PSD_FFT_LENGTH = 32
......
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-burst],[0.1.0],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_INIT([gstlal-burst],[0.2.0],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......@@ -16,15 +16,17 @@ AC_CONFIG_FILES([ \
Makefile \
gstlal-burst.spec \
bin/Makefile \
lib/gstlal-burst.pc \
lib/Makefile \
gst/Makefile \
gst/lal/Makefile \
debian/control \
debian/Makefile \
gst/Makefile \
gst/lal/Makefile \
lib/Makefile \
lib/gstlal-burst/gstlal-burst.pc \
lib/gstlal-burst/Makefile \
python/Makefile \
python/excesspower/Makefile \
python/fxtools/Makefile
python/snax/Makefile \
share/Makefile
])
......
gstlal-burst (0.2.0-1) unstable; urgency=low
* Rename fxtools submodule to snax to reflect new name from
gstlal feature extractor to SNAX
* Expose --psd-fft-length option for finer whitener control
* Fix in whitener zero-padding
* Update tags in monitoring to include subsystem info
* Change trigger_time -> time column, remove NaN rows for gwdatafind
compatibility
* Call smrepair upon startup if reading data from /dev/shm
* Add feature combiner job for offline to combine features from distinct
jobs to match online format
* Add option for monitor, aggregator to connect to Influx with auth/HTTPS
* Increase blocksize in reading frames for improved performance with NFS
* Fix issue with pipeline hanging in offline jobs in some edge cases
* Switch to single Kafka consumer in synchronizer for improved performance
-- Patrick Godwin <patrick.godwin@ligo.org> Mon, 21 Oct 2019 11:11:13 -0700
gstlal-burst (0.1.1) unstable; urgency=low
* Updated gstlal_feature_aggregator, gstlal_feature_monitor to deal with
ligo-scald API change
-- Patrick Godwin <patrick.godwin@ligo.org> Sun, 03 Mar 2019 21:27:15 -0500
gstlal-burst (0.1.0) unstable; urgency=low
* Add feature extraction toolkit
......
......@@ -11,7 +11,7 @@ AC_DEFUN([AX_PYTHON_GLUE],[
AC_MSG_ERROR(["cannot determine version"])
])
minversion=$1
AX_COMPARE_VERSION([$LIGO_SEGMENTS_VERSION], [ge], [${minversion:-0}], [
AX_COMPARE_VERSION([$GLUE_VERSION], [ge], [${minversion:-0}], [
AC_MSG_RESULT([$GLUE_VERSION])
], [
AC_MSG_WARN([found $GLUE_VERSION, require at least $1])
......
#! /bin/sh
# depcomp - compile a program generating dependencies as side-effects
scriptversion=2016-01-11.22; # UTC
scriptversion=2018-03-07.03; # UTC
# Copyright (C) 1999-2017 Free Software Foundation, Inc.
# Copyright (C) 1999-2018 Free Software Foundation, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -16,7 +16,7 @@ scriptversion=2016-01-11.22; # UTC
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# As a special exception to the GNU General Public License, if you
# distribute this file as part of a program that contains a
......@@ -783,7 +783,7 @@ exit 0
# Local Variables:
# mode: shell-script
# sh-indentation: 2
# eval: (add-hook 'write-file-hooks 'time-stamp)
# eval: (add-hook 'before-save-hook 'time-stamp)
# time-stamp-start: "scriptversion="
# time-stamp-format: "%:y-%02m-%02d.%02H"
# time-stamp-time-zone: "UTC0"
......
AM_CPPFLAGS = -I$(top_srcdir)/lib
AM_CPPFLAGS = -I$(top_srcdir)/lib -I$(top_builddir)/lib
plugin_LTLIBRARIES = lib@GSTPLUGINPREFIX@gstlalburst.la
......@@ -8,6 +8,6 @@ lib@GSTPLUGINPREFIX@gstlalburst_la_SOURCES = \
gstlal_string_triggergen.c gstlal_string_triggergen.h
lib@GSTPLUGINPREFIX@gstlalburst_la_CFLAGS = $(AM_CFLAGS) $(GSL_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS)
lib@GSTPLUGINPREFIX@gstlalburst_la_LIBADD = $(top_builddir)/lib/libgstlalburst.la
lib@GSTPLUGINPREFIX@gstlalburst_la_LIBADD = $(top_builddir)/lib/gstlal-burst/libgstlalburst.la
lib@GSTPLUGINPREFIX@gstlalburst_la_LDFLAGS = $(AM_LDFLAGS) $(GSL_LIBS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS) $(GSTLAL_PLUGIN_LDFLAGS)
......@@ -87,6 +87,215 @@ static void free_bank(GSTLALBurst_Triggergen *element)
element->bankarray = NULL;
}
/*
* ========================================================================
*
* Triggers
*
* ========================================================================
*/
static int gstlal_set_channel_in_snglburst_array(SnglBurst *bankarray, int length, char *channel)
{
int i;
for (i = 0; i < length; i++) {
if (channel) {
strncpy(bankarray[i].channel, (const char*) channel, LIGOMETA_CHANNEL_MAX);
bankarray[i].channel[LIGOMETA_CHANNEL_MAX - 1] = 0;
}
}
return 0;
}
static int gstlal_set_instrument_in_snglburst_array(SnglBurst *bankarray, int length, char *instrument)
{
int i;
for (i = 0; i < length; i++) {
if (instrument) {
strncpy(bankarray[i].ifo, (const char*) instrument, LIGOMETA_IFO_MAX);
bankarray[i].ifo[LIGOMETA_IFO_MAX - 1] = 0;
}
}
return 0;
}
static SnglBurst *gstlal_snglburst_new_list_from_peak(struct gstlal_peak_state *input, SnglBurst *bankarray, GstClockTime etime, guint rate, SnglBurst* output)
{
/* advance the pointer if we have one */
guint channel;
double complex *maxdata = input->values.as_double_complex;
guint *maxsample = input->samples;
/* FIXME do error checking */
for(channel = 0; channel < input->channels; channel++) {
if ( maxdata[channel] ) {
SnglBurst *new_event = XLALCreateSnglBurst();
memcpy(new_event, &(bankarray[channel]), sizeof(*new_event));
LIGOTimeGPS peak_time;
XLALINT8NSToGPS(&peak_time, etime);
XLALGPSAdd(&peak_time, -new_event->duration/2);
XLALGPSAdd(&peak_time, (double) maxsample[channel] / rate);
LIGOTimeGPS start_time = peak_time;
XLALGPSAdd(&start_time, -new_event->duration/2);
new_event->snr = cabs(maxdata[channel]);
new_event->start_time = start_time;
new_event->peak_time = peak_time;
new_event->next = output;
output = new_event;
}
}
return output;
}
static SnglBurst *gstlal_snglburst_new_list_from_double_peak(struct gstlal_peak_state *input, SnglBurst *bankarray, GstClockTime etime, guint rate, SnglBurst* output)
{
/* advance the pointer if we have one */
guint channel;
double *maxdata = input->values.as_double;
guint *maxsample = input->samples;
/* FIXME do error checking */
for(channel = 0; channel < input->channels; channel++) {
if ( maxdata[channel] ) {
SnglBurst *new_event = XLALCreateSnglBurst();
memcpy(new_event, &(bankarray[channel]), sizeof(*new_event));
LIGOTimeGPS peak_time;
XLALINT8NSToGPS(&peak_time, etime);
XLALGPSAdd(&peak_time, (double) maxsample[channel] / rate);
XLALGPSAdd(&peak_time, -new_event->duration/2);
// Center the tile
XLALGPSAdd(&peak_time, 1.0/(2.0*rate));
LIGOTimeGPS start_time = peak_time;
XLALGPSAdd(&start_time, -new_event->duration/2);
new_event->snr = fabs(maxdata[channel]);
new_event->start_time = start_time;
new_event->peak_time = peak_time;
new_event->next = output;
output = new_event;
}
}
return output;
}
static GstBuffer *gstlal_snglburst_new_buffer_from_list(SnglBurst *input, GstPad *pad, guint64 offset, guint64 length, GstClockTime etime, guint rate, guint64 *count)
{
/* FIXME check errors */
/* size is length in samples times number of channels times number of bytes per sample */
gint size = XLALSnglBurstTableLength(input);
size *= sizeof(*input);
GstBuffer *srcbuf = NULL;
GstCaps *caps = GST_PAD_CAPS(pad);
GstFlowReturn result = gst_pad_alloc_buffer(pad, offset, size, caps, &srcbuf);
if (result != GST_FLOW_OK)
return srcbuf;
if (input) {
//FIXME: Process ID
(*count) = XLALSnglBurstAssignIDs(input, 0, *count);
}
/* Copy the events into the buffer */
SnglBurst *output = (SnglBurst *) GST_BUFFER_DATA(srcbuf);
SnglBurst *head = input;
while (input) {
*output = *input;
/* Make the array look like a linked list */
output->next = input->next ? output+1 : NULL;
output++;
input = input->next;
}
/* Forget about this set of events, it's the buffer's now. */
XLALDestroySnglBurstTable(head);
if (size == 0)
GST_BUFFER_FLAG_SET(srcbuf, GST_BUFFER_FLAG_GAP);
/* set the offset */
GST_BUFFER_OFFSET(srcbuf) = offset;
GST_BUFFER_OFFSET_END(srcbuf) = offset + length;
/* set the time stamps */
GST_BUFFER_TIMESTAMP(srcbuf) = etime;
GST_BUFFER_DURATION(srcbuf) = (GstClockTime) gst_util_uint64_scale_int_round(GST_SECOND, length, rate);
return srcbuf;
}
static SnglBurst *gstlal_snglburst_new_list_from_double_buffer(double *input, SnglBurst *bankarray, GstClockTime etime, guint channels, guint samples, guint rate, gdouble threshold, SnglBurst* output)
{
/* advance the pointer if we have one */
guint channel, sample;
/* FIXME do error checking */
for (channel = 0; channel < channels; channel++) {
for (sample = 0; sample < samples; sample++) {
if (input[channels*sample+channel] > threshold) {
SnglBurst *new_event = XLALCreateSnglBurst();
memcpy(new_event, &(bankarray[channel]), sizeof(*new_event));
LIGOTimeGPS peak_time;
XLALINT8NSToGPS(&peak_time, etime);
XLALGPSAdd(&peak_time, (double) sample / rate);
XLALGPSAdd(&peak_time, -new_event->duration/2);
// Center the tile
XLALGPSAdd(&peak_time, 1.0/(2.0*rate));
LIGOTimeGPS start_time = peak_time;
XLALGPSAdd(&start_time, -new_event->duration/2);
new_event->snr = fabs(input[channels*sample+channel]);
new_event->start_time = start_time;
new_event->peak_time = peak_time;
new_event->next = output;
output = new_event;
}
}
}
return output;
}
static SnglBurst *gstlal_snglburst_new_list_from_complex_double_buffer(complex double *input, SnglBurst *bankarray, GstClockTime etime, guint channels, guint samples, guint rate, gdouble threshold, SnglBurst* output)
{
/* advance the pointer if we have one */
guint channel, sample;
/* FIXME do error checking */
for (channel = 0; channel < channels; channel++) {
for (sample = 0; sample < samples; sample++) {
/* FIXME Which are we thresholding on. the EP version uses the
* squared value, so we make this consistent */
if (cabs(input[channels*sample+channel]) > sqrt(threshold)) {
SnglBurst *new_event = XLALCreateSnglBurst();
memcpy(new_event, &(bankarray[channel]), sizeof(*new_event));
LIGOTimeGPS peak_time;
XLALINT8NSToGPS(&peak_time, etime);
XLALGPSAdd(&peak_time, (double) sample / rate);
XLALGPSAdd(&peak_time, -new_event->duration/2);
// Center the tile
XLALGPSAdd(&peak_time, 1.0/(2.0*rate));
LIGOTimeGPS start_time = peak_time;
XLALGPSAdd(&start_time, -new_event->duration/2);
new_event->snr = cabs(input[channels*sample+channel]);
new_event->start_time = start_time;
new_event->peak_time = peak_time;
new_event->next = output;
output = new_event;
}
}
}
return output;
}
/*
* ============================================================================
*
......
......@@ -51,7 +51,7 @@
#include <gstlal/gstaudioadapter.h>
#include <gstlal/gstlal_tags.h>
#include <gstlal/gstlal_autocorrelation_chi2.h>
#include <gstlal_sngltrigger.h>
#include <gstlal-burst/gstlal_sngltrigger.h>
/*
......
......@@ -38,7 +38,7 @@
#include <lal/LIGOMetadataTables.h>
#include <gsl/gsl_matrix.h>
#include <gsl/gsl_matrix_float.h>
#include <gstlal_sngltrigger.h>
#include <gstlal-burst/gstlal_sngltrigger.h>
G_BEGIN_DECLS
......
......@@ -7,39 +7,39 @@ Summary: GSTLAL Burst
License: GPL
Group: LSC Software/Data Analysis
Requires: gstlal-ugly >= @MIN_GSTLAL_UGLY_VERSION@
Requires: gstlal >= @MIN_GSTLAL_VERSION@
Requires: python >= @MIN_PYTHON_VERSION@
Requires: fftw >= 3
Requires: glue >= @MIN_GLUE_VERSION@
Requires: python2-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@
Requires: gobject-introspection >= @MIN_GOBJECT_INTROSPECTION_VERSION@
Requires: fftw >= 3
Requires: python-%{gstreamername}
Requires: gstlal >= @MIN_GSTLAL_VERSION@
Requires: gstlal-ugly >= @MIN_GSTLAL_UGLY_VERSION@
Requires: %{gstreamername} >= @MIN_GSTREAMER_VERSION@
Requires: %{gstreamername}-plugins-base >= @MIN_GSTREAMER_VERSION@
Requires: %{gstreamername}-plugins-good >= @MIN_GSTREAMER_VERSION@
Requires: gsl
Requires: h5py
Requires: numpy
Requires: scipy
Requires: lal >= @MIN_LAL_VERSION@
Requires: lalmetaio >= @MIN_LALMETAIO_VERSION@
Requires: lalburst >= @MIN_LALBURST_VERSION@
Requires: numpy
Requires: orc >= @MIN_ORC_VERSION@
Requires: gsl
Requires: python >= @MIN_PYTHON_VERSION@
Requires: python-%{gstreamername}
Requires: python2-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@
Requires: scipy
BuildRequires: fftw-devel >= 3
BuildRequires: gobject-introspection-devel >= @MIN_GOBJECT_INTROSPECTION_VERSION@
BuildRequires: graphviz
BuildRequires: gsl-devel
BuildRequires: gstlal-devel >= @MIN_GSTLAL_VERSION@
BuildRequires: python-devel >= @MIN_PYTHON_VERSION@
BuildRequires: fftw-devel >= 3
BuildRequires: %{gstreamername}-devel >= @MIN_GSTREAMER_VERSION@
BuildRequires: %{gstreamername}-plugins-base-devel >= @MIN_GSTREAMER_VERSION@
BuildRequires: lal-devel >= @MIN_LAL_VERSION@
BuildRequires: lal-python >= @MIN_LAL_VERSION@
BuildRequires: lalburst-devel >= @MIN_LALBURST_VERSION@
BuildRequires: lalmetaio-devel >= @MIN_LALMETAIO_VERSION@
BuildRequires: gsl-devel
BuildRequires: graphviz
BuildRequires: orc >= @MIN_ORC_VERSION@
BuildRequires: pkgconfig >= 0.18.0
BuildRequires: python-devel >= @MIN_PYTHON_VERSION@
BuildRequires: python2-lal >= @MIN_LAL_VERSION@
Conflicts: gstlal-ugly < 0.6.0