Newer
Older
# Copyright (C) 2009-2014 Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Stream-based inspiral analysis tool"""
#
# =============================================================================
#
#
# =============================================================================
#
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
### A program to analyze gravitational wave data for compact binary coalescence in real time or in an offline mode
###
### .. graphviz::
###
### digraph llpipe {
### labeljust = "r";
### label="gstlal_inspiral"
### rankdir=LR;
### graph [fontname="Roman", fontsize=24];
### edge [ fontname="Roman", fontsize=10 ];
### node [fontname="Roman", shape=box, fontsize=11];
###
### gracedb [label="GW\nCandidate\nDatabase", shape=oval, color=tomato3, style=filled];
###
###
### subgraph clusterNodeN {
###
### style=rounded;
### label="gstreamer pipeline";
### labeljust = "r";
### fontsize = 14;
###
### H1src [label="H1 data source:\n mkbasicsrc()", color=red4];
### L1src [label="L1 data source:\n mkbasicsrc()", color=green4];
### V1src [label="V1 data source:\n mkbasicsrc()", color=magenta4];
###
### H1multirate [label="H1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4];
### L1multirate [label="L1 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4];
### V1multirate [label="V1 whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4];
###
### H1LLOID [label="H1 LLOID filtering engine:\nmkLLOIDmulti()", color=red4];
### L1LLOID [label="L1 LLOID filtering engine:\nmkLLOIDmulti()", color=green4];
### V1LLOID [label="V1 LLOID filtering engine:\nmkLLOIDmulti()", color=magenta4];
###
### H1Trig1 [label="H1 Triggering:\nsub bank 1", color=red4];
### L1Trig1 [label="L1 Triggering:\nsub bank 1", color=green4];
### V1Trig1 [label="V1 Triggering:\nsub bank 1", color=magenta4];
### H1Trig2 [label="H1 Triggering:\nsub bank 2", color=red4];
### L1Trig2 [label="L1 Triggering:\nsub bank 2", color=green4];
### V1Trig2 [label="V1 Triggering:\nsub bank 2", color=magenta4];
### H1TrigN [label="H1 Triggering:\nsub bank N", color=red4];
### L1TrigN [label="L1 Triggering:\nsub bank N", color=green4];
### V1TrigN [label="V1 Triggering:\nsub bank N", color=magenta4];
###
### H1src -> H1multirate;
### L1src -> L1multirate;
### V1src -> V1multirate;
###
### H1multirate -> H1LLOID [label="h(t) 4096Hz"];
### L1multirate -> L1LLOID [label="h(t) 4096Hz"];
### V1multirate -> V1LLOID [label="h(t) 4096Hz"];
### H1multirate -> H1LLOID [label="h(t) 2048Hz"];
### L1multirate -> L1LLOID [label="h(t) 2048Hz"];
### V1multirate -> V1LLOID [label="h(t) 2048Hz"];
### H1multirate -> H1LLOID [label="h(t) Nth-pow-of-2 Hz"];
### L1multirate -> L1LLOID [label="h(t) Nth-pow-of-2 Hz"];
### V1multirate -> V1LLOID [label="h(t) Nth-pow-of-2 Hz"];
###
### H1LLOID -> H1Trig1 [label="SNRs sub bank 1"];
### L1LLOID -> L1Trig1 [label="SNRs sub bank 1"];
### V1LLOID -> V1Trig1 [label="SNRs sub bank 1"];
### H1LLOID -> H1Trig2 [label="SNRs sub bank 2"];
### L1LLOID -> L1Trig2 [label="SNRs sub bank 2"];
### V1LLOID -> V1Trig2 [label="SNRs sub bank 2"];
### H1LLOID -> H1TrigN [label="SNRs sub bank N"];
### L1LLOID -> L1TrigN [label="SNRs sub bank N"];
### V1LLOID -> V1TrigN [label="SNRs sub bank N"];
### }
###
###
### Coincidence [label="Coincidence\nO(1)s latency"];
### SigEst [label="Significance\nEstimation\nO(1)s latency"];
### Thresh [label="Thresholding\nO(1)s latency"];
### EventGen [label="Event\nGeneration\nO(1)s latency"];
###
### H1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### L1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### V1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### H1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### L1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### V1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### H1TrigN -> Coincidence [label="Trigs sub bank N"];
### L1TrigN -> Coincidence [label="Trigs sub bank N"];
### V1TrigN -> Coincidence [label="Trigs sub bank N"];
###
### Coincidence -> SigEst -> Thresh -> EventGen;
###
### EventGen -> gracedb;
###
### }
###
### Review Status
### -------------
###
### +------------------------------------------------+---------------------------------------------+------------+
### | Names | Hash | Date |
### +================================================+=============================================+============+
### | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 9074294d6b57f43651143b5f93210751de1fe55a | 2014-05-02 |
### +------------------------------------------------+---------------------------------------------+------------+
###
try:
from fpconst import PosInf
except ImportError:
# fpconst is not part of the standard library and might not be
# available
PosInf = float("+inf")
from optparse import OptionGroup, OptionParser

