Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • steffen.grunewald/gstlal
  • sumedha.biswas/gstlal
  • spiir-group/gstlal
  • madeline-wade/gstlal
  • hunter.schuler/gstlal
  • adam-mercer/gstlal
  • amit.reza/gstlal
  • alvin.li/gstlal
  • duncanmmacleod/gstlal
  • rebecca.ewing/gstlal
  • javed.sk/gstlal
  • leo.tsukada/gstlal
  • brian.bockelman/gstlal
  • ed-maros/gstlal
  • koh.ueno/gstlal
  • leo-singer/gstlal
  • lscsoft/gstlal
17 results
Show changes
Commits on Source (61)
Showing
with 22 additions and 2875 deletions
FROM containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 ARG lalsuite_version_tag
FROM containers.ligo.org/gstlal/gstlal-dev:$lalsuite_version_tag
# Labeling/packaging stuff: # Labeling/packaging stuff:
LABEL name="GstLAL Runtime Package, RL8" \ LABEL name="GstLAL Runtime Package, RL8" \
......
image: docker:latest image: docker:latest
variables: variables:
# Set the gstlal-dev version tag, which corresponds to a tagged
# version of lalsuite. A list of available tags is available in the
# gstlal-dev container registry:
#
# https://git.ligo.org/gstlal/gstlal-dev/container_registry/809
LALSUITE_VERSION_TAG: "lalsuite-v7.13-x86_64"
# Docker pipeline variables:
DOCKER_DRIVER: overlay DOCKER_DRIVER: overlay
DOCKER_BRANCH: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME DOCKER_BRANCH: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME
DOCKER_LATEST: $CI_REGISTRY_IMAGE:latest DOCKER_LATEST: $CI_REGISTRY_IMAGE:latest
...@@ -12,6 +21,7 @@ variables: ...@@ -12,6 +21,7 @@ variables:
# don't need git history # don't need git history
GIT_DEPTH: 1 GIT_DEPTH: 1
# GstLAL pipeline-specific runtime variables.
TMPDIR: /tmp TMPDIR: /tmp
GSTLAL_FIR_WHITEN: 0 GSTLAL_FIR_WHITEN: 0
...@@ -46,7 +56,7 @@ stages: ...@@ -46,7 +56,7 @@ stages:
.levelN:rpm: &levelN-rpm-package .levelN:rpm: &levelN-rpm-package
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
variables: variables:
GIT_STRATEGY: fetch GIT_STRATEGY: fetch
RPM_BUILD_CPUS: 4 RPM_BUILD_CPUS: 4
...@@ -115,7 +125,7 @@ level2:rpm:gstlal-burst: ...@@ -115,7 +125,7 @@ level2:rpm:gstlal-burst:
# Build the container: # Build the container:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY - docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker build --pull -t $DOCKER_BRANCH --file .gitlab-ci.Dockerfile.rl8 . - docker build --pull -t $DOCKER_BRANCH --build-arg lalsuite_version_tag="${LALSUITE_VERSION_TAG}" --file .gitlab-ci.Dockerfile.rl8 .
- docker push $DOCKER_BRANCH - docker push $DOCKER_BRANCH
only: only:
- schedules - schedules
...@@ -183,7 +193,7 @@ latest_image: ...@@ -183,7 +193,7 @@ latest_image:
# test stages (rl8) # test stages (rl8)
test:gstlal:rl8: test:gstlal:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-gstlal stage: test-gstlal
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
...@@ -205,7 +215,7 @@ test:gstlal:rl8: ...@@ -205,7 +215,7 @@ test:gstlal:rl8:
test:gstlal-full-build:rl8: test:gstlal-full-build:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-gstlal-full-build stage: test-gstlal-full-build
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
...@@ -226,7 +236,7 @@ test:gstlal-full-build:rl8: ...@@ -226,7 +236,7 @@ test:gstlal-full-build:rl8:
test:gstlal-inspiral:rl8: test:gstlal-inspiral:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-inspiral stage: test-inspiral
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
...@@ -246,7 +256,7 @@ test:gstlal-inspiral:rl8: ...@@ -246,7 +256,7 @@ test:gstlal-inspiral:rl8:
test:gstlal-ugly:rl8: test:gstlal-ugly:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-gstlal-ugly stage: test-gstlal-ugly
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
...@@ -266,7 +276,7 @@ test:gstlal-ugly:rl8: ...@@ -266,7 +276,7 @@ test:gstlal-ugly:rl8:
test:gstlal-burst:rl8: test:gstlal-burst:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-burst stage: test-burst
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
...@@ -285,7 +295,7 @@ test:gstlal-burst:rl8: ...@@ -285,7 +295,7 @@ test:gstlal-burst:rl8:
test:offline:rl8: test:offline:rl8:
interruptible: true interruptible: true
image: containers.ligo.org/gstlal/gstlal-dev:lalsuite-master-x86_64 image: containers.ligo.org/gstlal/gstlal-dev:${LALSUITE_VERSION_TAG}
stage: test-offline stage: test-offline
needs: needs:
- level0:rpm:gstlal - level0:rpm:gstlal
......
...@@ -7,17 +7,4 @@ dist_bin_SCRIPTS = \ ...@@ -7,17 +7,4 @@ dist_bin_SCRIPTS = \
gstlal_cherenkov_plot_summary \ gstlal_cherenkov_plot_summary \
gstlal_cherenkov_zl_rank_pdfs \ gstlal_cherenkov_zl_rank_pdfs \
gstlal_cs_triggergen \ gstlal_cs_triggergen \
gstlal_impulse_inj \ gstlal_impulse_inj
gstlal_snax_aggregate \
gstlal_snax_bank_overlap \
gstlal_snax_combine \
gstlal_snax_dag_offline \
gstlal_snax_dag_online \
gstlal_snax_extract \
gstlal_snax_generate \
gstlal_snax_monitor \
gstlal_snax_sink \
gstlal_snax_synchronize \
gstlal_snax_whiten \
gstlal_snax_workflow \
gstlal_snax_workflow_online
#!/usr/bin/env python3
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_aggregate [--options]"
__description__ = "an executable to aggregate and generate job metrics for streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import defaultdict, deque
import json
import logging
import optparse
import numpy
from ligo.scald import utils
from ligo.scald.io import influx
from gstlal import events
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Aggregator Options", "General settings for configuring the online aggregator.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the aggregator acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
group.add_option("--jobs", action="append", help="Specify jobs to process. Can be given multiple times.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar = "string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default: max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamAggregator(events.EventProcessor):
"""
Ingests and aggregates incoming streaming features, collects job metrics.
"""
_name = 'aggregator'
def __init__(self, options):
logging.info('setting up feature aggregator...')
self.jobs = options.jobs
input_topics = ['%s_%s'%(options.input_topic_basename, job) for job in self.jobs]
events.EventProcessor.__init__(
self,
process_cadence=options.processing_cadence,
request_timeout=options.request_timeout,
num_messages=len(self.jobs),
kafka_server=options.kafka_server,
input_topic=input_topics,
tag='aggregator_%s_%s'%(options.tag, self.jobs[0])
)
### other aggregator options
self.data_type = options.data_type
self.last_save = utils.gps_now()
self.sample_rate = options.sample_rate
### initialize 30 second queue for incoming buffers
self.feature_queue = {job: deque(maxlen = 30 * self.sample_rate) for job in self.jobs}
### set up aggregator
logging.info("setting up aggregator")
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
### define measurements to be stored from aggregators
self.agg_sink.register_schema(
'latency',
columns='data',
column_key='data',
tags='job',
tag_key='job'
)
self.agg_sink.register_schema(
'snr',
columns='data',
column_key='data',
tags=('channel', 'subsystem'),
tag_key='channel'
)
def ingest(self, message):
"""
parse a message containing feature data
"""
_, job = message.topic().rsplit('_', 1)
feature_subset = json.loads(message.value())
self.feature_queue[job].appendleft((
feature_subset['timestamp'],
feature_subset['features']
))
def handle(self):
"""
process and aggregate features from feature extraction jobs
"""
if utils.in_new_epoch(utils.gps_now(), self.last_save, 1):
self.last_save = utils.gps_now()
### format incoming packets into metrics and timeseries
feature_packets = [(job, self.feature_queue[job].pop()) for job in self.jobs for i in range(len(self.feature_queue[job]))]
all_timeseries, all_metrics = self.packets_to_timeseries(feature_packets)
### store and aggregate metrics
metric_data = {job: {'time': metrics['time'], 'fields': {'data': metrics['latency']}} for job, metrics in all_metrics.items()}
self.agg_sink.store_columns('latency', metric_data, aggregate=self.data_type)
### store and aggregate features
timeseries_data = {(channel, self._channel_to_subsystem(channel)): {'time': timeseries['time'], 'fields': {'data': timeseries['snr']}} for channel, timeseries in all_timeseries.items()}
self.agg_sink.store_columns('snr', timeseries_data, aggregate=self.data_type)
try:
max_latency = max(max(metrics['latency']) for metrics in all_metrics.values())
logging.info('processed features at time %d, highest latency is %.3f' % (self.last_save, max_latency))
except:
logging.info('no features to process at time %d' % self.last_save)
def packets_to_timeseries(self, packets):
"""
splits up a series of packets into ordered timeseries, keyed by channel
"""
metrics = defaultdict(lambda: {'time': [], 'latency': []})
### process each packet sequentially and split rows by channel
channel_rows = defaultdict(list)
for job, packet in packets:
timestamp, features = packet
metrics[job]['time'].append(timestamp)
metrics[job]['latency'].append(utils.gps_to_latency(timestamp))
for channel, row in features.items():
channel_rows[channel].extend(row)
### break up rows into timeseries
timeseries = {}
for channel, rows in channel_rows.items():
timeseries[channel] = {column: [row[column] for row in rows] for column in rows[0].keys()}
return timeseries, metrics
@staticmethod
def _channel_to_subsystem(channel):
"""
given a channel, returns the subsystem the channel lives in
"""
return channel.split(':')[1].split('-')[0]
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
options, args = parse_command_line()
### set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s | snax_aggregate : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# start up aggregator
aggregator = StreamAggregator(options)
aggregator.start()
#!/usr/bin/env python3
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program that measures template overlaps and how templates are spread out in the parameter space
for gstlal_snax_extract
"""
####################
#
# preamble
#
####################
import itertools
from optparse import OptionParser
import os
import random
import sys
from urllib.parse import urljoin
import numpy
import lal
from ligo.scald import report
from gstlal import aggregator
from gstlal.plots import util as plotutil
from gstlal.snax import utils
import matplotlib
matplotlib.use('Agg')
from mpl_toolkits.axes_grid import make_axes_locatable
from matplotlib.colorbar import Colorbar
from matplotlib import pyplot as plt
from matplotlib import ticker as tkr
import matplotlib.cm as cm
matplotlib.rcParams.update({
"font.size": 13.0,
"axes.titlesize": 13.0,
"axes.labelsize": 13.0,
"xtick.labelsize": 13.0,
"ytick.labelsize": 13.0,
"legend.fontsize": 13.0,
"figure.dpi": 300,
"savefig.dpi": 300,
"text.usetex": False,
"path.simplify": True
})
colors = ['#2c7fb8', '#e66101', '#5e3c99', '#d01c8b']
####################
#
# functions
#
####################
def plot_waveform(time, waveform, waveform_type='', waveform_params=None):
fig, axes = plt.subplots()
axes.plot(time, waveform, color = '#2c7fb8', alpha = 0.7, lw=2)
axes.set_title(r"%s waveform" % (waveform_type))
axes.set_ylabel(r"Amplitude [arb. unit]")
axes.set_xlabel(r"Time [seconds]")
axes.set_xlim(time[0], time[-1])
if waveform_params:
axes.text(0.96 * max(time), 0.98 * min(waveform), r"%s" % repr(waveform_params), size=10, ha='right')
return fig
def plot_waveforms(times, waveforms, waveform_type='', waveform_params=None):
"""
Plots multiple waveforms in one plot (up to 4 at one time)
"""
assert len(times) <= 4
# determine waveform limits
amp_min = min(numpy.min(waveform) for waveform in waveforms)
amp_max = max(numpy.max(waveform) for waveform in waveforms)
fig, axes = plt.subplots(len(times), sharex=True, sharey=True)
for ax, key, color in zip(axes, truedat.keys(), colors):
ax.plot(time, timeseries, color = color, alpha = 0.7, lw=2)
ax.set_ylim(amp_min, amp_max)
ax.set_xlim(time[0], time[-1])
ax.set_xlabel(r"Time [seconds]")
if waveform_params:
ax.text(0.98 * max(times), 0.97 * amp_min, r"%s" % repr(waveform_params), size=10, ha='right')
axes[0].set_title(r"Waveforms")
fig.text(0.04, 0.5, r"Amplitude [arb. unit]", ha='center', va='center', rotation='vertical')
fig.subplots_adjust(hspace=0)
plt.setp([a.get_xticklabels() for a in fig.axes[:-1]], visible=False)
return fig
def plot_template_bank(waveform_param1, waveform_param2, overlaps, waveform_type='', waveform_params=None):
fig, axes = plt.subplots()
axes.scatter(waveform_param1, waveform_param2, c = overlaps, cmap = cm.coolwarm, alpha = 0.8, lw=0)
norm = matplotlib.colors.Normalize(vmin=min(overlaps), vmax=numpy.max(overlaps), clip=True)
axes.set_title(r"Template Bank Placement for %s" % (waveform_type))
axes.set_xlabel(r"%s" % waveform_params[0])
axes.set_ylabel(r"%s" % waveform_params[1])
axes.set_xlim(min(waveform_param1) - 0.1 * min(waveform_param1), 1.1 * max(waveform_param1))
axes.set_ylim(min(waveform_param2) - 0.1 * min(waveform_param2), 1.1 * max(waveform_param2))
axes.loglog()
# set up colorbar
divider = make_axes_locatable(axes)
cax = divider.append_axes( "right", size="5%", pad=0.1)
cbl = matplotlib.colorbar.ColorbarBase(cax, cmap = cm.coolwarm, norm=norm, orientation="vertical")
cbl.set_label(r"Overlap")
plt.tight_layout()
return fig
def generate_html_file(plot_paths, output_dir, waveform_type=''):
if options.verbose:
print("Creating html report...", file=sys.stderr)
channels = set()
page = report.Report('SNAX Bank')
# header
summary = report.Tab('Summary')
summary += report.Header(f'Waveform Report for {waveform_type}')
# plots
for plot in sorted(plot_paths):
summary += report.Image(os.path.join(output_dir, plot))
page += summary
# generate page
page.save(output_dir)
if options.verbose:
print("done.", file=sys.stderr)
###############################
#
# command line parser
#
###############################
def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__)
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.05, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.05.")
parser.add_option("-q", "--qhigh", type = "float", default = 100, help = "Q high value for half sine-gaussian waveforms. Default = 100.")
parser.add_option("--output-dir", metavar = "filename", default = ".", help = "Set the location of the output (plots, etc).")
parser.add_option("--cluster", help = "Set the cluster that this script is being run on (for proper public_html linking)")
parser.add_option("--waveform", default = "sine_gaussian", help = "Set the type of waveform to plot. options=[sine_gaussian, half_sine_gaussian].")
# parse the arguments and sanity check
options, args = parser.parse_args()
return options, args
####################
#
# main
#
####################
if __name__ == '__main__':
options, args = parse_command_line()
# create directory if it doesn't exist
aggregator.makedir(options.output_dir)
# common parameters we will use throughout
max_samp_rate = 8192
min_samp_rate = 32
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
if options.verbose:
print("Creating templates...", file=sys.stderr)
# generate templates for each rate considered
rates = [min_samp_rate*2**i for i in range(n_rates)]
downsample_factor = 0.8
qhigh = options.qhigh
qlow = 3.3166
fhigh = max_samp_rate / 2.
flow = min_samp_rate / 4.
if options.waveform == 'sine_gaussian':
waveforms = utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch = options.mismatch, downsample_factor=downsample_factor)
elif options.waveform == 'half_sine_gaussian':
waveforms = utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch = options.mismatch, downsample_factor=downsample_factor)
else:
raise NotImplementedError
basis_params = waveforms.parameter_grid
templates = {rate: [waveform for waveform in waveforms.generate_templates(rate, quadrature=False)] for rate in rates}
# get all templates and params
all_templates = list(itertools.chain(*templates.values()))
all_params = list(itertools.chain(*basis_params.values()))
if options.verbose:
print("Creating template overlaps...", file=sys.stderr)
# zero pad templates to make them the same length
max_sample_pts = max(len(template) for template in all_templates)
all_templates = [numpy.pad(template, ((max_sample_pts - len(template)) // 2, (max_sample_pts - len(template)) // 2), 'constant') for template in all_templates]
# calculate overlap for each template and find maximum
overlaps = []
for this_template in all_templates:
overlaps.append(max([numpy.dot(this_template,template) for template in all_templates if not numpy.array_equal(template, this_template)]))
print("total number of templates: %d" % len(all_templates), file=sys.stderr)
print("min overlap specified: %f" % (1 - options.mismatch), file=sys.stderr)
print("max template overlap: %f" % max(overlaps), file=sys.stderr)
print("min template overlap: %f" % min(overlaps), file=sys.stderr)
# generate template plots
plot_paths = []
# cast params to a nicer format
# FIXME: should really be passing a dict of params instead
param_names = ['f', 'Q', 'duration']
waveform_type = options.waveform.replace('_', ' ').title()
# limit the number of waveforms plotted per frequency band
num_samples = 3
if options.verbose:
print("Creating waveform plots...", file=sys.stderr)
for rate in rates:
#for template_id, template in enumerate(random.sample(templates[rate], num_samples)):
for template_id in random.sample(numpy.arange(len(templates[rate])), num_samples):
waveform_params = ["%s: %.3f" % (name, param) for param, name in zip(basis_params[rate][template_id], param_names)]
template = templates[rate][template_id]
if options.verbose:
print("\tCreating waveform plot with parameters: %s" % repr(waveform_params), file=sys.stderr)
series_fig = plot_waveform(waveforms.times[rate], template, waveform_type, waveform_params)
fname = 'plot_%s_%s-timeseries.png' % (str(rate).zfill(4), str(template_id).zfill(4))
plot_paths.append(fname)
series_fig.savefig(os.path.join(options.output_dir, fname))
plt.close(fname)
# generate template overlap map
freqs = [param[0] for param in all_params]
Qs = [param[1] for param in all_params]
if options.verbose:
print("Creating template overlap plot...", file=sys.stderr)
overlap_fig = plot_template_bank(freqs, Qs, overlaps, waveform_type, param_names[:2])
fname = 'plot-template_overlap.png'
plot_paths.append(fname)
overlap_fig.savefig(os.path.join(options.output_dir, fname))
plt.close(fname)
# generate html page
generate_html_file(plot_paths, options.output_dir, waveform_type=waveform_type)
#!/usr/bin/env python3
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_combine [--options]"
__description__ = "an executable to combine features from the batch pipeline to provide a more user-friendly output"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
# =============================
#
# preamble
#
# =============================
from collections import defaultdict
import itertools
import optparse
import os
import sys
import shutil
import h5py
import numpy
from ligo.segments import infinity, segment, segmentlist
from gstlal import aggregator
from gstlal.datafind import DataCache
from gstlal.snax import utils
from gstlal.snax.feature_extractor import DataType
# =============================================================================
#
# FUNCTIONS
#
# =============================================================================
def parse_command_line():
"""
Parse command line inputs.
"""
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Combiner Options", "General settings for configuring the file combiner.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--start-time", type = "int", help = "Set the start time to combine features.")
group.add_option("--end-time", type = "int", help = "Set the end time to combine features.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--outdir", metavar = "path", help = "If set, chooses an alternate directory to save the features to. Default = --rootdir")
parser.add_option_group(group)
opts, args = parser.parse_args()
return opts, args
# ===================
#
# main
#
# ===================
if __name__ == "__main__":
options, args = parse_command_line()
### set up logging
logger = utils.get_logger('snax_combine', verbose=options.verbose)
### define gps bounds to grab features
start_time = options.start_time if options.start_time else -infinity()
end_time = options.end_time if options.end_time else infinity()
file_segs = segmentlist([segment(start_time, end_time)])
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
### build cache of hdf5-formatted features, grouped by segment
cache = DataCache.find(DataType.SNAX_FEATURES, root=options.rootdir, svd_bins="*", segments=file_segs)
### combine features in each stride
for seg, feature_files in cache.groupby("time").items():
logger.info(f'combining features within times: {seg[0]} - {seg[1]}')
features = defaultdict(dict)
### assume filenames, metadata is the same in each group
metadata = {}
with h5py.File(feature_files.cache[0].path, 'r') as f:
metadata['waveform'] = f.attrs.get('waveform')
metadata['sample_rate'] = f.attrs.get('sample_rate')
### load features
for entry in feature_files.cache:
with h5py.File(entry.path, 'r') as f:
channels = f.keys()
for channel in channels:
dsets = f[channel].keys()
for dset in dsets:
features[channel][dset] = numpy.array(f[channel][dset])
### save combined features to disk
filename = os.path.splitext(DataType.SNAX_FEATURES.filename(options.instrument, span=seg))[0]
for channel in features.keys():
for dset in features[channel].keys():
utils.create_new_dataset(tmp_dir, filename, features[channel][dset], name=dset, group=channel, tmp=True, metadata=metadata)
### determine final location for features
if options.outdir:
start_time = seg[0]
base_path = os.path.join(options.outdir, str(start_time)[:5])
aggregator.makedir(base_path)
final_path = os.path.join(base_path, filename)+".h5"
else:
final_path = os.path.join(dirname, filename)+".h5"
tmp_path = os.path.join(tmp_dir, filename)+".h5.tmp"
logger.info('saving features to: {}'.format(final_path))
shutil.move(tmp_path, final_path)
#!/usr/bin/env python3
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run offline gstlal_snax_extract batch jobs
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
# =============================
#
# preamble
#
# =============================
import os
import optparse
import lal
from ligo import segments
from gstlal import aggregator
from gstlal import dagparts
from gstlal.snax import feature_extractor
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
PSD_DROP_FACTOR = 16
# =============================
#
# functions
#
# =============================
def seglist_range(start, stop, stride):
b = start
while b <= stop:
seg = segments.segment(int(b), min(utils.floor_div(int(b) + stride, stride), stop))
b = utils.floor_div(int(b) + stride, stride)
yield seg
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, psd_drop_time, max_template_length = 30):
"""
get a dictionary of all the analysis segments
"""
segsdict = segments.segmentlistdict()
# start pad to allow whitener to settle + the maximum template_length
start_pad = psd_drop_time + max_template_length
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
segsdict[ifo] = dagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]:
del segsdict[ifo]
return segsdict
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
"""
get a dictionary of all the channels per gstlal_snax_extract job
"""
feature_extractor_nodes = {}
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
print("Creating feature extractor jobs for channel subset %d" % ii)
# parallelize jobs by segments
for seg in segsdict[ifo]:
# define analysis times
gps_start_time = int(seg[0])
feature_start_time = gps_start_time + options.psd_drop_time + max_template_length
feature_end_time = min(int(seg[1]), options.gps_end_time)
feature_seg = segments.segment(feature_start_time, feature_end_time)
# only produce jobs where the analysis runtime after applying segments is nonzero
if not data_source_info.frame_segments[ifo].intersects_segment(feature_seg):
if options.verbose:
print(" Skipping segment (%d, %d) for channel subset %d since there is no analyzable data here" % (int(feature_seg[0]), int(feature_seg[1]), ii))
continue
# set maximum number of jobs reading concurrently from the same frame file to prevent I/O locks
if ii // options.concurrency == 0:
dep_nodes = parent_nodes
else:
dep_nodes = [feature_extractor_nodes[(ii - options.concurrency, seg)]]
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
# FIXME: hacky way of getting options to get passed correctly for channels
channels[0] = channels[0].split('=')[1]
outpath = os.path.join(options.out_path, "gstlal_snax_extract")
feature_opts = {
"gps-start-time": gps_start_time,
"gps-end-time": feature_end_time,
"feature-start-time": feature_start_time,
"feature-end-time": feature_end_time,
"data-source": "frames",
"sample-rate": options.sample_rate,
"mismatch": options.mismatch,
"waveform": options.waveform,
"qhigh": options.qhigh,
"psd-fft-length": options.psd_fft_length,
"channel-name": ' '.join(channels),
"job-id": str(ii + 1).zfill(4),
"cadence": options.cadence,
"persist-cadence": options.persist_cadence,
"max-streams": options.max_serial_streams,
"frame-segments-name": options.frame_segments_name,
"save-format": options.save_format,
}
if options.verbose:
feature_opts["verbose"] = ""
if options.local_frame_caching:
feature_opts["local-frame-caching"] = ""
feature_extractor_nodes[(ii, seg)] = \
dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = dep_nodes,
opts = feature_opts,
input_files = {
"frame-cache": options.frame_cache,
"frame-segments-file": options.frame_segments_file
},
output_files = {"out-path": outpath}
)
if options.verbose:
print(" Creating node for channel subset %d, gps range %d - %d" % (ii, feature_start_time, feature_end_time))
return feature_extractor_nodes
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# DAG architecture options
parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks")
parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")
# Condor commands
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB")
parser.add_option("--request-disk", default = "30GB", metavar = "integer", help = "set the requested node local scratch space size needed, default = 50GB")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--singularity-image", metavar = "filename", help = "If set, uses the Singularity image provided as the build environment and sets Singularity-specific condor options.")
# Feature saving options
parser.add_option("--features-path", metavar = "path", help = "If set, chooses an alternate directory to save the features to. Default = --out-path")
options, filenames = parser.parse_args()
# set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel
options.max_streams = options.max_parallel_streams
# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
# calculate psd drop time based on fft length
options.psd_drop_time = options.psd_fft_length * PSD_DROP_FACTOR
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * options.psd_drop_time)
assert options.segment_length >= min_segment_length
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
# FIXME Work out better way to determine max template length
max_template_length = 30
#
# create directories if needed
#
listdir = os.path.join(options.out_path, "gstlal_snax_extract/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")
#
# set up dag and job classes
#
dag = dagparts.DAG("feature_extractor_pipe")
common_condor_options = {
"want_graceful_removal": "True",
"kill_sig": "15",
}
if options.singularity_image:
common_condor_options["+SingularityImage"] = '"{}"'.format(options.singularity_image)
extract_condor_options = {
"request_memory": options.request_memory,
"request_cpus": options.request_cpu,
"request_disk": options.request_disk,
}
extract_condor_options.update(common_condor_options)
extract_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, extract_condor_options)
feature_extractor_job = dagparts.DAGJob("gstlal_snax_extract", condor_commands = extract_condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, options.psd_drop_time, max_template_length=max_template_length)
combiner_condor_options = {
"request_memory": "4GB",
"request_cpus": 1,
}
combiner_condor_options.update(common_condor_options)
combiner_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, combiner_condor_options)
feature_combiner_job = dagparts.DAGJob("gstlal_snax_combine", condor_commands = combiner_condor_commands)
#
# set up jobs
#
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length)
feature_combiner_options = {
"rootdir": os.path.join(options.out_path, "gstlal_snax_extract"),
"basename": options.description,
"instrument": ifo,
"tag": "offline",
}
if options.features_path:
feature_combiner_options["outdir"] = options.features_path
if options.verbose:
feature_combiner_options["verbose"] = ""
for seg in seglist_range(data_source_info.seg[0], data_source_info.seg[1], 50000):
parent_nodes = [node for (i, job_seg), node in feature_extractor_nodes.items() if seg.intersects(job_seg)]
these_options = dict(feature_combiner_options)
these_options.update({"start-time": seg[0], "end-time": seg[1]})
feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = parent_nodes, opts = these_options)
#
# write out dag and sub files
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()
#!/usr/bin/env python3
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run a series of gstlal_snax_extract jobs online
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
# =============================
#
# preamble
#
# =============================
import optparse
import os
from gstlal import aggregator
from gstlal import dagparts
from gstlal.snax import feature_extractor
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
# =============================
#
# functions
#
# =============================
def generate_options(options):
"""
Generates a list of command line options to pass into DAG nodes.
"""
# data source options
if options.data_source == 'lvshm':
data_source_options = {
"data-source": options.data_source,
"shared-memory-partition": options.shared_memory_partition,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
elif options.data_source == 'framexmit':
data_source_options = {"data-source": options.data_source}
# waveform options
waveform_options = {
"waveform": options.waveform,
"mismatch": options.mismatch,
"qhigh": options.qhigh
}
# data transfer options
if options.save_format == 'kafka':
save_options = {
"save-format": options.save_format,
"sample-rate": options.sample_rate,
"kafka-partition": options.kafka_partition,
"kafka-topic": options.kafka_topic,
"kafka-server": options.kafka_server
}
elif options.save_format == 'hdf5':
save_options = {
"description": options.description,
"save-format": options.save_format,
"sample-rate": options.sample_rate,
"cadence": options.cadence,
"persist-cadence": options.persist_cadence
}
else:
raise NotImplementedError("not an available option for online jobs at this time")
# program behavior options
program_options = {"psd-fft-length": options.psd_fft_length}
if options.verbose:
program_options.update({"verbose": options.verbose})
# gobble options together
out_options = {}
out_options.update(data_source_options)
out_options.update(waveform_options)
out_options.update(save_options)
out_options.update(program_options)
return out_options
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info):
feature_extractor_nodes = {}
channel_list = []
# generate common command line options
command_line_options = generate_options(options)
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
if options.verbose:
print("Creating node for channel subset %d"%ii)
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
channels[0] = channels[0].split('=')[1] # this is done to peel off --channel-name option off first channel
channel_list.extend([(channel, int(data_source_info.channel_dict[channel]['fsamp'])) for channel in channel_subset])
# create specific options for each channel subset
subset_options = {
"max-streams": options.max_streams * 2, # FIXME: done to force all channels to be processed in parallel, but should be handled upstream more gracefully
"job-id": str(ii + 1).zfill(4),
"channel-name":' '.join(channels)
}
subset_options.update(command_line_options)
feature_extractor_nodes[ii] = \
dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = parent_nodes,
opts = subset_options,
output_files = {"out-path": os.path.join(options.out_path, "gstlal_snax_extract")}
)
num_channels = len(channel_list)
print("Writing channel list of all channels processed")
listpath = os.path.join(options.out_path, "full_channel_list.txt")
with open(listpath, 'w') as f:
for channel, rate in channel_list:
f.write('%s\t%d\n'%(channel, rate))
return feature_extractor_nodes, num_channels
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source and feature extraction options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# Condor commands
group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor")
group.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
group.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla")
group.add_option("--disable-agg-jobs", action = "store_true", help = "If set, do not launch aggregation jobs to process and aggregate incoming features.")
group.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2")
group.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB")
group.add_option("--auxiliary-request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for auxiliary processes, default = 2")
group.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
parser.add_option_group(group)
# Synchronizer/File Sink commands
group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
group.add_option("--features-path", metavar = "path", default = ".", help = "Write features to this path. Default = .")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option_group(group)
# Aggregation/Monitoring commands
group = optparse.OptionGroup(parser, "Aggregator Options", "Adjust parameters used for aggregation and monitoring of features.")
group.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
group.add_option("--num-agg-jobs", type = "int", default = 4, help = "Number of aggregator jobs to aggregate incoming features. Default = 4.")
group.add_option("--num-agg-processes-per-job", type = "int", default = 2, help = "Number of processes per aggregator job to aggregate incoming features. Used if --agg-data-backend = hdf5. Default = 2.")
group.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --agg-data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --agg-data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --agg-data-backend = influx.")
parser.add_option_group(group)
options, filenames = parser.parse_args()
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
#
# create directories if needed
#
for dir_ in ('features', 'synchronizer', 'monitor', 'aggregator', 'logs'):
aggregator.makedir(dir_)
#
# set up dag and job classes
#
dag = dagparts.DAG("%s_feature_extraction_pipe" % ifo)
# feature extractor job
if options.condor_universe == 'local':
condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = dagparts.DAGJob("gstlal_snax_extract", condor_commands = condor_commands, universe = options.condor_universe)
feature_extractor_nodes, num_channels = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
# auxiliary jobs
if options.save_format == 'kafka':
if options.condor_universe == 'local':
auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
auxiliary_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = dagparts.DAGJob("gstlal_snax_synchronize", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
hdf5_sink_job = dagparts.DAGJob("gstlal_snax_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
monitor_job = dagparts.DAGJob("gstlal_snax_monitor", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
# aggregator jobs
if not options.disable_agg_jobs:
aggregator_job = dagparts.DAGJob("gstlal_snax_aggregate", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
#
# set up options for auxiliary jobs
#
common_options = {
"verbose": options.verbose,
"tag": options.tag,
"processing-cadence": options.processing_cadence,
"request-timeout": options.request_timeout,
"kafka-server": options.kafka_server
}
synchronizer_options = {
"latency-timeout": options.latency_timeout,
"sample-rate": options.sample_rate,
"input-topic-basename": options.kafka_topic,
"output-topic-basename": '_'.join(['synchronizer', options.tag])
}
if options.no_drop:
synchronizer_options.update({"no-drop": options.no_drop})
monitor_options = {
"instrument": ifo,
"target-channel": options.target_channel,
"sample-rate": options.sample_rate,
"input-topic-basename": '_'.join(['synchronizer', options.tag]),
"num-channels": num_channels,
"data-backend": options.agg_data_backend,
"data-type": "max",
}
hdf5_sink_options = {
"instrument": ifo,
"channel-list": options.channel_list,
"features-path": options.features_path,
"basename": options.description,
"waveform": options.waveform,
"sample-rate": options.sample_rate,
"write-cadence": options.cadence,
"persist-cadence": options.persist_cadence,
"input-topic-basename": '_'.join(['synchronizer', options.tag])
}
extra_hdf5_channel_options = {
"section-include": options.section_include,
"safety-include": list(options.safety_include),
"fidelity-exclude": list(options.fidelity_exclude),
"safe-channel-include": options.safe_channel_include,
"unsafe-channel-include": options.unsafe_channel_include,
}
aggregator_options = {
"sample-rate": options.sample_rate,
"input-topic-basename": options.kafka_topic,
"data-backend": options.agg_data_backend,
"data-type": "max",
}
if options.agg_data_backend == 'influx':
backend_options = {
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
}
else:
backend_options = {"num-processes": options.num_agg_processes_per_job}
aggregator_options.update(backend_options)
monitor_options.update(backend_options)
### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
for option_name, option in extra_hdf5_channel_options.items():
if option:
hdf5_sink_options[option_name] = option
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
aggregator_options.update(common_options)
monitor_options.update(common_options)
monitor_options.update({"channel-list": os.path.join(options.out_path, "full_channel_list.txt")})
#
# set up jobs
#
if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "synchronizer")})
hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "features")})
monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "monitor")})
### aggregator jobs
if not options.disable_agg_jobs:
all_fx_jobs = [(str(ii).zfill(4), channel_subset) for ii, channel_subset in enumerate(data_source_info.channel_subsets)]
for job_subset in dagparts.groups(all_fx_jobs, options.num_agg_jobs):
jobs, channels = zip(*job_subset)
job_channel_options = {"jobs": jobs}
job_channel_options.update(aggregator_options)
agg_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
#
# write out dag and sub files
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()
#!/usr/bin/env python3
# Copyright (C) 2017-2018 Sydney J. Chamberlin, Patrick Godwin, Chad Hanna, Duncan Meacher
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program to extract features from auxiliary channel data in real time or in offline mode
"""
# =============================
#
# preamble
#
# =============================
import math
import optparse
import os
import resource
import socket
import sys
import tempfile
import h5py
import numpy
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from lal import LIGOTimeGPS
from lal.rate import IrregularBins
from ligo import segments
from ligo.segments import utils as segmentsUtils
from gstlal import aggregator
from gstlal import datasource
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal.snax import auxcache
from gstlal.snax import feature_extractor
from gstlal.snax import multichannel_datasource
from gstlal.snax import pipeparts as snaxparts
from gstlal.snax import utils
from gstlal.snax import waveforms as fxwaveforms
#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay
#
def setrlimit(res, lim):
hard_lim = resource.getrlimit(res)[1]
resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))
if sys.platform == "linux":
# set the number of processes and total set size up to hard limit and
# shrink the per-thread stack size (default is 10 MiB)
setrlimit(resource.RLIMIT_NPROC, None)
setrlimit(resource.RLIMIT_AS, None)
setrlimit(resource.RLIMIT_RSS, None)
# FIXME: tests at CIT show that this next tweak has no effect. it's
# possible that SL7 has lowered the default stack size from SL6 and we
# don't need to do this anymore. remove?
setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage='%prog [options]', description=__doc__)
# First append datasource and feature extraction common options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# parse the arguments
options, filenames = parser.parse_args()
# Sanity check the options
# set gps ranges for live and offline sources
if options.data_source in ("framexmit", "lvshm", "white_live"):
if options.data_source in ("framexmit", "lvshm"):
options.gps_start_time = int(aggregator.now())
else:
# NOTE: set start time for 'fake' live sources to zero,
# since seeking doesn't work with 'is_live' option
options.gps_start_time = 0
# set the gps end time to be "infinite"
options.gps_end_time = 2000000000
if options.feature_start_time is None:
options.feature_start_time = int(options.gps_start_time)
if options.feature_end_time is None:
options.feature_end_time = int(options.gps_end_time)
# check if input sample rate is sensible
assert options.sample_rate == 1 or options.sample_rate % 2 == 0
assert options.min_downsample_rate % 2 == 0
# check if persist and save cadence times are sensible
assert options.persist_cadence >= options.cadence
assert (options.persist_cadence % options.cadence) == 0
# check if there are any segments to dump to disk
if options.nxydump_segment:
options.nxydump_segment, = segmentsUtils.from_range_strings(
[options.nxydump_segment],
boundtype=LIGOTimeGPS
)
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up some core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
instrument = data_source_info.instrument
waveforms = {}
bins = {}
#
# set up logging
#
logger = utils.get_logger('snax_extract', verbose=options.verbose)
#
# set up local frame caching, if specified
#
if options.local_frame_caching:
# get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
# create local frame directory
local_path = os.path.join(tmp_dir, 'local_frames/')
aggregator.makedir(local_path)
# save local frame cache
logger.info("caching frame data locally to %s" % local_path)
f, fname = tempfile.mkstemp(".cache")
f = open(fname, "w")
data_source_info.local_cache_list = auxcache.cache_aux(
data_source_info,
logger,
output_path=local_path,
verbose=options.verbose
)
for cacheentry in data_source_info.local_cache_list:
# guarantee a lal cache compliant file with
# only integer starts and durations
cacheentry.segment = segments.segment(
int(cacheentry.segment[0]),
int(math.ceil(cacheentry.segment[1]))
)
print(str(cacheentry), file=f)
f.close()
data_source_info.frame_cache = fname
#
# process channel subsets in serial
#
for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
#
# building the event loop and pipeline
#
logger.info("assembling pipeline...")
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
# generate multiple channel sources, and link up pipeline
head = snaxparts.mkmultisrc(pipeline, data_source_info, channel_subset, verbose=options.verbose)
src = {}
for channel in channel_subset:
# define sampling rates used
samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
max_rate = min(data_source_info.max_sample_rate, samp_rate)
min_rate = min(data_source_info.min_sample_rate, max_rate)
n_rates = int(numpy.log2(max_rate / min_rate) + 1)
rates = [min_rate * 2**i for i in range(n_rates)]
# choose range of basis parameters
# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
downsample_factor = 0.8
qlow = 3.3166
if data_source_info.extension == 'ini':
flow = max(data_source_info.channel_dict[channel]['flow'], min_rate / 4.)
fhigh = min(data_source_info.channel_dict[channel]['fhigh'], max_rate / 2.)
qhigh = min(data_source_info.channel_dict[channel]['qhigh'], options.q_high)
else:
flow = min_rate / 4.
fhigh = max_rate / 2.
qhigh = options.q_high
# generate frequency bins
frequency_breakpoints = [0] + options.frequency_bin
if fhigh > frequency_breakpoints[-1]:
frequency_breakpoints.append(fhigh)
frequency_bins = IrregularBins(frequency_breakpoints)
bins[channel] = frequency_bins
# generate templates
if 'sine_gaussian' in options.waveform:
parameter_range = {'frequency': (flow, fhigh), 'q': (qlow, qhigh)}
if options.waveform == 'half_sine_gaussian':
waveforms[channel] = fxwaveforms.HalfSineGaussianGenerator(
parameter_range,
rates,
frequency_bins,
mismatch=options.mismatch,
downsample_factor=downsample_factor
)
elif options.waveform == 'sine_gaussian':
waveforms[channel] = fxwaveforms.SineGaussianGenerator(
parameter_range,
rates,
frequency_bins,
mismatch=options.mismatch,
downsample_factor=downsample_factor
)
elif options.waveform == 'tapered_sine_gaussian':
waveforms[channel] = fxwaveforms.TaperedSineGaussianGenerator(
parameter_range,
rates,
frequency_bins,
mismatch=options.mismatch,
downsample_factor=downsample_factor,
max_latency=options.max_latency
)
else:
raise NotImplementedError
if options.latency_output:
head[channel] = pipeparts.mklatency(
pipeline,
head[channel],
name=utils.latency_name('beforewhitening', 2, channel)
)
# whiten auxiliary channel data
head[channel] = snaxparts.mkcondition(
pipeline,
head[channel],
max(rates),
samp_rate,
instrument,
channel_name=channel,
width=32,
td_whiten=options.data_source in ("framexmit", "lvshm"),
nxydump_segment=options.nxydump_segment,
psd_fft_length=options.psd_fft_length,
)
# split whitened data into multiple frequency bands
multiband = snaxparts.mkmultiband(
pipeline,
head[channel],
rates,
samp_rate,
instrument,
min_rate=options.min_downsample_rate
)
for rate, band in multiband.items():
if options.latency_output:
thishead = pipeparts.mklatency(
pipeline,
band,
name=utils.latency_name('afterwhitening', 3, channel, rate)
)
# extract features
features = snaxparts.mkextract(
pipeline,
band,
instrument,
channel,
rate,
waveforms[channel],
frequency_bins,
snr_threshold=options.snr_threshold,
feature_sample_rate=options.sample_rate,
min_downsample_rate=options.min_downsample_rate,
nxydump_segment=options.nxydump_segment,
feature_mode=options.feature_mode,
latency_output=options.latency_output
)
for bin_idx, trg_head in features.items():
if options.latency_output:
thishead = pipeparts.mklatency(
pipeline,
trg_head,
name=utils.latency_name('aftertrigger', 5, '%s_%s'%(channel, bin_idx), rate)
)
# link to src for processing by appsync
src[(channel, rate, bin_idx)] = trg_head
# define structures to synchronize output streams and extract triggers from buffer
logger.info("setting up pipeline handler...")
handler = feature_extractor.MultiChannelHandler(
mainloop,
pipeline,
logger,
data_source_info,
options,
channels=channel_subset,
waveforms=waveforms,
bins=bins,
num_streams=len(src.keys()),
subset_id=subset_id
)
logger.info("attaching appsinks to pipeline...")
appsync = feature_extractor.LinkedAppSync(appsink_new_buffer=handler.bufhandler)
for channel, rate, bin_idx in src.keys():
appsync.add_sink(pipeline, src[(channel, rate, bin_idx)], name="sink_%s_%s_%s" % (rate, bin_idx, channel))
logger.info("attached %d appsinks to pipeline." % len(src.keys()))
# Allow Ctrl+C or sig term to gracefully shut down the program for online
# sources, otherwise it will just kill it
if data_source_info.data_source in data_source_info.live_sources: # what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
# Seek
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if data_source_info.data_source not in data_source_info.live_sources: # what about nds online?
datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)
#
# Run pipeline
#
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
logger.info("running pipeline...")
mainloop.run()
# save remaining triggers
logger.info("persisting features to disk...")
handler.flush_and_save_features()
#
# Shut down pipeline
#
logger.info("shutting down pipeline...")
#
# Set pipeline state to NULL and garbage collect the handler
#
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
del handler.pipeline
del handler
#
# Cleanup local frame file cache and related frames
#
if options.local_frame_caching:
logger.info("deleting temporary cache file and frames...")
# remove frame cache
os.remove(data_source_info.frame_cache)
# remove local frames
for cacheentry in data_source_info.local_cache_list:
os.remove(cacheentry.path)
del data_source_info.local_cache_list
#
# close program manually if data source is live
#
if options.data_source in data_source_info.live_sources:
sys.exit(0)
#!/usr/bin/env python3
# Copyright (C) 2020 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_generate [--options]"
__description__ = "an executable to generate synthetic low-latency features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import deque
import itertools
import json
import logging
import optparse
import os
import shutil
import h5py
import numpy
import scipy.stats
from gstlal import aggregator
from gstlal import events
from ligo.scald import utils
from gstlal.snax import multichannel_datasource
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Generator Options", "General settings for configuring the generator.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the generator processes data. Default = 0.1 seconds.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--output-topic", metavar = "string", help = "Sets the output kafka topic name. Required.")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Channel Options", "Settings used for deciding which auxiliary channels to generate.")
parser.add_option("--target-channel", metavar = "string", help = "Sets the target channel name. Required.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE")
group.add_option("--section-include", default=[], type="string", action="append", help="Set the channel sections to be included from the INI file. Can be given multiple times. Pass in spaces as underscores instead. If not specified, assumed to include all sections")
group.add_option("--safety-include", default=["safe"], type="string", action="append", help="Set the safety values for channels to be included from the INI file. Can be given multiple times. Default = 'safe'.")
group.add_option("--fidelity-exclude", default=[], type="string", action="append", help="Set the fidelity values for channels to be excluded from the INI file. Can supply multiple values by repeating this argument. Each must be on of (add here)")
group.add_option("--safe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file (requires exact match). Can be repeated. If not specified, assume to include all channels.")
group.add_option("--unsafe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file, disregarding safety information (requires exact match). Can be repeated.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class FeatureGenerator(events.EventProcessor):
"""
Handles the low-latency generation of synthetic features.
"""
_name = 'generator'
def __init__(self, options):
logging.info('setting up feature generator...')
events.EventProcessor.__init__(
self,
process_cadence=options.processing_cadence,
kafka_server=options.kafka_server,
tag=options.tag
)
### set up channels needed to do processing
if options.channel_list:
self.channels = multichannel_datasource.channel_dict_from_channel_ini(options)
elif options.channel_name:
self.channels = multichannel_datasource.channel_dict_from_channel_list(options.channel_name)
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
### iDQ saving properties
self.timestamp = None
self.last_write_time = None
self.sample_rate = options.sample_rate
self.write_cadence = 1. / options.sample_rate
self.output_topic = options.output_topic
### set up distributions for sampling
### FIXME: currently only treats a single distribution for each
### channels, would want to generalize
self.dists = {
'snr': {'type': scipy.stats.pareto, 'kwargs': {'b': 2, 'loc': 3}},
'frequency': {'type': scipy.stats.uniform, 'kwargs': {'loc': 32, 'scale': (2048 - 32)}},
'q': {'type': scipy.stats.uniform, 'kwargs': {'loc': 5, 'scale': (100 - 5)}},
'phase': {'type': scipy.stats.uniform, 'kwargs': {'loc': -numpy.pi, 'scale': (2 * numpy.pi)}},
}
def handle(self):
"""
determine if new features need to be generated and push to Kafka
"""
timestamp = utils.floor_div(utils.gps_now(), self.write_cadence)
if not self.last_write_time or utils.in_new_epoch(timestamp, self.last_write_time, self.write_cadence):
features = self.generate(timestamp)
self.push(timestamp, features)
self.last_write_time = timestamp
def generate(self, timestamp):
"""
generate synthetic features for a given timestamp
"""
times = scipy.stats.uniform.rvs(size=len(self.channels.keys()), loc=timestamp, scale=1./self.sample_rate)
features = {channel: [] for channel in self.channels.keys()}
for i, channel in enumerate(features.keys()):
row = {col: float(self.dists[col]['type'].rvs(size=1, **self.dists[col]['kwargs'])) for col in self.dists.keys()}
row['timestamp'] = timestamp
row['time'] = times[i]
features[channel].append(row)
return features
def push(self, timestamp, features):
"""
push features to Kafka
"""
logging.info(
'generating features with timestamp {:f}, '
'latency is {:.3f}'.format(timestamp, utils.gps_to_latency(timestamp))
)
feature_packet = {'timestamp': timestamp, 'features': features}
self.producer.produce(
timestamp=timestamp,
topic=self.output_topic,
value=json.dumps(feature_packet)
)
self.producer.poll(0)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
options, args = parse_command_line()
### set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s | snax_generate : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# start up hdf5 sink
generator = FeatureGenerator(options)
generator.start()
#!/usr/bin/env python3
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_monitor [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import defaultdict, deque
import json
import logging
import optparse
from ligo.scald import utils
from ligo.scald.io import influx
from gstlal import events
from gstlal.snax import multichannel_datasource
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "Monitor Options", "General settings for configuring the monitor.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--target-channel", metavar = "string", help = "Sets the target channel to view.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Location where log messages and sqlite database lives")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--num-channels", type = "int", help = "Set the full number of channels being processed upstream, used for monitoring purposes.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the monitor acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
group.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
group.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
group.add_option("--enable-auth", default=False, action="store_true", help = "If set, enables authentication for the influx aggregator.")
group.add_option("--enable-https", default=False, action="store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
group.add_option("--data-type", metavar="string", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Default = max")
group.add_option("--num-processes", type = "int", default = 2, help = "Number of processes to use concurrently, default 2.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamMonitor(events.EventProcessor):
"""
Listens to incoming streaming features, collects metrics and pushes relevant metrics to a data store.
"""
_name = 'feature_monitor'
def __init__(self, options):
logging.info('setting up feature monitor...')
events.EventProcessor.__init__(
self,
process_cadence=options.processing_cadence,
request_timeout=options.request_timeout,
num_messages=options.sample_rate,
kafka_server=options.kafka_server,
input_topic=options.input_topic_basename,
tag=options.tag
)
### initialize queues
self.sample_rate = options.sample_rate
self.feature_queue = deque(maxlen = 60 * self.sample_rate)
### other settings
if options.target_channel:
self.target_channel = options.target_channel
else:
self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument
self.num_channels = options.num_channels
self.data_type = options.data_type
### keep track of last timestamp processed and saved
self.last_save = None
self.timestamp = None
### set up aggregator
logging.info("setting up monitor")
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=False,
)
### determine channels to be processed
name, _ = options.channel_list.rsplit('.', 1)
self.channels = set(multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys())
### define measurements to be stored
for metric in ('target_snr', 'synchronizer_latency', 'percent_missed'):
self.agg_sink.register_schema(metric, columns='data', column_key='data', tags='job', tag_key='job')
def ingest(self, message):
"""
parse a message containing feature data
"""
features = json.loads(message.value())
self.feature_queue.appendleft((
features['timestamp'],
features['features']
))
self.timestamp = features['timestamp']
def handle(self):
"""
process features and generate metrics from synchronizer on a regular cadence
"""
if self.timestamp:
if not self.last_save or utils.in_new_epoch(self.timestamp, self.last_save, 1):
### check for missing channels
missing_channels = set()
metrics = defaultdict(list)
while len(self.feature_queue) > 0:
### remove data with oldest timestamp and process
timestamp, features = self.feature_queue.pop()
latency = utils.gps_to_latency(timestamp)
### check for missing channels
these_channels = set(features.keys())
missing_channels = self.channels - these_channels
### generate metrics
metrics['time'].append(timestamp)
metrics['synchronizer_latency'].append(latency)
metrics['percent_missed'].append(100 * (float(self.num_channels - len(features.keys())) / self.num_channels))
if features.has_key(self.target_channel):
metrics['target_time'].append(timestamp)
metrics['target_snr'].append(features[self.target_channel][0]['snr'])
### store and aggregate features
for metric in ('synchronizer_latency', 'percent_missed'):
data = {'time': metrics['time'], 'fields': {'data': metrics[metric]}}
self.agg_sink.store_columns(metric, {'synchronizer': data}, aggregate=self.data_type)
if len(metrics['target_time']) > 0:
data = {'time': metrics['target_time'], 'fields': {'data': metrics['target_snr']}}
self.agg_sink.store_columns('target_snr', {'synchronizer': data}, aggregate=self.data_type)
self.last_save = timestamp
logging.info(
'processed features up to timestamp {:.3f}, '
'max latency = {:.3f} s, '
'percent missing channels = {:.3f}'.format(
timestamp,
max(metrics['synchronizer_latency']),
max(metrics['percent_missed'])
)
)
if missing_channels:
logging.info('channels missing @ timestamp={:.3f}: {}'.format(timestamp, repr(list(missing_channels))))
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
options, args = parse_command_line()
### set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s | snax_monitor : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# start up monitor
monitor = StreamMonitor(options=options)
monitor.start()
#!/usr/bin/env python3
# Copyright (C) 2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_sink [--options]"
__description__ = "an executable to dump streaming data to disk via hdf5"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
from collections import deque
import itertools
import json
import logging
import optparse
import os
import shutil
import h5py
import numpy
from gstlal import aggregator
from gstlal import events
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = optparse.OptionParser(usage=__usage__, description=__description__)
group = optparse.OptionGroup(parser, "File Sink Options", "General settings for configuring the file sink.")
group.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
group.add_option("--features-path", metavar = "path", default = ".", help = "Write features to this path. Default = .")
group.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
group.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
group.add_option("--waveform", type="string", default = "sine_gaussian", help = "Set the waveform used for producing features. Default = sine_gaussian.")
group.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.")
group.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.")
group.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename. Required.")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Channel Options", "Settings used for deciding which auxiliary channels to process.")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE")
group.add_option("--section-include", default=[], type="string", action="append", help="Set the channel sections to be included from the INI file. Can be given multiple times. Pass in spaces as underscores instead. If not specified, assumed to include all sections")
group.add_option("--safety-include", default=["safe"], type="string", action="append", help="Set the safety values for channels to be included from the INI file. Can be given multiple times. Default = 'safe'.")
group.add_option("--fidelity-exclude", default=[], type="string", action="append", help="Set the fidelity values for channels to be excluded from the INI file. Can supply multiple values by repeating this argument. Each must be on of (add here)")
group.add_option("--safe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file (requires exact match). Can be repeated. If not specified, assume to include all channels.")
group.add_option("--unsafe-channel-include", default=[], action="append", type="string", help="Include this channel when reading the INI file, disregarding safety information (requires exact match). Can be repeated.")
parser.add_option_group(group)
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class HDF5StreamSink(events.EventProcessor):
"""
Handles the processing of incoming streaming features, saving datasets to disk in hdf5 format.
"""
_name = 'hdf5_sink'
def __init__(self, options):
logging.info('setting up hdf5 stream sink...')
events.EventProcessor.__init__(
self,
process_cadence=options.processing_cadence,
request_timeout=options.request_timeout,
kafka_server=options.kafka_server,
input_topic=options.input_topic_basename,
tag=options.tag
)
### initialize queues
self.feature_queue = deque(maxlen = 300)
### set up keys needed to do processing
name, extension = options.channel_list.rsplit('.', 1)
if extension == 'ini':
self.keys = multichannel_datasource.channel_dict_from_channel_ini(options).keys()
else:
self.keys = multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys()
### iDQ saving properties
self.timestamp = None
self.last_save_time = None
self.last_persist_time = None
self.rootdir = options.rootdir
self.base_features_path = options.features_path
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.waveform = options.waveform
self.instrument = options.instrument
self.columns = ['time', 'frequency', 'q', 'snr', 'phase', 'duration']
self.feature_data = utils.HDF5TimeseriesFeatureData(
self.columns,
keys = self.keys,
cadence = self.write_cadence,
sample_rate = self.sample_rate,
waveform = self.waveform
)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
self.tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
self.tmp_dir = os.environ['TMPDIR']
def set_hdf_file_properties(self, start_time, duration):
"""
Returns the file name, as well as locations of temporary and permanent locations of
directories where triggers will live, when given the current gps time and a gps duration.
Also takes care of creating new directories as needed and removing any leftover temporary files.
"""
# set/update file names and directories with new gps time and duration
span = segments.segment(start_time, start_time + duration)
filename = DataType.SNAX_FEATURES.filename(self.instrument, span=span)
self.feature_name = os.path.splitext(filename)[0]
self.feature_path = DataType.SNAX_FEATURES.directory(root=self.base_features_path, start=start_time)
self.tmp_path = DataType.SNAX_FEATURES.directory(root=self.tmp_dir, start=start_time)
# create temp and output directories if they don't exist
aggregator.makedir(self.feature_path)
aggregator.makedir(self.tmp_path)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
def ingest(self, message):
"""
requests for a new message from an individual topic,
and add to the feature queue
"""
features = json.loads(message.value())
self.feature_queue.appendleft((
features['timestamp'],
features['features']
))
def handle(self):
"""
takes data from the queue and adds to datasets, periodically persisting to disk
"""
while self.feature_queue:
### remove data with oldest timestamp and process
self.timestamp, features = self.feature_queue.pop()
logging.info('processing features for timestamp %f' % self.timestamp)
# set save times and initialize specific saving properties if not already set
if self.last_save_time is None:
self.last_save_time = self.timestamp
self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
self.set_hdf_file_properties(self.timestamp, duration)
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
logging.info("saving features to disk at timestamp = %f" % self.timestamp)
save_time = utils.floor_div(self.last_save_time, self.write_cadence)
self.feature_data.dump(self.tmp_path, self.feature_name, save_time, tmp = True)
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
logging.info("persisting features to disk for gps range %f - %f" % (self.timestamp-self.persist_cadence, self.timestamp))
self.persist_to_disk()
self.last_persist_time = self.timestamp
self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
### add new feature vector to dataset
self.feature_data.append(self.timestamp, features)
def persist_to_disk(self):
"""
moves a file from its temporary to final position
"""
final_path = os.path.join(self.feature_path, self.feature_name)+".h5"
tmp_path = os.path.join(self.tmp_path, self.feature_name)+".h5.tmp"
shutil.move(tmp_path, final_path)
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
options, args = parse_command_line()
### set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s | snax_sink : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# start up hdf5 sink
sink = HDF5StreamSink(options)
sink.start()
#!/usr/bin/env python3
# Copyright (C) 2017-2018 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_snax_synchronize [--options]"
__description__ = "an executable to synchronize incoming feature streams and send downstream"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
# Preamble
#-------------------------------------------------
import heapq
import json
import logging
from collections import deque
from queue import PriorityQueue
from optparse import OptionParser
from ligo.scald import utils
from gstlal import events
#-------------------------------------------------
# Functions
#-------------------------------------------------
def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.")
parser.add_option("--input-topic-basename", metavar = "string", help = "Sets the input kafka topic basename, i.e. {basename}_%02d. Required.")
parser.add_option("--output-topic-basename", metavar = "string", help = "Sets the output kafka topic name. Required.")
parser.add_option("--num-topics", type = "int", help = "Sets the number of input kafka topics to read from. Required.")
options, args = parser.parse_args()
return options, args
#-------------------------------------------------
# Classes
#-------------------------------------------------
class StreamSynchronizer(events.EventProcessor):
"""
Handles the synchronization of several incoming streams, populating data queues
and pushing feature vectors to a queue for downstream processing.
"""
_name = 'synchronizer'
def __init__(self, options):
logging.info('setting up stream synchronizer...')
self.num_topics = options.num_topics
self.topics = ['%s_%s' % (options.input_topic_basename, str(i).zfill(4)) for i in range(1, self.num_topics + 1)]
events.EventProcessor.__init__(
self,
process_cadence=options.processing_cadence,
request_timeout=options.request_timeout,
num_messages=self.num_topics,
kafka_server=options.kafka_server,
input_topic=self.topics,
tag=options.tag
)
### synchronizer settings
self.sample_rate = options.sample_rate
self.latency_timeout = options.latency_timeout
self.producer_name = options.output_topic_basename
self.no_drop = options.no_drop
### initialize queues
self.last_timestamp = 0
# 30 second queue for incoming buffers
self.feature_queue = PriorityQueue(maxsize = 30 * self.sample_rate * self.num_topics)
# 5 minute queue for outgoing buffers
self.feature_buffer = deque(maxlen = 300)
def ingest(self, message):
"""
parse a new message from a feature extractor,
and add to the feature queue
"""
### decode json and parse data
feature_subset = json.loads(message.value())
### add to queue if timestamp is within timeout
if self.no_drop or (feature_subset['timestamp'] >= self.max_timeout()):
self.feature_queue.put((
feature_subset['timestamp'],
feature_subset['features']
))
def handle(self):
"""
combines subsets from the feature queue at a given timestamp,
and send the resulting data downstream
"""
### clear out queue of any stale data
while not self.feature_queue.empty() and self.last_timestamp >= self.feature_queue.queue[0][0]:
self.feature_queue.get()
### inspect timestamps in front of queue
num_elems = min(self.num_topics, self.feature_queue.qsize())
timestamps = [block[0] for block in heapq.nsmallest(num_elems, self.feature_queue.queue)]
### check if either all timestamps are identical, or if the timestamps
### are old enough to process regardless. if so, process elements from queue
if timestamps:
if timestamps[0] <= self.max_timeout() or (len(set(timestamps)) == 1 and num_elems == self.num_topics):
### find number of elements to remove from queue
if timestamps[0] <= self.max_timeout():
num_subsets = len([timestamp for timestamp in timestamps if timestamp == timestamps[0]])
else:
num_subsets = num_elems
### remove data with oldest timestamp and process
subsets = [self.feature_queue.get() for i in range(num_subsets)]
logging.info(
'combining {:d} / {:d} feature subsets '
'for timestamp {:f}'.format(len(subsets), self.num_topics, timestamps[0])
)
features = self.combine_subsets(subsets)
self.feature_buffer.appendleft((timestamps[0], features))
self.last_timestamp = timestamps[0]
### push combined features downstream
while self.feature_buffer:
self.push_features()
def combine_subsets(self, subsets):
"""
combine subsets of features from multiple streams in a sensible way
"""
datum = [subset[1] for subset in subsets]
return {ch: rows for channel_subsets in datum for ch, rows in channel_subsets.items()}
def push_features(self):
"""
pushes any features that have been combined downstream in an outgoing topic
"""
# push full feature vector to producer if buffer isn't empty
if self.feature_buffer:
timestamp, features = self.feature_buffer.pop()
logging.info(
'pushing features with timestamp {:f} downstream, '
'latency is {:.3f}'.format(timestamp, utils.gps_to_latency(timestamp))
)
feature_packet = {'timestamp': timestamp, 'features': features}
self.producer.produce(
timestamp=timestamp,
topic=self.producer_name,
value=json.dumps(feature_packet)
)
self.producer.poll(0)
def max_timeout(self):
"""
calculates the oldest timestamp allowed for incoming data
"""
return utils.gps_now() - self.latency_timeout
#-------------------------------------------------
# Main
#-------------------------------------------------
if __name__ == '__main__':
options, args = parse_command_line()
### set up logging
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s | snax_synchronize : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# start up synchronizer
synchronizer = StreamSynchronizer(options=options)
synchronizer.start()
#!/usr/bin/env python3
# Copyright (C) 2017 Sydney J. Chamberlin, Patrick Godwin, Chad Hanna, Duncan Meacher
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
A program that whitens timeseries
"""
from optparse import OptionParser
import os
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from gstlal import aggregator
from gstlal import datasource
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal.snax import multichannel_datasource
from gstlal.snax import pipeparts as snaxparts
from gstlal.snax import utils
def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__)
# First append the datasource common options
multichannel_datasource.append_options(parser)
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
parser.add_option("--high-pass", action = "store_true", default = False, help = "Add a high-pass filter to the pipeline")
parser.add_option("--high-pass-cutoff", type = int, default = 12, help = "Set the high-pass filter cutoff, default = 12 Hz.")
parser.add_option("--psd-fft-length", metavar = "seconds", default = 32, type = "int", help = "The length of the FFT used to used to whiten the data (default is 32 s).")
parser.add_option("--save-format", metavar = "format", default = "gwf", help = "Set the save format for whitened timeseries (gwf/txt). Default = gwf.")
parser.add_option("--out-frame-type", metavar = "name", default = "WHITEN", help = "Set the frame type. Default = WHITEN.")
parser.add_option("--frame-duration", metavar = "s", default = 16, type = "int", help = "Set the duration of the output frames. The duration of the frame file will be multiplied by --frames-per-file. Default: 16s")
parser.add_option("--frames-per-file", metavar = "n", default = 4, type = "int", help = "Set the number of frames per file. Default: 4")
# parse the arguments and sanity check
options, filenames = parser.parse_args()
return options, filenames
####################
#
# main
#
####################
#
# parsing and setting up some core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
instrument = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
# create output directory if it doesn't already exists
aggregator.makedir(options.out_path)
# set up logging
logger = utils.get_logger('snax_whiten', verbose=options.verbose)
#
# building the event loop and pipeline
#
logger.info("assembling pipeline...")
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
handler = simplehandler.Handler(mainloop, pipeline)
# generate multiple channel sources, and link up pipeline
head = snaxparts.mkmultisrc(pipeline, data_source_info, channels, verbose=options.verbose)
for channel in channels:
# define whitening params
samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
max_rate = min(2048, samp_rate)
# whiten data
head[channel] = snaxparts.mkcondition(
pipeline,
head[channel],
max_rate,
samp_rate,
instrument,
channel_name=channel,
width=32,
psd_fft_length=options.psd_fft_length,
high_pass=options.high_pass,
high_pass_cutoff=options.high_pass_cutoff,
)
# dump timeseries to disk
if options.save_format == "gwf":
# set tags for output frames
for channel in channels:
ifo, channel_name = channel.split(":")
tagstr = "units=none,channel-name=%s,instrument=%s" % (channel_name, ifo)
head[channel] = pipeparts.mktaginject(pipeline, head[channel], tagstr)
head[channel] = snaxparts.mktimequeue(pipeline, head[channel], max_time=0)
# create frames
head = pipeparts.mkframecppchannelmux(
pipeline,
head,
frame_duration=options.frame_duration,
frames_per_file=options.frames_per_file
)
# write the frames to disk
head = pipeparts.mkframecppfilesink(
pipeline,
head,
frame_type=options.out_frame_type,
path=options.out_path
)
# Put O(100000 s) frames in each directory
frame_dir_prefix = (options.out_path, 5)
head.connect(
"notify::timestamp",
pipeparts.framecpp_filesink_ldas_path_handler,
frame_dir_prefix
)
elif options.save_format == "txt":
for channel in channels:
pipeparts.mknxydumpsink(
pipeline,
head[channel],
filename=os.path.join(options.out_path, "whitenedtimeseries_%d_%s.txt" % (samp_rate, channel))
)
# Allow Ctrl+C or sig term to gracefully shut down the program for online
# sources, otherwise it will just kill it
if data_source_info.data_source in ("lvshm", "framexmit"):# what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
# Seek
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if data_source_info.data_source not in ("lvshm", "framexmit"):# what about nds online?
datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)
#
# Run pipeline
#
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
logger.info("running pipeline...")
mainloop.run()
#
# Shut down pipeline
#
logger.info("shutting down pipeline...")
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
#
# close program manually if data source is live
#
if options.data_source in data_source_info.live_sources:
sys.exit(0)
#!/usr/bin/env python3
#
# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import argparse
from gstlal.snax.config import Config
from gstlal.snax.dags import DAG
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
# load config
args = parser.parse_args()
config = Config.load(args.config)
# create dag
dag = DAG(config)
dag.create_log_dir()
# generate dag layers
features = dag.extract()
dag.combine(features)
# write dag/script to disk
dag_name = "snax_offline_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
#!/usr/bin/env python3
#
# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import argparse
from gstlal.snax.config import Config
from gstlal.snax.dags import DAG
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
# load config
args = parser.parse_args()
config = Config.load(args.config)
# create dag
dag = DAG(config)
dag.create_log_dir()
# generate dag layers
features = dag.extract_online()
dag.synchronize(features)
dag.save(features)
# write dag/script to disk
dag_name = "snax_online_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
...@@ -25,8 +25,6 @@ AC_CONFIG_FILES([ \ ...@@ -25,8 +25,6 @@ AC_CONFIG_FILES([ \
lib/gstlal-burst/Makefile \ lib/gstlal-burst/Makefile \
python/Makefile \ python/Makefile \
python/cherenkov/Makefile \ python/cherenkov/Makefile \
python/snax/Makefile \
python/snax/dags/Makefile \
share/Makefile share/Makefile
]) ])
...@@ -260,7 +258,6 @@ AC_SUBST([MIN_LIGO_SEGMENTS_VERSION], [1.2.0]) ...@@ -260,7 +258,6 @@ AC_SUBST([MIN_LIGO_SEGMENTS_VERSION], [1.2.0])
AX_PYTHON_LIGO_SEGMENTS([$MIN_LIGO_SEGMENTS_VERSION]) AX_PYTHON_LIGO_SEGMENTS([$MIN_LIGO_SEGMENTS_VERSION])
AC_SUBST([MIN_LIGO_LW_VERSION], [1.8.3]) AC_SUBST([MIN_LIGO_LW_VERSION], [1.8.3])
AX_PYTHON_LIGO_LW([$MIN_LIGO_LW_VERSION]) AX_PYTHON_LIGO_LW([$MIN_LIGO_LW_VERSION])
AC_SUBST([MIN_LIGO_SCALD_VERSION], [0.7.2])
# #
......
...@@ -28,11 +28,9 @@ Depends: ${shlibs:Depends}, ${misc:Depends}, ${python3:Depends}, ...@@ -28,11 +28,9 @@ Depends: ${shlibs:Depends}, ${misc:Depends}, ${python3:Depends},
python3 (>= @MIN_PYTHON_VERSION@), python3 (>= @MIN_PYTHON_VERSION@),
python3-gi, python3-gi,
python3-gst-1.0, python3-gst-1.0,
python3-h5py,
python3-lal (>= @MIN_LAL_VERSION@), python3-lal (>= @MIN_LAL_VERSION@),
python3-lalburst (>= @MIN_LALBURST_VERSION@), python3-lalburst (>= @MIN_LALBURST_VERSION@),
python3-ligo-segments (>= @MIN_LIGO_SEGMENTS_VERSION@), python3-ligo-segments (>= @MIN_LIGO_SEGMENTS_VERSION@),
python3-ligo-scald (>= @MIN_LIGO_SCALD_VERSION@),
python3-matplotlib, python3-matplotlib,
python3-numpy, python3-numpy,
python3-scipy python3-scipy
......
...@@ -24,12 +24,10 @@ Requires: lal >= @MIN_LAL_VERSION@ ...@@ -24,12 +24,10 @@ Requires: lal >= @MIN_LAL_VERSION@
Requires: lalmetaio >= @MIN_LALMETAIO_VERSION@ Requires: lalmetaio >= @MIN_LALMETAIO_VERSION@
Requires: lalburst >= @MIN_LALBURST_VERSION@ Requires: lalburst >= @MIN_LALBURST_VERSION@
Requires: python%{python3_pkgversion}-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@ Requires: python%{python3_pkgversion}-ligo-segments >= @MIN_LIGO_SEGMENTS_VERSION@
Requires: python%{python3_pkgversion}-ligo-scald >= @MIN_LIGO_SCALD_VERSION@
# --- python package requirements --- # # --- python package requirements --- #
Requires: python3 >= @MIN_PYTHON_VERSION@ Requires: python3 >= @MIN_PYTHON_VERSION@
Requires: python%{python3_pkgversion}-%{gstreamername} Requires: python%{python3_pkgversion}-%{gstreamername}
Requires: python%{python3_pkgversion}-h5py
Requires: python%{python3_pkgversion}-numpy Requires: python%{python3_pkgversion}-numpy
Requires: python%{python3_pkgversion}-scipy Requires: python%{python3_pkgversion}-scipy
......
AM_CPPFLAGS = -I$(top_srcdir)/lib -I$(top_builddir)/lib AM_CPPFLAGS = -I$(top_srcdir)/lib -I$(top_builddir)/lib
SUBDIRS = cherenkov snax SUBDIRS = cherenkov
pkgpythondir = $(pkgpyexecdir) pkgpythondir = $(pkgpyexecdir)
......