-
Kipp Cannon authored
- the template IDs are stored in the Gamma0 column, which is a float. the ID values must be ints or the ranking statistic barfs.
Kipp Cannon authored- the template IDs are stored in the Gamma0 column, which is a float. the ID values must be ints or the ranking statistic barfs.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_inspiral 42.35 KiB
#!/usr/bin/env python
#
# Copyright (C) 2009-2014 Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Stream-based inspiral analysis tool"""
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
### A program to analyze gravitational wave data for compact binary coalescence in real time or in an offline mode
###
### .. graphviz::
###
### digraph llpipe {
### labeljust = "r";
### label="gstlal_inspiral"
### rankdir=LR;
### graph [fontname="Roman", fontsize=24];
### edge [ fontname="Roman", fontsize=10 ];
### node [fontname="Roman", shape=box, fontsize=11];
###
### gracedb [label="GW\nCandidate\nDatabase", shape=oval, color=tomato3, style=filled];
###
###
### subgraph clusterNodeN {
###
### style=rounded;
### label="gstreamer pipeline";
### labeljust = "r";
### fontsize = 14;
###
### H1src [label="H1 data source:\n mkbasicsrc()", color=red4];
### L1src [label="L1 data source:\n mkbasicsrc()", color=green4];
### V1src [label="V1 data source:\n mkbasicsrc()", color=magenta4];
###
### H1multirate [label="H1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4];
### L1multirate [label="L1 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4];
### V1multirate [label="V1 whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4];
###
### H1LLOID [label="H1 LLOID filtering engine:\nmkLLOIDmulti()", color=red4];
### L1LLOID [label="L1 LLOID filtering engine:\nmkLLOIDmulti()", color=green4];
### V1LLOID [label="V1 LLOID filtering engine:\nmkLLOIDmulti()", color=magenta4];
###
### H1Trig1 [label="H1 Triggering:\nsub bank 1", color=red4];
### L1Trig1 [label="L1 Triggering:\nsub bank 1", color=green4];
### V1Trig1 [label="V1 Triggering:\nsub bank 1", color=magenta4];
### H1Trig2 [label="H1 Triggering:\nsub bank 2", color=red4];
### L1Trig2 [label="L1 Triggering:\nsub bank 2", color=green4];
### V1Trig2 [label="V1 Triggering:\nsub bank 2", color=magenta4];
### H1TrigN [label="H1 Triggering:\nsub bank N", color=red4];
### L1TrigN [label="L1 Triggering:\nsub bank N", color=green4];
### V1TrigN [label="V1 Triggering:\nsub bank N", color=magenta4];
###
### H1src -> H1multirate;
### L1src -> L1multirate;
### V1src -> V1multirate;
###
### H1multirate -> H1LLOID [label="h(t) 4096Hz"];
### L1multirate -> L1LLOID [label="h(t) 4096Hz"];
### V1multirate -> V1LLOID [label="h(t) 4096Hz"];
### H1multirate -> H1LLOID [label="h(t) 2048Hz"];
### L1multirate -> L1LLOID [label="h(t) 2048Hz"];
### V1multirate -> V1LLOID [label="h(t) 2048Hz"];
### H1multirate -> H1LLOID [label="h(t) Nth-pow-of-2 Hz"];
### L1multirate -> L1LLOID [label="h(t) Nth-pow-of-2 Hz"];
### V1multirate -> V1LLOID [label="h(t) Nth-pow-of-2 Hz"];
###
### H1LLOID -> H1Trig1 [label="SNRs sub bank 1"];
### L1LLOID -> L1Trig1 [label="SNRs sub bank 1"];
### V1LLOID -> V1Trig1 [label="SNRs sub bank 1"];
### H1LLOID -> H1Trig2 [label="SNRs sub bank 2"];
### L1LLOID -> L1Trig2 [label="SNRs sub bank 2"];
### V1LLOID -> V1Trig2 [label="SNRs sub bank 2"];
### H1LLOID -> H1TrigN [label="SNRs sub bank N"];
### L1LLOID -> L1TrigN [label="SNRs sub bank N"];
### V1LLOID -> V1TrigN [label="SNRs sub bank N"];
### }
###
###
### Coincidence [label="Coincidence\nO(1)s latency"];
### SigEst [label="Significance\nEstimation\nO(1)s latency"];
### Thresh [label="Thresholding\nO(1)s latency"];
### EventGen [label="Event\nGeneration\nO(1)s latency"];
###
### H1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### L1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### V1Trig1 -> Coincidence [label="Trigs sub bank 1"];
### H1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### L1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### V1Trig2 -> Coincidence [label="Trigs sub bank 2"];
### H1TrigN -> Coincidence [label="Trigs sub bank N"];
### L1TrigN -> Coincidence [label="Trigs sub bank N"];
### V1TrigN -> Coincidence [label="Trigs sub bank N"];
###
### Coincidence -> SigEst -> Thresh -> EventGen;
###
### EventGen -> gracedb;
###
### }
###
### Review Status
### -------------
###
### +------------------------------------------------+---------------------------------------------+------------+
### | Names | Hash | Date |
### +================================================+=============================================+============+
### | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 9074294d6b57f43651143b5f93210751de1fe55a | 2014-05-02 |
### +------------------------------------------------+---------------------------------------------+------------+
###
import base64
try:
from fpconst import PosInf
except ImportError:
# fpconst is not part of the standard library and might not be
# available
PosInf = float("+inf")
import gzip
import itertools
import math
from optparse import OptionGroup, OptionParser
import os
import resource
import signal
import socket
import StringIO
import sys
import tempfile
import uuid
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
import lal
from lal import LIGOTimeGPS
from lal.utils import CacheEntry
from ligo.gracedb.rest import DEFAULT_SERVICE_URL as gracedb_default_service_url
from glue import segments
from glue import segmentsUtils
from glue.ligolw import ligolw
from glue.ligolw import lsctables
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import segments as ligolw_segments
from gstlal import bottle
from gstlal import datasource
from gstlal import lloidparts
from gstlal import far
from gstlal import httpinterface
from gstlal import hoftcache
from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import pipeparts
from gstlal import simulation
@lsctables.use_in
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay
#
def setrlimit(res, lim):
hard_lim = resource.getrlimit(res)[1]
resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))
# set the number of processes and total set size up to hard limit and
# shrink the per-thread stack size (default is 10 MiB)
setrlimit(resource.RLIMIT_NPROC, None)
setrlimit(resource.RLIMIT_AS, None)
setrlimit(resource.RLIMIT_RSS, None)
# FIXME: tests at CIT show that this next tweak has no effect. it's
# possible that SL7 has lowered the default stack size from SL6 and we
# don't need to do this anymore. remove?
setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
def parse_command_line():
parser = OptionParser(
description = __doc__
)
# append all the datasource specific options
datasource.append_options(parser)
group = OptionGroup(parser, "PSD Options", "Adjust noise spectrum estimation parameters")
group.add_option("--psd-fft-length", metavar = "seconds", default = 32, type = "int", help = "The length of the FFT used to used to whiten the data (default is 32 s).")
group.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
group.add_option("--track-psd", action = "store_true", help = "Enable dynamic PSD tracking. Always enabled if --reference-psd is not given.")
parser.add_option_group(group)
group = OptionGroup(parser, "Data Qualtiy", "Adjust data quality handling")
group.add_option("--ht-gate-threshold", metavar = "sigma", type = "float", action = "append", default = [], help = "Set the threshold on whitened h(t) to excise a glitches in units of standard deviations (optional). If given, exactly as many h(t) thresholds must be set as svd-bank files will be processed (see --svd-bank).")
group.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
group.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option_group(group)
group = OptionGroup(parser, "Trigger Generator", "Adjust trigger generator behaviour")
group.add_option("--control-peak-time", metavar = "seconds", type = "int", help = "Set a time window in seconds to find peaks in the control signal (optional, default is to disable composite detection statistic).")
group.add_option("--output", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required). Can be given multiple times. Exactly as many output files must be specified as svd-bank files will be processed (see --svd-bank).")
group.add_option("--output-cache", metavar = "filename", help = "Provide a cache of output files. This can be used instead of giving multiple --output options. Cannot be combined with --output.")
group.add_option("--svd-bank", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file. These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times, unless --data-source is lvshm or framexmit in which case it must be given exactly once. If given multiple times, the banks will be processed one-by-one, in order. At least one svd bank for at least 2 detectors is required, but see also --svd-bank-cache.")
group.add_option("--svd-bank-cache", metavar = "filename", help = "Provide a cache file of svd-bank files. This can be used instead of giving multiple --svd-bank options. Cannot be combined with --svd-bank options.")
# NOTE: the clustering SQL scripts search for this option in the
# process_params table to determine the threshold below which it
# can delete uninteresting singles after the coincs are ranked. if
# the name of this option is changed, be sure to update
# simplify_and_cluster.sql and derivatives
group.add_option("--singles-threshold", metavar = "SNR", type = "float", default = PosInf, help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
parser.add_option_group(group)
group = OptionGroup(parser, "Ranking Statistic Options", "Adjust ranking statistic behaviour")
group.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
group.add_option("--coincidence-threshold", metavar = "seconds", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005 s). The light-travel time between instruments will be added automatically in the coincidence test.")
group.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
group.add_option("--min-log-L", metavar = "log likelihood ratio", type = "float", help = "Discard candidates that get assigned log likelihood ratios below this threshold (default = keep all). When used without --ranking-stat-input, the cut is decided based on an internal approximate ranking statistic.")
group.add_option("--ranking-stat-input", metavar = "url", help = "Set the URL from which to load a ranking statistic definition. When this is enabled, signal candidates will have ranking statistic values assigned on-the-fly, and the --min-log-L cut will be applied based on the assigned values. Required when --data-source is lvshm or framexmit; must also set --likelihood-snapshot-interval.")
group.add_option("--ranking-stat-output", metavar = "filename", action = "append", default = [], help = "Set the name of the file to which to write ranking statistic data collected from triggers (optional). Can be given more than once. If given, exactly as many must be provided as there are --svd-bank options and they will be writen to in order.")
group.add_option("--ranking-stat-output-cache", metavar = "filename", help = "Provide a cache of ranking statistic output files. This can be used instead of giving multiple --ranking-stat-output options. Cannot be combined with --ranking-stat-output.")
group.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to snapshot candidate and ranking statistic data to disk when running online.")
group.add_option("--ranking-stat-pdf", metavar = "url", help = "Set the URL from which to load the ranking statistic PDF. This is used to compute false-alarm probabilities and false-alarm rates and is required for online operation (when --data-source is framexmit or lvshm). It is forbidden for offline operation (all other data sources)")
group.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets (required).")
group.add_option("--zerolag-rankingstat-pdf", metavar = "url", action = "append", help = "Record a histogram of the likelihood ratio ranking statistic values assigned to zero-lag candidates in this XML file. This is used to construct the extinction model and set the overall false-alarm rate normalization during online running. If it does not exist at start-up, a new file will be initialized, otherwise the counts will be added to the file's contents. Required when --data-source is lvshm or framexmit; optional otherwise. If given, exactly as many must be provided as there are --svd-bank options and they will be used in order.")
parser.add_option_group(group)
group = OptionGroup(parser, "GracedB Options", "Adjust GracedB interaction")
group.add_option("--gracedb-far-threshold", metavar = "Hertz", type = "float", help = "False-alarm rate threshold for gracedb uploads in Hertz (default = do not upload to gracedb).")
group.add_option("--gracedb-group", metavar = "name", default = "Test", help = "Gracedb group to which to upload events (default is Test).")
group.add_option("--gracedb-pipeline", metavar = "name", default = "gstlal", help = "Name of pipeline to provide in GracedB uploads (default is gstlal).")
group.add_option("--gracedb-search", metavar = "name", default = "LowMass", help = "Name of search to provide in GracedB uploads (default is LowMass).")
group.add_option("--gracedb-service-url", metavar = "url", default = gracedb_default_service_url, help = "Override default GracedB service url (optional, default is %s)." % gracedb_default_service_url)
parser.add_option_group(group)
group = OptionGroup(parser, "Program Behaviour")
group.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
group.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
group.add_option("--comment", metavar = "message", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
group.add_option("--fir-stride", metavar = "seconds", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc.; may not contain \".\" nor \"-\".")
group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ")
group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
group.add_option("--thinca-interval", metavar = "seconds", type = "float", default = 30.0, help = "Set the thinca interval (default = 30 s).")
group.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
group.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
parser.add_option_group(group)
options, filenames = parser.parse_args()
missing_options = []
#
# do this before messing with options object
#
process_params = options.__dict__.copy()
#
# extract data source configuration
#
detectors = datasource.GWDataSourceInfo(options)
#
# extract information about principle I/O files
#
# FIXME Put all svd banks for different detectors in one file.
if options.svd_bank:
if options.svd_bank_cache:
raise ValueError("cannot supply both --svd-bank and --svd-bank-cache")
try:
svd_banks = map(inspiral.parse_svdbank_string, options.svd_bank)
except ValueError as e:
print "Unable to parse --svd-bank"
raise
elif options.svd_bank_cache:
svd_banks = []
svd_bank_cache = map(CacheEntry, open(options.svd_bank_cache))
if not svd_bank_cache:
raise ValueError("--svd-bank-cache is empty")
svd_bank_cache.sort(key = lambda cache_entry: cache_entry.description)
for key, seq in itertools.groupby(svd_bank_cache, key = lambda cache_entry: cache_entry.description):
svd_banks.append(dict((cache_entry.observatory, cache_entry.url) for cache_entry in seq))
else:
missing_options.append("either --svd-bank-cache or at least one --svd-bank")
if options.output:
if options.output_cache:
raise ValueError("cannot supply both --output and --output-cache")
elif options.output_cache:
# do this out-of-place to preserve process_params' contents
options.output = [CacheEntry(line).url for line in open(options.output_cache)]
if not options.output:
raise ValueError("--output-cache is empty")
else:
missing_options.append("either --output-cache or at least one --output")
if options.ranking_stat_output:
if options.ranking_stat_output_cache:
raise ValueError("cannot supply both --ranking-stat-output and --ranking-stat-output-cache")
elif options.ranking_stat_output_cache:
# do this out-of-place to preserve process_params' contents
options.ranking_stat_output = [CacheEntry(line).url for line in open(options.ranking_stat_output_cache)]
if not options.ranking_stat_output:
raise ValueError("--ranking-stat-output-cache is empty")
if not options.time_slide_file:
missing_options.append("--time-slide-file")
if missing_options:
raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
#
# check sanity of the input and output file collections
#
if len(svd_banks) != len(options.output):
raise ValueError("must supply exactly as many --svd-bank options as --output")
if options.ranking_stat_output and len(options.ranking_stat_output) != len(options.output):
raise ValueError("must supply either none or exactly as many --ranking-stat-output options as --output")
if options.likelihood_snapshot_interval and not options.ranking_stat_output:
raise ValueError("must set --ranking-stat-output when --likelihood-snapshot-interval is set")
if options.ranking_stat_output is None or len(options.ranking_stat_output) == 0:
options.ranking_stat_output = [None] * len(options.output)
required_urls = [options.time_slide_file]
for svd_bank_set in svd_banks:
required_urls += svd_bank_set.values()
for filename in (options.veto_segments_file, options.injections, options.blind_injections, options.reference_psd):
if filename:
required_urls.append(filename)
for i in range(len(required_urls)):
try:
required_urls[i] = ligolw_utils.local_path_from_url(required_urls[i])
except ValueError:
required_urls[i] = None
missing_files = [filename for filename in required_urls if filename is not None and not os.access(filename, os.R_OK)]
if missing_files:
raise ValueError("one or more required files cannot be found or are not readable: %s" % ", ".join("'%s'" % filename for filename in sorted(missing_files)))
#
# check sanity of overall configuration
#
if options.blind_injections and options.injections:
raise ValueError("cannot set both --blind-injections and --injections")
if options.data_source in ("lvshm", "framexmit"):
missing_options = []
for option in ["job_tag", "ranking_stat_input", "ranking_stat_pdf", "zerolag_rankingstat_pdf"]:
if getattr(options, option) is None:
missing_options.append("--%s" %option.replace("_","-"))
if missing_options:
raise ValueError("missing required option(s) %s when --data-source is lvshm or framexmit" % ", ".join(missing_options))
if "-" in options.job_tag or "." in options.job_tag:
raise ValueError("invalid characters in --job-tag \"%s\"" % options.job_tag)
if len(svd_banks) > 1:
raise ValueError("more than one --svd-bank not allowed when --datasource is lvshm or framexmit, have %d" % len(svd_banks))
# make an "infinite" extent segment
detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
# this gets set so that if you log into a node you can find out what the job id is easily
os.environ['GSTLAL_LL_JOB'] = options.job_tag
else:
bad_options = []
for option in ["job_tag", "ranking_stat_pdf", "likelihood_snapshot_interval"]:
if getattr(options, option) is not None:
bad_options.append("--%s" % option.replace("_","-"))
if bad_options:
raise ValueError("cannot set %s when --data-source is not lvshm or framexmit " % ", ".join(bad_options))
if options.reference_psd is None:
options.track_psd = True
if options.psd_fft_length < 32:
raise ValueError("--psd-fft-length cannot be less than 32")
if options.local_frame_caching and not options.data_source == "frames":
raise ValueError("--local-frame-caching can only be used if --data-source is frames")
if options.chisq_type not in ["autochisq", "timeslicechisq"]:
raise ValueError("--chisq-type must be one of (autochisq|timeslicechisq), got %s" % (options.chisq_type))
if options.likelihood_snapshot_interval is not None and options.likelihood_snapshot_interval <= 0.:
raise ValueError("--likelihood-snapshot-interval cannot be <= 0")
if options.ranking_stat_input:
if not options.likelihood_snapshot_interval:
raise ValueError("must set --likelihood-snapshot-interval when --ranking-stat-input is set")
if not options.ht_gate_threshold:
# default threshold is +inf = disable feature.
options.ht_gate_threshold = [float("inf")] * len(svd_banks)
elif len(options.ht_gate_threshold) != len(svd_banks):
raise ValueError("must supply either none or exactly as many --ht-gate-threshold values options as --svd-bank")
if not options.zerolag_rankingstat_pdf:
options.zerolag_rankingstat_pdf = [None] * len(svd_banks)
elif len(options.zerolag_rankingstat_pdf) != len(svd_banks):
raise ValueError("must supply either none or exactly as many --zerolag-rankingstat-pdf options as --svd-bank")
if options.min_instruments < 1:
raise ValueError("--min-instruments must be >= 1")
if options.min_instruments > len(detectors.channel_dict):
raise ValueError("--min-instruments (%d) is greater than the number of --channel-name's (%d)" % (options.min_instruments, len(detectors.channel_dict)))
#
# Option checks complete
#
options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)
if options.blind_injections is not None:
detectors.injection_filename = options.blind_injections
# Setup local caching
if options.local_frame_caching:
f, fname = tempfile.mkstemp(".cache")
if options.verbose:
print >> sys.stderr, "caching frame data locally to ", fname
f = open(fname, "w")
# FIXME: should try to down-sample if possible. there are
# MDCs data sets floating around whose streams do not start
# on integer second boundaries, however, and it's possible
# to trigger a failure in the frame muxer if those get
# down-sampled so for now we're not doing any resampling.
# later, when we don't care about those MDCs, we can go
# back to down-sampling. if not being able to down-sample
# is a problem in the meantime, I think the best way
# forward is to clip the start of said streams using a drop
# element and a (to be written) buffer probe that figures
# out how many samples to clip off the start of the stream.
# FIXME shouldn't use tempfile.gettempdir() directly, use
# _CONDOR_SCRATCH_DIR like glue??
# FIXME, note that at least for now condor sets TMPDIR to the
# run scratch space so this *will* work properly
detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), verbose = options.verbose)
for cacheentry in detectors.local_cache_list:
# Guarantee a lal cache complient file with only integer starts and durations.
cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
print >>f, str(cacheentry)
detectors.frame_cache = fname
# the injections are now present in the data so we don't want to do them twice
detectors.injection_filename = None
# we're done
return options, filenames, process_params, svd_banks, detectors
#
# =============================================================================
#
# Signal Handler
#
# =============================================================================
#
class OneTimeSignalHandler(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.count = 0
def __call__(self, signum, frame):
self.count += 1
if self.count == 1:
print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
try:
#FIXME how do I choose a timestamp?
self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp = inspiral.now().ns()))
if not self.pipeline.send_event(Gst.Event.new_eos()):
raise Exception("pipeline.send_event(EOS) returned failure")
except Exception as e:
print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
os._exit(1)
else:
print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
#
# =============================================================================
#
# Main
#
# =============================================================================
#
#
# parse command line
#
options, filenames, process_params, svd_banks, detectors = parse_command_line()
if not options.check_time_stamps:
pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src
#
# Parse the vetos segments file(s) if provided
#
if options.veto_segments_file is not None:
veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
else:
veto_segments = None
#
# Load the time slide vectors, and use them to define the complete
# instrument list for the network
#
offsetvectors = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(options.time_slide_file, contenthandler = LIGOLWContentHandler, verbose = options.verbose)).as_dict().values()
all_instruments = reduce(lambda a, b: a | set(b), offsetvectors, set())
if len(all_instruments) < options.min_instruments:
raise ValueError("--time-slide-file \"%s\" names %s but we need at least %d instruments" % (options.time_slide_file, ", ".join(sorted(all_instruments)), options.min_instruments))
if not (all_instruments >= set(detectors.channel_dict)):
raise ValueError("--time-slide-file names %s but have channel names for %s" % (", ".join(sorted(all_instruments)), ", ".join(sorted(detectors.channel_dict))))
#
# set up the PSDs
#
# There are three modes for psds in this program
# 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
# 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
# 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first guess will come from the users provided psd
#
if options.reference_psd is not None:
psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
if set(psd) != set(detectors.channel_dict):
raise ValueError("missing PSD(s) for %s" % ", ".join(sorted(set(detectors.channel_dict) - set(psd))))
else:
psd = dict.fromkeys(all_instruments, None)
#
# Process svd banks in serial
#
for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, zerolag_rankingstat_pdf, ht_gate_threshold) in enumerate(zip(svd_banks, options.output, options.ranking_stat_output, options.zerolag_rankingstat_pdf, options.ht_gate_threshold)):
#
# Checkpointing only supported for gzip files in offline analysis
# FIXME Implement a means by which to check for sqlite file
# validity
#
if options.data_source not in ("lvshm", "framexmit") and output_url.endswith('.gz'):
try:
# a single .read() would be easier but looping over
# lines uses less memory
for line in gzip.open(ligolw_utils.local_path_from_url(output_url)):
pass
if ranking_stat_output_url is not None:
for line in gzip.open(ligolw_utils.local_path_from_url(ranking_stat_output_url)):
pass
# File is OK and there is no need to process it,
# skip ahead in the loop
continue
except IOError:
# File does not exist or is corrupted, need to
# reprocess
print >>sys.stderr, "Checkpoint: {0} of {1} files completed and continuing with {2}".format(output_file_number, len(options.output), os.path.basename(output_url))
pass
#
# create a new, empty, Bottle application and make it the current
# default, then start http server(s) to serve it up
#
bottle.default_app.push()
# uncomment the next line to show tracebacks when something fails
# in the web server
#bottle.app().catchall = False
httpservers = httpinterface.HTTPServers(
service_name = "%s.gstlal_inspiral" % (options.job_tag if options.job_tag is not None else base64.urlsafe_b64encode(uuid.uuid4().bytes)),
service_properties = {
"cwd": os.getcwd(),
"pid": str(os.getpid()),
},
verbose = options.verbose
)
#
# Set up a registry of the resources that this job provides
#
@bottle.route("/")
@bottle.route("/index.html")
def index(job_tag = options.job_tag, instruments = all_instruments):
# get the host and port to report in the links from the
# request we've received so that the URLs contain the IP
# address by which client has contacted us
netloc = bottle.request.urlparts[1]
server_address = "http://%s" % netloc
yield "<html><body>\n<h3>%s %s %s %s</h3>\n<p>\n" % (job_tag, os.environ.get("GSTLAL_LL_JOB"), netloc, " ".join(sorted(instruments)))
for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
# don't create links back to this page
if route.rule in ("/", "/index.html"):
continue
# only create links for GET methods
if route.method != "GET":
continue
yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
yield "</p>\n</body></html>"
# FIXME: get service-discovery working, then don't do this
if "GSTLAL_LL_JOB" in os.environ:
open("%s_registry.txt" % os.environ["GSTLAL_LL_JOB"], "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
#
# parse SVD template bank and expose bank metadata
#
banks = inspiral.parse_bank_files(svd_bank_url_dict, verbose = options.verbose)
# assume all instruments have the same templates, just extract them
# from one of the instruments at random
sngl_inspiral_table = banks.values()[0][0].sngl_inspiral_table.copy()
for bank in banks.values()[0]:
sngl_inspiral_table.extend(bank.sngl_inspiral_table)
@bottle.route("/template_bank.xml.gz")
def get_template_bank_xml(sngl_inspiral_table = sngl_inspiral_table):
xmldoc = ligolw.Document()
xmldoc.appendChild(ligolw.LIGO_LW())
map(xmldoc.childNodes[-1].appendChild(sngl_inspiral_table.copy()).append, sngl_inspiral_table)
output = StringIO.StringIO()
ligolw_utils.write_fileobj(xmldoc, output, gz = True)
outstr = output.getvalue()
output.close()
return outstr
# FIXME: don't use Gamma0, switch to a proper column
template_ids = frozenset(map(int, sngl_inspiral_table.getColumnByName("Gamma0")))
@bottle.route("/template_ids.txt")
def get_template_ids(template_ids = sorted(template_ids)):
return "\n".join("%d" % template_id for template_id in template_ids)
# Choose to optionally reconstruct segments around injections (not
# blind injections!)
if options.injections:
offset_padding = max([int(abs(float(offset)))+2 for bank in banks.items()[0][-1] for offset in bank.sngl_inspiral_table.get_end()]) # Add 2 to achieve rounding up and adding 1
reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections, pad = offset_padding)
else:
reconstruction_segment_list = None
@bottle.route("/bank.txt")
def get_filter_length_and_chirpmass(banks = banks):
bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
yield '%.14g %.4g %.4g' % (float(inspiral.now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
#
# Build pipeline
#
if options.verbose:
print >>sys.stderr, "assembling pipeline ...",
pipeline = Gst.Pipeline(name="gstlal_inspiral")
mainloop = GObject.MainLoop()
triggersrc = lloidparts.mkLLOIDmulti(
pipeline,
detectors = detectors,
banks = banks,
psd = psd,
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = ht_gate_threshold,
veto_segments = veto_segments,
verbose = options.verbose,
nxydump_segment = options.nxydump_segment,
chisq_type = options.chisq_type,
track_psd = options.track_psd,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
reconstruction_segment_list = reconstruction_segment_list
)
if options.verbose:
print >>sys.stderr, "done"
#
# Load/Initialize ranking statistic data.
#
rankingstat = None
if options.ranking_stat_input:
try:
xmldoc = ligolw_utils.load_url(options.ranking_stat_input, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)
except IOError:
print >>sys.stderr, "warning: '%s' not found or cannot be loaded; will fall back to newly-initialized ranking statistic" % options.ranking_stat_input
else:
rankingstat, _ = far.parse_likelihood_control_doc(xmldoc)
if rankingstat is None:
raise ValueError("\"%s\" does not contain parameter distribution data" % options.ranking_stat_input)
if rankingstat.delta_t != options.coincidence_threshold:
raise ValueError("\"%s\" is for delta_t=%g, we need %g" % (options.ranking_stat_input, rankingstat.denominator.delta_t, options.coincidence_threshold))
if rankingstat.min_instruments != options.min_instruments:
raise ValueError("\"%s\" is for min instruments = %d but we need %d" % (options.ranking_stat_input, rankingstat.denominator.min_instruments, options.min_instruments))
if rankingstat.instruments != all_instruments:
raise ValueError("\"%s\" is for %s but we need %s" % (options.ranking_stat_input, ", ".join(sorted(rankingstat.instruments)), ", ".join(sorted(all_instruments))))
if rankingstat.template_ids is None:
rankingstat.template_ids = template_ids
elif rankingstat.template_ids != template_ids:
raise ValueError("\"%s\" is for the wrong templates" % options.ranking_stat_input)
if rankingstat is None:
rankingstat = far.RankingStat(template_ids = template_ids, instruments = all_instruments, delta_t = options.coincidence_threshold, min_instruments = options.min_instruments)
rankingstat.numerator.add_signal_model(df = 40)
#
# build output document
#
if options.verbose:
print >>sys.stderr, "initializing output document ..."
output = inspiral.Data(
inspiral.CoincsDocument(
url = output_url or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.instrumentsproperty.set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
process_params = process_params,
comment = options.comment,
instruments = rankingstat.instruments,
seg = detectors.seg,
offsetvectors = offsetvectors,
injection_filename = options.injections,
tmp_path = options.tmp_space,
replace_file = True,
verbose = options.verbose
),
pipeline = pipeline,
rankingstat = rankingstat,
ranking_stat_input_url = options.ranking_stat_input,
ranking_stat_output_url = ranking_stat_output_url,
rankingstatpdf_url = options.ranking_stat_pdf,
zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf,
likelihood_snapshot_interval = options.likelihood_snapshot_interval,
thinca_interval = options.thinca_interval,
min_log_L = options.min_log_L,
sngls_snr_threshold = options.singles_threshold,
gracedb_far_threshold = options.gracedb_far_threshold,
gracedb_min_instruments = options.min_instruments,
gracedb_group = options.gracedb_group,
gracedb_search = options.gracedb_search,
gracedb_pipeline = options.gracedb_pipeline,
gracedb_service_url = options.gracedb_service_url,
upload_auxiliary_data_to_gracedb = (options.gracedb_service_url == gracedb_default_service_url),
verbose = options.verbose
)
if options.verbose:
print >>sys.stderr, "... output document initialized"
handler = lloidparts.Handler(mainloop, pipeline,
output,
instruments = rankingstat.instruments,
tag = options.job_tag,
verbose = options.verbose
)
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline ...",
appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
appsinks = set(appsync.add_sink(pipeline, src, caps = Gst.Caps.from_string("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
if options.verbose:
print >>sys.stderr, "attached %d, done" % len(appsinks)
#
# if we request a dot graph of the pipeline, set it up
#
if options.write_pipeline is not None:
pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
#
# Run pipeline
#
if options.data_source in ("lvshm", "framexmit"):
#
# setup sigint handler to shutdown pipeline. this is how
# the program stops gracefully, it is the only way to stop
# it. Otherwise it runs forever man.
#
# this is only done in the online case because we want
# offline jobs to just die and not write partial databases
#
signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
if options.verbose:
print >>sys.stderr, "setting pipeline state to ready ..."
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter ready state")
datasource.pipeline_seek_for_gps(pipeline, *detectors.seg)
if options.verbose:
print >>sys.stderr, "setting pipeline state to playing ..."
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter playing state")
if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "running pipeline ..."
mainloop.run()
#
# write output file
#
output.write_output_url(url = output_url or output.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)
#
# Cleanup template bank temp files
#
for ifo in banks:
for bank in banks[ifo]:
if options.verbose:
print >> sys.stderr, "removing file: ", bank.template_bank_filename
os.remove(bank.template_bank_filename)
#
# Shutdown the web interface servers and garbage collect the Bottle
# app. This should release the references the Bottle app's routes
# hold to the pipeline's data (like template banks and so on).
#
del httpservers
bottle.default_app.pop()
#
# Set pipeline state to NULL and garbage collect the handler
#
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
del handler.pipeline
del output.pipeline
del handler
del bank
del banks
#
# Cleanup local frame file cache
#
if options.local_frame_caching:
if options.verbose:
print >>sys.stderr, "deleting temporary cache file ", detectors.frame_cache
os.remove(detectors.frame_cache)
del detectors.local_cache_list
#
# done. online pipeline always ends with an error code so that dagman does
# not mark the job "done" and the job will be restarted when the dag is
# restarted.
#
if options.data_source in ("lvshm", "framexmit"):
sys.exit(1)