Chad Hanna
committed
import signal
import socket

Chad Hanna
committed
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from lal import LIGOTimeGPS

Chad Hanna
committed
from lal import UTCToGPS
from ligo.gracedb.rest import DEFAULT_SERVICE_URL as gracedb_default_service_url
from ligo import segments
from ligo.segments import utils as segmentsUtils
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments
from gstlal import bottle
from gstlal import datasource
from gstlal import far

Chad Hanna
committed
from gstlal import hoftcache
from gstlal import lloidhandler
from gstlal import lloidparts
from gstlal import reference_psd
from gstlal import servicediscovery

Chad Hanna
committed
GSTLAL_PROCESS_START_TIME = UTCToGPS(time.gmtime())
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass

Chad Hanna
committed
#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay

Chad Hanna
committed
def setrlimit(res, lim):
hard_lim = resource.getrlimit(res)[1]
resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))
# set the number of processes and total set size up to hard limit and
# shrink the per-thread stack size (default is 10 MiB)
setrlimit(resource.RLIMIT_NPROC, None)
setrlimit(resource.RLIMIT_AS, None)
setrlimit(resource.RLIMIT_RSS, None)
# FIXME: tests at CIT show that this next tweak has no effect. it's
# possible that SL7 has lowered the default stack size from SL6 and we
# don't need to do this anymore. remove?
setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
def service_domain(gracedb_search, gracedb_pipeline):
return "%s_%s.%s" % (gracedb_pipeline.lower(), gracedb_search.lower(), servicediscovery.DEFAULT_SERVICE_DOMAIN)
#
# =============================================================================
#
#
# =============================================================================
#
def parse_command_line():
parser = OptionParser(
description = __doc__

Chad Hanna
committed
# append all the datasource specific options
datasource.append_options(parser)
group = OptionGroup(parser, "PSD Options", "Adjust noise spectrum estimation parameters")
group.add_option("--psd-fft-length", metavar = "seconds", default = 32, type = "int", help = "The length of the FFT used to used to whiten the data (default is 32 s).")
group.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
group.add_option("--track-psd", action = "store_true", help = "Enable dynamic PSD tracking. Always enabled if --reference-psd is not given.")
parser.add_option_group(group)
group = OptionGroup(parser, "Data Qualtiy", "Adjust data quality handling")
group.add_option("--ht-gate-threshold", metavar = "sigma", type = "float", action = "append", default = [], help = "Set the threshold on whitened h(t) to excise a glitches in units of standard deviations (optional). If given, exactly as many h(t) thresholds must be set as svd-bank files will be processed (see --svd-bank).")
group.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
group.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option_group(group)
group = OptionGroup(parser, "Trigger Generator", "Adjust trigger generator behaviour")
group.add_option("--control-peak-time", metavar = "seconds", type = "int", help = "Set a time window in seconds to find peaks in the control signal (optional, default is to disable composite detection statistic).")
group.add_option("--output", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required). Can be given multiple times. Exactly as many output files must be specified as svd-bank files will be processed (see --svd-bank).")
group.add_option("--output-cache", metavar = "filename", help = "Provide a cache of output files. This can be used instead of giving multiple --output options. Cannot be combined with --output.")
group.add_option("--svd-bank", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file. These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times, unless --data-source is lvshm or framexmit in which case it must be given exactly once. If given multiple times, the banks will be processed one-by-one, in order. At least one svd bank for at least 2 detectors is required, but see also --svd-bank-cache.")
group.add_option("--svd-bank-cache", metavar = "filename", help = "Provide a cache file of svd-bank files. This can be used instead of giving multiple --svd-bank options. Cannot be combined with --svd-bank options.")
# NOTE: the clustering SQL scripts search for this option in the
# process_params table to determine the threshold below which it
# can delete uninteresting singles after the coincs are ranked. if
# the name of this option is changed, be sure to update
# simplify_and_cluster.sql and derivatives
group.add_option("--singles-threshold", metavar = "SNR", type = "float", default = PosInf, help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
parser.add_option_group(group)
group = OptionGroup(parser, "Ranking Statistic Options", "Adjust ranking statistic behaviour")

Chad Hanna
committed
group.add_option("--cap-singles", action = "store_true", help = "Cap singles to 1 / livetime if computing FAR. No effect otherwise")
group.add_option("--FAR-trialsfactor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
group.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
group.add_option("--coincidence-threshold", metavar = "seconds", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005 s). The light-travel time between instruments will be added automatically in the coincidence test.")
group.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
group.add_option("--ranking-stat-input", metavar = "url", help = "Set the URL from which to load a ranking statistic definition. When this is enabled, signal candidates will have ranking statistic values assigned on-the-fly. Required when --data-source is lvshm or framexmit; must also set --likelihood-snapshot-interval.")
group.add_option("--ranking-stat-output", metavar = "filename", action = "append", default = [], help = "Set the name of the file to which to write ranking statistic data collected from triggers (optional). Can be given more than once. If given, exactly as many must be provided as there are --svd-bank options and they will be writen to in order.")
group.add_option("--ranking-stat-output-cache", metavar = "filename", help = "Provide a cache of ranking statistic output files. This can be used instead of giving multiple --ranking-stat-output options. Cannot be combined with --ranking-stat-output.")
group.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to snapshot candidate and ranking statistic data to disk when running online.")
group.add_option("--ranking-stat-pdf", metavar = "url", help = "Set the URL from which to load the ranking statistic PDF. This is used to compute false-alarm probabilities and false-alarm rates and is required for online operation (when --data-source is framexmit or lvshm). It is forbidden for offline operation (all other data sources)")
group.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets (required).")
group.add_option("--zerolag-rankingstat-pdf", metavar = "url", action = "append", help = "Record a histogram of the likelihood ratio ranking statistic values assigned to zero-lag candidates in this XML file. This is used to construct the extinction model and set the overall false-alarm rate normalization during online running. If it does not exist at start-up, a new file will be initialized, otherwise the counts will be added to the file's contents. Required when --data-source is lvshm or framexmit; optional otherwise. If given, exactly as many must be provided as there are --svd-bank options and they will be used in order.")
parser.add_option_group(group)
group = OptionGroup(parser, "GracedB Options", "Adjust GracedB interaction")
group.add_option("--gracedb-far-threshold", metavar = "Hertz", type = "float", help = "False-alarm rate threshold for gracedb uploads in Hertz (default = do not upload to gracedb).")
group.add_option("--gracedb-group", metavar = "name", default = "Test", help = "Gracedb group to which to upload events (default is Test).")
group.add_option("--gracedb-pipeline", metavar = "name", default = "gstlal", help = "Name of pipeline to provide in GracedB uploads (default is gstlal).")
group.add_option("--gracedb-search", metavar = "name", default = "LowMass", help = "Name of search to provide in GracedB uploads (default is LowMass).")
group.add_option("--gracedb-service-url", metavar = "url", default = gracedb_default_service_url, help = "Override default GracedB service url (optional, default is %s)." % gracedb_default_service_url)
parser.add_option_group(group)
group = OptionGroup(parser, "Program Behaviour")
group.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
group.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
group.add_option("--comment", metavar = "message", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
group.add_option("--fir-stride", metavar = "seconds", type = "float", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc.; may not contain \".\" nor \"-\".")
group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ")
group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
group.add_option("--reconstruction-segment", metavar = "start:stop", action = "append", help = "Only reconstruct the SNRs for this time interval (optional). Can be provided multiple times.")
group.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
group.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
group.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
parser.add_option_group(group)

Chad Hanna
committed
options, filenames = parser.parse_args()

Leo P. Singer
committed
#
# do this before messing with options object
#
process_params = options.__dict__.copy()
# extract data source configuration
#
detectors = datasource.GWDataSourceInfo(options)
#
# extract information about principle I/O files
# FIXME Put all svd banks for different detectors in one file.
if options.svd_bank:
if options.svd_bank_cache:
raise ValueError("cannot supply both --svd-bank and --svd-bank-cache")
svd_banks = map(inspiral.parse_svdbank_string, options.svd_bank)
except ValueError as e:
print "Unable to parse --svd-bank"
raise
elif options.svd_bank_cache:
svd_banks = []
svd_bank_cache = map(CacheEntry, open(options.svd_bank_cache))
if not svd_bank_cache:
raise ValueError("--svd-bank-cache is empty")
svd_bank_cache.sort(key = lambda cache_entry: cache_entry.description)
for key, seq in itertools.groupby(svd_bank_cache, key = lambda cache_entry: cache_entry.description):
svd_banks.append(dict((cache_entry.observatory, cache_entry.url) for cache_entry in seq))
else:
missing_options.append("either --svd-bank-cache or at least one --svd-bank")

Ryan Everett
committed
if options.output:
if options.output_cache:
raise ValueError("cannot supply both --output and --output-cache")
elif options.output_cache:
# do this out-of-place to preserve process_params' contents
options.output = [CacheEntry(line).url for line in open(options.output_cache)]
if not options.output:
raise ValueError("--output-cache is empty")
else:
missing_options.append("either --output-cache or at least one --output")

Ryan Everett
committed
if options.ranking_stat_output:
if options.ranking_stat_output_cache:
raise ValueError("cannot supply both --ranking-stat-output and --ranking-stat-output-cache")
elif options.ranking_stat_output_cache:
# do this out-of-place to preserve process_params' contents
options.ranking_stat_output = [CacheEntry(line).url for line in open(options.ranking_stat_output_cache)]
if not options.ranking_stat_output:
raise ValueError("--ranking-stat-output-cache is empty")
if not options.time_slide_file:
missing_options.append("--time-slide-file")
raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))

Leo P. Singer
committed
#
# check sanity of the input and output file collections
#
if len(svd_banks) != len(options.output):
raise ValueError("must supply exactly as many --svd-bank options as --output")
if options.ranking_stat_output and len(options.ranking_stat_output) != len(options.output):
raise ValueError("must supply either none or exactly as many --ranking-stat-output options as --output")
if options.likelihood_snapshot_interval and not options.ranking_stat_output:
raise ValueError("must set --ranking-stat-output when --likelihood-snapshot-interval is set")

Chad Hanna
committed
if options.ranking_stat_output is None or len(options.ranking_stat_output) == 0:
options.ranking_stat_output = [None] * len(options.output)
required_urls = [options.time_slide_file]
for svd_bank_set in svd_banks:

Cody Messick
committed
required_urls += svd_bank_set.values()
for filename in (options.veto_segments_file, options.injections, options.blind_injections, options.reference_psd):
if filename:
required_urls.append(filename)

Cody Messick
committed
for i in range(len(required_urls)):

Cody Messick
committed
required_urls[i] = ligolw_utils.local_path_from_url(required_urls[i])

Cody Messick
committed
required_urls[i] = None
missing_files = [filename for filename in required_urls if filename is not None and not os.access(filename, os.R_OK)]
raise ValueError("one or more required files cannot be found or are not readable: %s" % ", ".join("'%s'" % filename for filename in sorted(missing_files)))
# check sanity of overall configuration
if options.blind_injections and options.injections:
raise ValueError("cannot set both --blind-injections and --injections")
if options.data_source in ("lvshm", "framexmit"):
for option in ["job_tag", "ranking_stat_input", "ranking_stat_pdf", "zerolag_rankingstat_pdf"]:
if getattr(options, option) is None:
missing_options.append("--%s" %option.replace("_","-"))
if missing_options:
raise ValueError("missing required option(s) %s when --data-source is lvshm or framexmit" % ", ".join(missing_options))
if "-" in options.job_tag or "." in options.job_tag:
raise ValueError("invalid characters in --job-tag \"%s\"" % options.job_tag)
if len(svd_banks) > 1:
raise ValueError("more than one --svd-bank not allowed when --datasource is lvshm or framexmit, have %d" % len(svd_banks))

Chad Hanna
committed
# make an "infinite" extent segment
detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))

Chad Hanna
committed
# this gets set so that if you log into a node you can find out what the job id is easily
os.environ['GSTLAL_LL_JOB'] = options.job_tag
chad.hanna
committed
# FIXME: this is ugly, but we have to protect against busted shared memory partitions
if options.data_source == "lvshm":
import subprocess
for partition in detectors.shm_part_dict.values():
subprocess.call(["smrepair", partition])

Chad Hanna
committed
else:
bad_options = []

Chad Hanna
committed
for option in ["job_tag"]:

Chad Hanna
committed
if getattr(options, option) is not None:

Chad Hanna
committed
bad_options.append("--%s" % option.replace("_","-"))

Chad Hanna
committed
if bad_options:
raise ValueError("cannot set %s when --data-source is not lvshm or framexmit " % ", ".join(bad_options))

Chad Hanna
committed
if options.reference_psd is None:
options.track_psd = True
if options.psd_fft_length < 4:
raise ValueError("--psd-fft-length cannot be less than 4")
if options.psd_fft_length < 32:
warnings.warn("It is not advised to choose PSD lengths below 32s, since a loss in sensitivity may occur")
if options.local_frame_caching and not options.data_source == "frames":
raise ValueError("--local-frame-caching can only be used if --data-source is frames")
if options.chisq_type not in ["autochisq", "timeslicechisq"]:
raise ValueError("--chisq-type must be one of (autochisq|timeslicechisq), got %s" % (options.chisq_type))
if options.likelihood_snapshot_interval is not None and options.likelihood_snapshot_interval <= 0.:
raise ValueError("--likelihood-snapshot-interval cannot be <= 0")
if options.ranking_stat_input:
if not options.likelihood_snapshot_interval:
raise ValueError("must set --likelihood-snapshot-interval when --ranking-stat-input is set")
if not options.ht_gate_threshold:
# default threshold is +inf = disable feature.
options.ht_gate_threshold = [float("inf")] * len(svd_banks)
elif len(options.ht_gate_threshold) != len(svd_banks):
raise ValueError("must supply either none or exactly as many --ht-gate-threshold values options as --svd-bank")
if not options.zerolag_rankingstat_pdf:
options.zerolag_rankingstat_pdf = [None] * len(svd_banks)
elif len(options.zerolag_rankingstat_pdf) != len(svd_banks):
raise ValueError("must supply either none or exactly as many --zerolag-rankingstat-pdf options as --svd-bank")
if options.min_instruments < 1:
raise ValueError("--min-instruments must be >= 1")
if options.min_instruments > len(detectors.channel_dict):
raise ValueError("--min-instruments (%d) is greater than the number of --channel-name's (%d)" % (options.min_instruments, len(detectors.channel_dict)))
#
# Option checks complete
#
options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)

