...
 
Commits (54)
......@@ -3,6 +3,7 @@ dist_bin_SCRIPTS = \
gstlal_excesspower \
gstlal_excesspower_trigvis \
gstlal_feature_aggregator \
gstlal_feature_combiner \
gstlal_feature_extractor \
gstlal_feature_extractor_pipe \
gstlal_ll_feature_extractor_pipe \
......
......@@ -64,6 +64,8 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar = "string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default: max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -112,19 +114,25 @@ class StreamAggregator(object):
### set up aggregator
logger.info("setting up aggregator with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = io.hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
)
### define measurements to be stored from aggregators
self.agg_sink.register_schema('latency', columns='data', column_key='data', tags='job', tag_key='job')
self.agg_sink.register_schema('snr', columns='data', column_key='data', tags=('channel', 'subsystem'), tag_key='channel')
def fetch_data(self, job_consumer):
"""
requests for a new message from an individual topic,
......@@ -163,11 +171,11 @@ class StreamAggregator(object):
### store and aggregate metrics
metric_data = {job: {'time': metrics['time'], 'fields': {'data': metrics['latency']}} for job, metrics in all_metrics.items()}
self.agg_sink.store_columns('latency', metric_data, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('latency', metric_data, aggregate=self.data_type)
### store and aggregate features
timeseries_data = {channel: {'time': timeseries['trigger_time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, 'data', tags='channel', aggregate=self.data_type)
timeseries_data = {(channel, self._channel_to_subsystem(channel)): {'time': timeseries['time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, aggregate=self.data_type)
try:
max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
......@@ -218,6 +226,13 @@ class StreamAggregator(object):
logger.info('shutting down feature aggregator...')
self.is_running = False
@staticmethod
def _channel_to_subsystem(channel):
"""
given a channel, returns the subsystem the channel lives in
"""
return channel.split(':')[1].split('-')[0]
class SignalHandler(object):
"""
helper class to shut down the stream aggregator gracefully before exiting
......@@ -242,7 +257,7 @@ if __name__ == '__main__':
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'feature_aggregator']),
'-'.join([options.tag, 'feature_aggregator', options.jobs[0]]),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
......
#!/usr/bin/env python
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_combiner [--options]"
__description__ = "an executable to combine features from the batch pipeline to provide a more user-friendly output"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
# =============================
#
# preamble
#
# =============================
from collections import defaultdict
import itertools
import optparse
import os
import sys
import shutil
import h5py
import numpy
from ligo.segments import infinity, segment, segmentlist
from gstlal import aggregator
from gstlal.fxtools import utils
# =============================================================================
#
# FUNCTIONS
#
# =============================================================================
def parse_command_line():
"""
Parse command line inputs.
"""
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Combiner Options", "General settings for configuring the file combiner.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--start-time", type = "int", help = "Set the start time to combine features.")
group.add_option("--end-time", type = "int", help = "Set the end time to combine features.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
group.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_FEATURES")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--outdir", metavar = "path", help = "If set, chooses an alternate directory to save the features to. Default = --rootdir")
parser.add_option_group(group)
opts, args = parser.parse_args()
return opts, args
# ===================
#
# main
#
# ===================
if __name__ == "__main__":
options, args = parse_command_line()
### set up logging
logger = utils.get_logger(
'-'.join([options.tag, 'combiner']),
log_level=options.log_level,
rootdir=options.rootdir,
verbose=options.verbose
)
### define gps bounds to grab features
start_time = options.start_time if options.start_time else -infinity()
end_time = options.end_time if options.end_time else infinity()
file_segs = segmentlist([segment(start_time, end_time)])
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
### build cache of hdf5-formatted features, grouped by segment
pattern = '{ifo}-{basename}/{ifo}-{basename}-*/{ifo}-{basename}-*/{ifo}-{basename}-*.h5'.format(
basename=options.basename,
ifo=options.instrument[0],
)
cache = sorted(utils.path2cache(options.rootdir, pattern), key=lambda x: x.segment)
### filter cache with segments
cache = [entry for entry in cache if file_segs.intersects_segment(entry.segment)]
### group by segment
grouped_cache = [(seg, list(group)) for seg, group in itertools.groupby(cache, key=lambda x: x.segment)]
### combine features in each stride
for seg, cache in grouped_cache:
logger.info('combining features within times: {} - {}'.format(*seg))
features = defaultdict(dict)
### assume filenames, metadata is the same in each group
dirname = os.path.split(os.path.dirname(cache[0].path))[0]
filename = os.path.splitext(os.path.basename(cache[0].path))[0]
metadata = {}
with h5py.File(cache[0].path, 'r') as f:
metadata['waveform'] = f.attrs.get('waveform')
metadata['sample_rate'] = f.attrs.get('sample_rate')
### load features
for entry in cache:
with h5py.File(entry.path, 'r') as f:
channels = f.keys()
for channel in channels:
dsets = f[channel].keys()
for dset in dsets:
features[channel][dset] = numpy.array(f[channel][dset])
### save combined features to disk
for channel in features.keys():
for dset in features[channel].keys():
utils.create_new_dataset(tmp_dir, filename, features[channel][dset], name=dset, group=channel, tmp=True, metadata=metadata)
### determine final location for features
if options.outdir:
start_time = int(filename.split('-')[2])
basename = '-'.join([options.instrument[0], options.basename])
base_path = utils.to_trigger_path(options.outdir, basename, start_time)
aggregator.makedir(base_path)
final_path = os.path.join(base_path, filename)+".h5"
else:
final_path = os.path.join(dirname, filename)+".h5"
tmp_path = os.path.join(tmp_dir, filename)+".h5.tmp"
logger.info('saving features to: {}'.format(final_path))
shutil.move(tmp_path, final_path)
......@@ -221,6 +221,7 @@ def parse_command_line():
# check if input sample rate is sensible
assert options.sample_rate == 1 or options.sample_rate % 2 == 0
assert options.min_downsample_rate % 2 == 0
# check if persist and save cadence times are sensible
assert options.persist_cadence >= options.cadence
......@@ -418,24 +419,25 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
head[channel] = pipeparts.mklatency(pipeline, head[channel], name=utils.latency_name('beforewhitening', 2, channel))
# whiten auxiliary channel data
for rate, thishead in multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32, nxydump_segment=options.nxydump_segment).items():
for rate, thishead in multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32, nxydump_segment=options.nxydump_segment, psd_fft_length=options.psd_fft_length, min_rate=options.min_downsample_rate).items():
thisrate = max(options.min_downsample_rate, rate)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterwhitening', 3, channel, rate))
# determine whether to do time-domain or frequency-domain convolution
time_domain = (waveforms[channel].sample_pts(rate)*rate) < (5*waveforms[channel].sample_pts(rate)*numpy.log2(rate))
time_domain = (waveforms[channel].sample_pts(thisrate)*thisrate) < (5*waveforms[channel].sample_pts(thisrate)*numpy.log2(thisrate))
# create FIR bank of half sine-gaussian templates
fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate)])
fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate, sampling_rate=thisrate)])
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = waveforms[channel].latency(rate))
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(thisrate), latency = waveforms[channel].latency(thisrate))
# add queues, change stream format, add tags
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterFIRbank', 4, channel, rate))
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
thishead = pipeparts.mktogglecomplex(pipeline, thishead)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % rate)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % thisrate)
thishead = pipeparts.mktaginject(pipeline, thishead, "instrument=%s,channel-name=%s" %( instrument, channel))
# dump segments to disk if specified
......@@ -445,9 +447,9 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# extract features from time series
if options.feature_mode == 'timeseries':
thishead = pipeparts.mktrigger(pipeline, tee, int(rate // options.sample_rate), max_snr = True)
thishead = pipeparts.mktrigger(pipeline, tee, int(thisrate // options.sample_rate), max_snr = True)
elif options.feature_mode == 'etg':
thishead = pipeparts.mktrigger(pipeline, tee, rate, snr_thresh = options.snr_threshold)
thishead = pipeparts.mktrigger(pipeline, tee, thisrate, snr_thresh = options.snr_threshold)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('aftertrigger', 5, channel, rate))
......
......@@ -42,6 +42,7 @@ from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils
PSD_DROP_FACTOR = 16
# =============================
#
......@@ -49,18 +50,25 @@ from gstlal.fxtools import utils
#
# =============================
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
def seglist_range(start, stop, stride):
b = start
while b <= stop:
seg = segments.segment(int(b), min(utils.floor_div(int(b) + stride, stride), stop))
b = utils.floor_div(int(b) + stride, stride)
yield seg
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, psd_drop_time, max_template_length = 30):
"""
get a dictionary of all the analysis segments
"""
segsdict = segments.segmentlistdict()
# start pad to allow whitener to settle + the maximum template_length
start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length
start_pad = psd_drop_time + max_template_length
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
segsdict[ifo] = dagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]:
del segsdict[ifo]
......@@ -102,7 +110,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdic
# define analysis times
gps_start_time = int(seg[0])
feature_start_time = gps_start_time + multirate_datasource.PSD_DROP_TIME + max_template_length
feature_start_time = gps_start_time + options.psd_drop_time + max_template_length
feature_end_time = min(int(seg[1]), options.gps_end_time)
feature_extractor_nodes[(ii, seg)] = \
......@@ -168,9 +176,12 @@ def parse_command_line():
# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
# calculate psd drop time based on fft length
options.psd_drop_time = options.psd_fft_length * PSD_DROP_FACTOR
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME)
min_segment_length = int(4 * options.psd_drop_time)
assert options.segment_length >= min_segment_length
return options, filenames
......@@ -208,10 +219,14 @@ aggregator.makedir("logs")
dag = dagparts.DAG("feature_extractor_pipe")
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_options = {"request_memory": options.request_memory, "request_cpus": options.request_cpu, "want_graceful_removal": "True", "kill_sig": "15"}
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, options.psd_drop_time, max_template_length=max_template_length)
combiner_condor_options = {"request_memory": "4GB", "request_cpus": 1, "want_graceful_removal": "True", "kill_sig": "15"}
combiner_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, combiner_condor_options)
feature_combiner_job = dagparts.DAGJob("gstlal_feature_combiner", condor_commands = combiner_condor_commands)
#
# set up jobs
......@@ -219,6 +234,20 @@ segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_i
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length)
feature_combiner_options = {
"verbose": options.verbose,
"rootdir": os.path.join(options.out_path, "gstlal_feature_extractor"),
"basename": options.description,
"instrument": ifo,
"tag": "offline",
}
for seg in seglist_range(data_source_info.seg[0], data_source_info.seg[1], 50000):
parent_nodes = [node for (i, job_seg), node in feature_extractor_nodes.items() if seg.intersects(job_seg)]
these_options = dict(feature_combiner_options)
these_options.update({"start-time": seg[0], "end-time": seg[1]})
feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = parent_nodes, opts = these_options)
#
# write out dag and sub files
#
......
......@@ -91,6 +91,7 @@ class HDF5StreamSink(object):
"""
def __init__(self, logger, options):
logger.info('setting up hdf5 stream sink...')
self.tag = options.tag
### initialize timing options
self.request_timeout = options.request_timeout
......@@ -98,8 +99,10 @@ class HDF5StreamSink(object):
self.is_running = False
### kafka settings
self.kafka_settings = {'bootstrap.servers': options.kafka_server,
'group.id': 'group_1'}
self.kafka_settings = {
'bootstrap.servers': options.kafka_server,
'group.id': 'hdf5_sink_{}'.format(self.tag)
}
### initialize consumers
self.consumer = Consumer(self.kafka_settings)
......@@ -125,8 +128,14 @@ class HDF5StreamSink(object):
self.persist_cadence = options.persist_cadence
self.waveform = options.waveform
self.basename = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate, waveform = self.waveform)
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
self.feature_data = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.write_cadence,
sample_rate = self.sample_rate,
waveform = self.waveform
)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
......@@ -188,13 +197,14 @@ class HDF5StreamSink(object):
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
logger.info("saving features to disk at timestamp = %f" % self.timestamp)
self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True)
save_time = utils.floor_div(self.last_save_time, self.write_cadence)
self.feature_data.dump(self.tmp_path, self.feature_name, save_time, tmp = True)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
......
......@@ -65,6 +65,8 @@ def parse_command_line():
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar="string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default = max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
......@@ -117,14 +119,16 @@ class StreamMonitor(object):
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.InfluxDBAggregator(
self.agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.HDF5Aggregator(
self.agg_sink = io.hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
......@@ -134,6 +138,9 @@ class StreamMonitor(object):
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
### define measurements to be stored
for metric in ('target_snr', 'synchronizer_latency', 'percent_missed'):
self.agg_sink.register_schema(metric, columns='data', column_key='data', tags='job', tag_key='job')
def fetch_data(self):
"""
......@@ -161,6 +168,9 @@ class StreamMonitor(object):
if self.timestamp:
if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
### check for missing channels
missing_channels = set()
metrics = defaultdict(list)
while len(self.feature_queue) > 0:
### remove data with oldest timestamp and process
......@@ -170,9 +180,6 @@ class StreamMonitor(object):
### check for missing channels
these_channels = set(features.keys())
missing_channels = self.channels - these_channels
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
### generate metrics
metrics['time'].append(timestamp)
metrics['synchronizer_latency'].append(latency)
......@@ -185,13 +192,16 @@ class StreamMonitor(object):
### store and aggregate features
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns(metric, {'synchronizer': data}, aggregate=self.data_type)
if len(metrics['target_time']) > 0:
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, 'data', tags='job', aggregate=self.data_type)
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, aggregate=self.data_type)
self.last_save = timestamp
logger.info('processed features up to timestamp %.3f, max latency = %.3f s, percent missing channels = %.3f' % (timestamp, max(metrics['synchronizer_latency']), max(metrics['percent_missed'])))
if missing_channels:
logger.info('channels missing @ timestamp=%.3f: %s' % (timestamp, repr(list(missing_channels))))
def start(self):
"""
......@@ -212,7 +222,6 @@ class StreamMonitor(object):
shut down gracefully
"""
logger.info('shutting down feature monitor...')
self.conn.close()
class SignalHandler(object):
"""
......
......@@ -87,7 +87,7 @@ def generate_options(options):
raise NotImplementedError, 'not an available option for online jobs at this time'
# program behavior options
program_options = {}
program_options = {"psd-fft-length": options.psd_fft_length}
if options.disable_web_service:
program_options.update({"disable-web-service": options.disable_web_service})
if options.verbose:
......@@ -145,83 +145,6 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op
return feature_extractor_nodes, num_channels
# =============================
#
# classes
#
# =============================
class zookeeper_job(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
f = open("zookeeper.properties", "w")
f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
""" % (datadir, port, maxclients))
f.close()
class kafka_job(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
f = open("kafka.properties", "w")
f.write("""
broker.id=0
listeners = PLAINTEXT://%s
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
""" % (host, logdir, zookeeperaddr))
f.close()
# =============================
#
# command line parser
......@@ -239,7 +162,6 @@ def parse_command_line():
group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla")
parser.add_option("--disable-kafka-jobs", action = "store_true", help = "If set, do not launch instances of kafka and zookeeper alongside feature extraction jobs.")
parser.add_option("--disable-agg-jobs", action = "store_true", help = "If set, do not launch aggregation jobs to process and aggregate incoming features.")
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB")
......@@ -291,15 +213,14 @@ channels = data_source_info.channel_dict.keys()
# create directories if needed
#
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")
for dir_ in ('features', 'synchronizer', 'monitor', 'aggregator', 'logs'):
aggregator.makedir(dir_)
#
# set up dag and job classes
#
dag = dagparts.DAG("feature_extractor_pipe")
dag = dagparts.DAG("%s_feature_extraction_pipe" % ifo)
# feature extractor job
if options.condor_universe == 'local':
......@@ -325,13 +246,6 @@ if options.save_format == 'kafka':
if not options.disable_agg_jobs:
aggregator_job = dagparts.DAGJob("gstlal_feature_aggregator", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
# kafka/zookeeper jobs
if not options.disable_kafka_jobs:
local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
local_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, local_condor_options)
zoo_job = zookeeper_job(condor_commands = local_condor_commands)
kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server)
#
# set up options for auxiliary jobs
#
......@@ -421,7 +335,7 @@ if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "synchronizer")})
hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "features")})
monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "monitor")})
### aggregator jobs
if not options.disable_agg_jobs:
......@@ -432,11 +346,6 @@ if options.save_format == 'kafka':
job_channel_options.update(aggregator_options)
agg_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
### kafka jobs
if not options.disable_kafka_jobs:
zoo_node = dagparts.DAGNode(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})
#
# write out dag and sub files
#
......
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-burst],[0.1.0],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_INIT([gstlal-burst],[0.1.1],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......
gstlal-burst (0.1.1) unstable; urgency=low
* Updated gstlal_feature_aggregator, gstlal_feature_monitor to deal with
ligo-scald API change
-- Patrick Godwin <patrick.godwin@ligo.org> Sun, 03 Mar 2019 21:27:15 -0500
gstlal-burst (0.1.0) unstable; urgency=low
* Add feature extraction toolkit
......
......@@ -35,9 +35,6 @@ import StringIO
import threading
import shutil
import h5py
import numpy
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
......@@ -113,7 +110,7 @@ class MultiChannelHandler(simplehandler.Handler):
self.persist_cadence = options.persist_cadence
self.feature_start_time = options.feature_start_time
self.feature_end_time = options.feature_end_time
self.columns = ['trigger_time', 'frequency', 'q', 'snr', 'phase']
self.columns = ['timestamp', 'time', 'snr', 'phase', 'frequency', 'q', 'duration']
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
......@@ -129,22 +126,39 @@ class MultiChannelHandler(simplehandler.Handler):
# set queue buffer size based on file format
if self.save_format == 'hdf5':
self.buffer_size = 1 ### 1 second buffers for file-based formats
self.buffer_size = 1. ### 1 second buffers for file-based formats
else:
self.buffer_size = 1. / self.sample_rate
# set up queue to cache features depending on pipeline mode
self.feature_mode = options.feature_mode
if self.feature_mode == 'timeseries':
self.feature_queue = utils.TimeseriesFeatureQueue(self.keys, self.columns, sample_rate = self.sample_rate, buffer_size = self.buffer_size)
self.feature_queue = utils.TimeseriesFeatureQueue(
self.keys,
self.columns,
sample_rate = self.sample_rate,
buffer_size = self.buffer_size
)
elif self.feature_mode == 'etg':
self.feature_queue = utils.ETGFeatureQueue(self.keys, self.columns)
# set up structure to store feature data
if self.save_format == 'hdf5':
if self.feature_mode == 'timeseries':
self.fdata = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.cadence, sample_rate = self.sample_rate, waveform = self.waveform_type)
self.fdata = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.cadence,
sample_rate = self.sample_rate,
waveform = self.waveform_type
)
elif self.feature_mode == 'etg':
self.fdata = utils.HDF5ETGFeatureData(self.columns, keys = self.keys, cadence = self.cadence, waveform = self.waveform_type)
self.fdata = utils.HDF5ETGFeatureData(
self.columns,
keys = self.keys,
cadence = self.cadence,
waveform = self.waveform_type
)
else:
raise KeyError, 'not a valid feature mode option'
......@@ -216,13 +230,13 @@ class MultiChannelHandler(simplehandler.Handler):
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
if self.save_format =='hdf5':
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + 1
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.save_format == 'hdf5':
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.cadence) or (self.timestamp == self.feature_end_time):
self.logger.info("saving features to disk at timestamp = %d, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.logger.info("saving features to disk at timestamp = %d" % self.timestamp)
self.save_features()
self.last_save_time = self.timestamp
......@@ -237,12 +251,13 @@ class MultiChannelHandler(simplehandler.Handler):
# add features to respective format specified
if self.save_format == 'kafka':
if self.data_transfer == 'table':
self.logger.info("pushing features to disk at timestamp = %.3f, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset))
elif self.data_transfer == 'row':
for row in itertools.chain(*feature_subset['features'].values()):
if row:
self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(row))
self.logger.info("pushing features to disk at timestamp = %.3f, latency = %.3f" % (self.timestamp, utils.gps2latency(self.timestamp)))
self.producer.poll(0) ### flush out queue of sent packets
elif self.save_format == 'bottle':
self.feature_data.append(feature_subset)
......@@ -287,8 +302,17 @@ class MultiChannelHandler(simplehandler.Handler):
trigger_time = row.end_time + row.end_time_ns * 1e-9
# append row for data transfer/saving
feature_row = {
'timestamp': utils.floor_div(buftime, 1. / self.sample_rate),
'channel': channel,
'snr': row.snr,
'phase': row.phase,
'time': trigger_time,
'frequency': waveform['frequency'],
'q': waveform['q'],
'duration': waveform['duration'],
}
timestamp = utils.floor_div(buftime, self.buffer_size)
feature_row = {'channel':channel, 'snr':row.snr, 'trigger_time':trigger_time, 'frequency':waveform['frequency'], 'q':waveform['q'], 'phase':row.phase}
self.feature_queue.append(timestamp, channel, feature_row)
def save_features(self):
......@@ -506,6 +530,8 @@ def append_options(parser):
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Program Behavior")
group.add_option("--psd-fft-length", metavar = "seconds", default = 32, type = "int", help = "The length of the FFT used to used to whiten the data (default is 32 s).")
group.add_option("--min-downsample-rate", metavar = "Hz", default = 128, type = "int", help = "The minimum sampling rate in which to downsample streams. Default = 128 Hz.")
group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.")
group.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
......
......@@ -463,6 +463,11 @@ class DataSourceInfo(object):
## Data source, one of python.datasource.DataSourceInfo.data_sources
self.data_source = options.data_source
# FIXME: this is ugly, but we have to protect against busted shared memory partitions
if self.data_source == "lvshm":
import subprocess
subprocess.call(["smrepair", "--bufmode", "5", self.shm_part_dict[self.instrument]])
def append_options(parser):
"""!
Append generic data source options to an OptionParser object in order
......
......@@ -47,7 +47,6 @@ from gstlal import datasource
# FIXME: Find a better way than using global variables.
PSD_FFT_LENGTH = 32
PSD_DROP_TIME = 16 * PSD_FFT_LENGTH
NATIVE_RATE_CUTOFF = 128
#
......@@ -58,7 +57,7 @@ NATIVE_RATE_CUTOFF = 128
# =============================================================================
#
def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd = None, psd_fft_length = PSD_FFT_LENGTH, veto_segments = None, nxydump_segment = None, track_psd = True, block_duration = int(1 * Gst.SECOND), width = 64, channel_name = "hoft"):
def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd = None, psd_fft_length = PSD_FFT_LENGTH, veto_segments = None, nxydump_segment = None, track_psd = True, block_duration = 0.25 * Gst.SECOND, width = 64, channel_name = "hoft", min_rate = 128):
"""!
Build pipeline stage to whiten and downsample auxiliary channels.
......@@ -149,16 +148,16 @@ def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd
# construct whitener.
#
zero_pad = 0
zero_pad = psd_fft_length // 4
head = pipeparts.mktee(pipeline, head)
whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = 0, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lalwhiten" % (instrument, channel_name))
whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "%s_%s_lalwhiten" % (instrument, channel_name))
pipeparts.mkfakesink(pipeline, whiten)
#
# high pass filter
#
block_stride = block_duration * max_rate // Gst.SECOND
block_stride = int(block_duration * max_rate // Gst.SECOND)
if native_rate >= NATIVE_RATE_CUTOFF:
kernel = reference_psd.one_second_highpass_kernel(max_rate, cutoff = 12)
assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
......@@ -195,7 +194,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd
# Drop initial data to let the PSD settle
#
head = pipeparts.mkdrop(pipeline, head, drop_samples = PSD_DROP_TIME * max_rate)
head = pipeparts.mkdrop(pipeline, head, drop_samples = 16 * psd_fft_length * max_rate)
#
# enable/disable PSD tracking
......@@ -267,7 +266,7 @@ def mkwhitened_multirate_src(pipeline, src, rates, native_rate, instrument, psd
for rate in sorted(set(rates))[:-1]:
head[rate] = pipeparts.mkqueue(pipeline, tee, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 8)
head[rate] = pipeparts.mkaudioamplify(pipeline, head[rate], 1. / math.sqrt(pipeparts.audioresample_variance_gain(quality, max_rate, rate)))
head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkinterpolator(pipeline, head[rate]), caps = "audio/x-raw, rate=%d" % rate)
head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkinterpolator(pipeline, head[rate]), caps = "audio/x-raw, rate=%d" % max(min_rate, rate))
#
# return value is a dictionary of elements indexed by sample rate
......
......@@ -27,7 +27,9 @@
from collections import Counter, defaultdict, deque
import glob
import itertools
import logging
import operator
import os
import sys
import timeit
......@@ -36,6 +38,8 @@ import h5py
import numpy
from lal import gpstime
from lal.utils import CacheEntry
from gstlal import aggregator
......@@ -99,6 +103,14 @@ def create_new_dataset(path, base, data, name = 'data', group = None, tmp = Fals
return fname
def feature_dtype(columns):
"""
given a set of columns, returns back numpy dtypes associated with those
columns. All time-based columns are double-precision, others are stored
in single-precision.
"""
return [(column, numpy.float64) if 'time' in column else (column, numpy.float32) for column in columns]
#----------------------------------
### gps time utilities
......@@ -186,8 +198,6 @@ def latency_name(stage_name, stage_num, channel, rate=None):
#----------------------------------
### logging utilities
# FIXME: shamelessly copied from iDQ's logs module, until this dependency is added in to gstlal-iDQ proper.
def get_logger(logname, log_level=10, rootdir='.', verbose=False):
'''
standardize how we instantiate loggers
......@@ -204,19 +214,34 @@ def get_logger(logname, log_level=10, rootdir='.', verbose=False):
handlers.append( logging.StreamHandler() )
# add handlers to logger
formatter = gen_formatter()
formatter = logging.Formatter('%(asctime)s | %(name)s : %(levelname)s : %(message)s')
for handler in handlers:
handler.setFormatter( formatter )
logger.addHandler( handler )
return logger
def gen_formatter():
"""
standarizes formatting for loggers
returns an instance of logging.Formatter
"""
return logging.Formatter('%(asctime)s | %(name)s : %(levelname)s : %(message)s')
#----------------------------------
### cache utilities
def path2cache(rootdir, pathname):
"""
given a rootdir and a glob-compatible pathname that may contain shell-style wildcards,
will find all files that match and populate a Cache.
NOTE: this will only work with files that comply with the T050017 file convention.
"""
return [CacheEntry.from_T050017(file_) for file_ in glob.iglob(os.path.join(rootdir, pathname))]
#----------------------------------
### other utilities
def group_indices(indices):
"""
Given a list of indices, groups up indices into contiguous groups.
"""
for k, group in itertools.groupby(enumerate(indices), lambda (i,x):i-x):
yield map(operator.itemgetter(1), group)
####################
#
......@@ -256,7 +281,7 @@ class HDF5TimeseriesFeatureData(FeatureData):
self.sample_rate = kwargs['sample_rate']
self.waveform = kwargs['waveform']
self.metadata = dict(**kwargs)
self.dtype = [(column, '<f8') for column in self.columns]
self.dtype = feature_dtype(self.columns)
self.feature_data = {key: numpy.empty((self.cadence * self.sample_rate,), dtype = self.dtype) for key in keys}
self.last_save_time = 0
self.clear()
......@@ -265,9 +290,18 @@ class HDF5TimeseriesFeatureData(FeatureData):
"""
Saves the current cadence of features to disk and clear out data
"""
name = "%d_%d" % (start_time, self.cadence)
for key in self.keys:
create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp, metadata=self.metadata)
nonnan_indices = list(numpy.where(numpy.isfinite(self.feature_data[key]['time']))[0])
### split up and save datasets into contiguous segments
for idx_group in group_indices(nonnan_indices):
start_idx, end_idx = idx_group[0], idx_group[-1]
start = start_time + float(start_idx) / self.sample_rate
end = start_time + float(end_idx + 1) / self.sample_rate
name = "%.6f_%.6f" % (float(start), float(end - start))
create_new_dataset(path, base, self.feature_data[key][start_idx:end_idx], name=name, group=key, tmp=tmp, metadata=self.metadata)
### clear out current features
self.clear()
def append(self, timestamp, features):
......@@ -296,7 +330,7 @@ class HDF5ETGFeatureData(FeatureData):
self.cadence = kwargs['cadence']
self.waveform = kwargs['waveform']
self.metadata = dict(**kwargs)
self.dtype = [(column, '<f8') for column in self.columns]
self.dtype = feature_dtype(self.columns)
self.feature_data = {key: [] for key in keys}
self.clear()
......@@ -330,19 +364,19 @@ class TimeseriesFeatureQueue(object):
Example:
>>> # create the queue
>>> columns = ['trigger_time', 'snr']
>>> columns = ['time', 'snr']
>>> channels = ['channel1']
>>> queue = TimeseriesFeatureQueue(channels, columns, sample_rate=1, buffer_size=1)
>>> # add features
>>> queue.append(123450, 'channel1', {'trigger_time': 123450.3, 'snr': 3.0})
>>> queue.append(123451, 'channel1', {'trigger_time': 123451.7, 'snr': 6.5})
>>> queue.append(123452, 'channel1', {'trigger_time': 123452.4, 'snr': 5.2})
>>> queue.append(123450, 'channel1', {'time': 123450.3, 'snr': 3.0})
>>> queue.append(123451, 'channel1', {'time': 123451.7, 'snr': 6.5})
>>> queue.append(123452, 'channel1', {'time': 123452.4, 'snr': 5.2})
>>> # get oldest feature
>>> row = queue.pop()
>>> row['timestamp']
123450
>>> row['features']['channel1']
[{'snr': 3.0, 'trigger_time': 123450.3}]
[{'snr': 3.0, 'time': 123450.3}]
"""
def __init__(self, channels, columns, **kwargs):
......@@ -364,7 +398,7 @@ class TimeseriesFeatureQueue(object):
self.counter[timestamp] += 1
### store row, aggregating if necessary
idx = self._idx(row['trigger_time'])
idx = self._idx(row['time'])
if not self.in_queue[timestamp][channel][idx] or (row['snr'] > self.in_queue[timestamp][channel][idx]['snr']):
self.in_queue[timestamp][channel][idx] = row
......
......@@ -105,7 +105,7 @@ class TemplateGenerator(object):
"""
return NotImplementedError
def generate_templates(self, rate, quadrature = True):
def generate_templates(self, rate, quadrature = True, sampling_rate = None):
"""
Generate all templates corresponding to a parameter range for a given sampling rate.
If quadrature is set, yield two templates corresponding to in-phase and quadrature components of the waveform.
......@@ -170,17 +170,20 @@ class HalfSineGaussianGenerator(TemplateGenerator):
"""
return 0.5 * (q/(2.*numpy.pi*f)) * numpy.log(1./self.tolerance)
def generate_templates(self, rate, quadrature = True):
def generate_templates(self, rate, quadrature = True, sampling_rate = None):
"""
generate all half sine gaussian templates corresponding to a parameter range and template duration
for a given sampling rate.
"""
if not sampling_rate:
sampling_rate = rate
for template in self.parameter_grid[rate]:
if quadrature:
for phase in self.phases:
yield self.waveform(rate, phase, template['frequency'], template['q'])
yield self.waveform(sampling_rate, phase, template['frequency'], template['q'])
else:
yield self.waveform(rate, self.phases[0], template['frequency'], template['q'])
yield self.waveform(sampling_rate, self.phases[0], template['frequency'], template['q'])
def waveform(self, rate, phase, f, q):
"""
......
# configuration file for monitoring
dashboard:
title: Online Feature Extraction Status
image: null
tabs:
- name: H1
url: https://ldas-jobs.ligo.caltech.edu/~patrick.godwin/cgi-bin/H1_feature_extraction_monitor
- name: L1
url: https://ldas-jobs.ligo.caltech.edu/~patrick.godwin/cgi-bin/L1_feature_extraction_monitor
- name: Docs
url: https://docs.ligo.org/lscsoft/gstlal/gstlal-burst/gstlal-burst.html
- name: Git
url: https://git.ligo.org/lscsoft/gstlal
GPS: -1
Duration: 600
plots:
# target SNR timeseries
- title: h(t) SNR
type: TimeSeries
measurement: target_snr
column: data
aggregate: max
params:
- - job
- synchronizer
layout:
yaxis:
type: log
title:
text: SNR
margin:
l: 40
r: 20
t: 10
b: 20
value: checked
# synchronizer latency timeseries
- title: Latency
type: TimeSeries
measurement: synchronizer_latency
column: data
aggregate: max
params:
- - job
- synchronizer
layout:
yaxis:
title:
text: Latency [s]
margin:
l: 40
r: 20
t: 10
b: 20
value: checked
# percent missed timeseries
- title: Percent channels missed
type: TimeSeries
measurement: percent_missed
column: data
aggregate: max
params:
- - job
- synchronizer
layout:
yaxis:
title:
text: percent channels missed in buffer
margin:
l: 40
r: 20
t: 10
b: 20
value: checked
# latency heatmap (by job)
- title: Latency by Job
type: TimeHeatMap
measurement: latency
column: data
aggregate: max
tag: job
value:
# snr heatmap (by channel)
- title: SNR by Channel
type: TimeHeatMap
measurement: snr
column: data
aggregate: max
tag: channel
value:
nagios:
### check for missing jobs from synchronizer
missed_jobs:
lookback: 300
measurement: missed_packets
column: data
aggregate: max
alert_type: threshold
alert_settings:
tags:
- - job
- synchronizer
tag_type: job
threshold: 1
threshold_units: missed_packets
### check for synchronizer latency spikes
latency:
lookback: 300
measurement: synchronizer_latency
column: data
aggregate: max
alert_type: threshold
alert_settings:
tags:
- - job
- synchronizer
tag_type: job
threshold: 10
threshold_units: seconds
### check for jobs not reporting
job_heartbeat:
lookback: 30
measurement: latency
column: data
aggregate: max
alert_type: heartbeat
alert_settings:
tag_type: job
tag_format: 4digit
num_tags: 43
database:
backend: influxdb
name: H1_gstlal_features
hostname: 10.14.0.100
port: 8086
......@@ -4,9 +4,9 @@ dashboard:
image: null
tabs:
- name: H1
url: https://ldas-jobs.ligo-wa.caltech.edu/~patrick.godwin/cgi-bin/feature_extraction_monitor.cgi
url: https://ldas-jobs.ligo.caltech.edu/~patrick.godwin/cgi-bin/H1_feature_extraction_monitor
- name: L1
url: https://ldas-jobs.ligo-la.caltech.edu/~idq/cgi-bin/fx_monitor.cgi
url: https://ldas-jobs.ligo.caltech.edu/~patrick.godwin/cgi-bin/L1_feature_extraction_monitor
- name: Docs
url: https://docs.ligo.org/lscsoft/gstlal/gstlal-burst/gstlal-burst.html
- name: Git
......@@ -24,17 +24,36 @@ plots:
params:
- - job
- synchronizer
layout:
yaxis:
type: log
title:
text: SNR
margin:
l: 40
r: 20
t: 10
b: 20
value: checked
# synchronizer latency timeseries
- title: Latency
type: TimeSeries
measurement: latency
measurement: synchronizer_latency