...
 
Commits (24)
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-burst],[0.1.0],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_INIT([gstlal-burst],[0.1.1],[gstlal-discuss@ligo.org],[gstlal-burst])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......
gstlal-burst (0.1.1) unstable; urgency=low
* Updated gstlal_feature_aggregator, gstlal_feature_monitor to deal with
ligo-scald API change
-- Patrick Godwin <patrick.godwin@ligo.org> Sun, 03 Mar 2019 21:27:15 -0500
gstlal-burst (0.1.0) unstable; urgency=low
* Add feature extraction toolkit
......
......@@ -249,6 +249,8 @@ def parse_command_line():
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")
options, filenames = parser.parse_args()
......@@ -346,6 +348,7 @@ if options.output_kafka_server is not None:
# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
......@@ -401,7 +404,13 @@ for ifo in channel_dict:
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
"enable-auth": options.enable_auth,
"enable-https": options.enable_https,
})
if options.enable_auth:
common_opts.update({"enable-auth": ""})
if options.enable_https:
common_opts.update({"enable-https": ""})
dagparts.DAGNode(dqJob, dag, [], opts = common_opts)
......@@ -582,6 +591,10 @@ if options.agg_data_backend == 'influx':
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
if options.enable_auth:
agg_options.update({"enable-auth": ""})
if options.enable_https:
agg_options.update({"enable-https": ""})
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
......@@ -603,6 +616,25 @@ for routes in groups(agg_routes, 1):
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# Trigger aggregation
trigagg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(jobTags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
}
if options.agg_data_backend == 'influx':
trigagg_options.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
agg_options["route"] = routes
......
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-inspiral],[1.6.1],[gstlal-discuss@ligo.org],[gstlal-inspiral])
AC_INIT([gstlal-inspiral],[1.6.2],[gstlal-discuss@ligo.org],[gstlal-inspiral])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......
gstlal-inspiral (1.6.2-1) unstable; urgency=low
* Enforce that appended zeros in subthreshold trigger generation have same
dtype as snr time series
-- Alexander Pace <alexander.pace@ligo.org> Sun, 03 Mar 2019 21:51:57 -0500
gstlal-inspiral (1.6.1-1) unstable; urgency=low
* Packaging differences for rpms: disabling mass model.
......
......@@ -10,6 +10,8 @@
dist_bin_SCRIPTS = \
gstlal_harmonic_mean_psd \
gstlal_mock_data_server \
gstlal_ilwdify \
gstlal_inspiral_best_coinc_file \
gstlal_inspiral_treebank \
gstlal_inspiral_treebank_dag \
gstlal_inspiral_bankviz \
......@@ -25,9 +27,14 @@ dist_bin_SCRIPTS = \
gstlal_recolor_frames_pipe \
gstlal_inj_frames \
gstlal_cache_to_segments \
gstlal_ligolw_add_without_reassign \
gstlal_ll_inspiral_aggregator \
gstlal_ll_inspiral_trigger_aggregator \
gstlal_ll_dq \
gstlal_condor_top \
gstlal_injsplitter \
gstlal_kafka_dag \
gstlal_reduce_dag \
gstlal_dag_run_time
gstlal_dag_run_time \
gstlal_svd_bank_checkerboard \
gstlal_get_online_coinc_times
#!/usr/bin/env python
#
# Copyright (C) 2018 Ryan Magee
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
'''
A program to estimate the coincident data collected across each IFO pair
'''
from optparse import OptionParser
import itertools
import subprocess
import glob
from ligo import segments
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments
from ligo.lw.utils import process as ligolw_process
from lal import LIGOTimeGPS
import sys
present = int(subprocess.Popen('lalapps_tconvert', stdout = subprocess.PIPE).communicate()[0])
def parse_command_line():
parser = OptionParser()
parser.add_option("--start", type = "float", default = 1235750418.0, help = "Set the start time for collection of data. Defaults to ER14 start time. Required.")
parser.add_option("--end", type = "float", default = present, help = "Set the end time for collection of data. Defaults to present day. Required.")
parser.add_option("--channels", default = ['H1:DMT-ANALYSIS_READY:1','L1:DMT-ANALYSIS_READY:1','V1:ITF_SCIENCE'], help = "Set channels to be compared. Comma separated list with entires of form 'H1:DMT-ANALYSIS_READY:1'")
options, filenames = parser.parse_args()
return options, filenames
options, filenames = parse_command_line()
# Get segments for each channel
for channel in options.channels:
segcommand = "ligolw_segment_query_dqsegdb --segment-url=https://segments.ligo.org -q --gps-start-time %f --gps-end-time %f --include-segments %s --result-name=datasegments -o %s.intermediate" % (options.start, options.end, channel, channel.split(':')[0])
segs = subprocess.Popen(segcommand.split(), stdout = subprocess.PIPE)
segs.wait()
# Combine segments into a single file
intermediate_files = glob.glob('*.intermediate')
combined = ['ligolw_add','--ilwd','-o','segments.xml.gz']
combined.extend(["%s" % x for x in intermediate_files])
combine = subprocess.Popen(combined, stdout = subprocess.PIPE)
combine.wait()
rm_intermediates = subprocess.Popen(['rm']+["%s" % x for x in intermediate_files], stdout = subprocess.PIPE)
rm_intermediates.wait()
# Character formatting/compatibility
ligolw_no_ilwdchar = "ligolw_no_ilwdchar segments.xml.gz"
ilwd = subprocess.Popen(ligolw_no_ilwdchar.split(), stdout = subprocess.PIPE)
ilwd.wait()
datafname = "segments.xml.gz"
# Shamelessly borrowed definition from the itertools documentation
# (https://docs.python.org/3/library/itertools.html)
def powerset(iterable):
s = list(iterable)
return itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1))
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
def extract_segs(fname, segname):
llwsegments = ligolw_segments.LigolwSegments(ligolw_utils.load_filename(fname, verbose = True, contenthandler = LIGOLWContentHandler))
return llwsegments.get_by_name(segname).coalesce()
data = extract_segs(datafname, "datasegments")
ifos = data.keys()
# Iterate over all IFO combos
for ifo_combo in powerset(ifos):
if len(ifo_combo) > 1:
print "%s"*len(ifo_combo) % (ifo_combo),"time:", abs(data.intersection(ifo_combo)), 'seconds, ', abs(data.intersection(ifo_combo)) / 86400., 'days'
#!/usr/bin/env python
import sys
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import array
from ligo.lw import param
from ligo.lw import utils as ligolw_utils
from gstlal import ilwdify
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
param.use_in(LIGOLWContentHandler)
array.use_in(LIGOLWContentHandler)
fname = sys.argv[1]
xmldoc = ligolw_utils.load_filename(fname, verbose = True, contenthandler = LIGOLWContentHandler)
xmldoc = ilwdify.do_it_to(xmldoc)
ligolw_utils.write_filename(xmldoc, fname, gz = fname.endswith('gz'), verbose = True)
#!/usr/bin/python
import sys
from ligo.lw import ligolw
from ligo.lw import array as ligolw_array
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
#from glue.ligolw import ligolw
#from glue.ligolw import array as ligolw_array
#from glue.ligolw import lsctables
#from glue.ligolw import utils as ligolw_utils
@ligolw_array.use_in
@lsctables.use_in
class ContentHandler(ligolw.LIGOLWContentHandler):
pass
def get_snr(fname):
xmldoc = ligolw_utils.load_filename(fname, verbose = False, contenthandler = ContentHandler)
coinc_inspiral_table = lsctables.CoincInspiralTable.get_table(xmldoc)
assert len(coinc_inspiral_table) == 1
return coinc_inspiral_table[0].snr
best = sys.argv[1]
bestsnr = get_snr(sys.argv[1])
for fname in sys.argv[2:]:
snr = get_snr(fname)
if snr > bestsnr:
bestsnr = snr
best = fname
print best
......@@ -31,7 +31,7 @@ from gstlal import far
def parse_command_line():
parser = OptionParser(usage="%prog [options] database.sqlite")
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Path of marginalized likelihood file")
parser.add_option("--post-marginalized-likelihood-file", metavar = "filename", help = "Path of post marginalized likelihood file")
options, database = parser.parse_args()
return options, database
......@@ -39,9 +39,9 @@ def parse_command_line():
options, database = parse_command_line()
_, rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_filename(options.marginalized_likelihood_file, contenthandler=far.RankingStat.LIGOLWContentHandler))
_, rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_filename(options.post_marginalized_likelihood_file, contenthandler=far.RankingStat.LIGOLWContentHandler))
fapfar = far.FAPFAR(rankingstatpdf)
fapfar = far.FAPFAR(rankingstatpdf.new_with_extinction())
livetime = fapfar.livetime
print "%s: livetime used in making IFAR plots\n" % (livetime,)
......
#!/usr/bin/env python
#
# Copyright (C) 2018--2019 Chad Hanna, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe
"""
This program makes a dag for persistent kafka/zookeeper services
"""
__author__ = 'Chad Hanna <channa@caltech.edu>, Patrick Godwin <patrick.godwin@ligo.org>'
#
# import standard modules and append the lalapps prefix to the python path
#
import os
from optparse import OptionParser
#
# import the modules we need to build the pipeline
#
from gstlal import aggregator
from gstlal import dagparts
#
# configuration file templates
#
ZOOKEEPER_TEMPLATE = """
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
"""
KAFKA_TEMPLATE = """
broker.id=0
listeners = PLAINTEXT://%s:%d
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
"""
KAFKA_ENV_TEMPLATE = """
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
KAFKA_HEAP_OPTS="-Xms8G -Xmx8G"
export KAFKA_HEAP_OPTS KAFKA_JVM_PERFORMANCE_OPTS
"""
#
# job classes
#
class ZookeeperJob(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server", rootdir = dagparts.log_path(), tag = "", port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
if tag:
zoodir = os.path.join(rootdir, tag, "zookeeper")
else:
zoodir = os.path.join(rootdir, "zookeeper")
aggregator.makedir(zoodir)
with open("zookeeper.properties", "w") as f:
f.write(ZOOKEEPER_TEMPLATE%(zoodir, port, maxclients))
class KafkaJob(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server", rootdir = dagparts.log_path(), tag = "", hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
if tag:
kafkadir = os.path.join(rootdir, tag, "kafka")
else:
kafkadir = os.path.join(rootdir, "kafka")
aggregator.makedir(kafkadir)
with open("kafka.properties", "w") as f:
f.write(KAFKA_TEMPLATE%(hostname, port, kafkadir, zookeeperaddr))
#
# Parse the command line
#
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously and to avoid filename clashes.")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
parser.add_option("--kafka-port", type = "int", metavar = "number", help = "Set the kafka port. default: 9092", default = 9092)
parser.add_option("--condor-universe", default = "local", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = local")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
options, filenames = parser.parse_args()
return options, filenames
#
# MAIN
#
options, filenames = parse_command_line()
aggregator.makedir("logs")
if options.analysis_tag:
dag = dagparts.DAG("kafka_broker_%s" % options.analysis_tag)
else:
dag = dagparts.DAG("kafka_broker")
#
# setup kafka/zookeeper jobs and nodes
#
condor_options = {
"want_graceful_removal": "True",
"kill_sig": "15"
}
if options.condor_universe == 'vanilla':
condor_options.update({
"request_memory": "10GB",
"request_cpus": 2,
})
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
zookeeper_job = ZookeeperJob(
"zookeeper-server-start.sh",
tag_base = "zookeeper-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
universe = options.condor_universe,
port = options.zookeeper_port
)
kafka_job = KafkaJob(
"kafka-server-start.sh",
tag_base = "kafka-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
hostname = options.kafka_hostname,
port = options.kafka_port,
universe = options.condor_universe,
zookeeperaddr = "localhost:%d" % options.zookeeper_port
)
zookeeper_node = dagparts.DAGNode(zookeeper_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})
#
# Write out the dag and other files
#
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]
dag.write_dag()
dag.write_script()
with open('kafka_env.sh', 'w') as f:
f.write(KAFKA_ENV_TEMPLATE)
print('source kafka_env.sh before submitting dag')
#!/usr/bin/python
#
# Copyright (C) 2005--2009,2012,2014,2015,2017 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
Add (merge) LIGO LW XML files containing LSC tables.
"""
from optparse import OptionParser
from lal.utils.cache import CacheEntry
from ligo.lw import __version__, __date__
from ligo.lw import ligolw
from ligo.lw import array as ligolw_array
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import ligolw_add
__author__ = "Kipp Cannon <kipp.cannon@ligo.org>"
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
def parse_command_line():
"""
Parse the command line, return an options object and a list of URLs.
"""
parser = OptionParser(
version = "Name: %%prog\n%s" % __version__,
usage = "%prog [options] [url ...]",
description = "Combines one or more LIGO Light Weight XML files into a single output file. The output is written to stdout or to the filename specified by --output. In addition to regular files, many common URL types can be read such as http:// and ftp://. Input documents that are gzip-compressed are automatically detected and decompressed. If the output file's name ends in \".gz\", the output document will be gzip-compressed. Table elements contained in the document will be merged so that there is not more than one table of any given name in the output. To accomplish this, any tables in the input documents that share the same name must have compatible columns, meaning the same column names with matching types (but not necessarily in the same order)."
)
parser.add_option("-i", "--input-cache", metavar = "filename", action = "append", default = [], help = "Get input files from the LAL cache named filename.")
parser.add_option("--non-lsc-tables-ok", action = "store_true", help = "OK to merge documents containing non-LSC tables.")
parser.add_option("-o", "--output", metavar = "filename", help = "Write output to filename (default = stdout).")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--remove-input", action = "store_true", help = "Remove input files after writing output (an attempt is made to not delete the output file in the event that it overwrote one of the input files).")
parser.add_option("--remove-input-except", metavar = "filename", action = "append", default = [], help = "When deleting input files, do not delete this file.")
options, urls = parser.parse_args()
urls += [CacheEntry(line).url for cache in options.input_cache for line in open(cache)]
if len(urls) < 1:
raise ValueError("no input files!")
return options, urls
#
# =============================================================================
#
# Main
#
# =============================================================================
#
#
# Command line
#
options, urls = parse_command_line()
#
# Input
#
@ligolw_array.use_in
@lsctables.use_in
class ContentHandler(ligolw.LIGOLWContentHandler):
pass
def _ligolw_add(xmldoc, urls, non_lsc_tables_ok = False, verbose = False, contenthandler = ContentHandler, reassign_ids = True):
"""
An implementation of the LIGO LW add algorithm. urls is a list of
URLs (or filenames) to load, xmldoc is the XML document tree to
which they should be added.
"""
# Input
for n, url in enumerate(urls, 1):
if verbose:
sys.stderr.write("%d/%d:" % (n, len(urls)))
ligolw_utils.load_url(url, verbose = verbose, xmldoc = xmldoc, contenthandler = contenthandler)
# ID reassignment
if not non_lsc_tables_ok and lsctables.HasNonLSCTables(xmldoc):
raise ValueError("non-LSC tables found. Use --non-lsc-tables-ok to force")
if reassign_ids:
ligolw_add.reassign_ids(xmldoc, verbose = verbose)
# Document merge
if verbose:
sys.stderr.write("merging elements ...\n")
ligolw_add.merge_ligolws(xmldoc)
ligolw_add.merge_compatible_tables(xmldoc)
return xmldoc
xmldoc = _ligolw_add(
ligolw.Document(),
urls,
non_lsc_tables_ok = options.non_lsc_tables_ok,
verbose = options.verbose,
contenthandler = ContentHandler,
reassign_ids = False
)
#
# Output
#
ligolw_utils.write_filename(
xmldoc,
options.output,
verbose = options.verbose,
gz = (options.output or "stdout").endswith(".gz")
)
#
# Remove input
#
if options.remove_input:
ligolw_add.remove_input(urls, [options.output] + options.remove_input_except, options.verbose)
......@@ -65,6 +65,8 @@ def parse_command_line():
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")
options, filenames = parser.parse_args()
......@@ -131,7 +133,7 @@ class PSDHandler(simplehandler.Handler):
### store and reduce noise / range history
for route in self.routes:
agg_sink.store_columns(route, data[route], 'data', tags='ifo', aggregate="max")
agg_sink.store_columns(route, data[route], aggregate="max")
### flush buffers
self.timedeq.clear()
......@@ -140,7 +142,7 @@ class PSDHandler(simplehandler.Handler):
# Save a "latest" psd
# NOTE: The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime))
thisdir = os.path.join(self.out_path, io.common.gps_to_leaf_directory(buftime))
aggregator.makedir(thisdir)
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
......@@ -178,9 +180,13 @@ if __name__ == '__main__':
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name, auth=options.enable_auth, https=options.enable_https)
else: ### hdf5 data backend
agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
agg_sink = io.hdf5.Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
# register measurement schemas for aggregators
for route in ('noise', 'range_history'):
agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
# parse the generic "source" options, check for inconsistencies is done inside
# the class init method
......
......@@ -21,7 +21,6 @@
import argparse
import json
import logging
from multiprocessing import Pool
import sys, os
import time
import timeit
......@@ -60,6 +59,9 @@ def parse_command_line():
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
args = parser.parse_args()
......@@ -89,20 +91,41 @@ if __name__ == '__main__':
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
consumer = KafkaConsumer(
*routes,
bootstrap_servers=[options.kafka_server],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]),
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
else:
consumer = None
# set up aggregator sink
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=options.across_jobs
)
else: ### hdf5 data backend
agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# register measurement schemas for aggregators
for route in routes:
agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
# start an infinite loop to keep updating and aggregating data
while True:
......@@ -113,7 +136,7 @@ if __name__ == '__main__':
# this is not threadsafe!
logging.info("retrieving data from kafka")
start = timeit.default_timer()
datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, max_records = 2 * len(jobs) * len(routes))
datadata = io.kafka.retrieve_timeseries(consumer, routes, max_records = 2 * len(jobs) * len(routes))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
......@@ -125,7 +148,7 @@ if __name__ == '__main__':
for route in routes:
logging.info("storing and reducing timeseries for measurement: %s" % route)
for aggregate in options.data_type:
agg_sink.store_columns(route, datadata[route], 'data', tags='job', aggregate=aggregate)
agg_sink.store_columns(route, datadata[route], aggregate=aggregate)
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
......
#!/usr/bin/env python
#
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import argparse
import json
import logging
from multiprocessing import Pool
import sys, os
import time
import timeit
import numpy
from ligo.scald import aggregator
from ligo.scald import io
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
def retrieve_triggers(consumer, jobs, route_name = 'coinc', timeout = 1000, max_records = 1000):
"""!
A function to pull triggers from kafka for a set of jobs (topics) and
route_name (key in the incoming json messages)
"""
triggers = []
### retrieve timeseries for all routes and topics
msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records)
for tp, messages in msg_pack.items():
for message in messages:
try:
triggers.extend(message.value)
except KeyError: ### no route in message
pass
return triggers
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--job-start", type=int, help="job id to start aggregating from")
parser.add_argument("--route", action="store", default="coinc", help="Specify the route where triggers are stored in.")
parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
args = parser.parse_args()
assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]'
return args
#
# =============================================================================
#
# Main
#
# =============================================================================
#
if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
options.route,
bootstrap_servers=[options.kafka_server],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_trigger_aggregator' % jobs[0],
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
else:
consumer = None
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https
)
else: ### hdf5 data backend
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("sleeping for %.1f s" % options.dump_period)
time.sleep(options.dump_period)
if consumer:
# this is not threadsafe!
logging.info("retrieving data from kafka")
start = timeit.default_timer()
#triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
triggers = retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
logging.info("retrieving data from bottle routes")
triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = options.route, num_threads=options.num_threads)
# filter out triggers that don't have a far assigned yet
triggers = [trg for trg in triggers if 'combined_far' in trg]
# store and reduce data for each job
if triggers:
start = timeit.default_timer()
logging.info("storing and reducing triggers")
agg_sink.store_triggers('triggers', triggers, far_key = 'combined_far', time_key = 'end')
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce triggers: %.1f s" % elapsed)
else:
logging.info("no triggers to process")
# close connection to consumer if using kafka
if consumer:
consumer.close()
#
# always end on an error so that condor won't think we're done and will
# restart us
#
sys.exit(1)
#!/usr/bin/env python
#
# Copyright (C) 2019 Duncan Meacher
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Split SVD bank into 'odd' or 'even' banks"""
#
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import os
import numpy as np
import itertools
from optparse import OptionParser
from lal.utils import CacheEntry
from gstlal import svd_bank
from gstlal import inspiral
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import array as ligolw_array
from ligo.lw import param as ligolw_param
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
@ligolw_array.use_in
@ligolw_param.use_in
@lsctables.use_in
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
#
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
parser = OptionParser(description = __doc__)
parser.add_option("--svd-files", metavar = "filename", help = "A LIGO light-weight xml / xml.gz file containing svd bank information (require). Can give multiple files, seperated with a ','." )
parser.add_option("--outdir", metavar = "directory", type = "str", default = ".", help = "Output directory for modified SVD files.")
parser.add_option("--even", action = "store_true", help = "Output even rows pf reconstruction matrix. Default is odd.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
options, filenames = parser.parse_args()
if options.svd_files is None:
raise ValueError("SVD file must be selected with, seperated with a ','. --svd-files bank1.xml,bank2.xml,bank3.xml")
svd_bank_files = options.svd_files.split(',')
#
#
# =============================================================================
#
# Main
#
# =============================================================================
#
# Loop over all SVD files given in command line
for bank_file in svd_bank_files:
# read in SVD bank file
banks = svd_bank.read_banks(bank_file, contenthandler = LIGOLWContentHandler, verbose = options.verbose)
# Loop over each bank within SVD file
for bank in banks:
# Loop over each bank_fragment within each bank
for n, frag in enumerate(bank.bank_fragments):
# Extract odd/even rows of chifacs and mix_matrix from each bank fragment
if options.even:
chifacs_re = bank.bank_fragments[n].chifacs[2::4]
chifacs_im = bank.bank_fragments[n].chifacs[3::4]
mix_mat_re = bank.bank_fragments[n].mix_matrix[:, 2::4]
mix_mat_im = bank.bank_fragments[n].mix_matrix[:, 3::4]
else:
chifacs_re = bank.bank_fragments[n].chifacs[0::4]
chifacs_im = bank.bank_fragments[n].chifacs[1::4]
mix_mat_re = bank.bank_fragments[n].mix_matrix[:, 0::4]
mix_mat_im = bank.bank_fragments[n].mix_matrix[:, 1::4]
bank.bank_fragments[n].chifacs = np.array(list(itertools.chain(*zip(chifacs_re, chifacs_im))))
mix_mat_new = np.empty((mix_mat_re.shape[0],mix_mat_re.shape[1]+mix_mat_im.shape[1]))
mix_mat_new[:,0::2] = mix_mat_re
mix_mat_new[:,1::2] = mix_mat_im
bank.bank_fragments[n].mix_matrix = mix_mat_new
# delete even/odd entries from sngl_inspiral table
if options.even:
del bank.sngl_inspiral_table[0::2]
else:
del bank.sngl_inspiral_table[1::2]
# Set output path
if options.even:
out_path = options.outdir+"/even/"
else:
out_path = options.outdir+"/odd/"
# Check output path exists
if not os.path.isdir(out_path):
os.mkdir(out_path)
# Write out checkerboard SVD bank
svd_bank.write_bank(out_path+bank_file.split('/')[-1], banks, cliplefts = [0]*len(banks), cliprights = [0]*len(banks), verbose = options.verbose)
......@@ -3,7 +3,7 @@
#
AC_INIT([gstlal-ugly],[1.6.0],[gstlal-discuss@ligo.org],[gstlal-ugly])
AC_INIT([gstlal-ugly],[1.6.4],[gstlal-discuss@ligo.org],[gstlal-ugly])
AC_COPYRIGHT([Copyright (C) The authors (see source code for details)])
# a file whose existance can be used to use to check that we are in the
# top-level directory of the source tree
......
gstlal-ugly (1.6.4-1) unstable; urgency=low
* add options to enable auth, https for inspiral aggregators
* add gstlal_ll_inspiral_trigger_aggregator to gstlal-ugly, add job to
gstlal_ll_inspiral_pipe
-- Alexander Pace <alexander.pace@ligo.org> Wed, 05 Jun 2019 12:24:02 -0700
gstlal-ugly (1.6.3-1) unstable; urgency=low
* gstlal_kafka_dag: add condor universe option into kafka/zookeeper jobs
* rough tool to estimate background collected among ifo combos
-- Alexander Pace <alexander.pace@ligo.org> Thu, 28 Mar 2019 07:44:55 -0700
gstlal-ugly (1.6.2-1) unstable; urgency=low
* gstlal_svd_bank_checkerboard: Added SVD bank checkerboarding code
* some new temp programs
* construct_skymap_test_dag: update
* gstlal_kafka_dag: add tag information so that multiple analyzes do not
have file clashes
* Bug fixes and performance improvements
-- Alexander Pace <alexander.pace@ligo.org> Sun, 17 Mar 2019 09:37:07 -0700
gstlal-ugly (1.6.1-1) unstable; urgency=low
* interpolator: fix bug where interpolator was pushing out nongap buffers
when it received gap buffers
* gstlal_ll_dq: reduce the upper frequency for the horizon distance
calculation
* gstlal_ll_dq, gstlal_ll_inspiral_aggregator: update to deal with API
changes from ligo-scald
-- Alexander Pace <alexander.pace@ligo.org> Sun, 03 Mar 2019 21:30:40 -0500
gstlal-ugly (1.6.0-1) unstable; urgency=low
* Pre-ER14 Release
......
This diff is collapsed.