Chad Hanna
committed
if options.blind_injections is not None:
detectors.injection_filename = options.blind_injections

Chad Hanna
committed
# Setup local caching
if options.local_frame_caching:
f, fname = tempfile.mkstemp(".cache")
if options.verbose:
print >> sys.stderr, "caching frame data locally to ", fname
f = open(fname, "w")
# FIXME: should try to down-sample if possible. there are
# MDCs data sets floating around whose streams do not start
# on integer second boundaries, however, and it's possible
# to trigger a failure in the frame muxer if those get
# down-sampled so for now we're not doing any resampling.
# later, when we don't care about those MDCs, we can go
# back to down-sampling. if not being able to down-sample
# is a problem in the meantime, I think the best way
# forward is to clip the start of said streams using a drop
# element and a (to be written) buffer probe that figures
# out how many samples to clip off the start of the stream.
# FIXME shouldn't use tempfile.gettempdir() directly, use
# _CONDOR_SCRATCH_DIR like glue??
# FIXME, note that at least for now condor sets TMPDIR to the
# run scratch space so this *will* work properly
detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), verbose = options.verbose)

Chad Hanna
committed
for cacheentry in detectors.local_cache_list:

Chad Hanna
committed
# Guarantee a lal cache complient file with only integer starts and durations.
cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )

Chad Hanna
committed
print >>f, str(cacheentry)
detectors.frame_cache = fname
# the injections are now present in the data so we don't want to do them twice
detectors.injection_filename = None

Chad Hanna
committed
# we're done
return options, filenames, process_params, svd_banks, detectors
#
# =============================================================================
#
# Signal Handler
#
# =============================================================================
#
class OneTimeSignalHandler(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.count = 0
def __call__(self, signum, frame):
self.count += 1
if self.count == 1:
print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
try:
#FIXME how do I choose a timestamp?
self.pipeline.get_bus().post(lloidhandler.message_new_checkpoint(self.pipeline, timestamp = inspiral.now().ns()))
if not self.pipeline.send_event(Gst.Event.new_eos()):
raise Exception("pipeline.send_event(EOS) returned failure")
except Exception as e:
print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
os._exit(1)
else:
print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
#
# =============================================================================
#
# Horizon Distances
#
# =============================================================================
#
#
# =============================================================================
#
# Main
#
# =============================================================================
#
#
# parse command line
#
options, filenames, process_params, svd_banks, detectors = parse_command_line()

Chad Hanna
committed
if not options.check_time_stamps:
pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src

Chad Hanna
committed
#
# Parse the vetos segments file(s) if provided
#
if options.veto_segments_file is not None:
veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
else:
veto_segments = None
#
# Load the time slide vectors, and use them to define the complete
# instrument list for the network
#
offsetvectors = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(options.time_slide_file, contenthandler = LIGOLWContentHandler, verbose = options.verbose)).as_dict().values()
all_instruments = reduce(lambda a, b: a | set(b), offsetvectors, set())
if len(all_instruments) < options.min_instruments:
raise ValueError("--time-slide-file \"%s\" names %s but we need at least %d instruments" % (options.time_slide_file, ", ".join(sorted(all_instruments)), options.min_instruments))
if not (all_instruments >= set(detectors.channel_dict)):
raise ValueError("--time-slide-file names %s but have channel names for %s" % (", ".join(sorted(all_instruments)), ", ".join(sorted(detectors.channel_dict))))

Chad Hanna
committed
# set up the PSDs
#
# There are three modes for psds in this program
# 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
# 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
# 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first guess will come from the users provided psd
#
if options.reference_psd is not None:
psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
if set(psd) != set(detectors.channel_dict):
raise ValueError("missing PSD(s) for %s" % ", ".join(sorted(set(detectors.channel_dict) - set(psd))))
else:

Chad Hanna
committed
#

Chad Hanna
committed
# Process svd banks in serial
for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, zerolag_rankingstat_pdf, ht_gate_threshold) in enumerate(zip(svd_banks, options.output, options.ranking_stat_output, options.zerolag_rankingstat_pdf, options.ht_gate_threshold)):
#
# Checkpointing only supported for gzip files in offline analysis
# FIXME Implement a means by which to check for sqlite file
# validity

Cody Messick
committed
if options.data_source not in ("lvshm", "framexmit") and output_url.endswith('.gz'):
# a single .read() would be easier but looping over
# lines uses less memory
for line in gzip.open(ligolw_utils.local_path_from_url(output_url)):
pass
if ranking_stat_output_url is not None:
for line in gzip.open(ligolw_utils.local_path_from_url(ranking_stat_output_url)):
# File is OK and there is no need to process it,
# skip ahead in the loop
except IOError:
# File does not exist or is corrupted, need to
# reprocess

Cody Messick
committed
print >>sys.stderr, "Checkpoint: {0} of {1} files completed and continuing with {2}".format(output_file_number, len(options.output), os.path.basename(output_url))
#
# create a new, empty, Bottle application and make it the current
# default, then start http server(s) to serve it up
#
bottle.default_app.push()
# uncomment the next line to show tracebacks when something fails
# in the web server
#bottle.app().catchall = False
httpservers = httpinterface.HTTPServers(
service_name = "%s.gstlal_inspiral" % (options.job_tag if options.job_tag is not None else base64.urlsafe_b64encode(uuid.uuid4().bytes)),
service_domain = service_domain(options.gracedb_search, options.gracedb_pipeline),
"cwd": os.getcwd(),
"pid": str(os.getpid()),
},
verbose = options.verbose
)
#
# Set up a registry of the resources that this job provides
#
@bottle.route("/")
@bottle.route("/index.html")
def index(job_tag = options.job_tag, instruments = all_instruments):
# get the host and port to report in the links from the
# request we've received so that the URLs contain the IP
# address by which client has contacted us
netloc = bottle.request.urlparts[1]
server_address = "http://%s" % netloc
yield "<html><body>\n<h3>%s %s %s %s</h3>\n<p>\n" % (job_tag, os.environ.get("GSTLAL_LL_JOB"), netloc, " ".join(sorted(instruments)))
for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
if route.rule in ("/", "/index.html"):
if route.method != "GET":
continue
yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
yield "</p>\n</body></html>"
Kipp Cannon
committed
# FIXME: get service-discovery working, then don't do this
if "GSTLAL_LL_JOB" in os.environ:
open("%s_registry.txt" % os.environ["GSTLAL_LL_JOB"], "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
#
# parse SVD template bank and expose bank metadata
#

Cody Messick
committed
banks = inspiral.parse_bank_files(svd_bank_url_dict, verbose = options.verbose)
# assume all instruments have the same templates, just extract them
# from one of the instruments at random
sngl_inspiral_table = banks.values()[0][0].sngl_inspiral_table.copy()
horizon_factors = {}
for bank in banks.values()[0]:
sngl_inspiral_table.extend(bank.sngl_inspiral_table)
horizon_factors.update(bank.horizon_factors)
@bottle.route("/template_bank.xml.gz")
def get_template_bank_xml(sngl_inspiral_table = sngl_inspiral_table):
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
map(xmldoc.childNodes[-1].appendChild(sngl_inspiral_table.copy()).append, sngl_inspiral_table)
output = StringIO.StringIO()
ligolw_utils.write_fileobj(xmldoc, output, gz = True)
outstr = output.getvalue()
output.close()
return outstr
template_ids = frozenset(row.template_id for row in sngl_inspiral_table)
assert set(horizon_factors) == template_ids, "horizon factors are not assigned for each template id"
assert len(template_ids) == len(sngl_inspiral_table), "template IDs are not unique within the template bank"
@bottle.route("/template_ids.txt")
def get_template_ids(template_ids = sorted(template_ids)):
return "\n".join("%d" % template_id for template_id in template_ids)
# Choose to optionally reconstruct segments around injections (not
# blind injections!)
Sarah Caudill
committed
if options.injections:
offset_padding = max(math.ceil(abs(row.end))+1 for bank in banks.items()[0][-1] for row in bank.sngl_inspiral_table)
Sarah Caudill
committed
reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections, pad = offset_padding)
elif options.reconstruction_segment:
reconstruction_segment_list = segments.segmentlist()
for reconstruction_segment in options.reconstruction_segment:
t0, t1 = reconstruction_segment.split(":")
reconstruction_segment_list.append(segments.segment(LIGOTimeGPS(int(t0)), LIGOTimeGPS(int(t1))))
Sarah Caudill
committed
else:
reconstruction_segment_list = None
@bottle.route("/bank.txt")
def get_filter_length_and_chirpmass(banks = banks):
bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
yield '%.14g %.4g %.4g' % (float(inspiral.now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)

Chad Hanna
committed
#
# Build pipeline
#

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "assembling pipeline ...",
pipeline = Gst.Pipeline(name="gstlal_inspiral")
mainloop = GObject.MainLoop()

Chad Hanna
committed
triggersrc = lloidparts.mkLLOIDmulti(
pipeline,
detectors = detectors,
banks = banks,
psd = psd,
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = ht_gate_threshold,

Chad Hanna
committed
veto_segments = veto_segments,
verbose = options.verbose,
nxydump_segment = options.nxydump_segment,
chisq_type = options.chisq_type,
track_psd = options.track_psd,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
reconstruction_segment_list = reconstruction_segment_list
)

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "done"

Chad Hanna
committed
#
# Load/Initialize ranking statistic data.

Chad Hanna
committed
#
rankingstat = None
if options.ranking_stat_input:
try:
xmldoc = ligolw_utils.load_url(options.ranking_stat_input, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)
except IOError:
print >>sys.stderr, "warning: '%s' not found or cannot be loaded; will fall back to newly-initialized ranking statistic" % options.ranking_stat_input
else:
rankingstat, _ = far.parse_likelihood_control_doc(xmldoc)
if rankingstat is None:
raise ValueError("\"%s\" does not contain parameter distribution data" % options.ranking_stat_input)
if rankingstat.delta_t != options.coincidence_threshold:
raise ValueError("\"%s\" is for delta_t=%g, we need %g" % (options.ranking_stat_input, rankingstat.denominator.delta_t, options.coincidence_threshold))
if rankingstat.min_instruments != options.min_instruments:
raise ValueError("\"%s\" is for min instruments = %d but we need %d" % (options.ranking_stat_input, rankingstat.denominator.min_instruments, options.min_instruments))
if rankingstat.instruments != all_instruments:
raise ValueError("\"%s\" is for %s but we need %s" % (options.ranking_stat_input, ", ".join(sorted(rankingstat.instruments)), ", ".join(sorted(all_instruments))))
if rankingstat.template_ids is None:
rankingstat.template_ids = template_ids
elif rankingstat.template_ids != template_ids:
raise ValueError("\"%s\" is for the wrong templates" % options.ranking_stat_input)
rankingstat.numerator.set_horizon_factors(horizon_factors)
if rankingstat is None:
rankingstat = far.RankingStat(template_ids = template_ids, instruments = all_instruments, delta_t = options.coincidence_threshold, min_instruments = options.min_instruments, horizon_factors = horizon_factors)
Kipp Cannon
committed

Chad Hanna
committed
#
# construct and configure pipeline handler

Chad Hanna
committed
#

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "initializing pipeline handler ..."
handler = lloidhandler.Handler(
mainloop = mainloop,
pipeline = pipeline,
coincs_document = inspiral.CoincsDocument(
url = output_url or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.instrumentsproperty.set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
process_params = process_params,

Chad Hanna
committed
process_start_time = GSTLAL_PROCESS_START_TIME,
comment = options.comment,
instruments = rankingstat.instruments,
offsetvectors = offsetvectors,
injection_filename = options.injections,
tmp_path = options.tmp_space,
replace_file = True,
verbose = options.verbose
),
rankingstat = rankingstat,
horizon_distance_func = banks.values()[0][0].horizon_distance_func,# they should all be the same
gracedbwrapper = inspiral.GracedBWrapper(
instruments = rankingstat.instruments,
far_threshold = options.gracedb_far_threshold,
min_instruments = options.min_instruments,
group = options.gracedb_group,
search = options.gracedb_search,
pipeline = options.gracedb_pipeline,
service_url = options.gracedb_service_url,

Chad Hanna
committed
upload_auxiliary_data = True,
ranking_stat_input_url = options.ranking_stat_input,
ranking_stat_output_url = ranking_stat_output_url,
rankingstatpdf_url = options.ranking_stat_pdf,
zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf,
likelihood_snapshot_interval = options.likelihood_snapshot_interval,
sngls_snr_threshold = options.singles_threshold,
tag = options.job_tag,
kafka_server = options.output_kafka_server,
chad.hanna
committed
cluster = True,#options.data_source in ("lvshm", "framexmit"),# If uncommented, we only cluster when running online

Chad Hanna
committed
cap_singles = options.cap_singles,
FAR_trialsfactor = options.FAR_trialsfactor,
verbose = options.verbose
)
if options.verbose:
print >>sys.stderr, "... pipeline handler initialized"

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline ...",
appsync = pipeparts.AppSync(appsink_new_buffer = handler.appsink_new_buffer)

Cody Messick
committed
appsinks = set(appsync.add_sink(pipeline, src, caps = Gst.Caps.from_string("application/x-lal-snglinspiral"), name = "bank_%s_sink" % bank_id) for bank_id, src in triggersrc.items())

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "attached %d, done" % len(appsinks)

Chad Hanna
committed
#
# if we request a dot graph of the pipeline, set it up
#

Chad Hanna
committed
if options.write_pipeline is not None:
pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)

Chad Hanna
committed
#

Chad Hanna
committed
# Run pipeline

Chad Hanna
committed
#

Chad Hanna
committed
if options.data_source in ("lvshm", "framexmit"):

Chad Hanna
committed
#
# setup sigint handler to shutdown pipeline. this is how
# the program stops gracefully, it is the only way to stop
# it. Otherwise it runs forever man.

Chad Hanna
committed
#
# this is only done in the online case because we want
# offline jobs to just die and not write partial databases
#

Chad Hanna
committed
signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
print >>sys.stderr, "setting pipeline state to ready ..."
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter ready state")
datasource.pipeline_seek_for_gps(pipeline, *detectors.seg)

Chad Hanna
committed
if options.verbose:
print >>sys.stderr, "setting pipeline state to playing ..."
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter playing state")

Chad Hanna
committed
if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "running pipeline ..."
mainloop.run()

Chad Hanna
committed

Chad Hanna
committed
#

Chad Hanna
committed
# write output file

Chad Hanna
committed
#
handler.write_output_url(url = output_url or handler.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)

Chad Hanna
committed

Chad Hanna
committed
#
# Cleanup template bank temp files
#

Chad Hanna
committed
for ifo in banks:
for bank in banks[ifo]:
if options.verbose:
print >> sys.stderr, "removing file: ", bank.template_bank_filename
os.remove(bank.template_bank_filename)
#
# Shutdown the web interface servers and garbage collect the Bottle
# app. This should release the references the Bottle app's routes
# hold to the pipeline's data (like template banks and so on).
#
del httpservers
bottle.default_app.pop()
#
# Set pipeline state to NULL and garbage collect the handler
#
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")

Chad Hanna
committed
if options.local_frame_caching:
if options.verbose:
print >>sys.stderr, "deleting temporary cache file ", detectors.frame_cache
os.remove(detectors.frame_cache)
del detectors.local_cache_list
# done. online pipeline always ends with an error code so that dagman does
# not mark the job "done" and the job will be restarted when the dag is
# restarted.

Chad Hanna
committed
if options.data_source in ("lvshm", "framexmit"):