Skip to content
Snippets Groups Projects
gstlal_inspiral 44.2 KiB
Newer Older
Adam Mercer's avatar
Adam Mercer committed
#!/usr/bin/env python
# Copyright (C) 2009-2014  Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
Kipp Cannon's avatar
Kipp Cannon committed
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""Stream-based inspiral analysis tool"""


#
# =============================================================================
#
kipp's avatar
kipp committed
#                                   Preamble
#
# =============================================================================
#

Chad Hanna's avatar
Chad Hanna committed

### A program to analyze gravitational wave data for compact binary coalescence in real time or in an offline mode
###
### .. graphviz::
###
###    digraph llpipe {
###    	labeljust = "r";
###    	label="gstlal_inspiral"
###    	rankdir=LR;
###    	graph [fontname="Roman", fontsize=24];
###    	edge [ fontname="Roman", fontsize=10 ];
###    	node [fontname="Roman", shape=box, fontsize=11];
###
###    	gracedb [label="GW\nCandidate\nDatabase", shape=oval, color=tomato3, style=filled];
###
###
###    	subgraph clusterNodeN {
###
###    		style=rounded;
###    		label="gstreamer pipeline";
###    		labeljust = "r";
###    		fontsize = 14;
###
###    		H1src [label="H1 data source:\n mkbasicsrc()", color=red4];
###    		L1src [label="L1 data source:\n mkbasicsrc()", color=green4];
###    		V1src [label="V1 data source:\n mkbasicsrc()", color=magenta4];
###
###    		H1multirate [label="H1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4];
###    		L1multirate [label="L1 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4];
###    		V1multirate [label="V1 whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4];
###
###    		H1LLOID [label="H1 LLOID filtering engine:\nmkLLOIDmulti()", color=red4];
###    		L1LLOID [label="L1 LLOID filtering engine:\nmkLLOIDmulti()", color=green4];
###    		V1LLOID [label="V1 LLOID filtering engine:\nmkLLOIDmulti()", color=magenta4];
###
###    		H1Trig1 [label="H1 Triggering:\nsub bank 1", color=red4];
###    		L1Trig1 [label="L1 Triggering:\nsub bank 1", color=green4];
###    		V1Trig1 [label="V1 Triggering:\nsub bank 1", color=magenta4];
###    		H1Trig2 [label="H1 Triggering:\nsub bank 2", color=red4];
###    		L1Trig2 [label="L1 Triggering:\nsub bank 2", color=green4];
###    		V1Trig2 [label="V1 Triggering:\nsub bank 2", color=magenta4];
###    		H1TrigN [label="H1 Triggering:\nsub bank N", color=red4];
###    		L1TrigN [label="L1 Triggering:\nsub bank N", color=green4];
###    		V1TrigN [label="V1 Triggering:\nsub bank N", color=magenta4];
###
###    		H1src -> H1multirate;
###    		L1src -> L1multirate;
###    		V1src -> V1multirate;
###
###    		H1multirate -> H1LLOID [label="h(t) 4096Hz"];
###    		L1multirate -> L1LLOID [label="h(t) 4096Hz"];
###    		V1multirate -> V1LLOID [label="h(t) 4096Hz"];
###    		H1multirate -> H1LLOID [label="h(t) 2048Hz"];
###    		L1multirate -> L1LLOID [label="h(t) 2048Hz"];
###    		V1multirate -> V1LLOID [label="h(t) 2048Hz"];
###    		H1multirate -> H1LLOID [label="h(t) Nth-pow-of-2 Hz"];
###    		L1multirate -> L1LLOID [label="h(t) Nth-pow-of-2 Hz"];
###    		V1multirate -> V1LLOID [label="h(t) Nth-pow-of-2 Hz"];
###
###    		H1LLOID -> H1Trig1 [label="SNRs sub bank 1"];
###    		L1LLOID -> L1Trig1 [label="SNRs sub bank 1"];
###    		V1LLOID -> V1Trig1 [label="SNRs sub bank 1"];
###    		H1LLOID -> H1Trig2 [label="SNRs sub bank 2"];
###    		L1LLOID -> L1Trig2 [label="SNRs sub bank 2"];
###    		V1LLOID -> V1Trig2 [label="SNRs sub bank 2"];
###    		H1LLOID -> H1TrigN [label="SNRs sub bank N"];
###    		L1LLOID -> L1TrigN [label="SNRs sub bank N"];
###    		V1LLOID -> V1TrigN [label="SNRs sub bank N"];
###    	}
###
###
###    	Coincidence [label="Coincidence\nO(1)s latency"];
###    	SigEst [label="Significance\nEstimation\nO(1)s latency"];
###    	Thresh [label="Thresholding\nO(1)s latency"];
###    	EventGen [label="Event\nGeneration\nO(1)s latency"];
###
###    	H1Trig1 -> Coincidence [label="Trigs sub bank 1"];
###    	L1Trig1 -> Coincidence [label="Trigs sub bank 1"];
###    	V1Trig1 -> Coincidence [label="Trigs sub bank 1"];
###    	H1Trig2 -> Coincidence [label="Trigs sub bank 2"];
###    	L1Trig2 -> Coincidence [label="Trigs sub bank 2"];
###    	V1Trig2 -> Coincidence [label="Trigs sub bank 2"];
###    	H1TrigN -> Coincidence [label="Trigs sub bank N"];
###    	L1TrigN -> Coincidence [label="Trigs sub bank N"];
###    	V1TrigN -> Coincidence [label="Trigs sub bank N"];
###
###    	Coincidence -> SigEst -> Thresh -> EventGen;
###
###    	EventGen -> gracedb;
###
###    }
###
### Review Status
### -------------
###
### +------------------------------------------------+---------------------------------------------+------------+
### | Names                                          | Hash                                        | Date       |
### +================================================+=============================================+============+
### | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 9074294d6b57f43651143b5f93210751de1fe55a    | 2014-05-02 |
### +------------------------------------------------+---------------------------------------------+------------+
###
try:
	from fpconst import PosInf
except ImportError:
	# fpconst is not part of the standard library and might not be
	# available
	PosInf = float("+inf")
import math
from optparse import OptionGroup, OptionParser
Kipp Cannon's avatar
Kipp Cannon committed
import os
import resource
import sys
import tempfile
Kipp Cannon's avatar
Kipp Cannon committed

Chad Hanna's avatar
Chad Hanna committed
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)

import lal
from lal import LIGOTimeGPS
from lal.utils import CacheEntry
from ligo.gracedb.rest import DEFAULT_SERVICE_URL as gracedb_default_service_url
from ligo import segments
from ligo.segments import utils as segmentsUtils
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments
from gstlal import bottle
from gstlal import datasource
from gstlal import httpinterface
from gstlal import inspiral
from gstlal import lloidhandler
from gstlal import lloidparts
from gstlal import pipeparts
from gstlal import reference_psd
from gstlal import servicediscovery
from gstlal import simulation
from gstlal import svd_bank
GSTLAL_PROCESS_START_TIME = UTCToGPS(time.gmtime())

@lsctables.use_in
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
	pass

#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay
def setrlimit(res, lim):
	hard_lim = resource.getrlimit(res)[1]
	resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))

# set the number of processes and total set size up to hard limit and
# shrink the per-thread stack size (default is 10 MiB)
setrlimit(resource.RLIMIT_NPROC, None)
setrlimit(resource.RLIMIT_AS, None)
setrlimit(resource.RLIMIT_RSS, None)
# FIXME:  tests at CIT show that this next tweak has no effect.  it's
# possible that SL7 has lowered the default stack size from SL6 and we
# don't need to do this anymore.  remove?
setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
Kipp Cannon's avatar
Kipp Cannon committed

def service_domain(gracedb_search, gracedb_pipeline):
	return "%s_%s.%s" % (gracedb_pipeline.lower(), gracedb_search.lower(), servicediscovery.DEFAULT_SERVICE_DOMAIN)


#
# =============================================================================
#
kipp's avatar
kipp committed
#                                 Command Line
#
# =============================================================================
#


def parse_command_line():
	parser = OptionParser(

	# append all the datasource specific options
	datasource.append_options(parser)

	group = OptionGroup(parser, "PSD Options", "Adjust noise spectrum estimation parameters")
	group.add_option("--psd-fft-length", metavar = "seconds", default = 32, type = "int", help = "The length of the FFT used to used to whiten the data (default is 32 s).")
	group.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
	group.add_option("--track-psd", action = "store_true", help = "Enable dynamic PSD tracking.  Always enabled if --reference-psd is not given.")
	parser.add_option_group(group)

	group = OptionGroup(parser, "Data Qualtiy", "Adjust data quality handling")
	group.add_option("--ht-gate-threshold", metavar = "sigma", type = "float", action = "append", default = [], help = "Set the threshold on whitened h(t) to excise a glitches in units of standard deviations (optional).  If given, exactly as many h(t) thresholds must be set as svd-bank files will be processed (see --svd-bank).")
	group.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
	group.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
	parser.add_option_group(group)

	group = OptionGroup(parser, "Trigger Generator", "Adjust trigger generator behaviour")
	group.add_option("--control-peak-time", metavar = "seconds", type = "int", help = "Set a time window in seconds to find peaks in the control signal (optional, default is to disable composite detection statistic).")
	group.add_option("--output", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).  Can be given multiple times.  Exactly as many output files must be specified as svd-bank files will be processed (see --svd-bank).")
	group.add_option("--output-cache", metavar = "filename", help = "Provide a cache of output files.  This can be used instead of giving multiple --output options.  Cannot be combined with --output.")
	group.add_option("--svd-bank", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file.  These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.  This option can be given multiple times, unless --data-source is lvshm or framexmit in which case it must be given exactly once.  If given multiple times, the banks will be processed one-by-one, in order.  At least one svd bank for at least 2 detectors is required, but see also --svd-bank-cache.")
	group.add_option("--svd-bank-cache", metavar = "filename", help = "Provide a cache file of svd-bank files.  This can be used instead of giving multiple --svd-bank options.  Cannot be combined with --svd-bank options.")
	# NOTE:  the clustering SQL scripts search for this option in the
	# process_params table to determine the threshold below which it
	# can delete uninteresting singles after the coincs are ranked.  if
	# the name of this option is changed, be sure to update
	# simplify_and_cluster.sql and derivatives
	group.add_option("--singles-threshold", metavar = "SNR", type = "float", default = PosInf, help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
	parser.add_option_group(group)

	group = OptionGroup(parser, "Ranking Statistic Options", "Adjust ranking statistic behaviour")
	group.add_option("--cap-singles", action = "store_true", help = "Cap singles to 1 / livetime if computing FAR. No effect otherwise")
	group.add_option("--FAR-trialsfactor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
	group.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
	group.add_option("--coincidence-threshold", metavar = "seconds", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005 s).  The light-travel time between instruments will be added automatically in the coincidence test.")
	group.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
	group.add_option("--ranking-stat-input", metavar = "url", help = "Set the URL from which to load a ranking statistic definition.  When this is enabled, signal candidates will have ranking statistic values assigned on-the-fly.  Required when --data-source is lvshm or framexmit;  must also set --likelihood-snapshot-interval.")
	group.add_option("--ranking-stat-output", metavar = "filename", action = "append", default = [], help = "Set the name of the file to which to write ranking statistic data collected from triggers (optional).  Can be given more than once.  If given, exactly as many must be provided as there are --svd-bank options and they will be writen to in order.")
	group.add_option("--ranking-stat-output-cache", metavar = "filename", help = "Provide a cache of ranking statistic output files.  This can be used instead of giving multiple --ranking-stat-output options.  Cannot be combined with --ranking-stat-output.")
	group.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to snapshot candidate and ranking statistic data to disk when running online.")
	group.add_option("--ranking-stat-pdf", metavar = "url", help = "Set the URL from which to load the ranking statistic PDF.  This is used to compute false-alarm probabilities and false-alarm rates and is required for online operation (when --data-source is framexmit or lvshm).  It is forbidden for offline operation (all other data sources)")
	group.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets (required).")
	group.add_option("--zerolag-rankingstat-pdf", metavar = "url", action = "append", help = "Record a histogram of the likelihood ratio ranking statistic values assigned to zero-lag candidates in this XML file.  This is used to construct the extinction model and set the overall false-alarm rate normalization during online running.  If it does not exist at start-up, a new file will be initialized, otherwise the counts will be added to the file's contents.  Required when --data-source is lvshm or framexmit;  optional otherwise.  If given, exactly as many must be provided as there are --svd-bank options and they will be used in order.")
	parser.add_option_group(group)

	group = OptionGroup(parser, "GracedB Options", "Adjust GracedB interaction")
	group.add_option("--gracedb-far-threshold", metavar = "Hertz", type = "float", help = "False-alarm rate threshold for gracedb uploads in Hertz (default = do not upload to gracedb).")
	group.add_option("--gracedb-group", metavar = "name", default = "Test", help = "Gracedb group to which to upload events (default is Test).")
	group.add_option("--gracedb-pipeline", metavar = "name", default = "gstlal", help = "Name of pipeline to provide in GracedB uploads (default is gstlal).")
	group.add_option("--gracedb-search", metavar = "name", default = "LowMass", help = "Name of search to provide in GracedB uploads (default is LowMass).")
	group.add_option("--gracedb-service-url", metavar = "url", default = gracedb_default_service_url, help = "Override default GracedB service url (optional, default is %s)." % gracedb_default_service_url)
	parser.add_option_group(group)

	group = OptionGroup(parser, "Program Behaviour")
	group.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently.  Has the effect of having hidden signals in the input data.  --injections must not be specified in this case")
	group.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
	group.add_option("--comment", metavar = "message", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
	group.add_option("--fir-stride", metavar = "seconds", type = "float", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
	group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node.  Should be 4 digits of the form 0001, 0002, etc.;  may not contain \".\" nor \"-\".")
	group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ")
	group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional).  The default is \":\", i.e. dump all time.")
	group.add_option("--reconstruction-segment", metavar = "start:stop", action = "append", help = "Only reconstruct the SNRs for this time interval (optional). Can be provided multiple times.")
	group.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file.  The database file will be worked on in this directory, and then moved to the final location when complete.  This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
	group.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
	group.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional).  The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
	group.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092")
	parser.add_option_group(group)
	missing_options = []
	#
	# do this before messing with options object
	#

	process_params = options.__dict__.copy()

	# extract data source configuration
	#

	detectors = datasource.GWDataSourceInfo(options)

	#
	# extract information about principle I/O files

	# FIXME Put all svd banks for different detectors in one file.
	if options.svd_bank:
		if options.svd_bank_cache:
			raise ValueError("cannot supply both --svd-bank and --svd-bank-cache")
			svd_banks = map(inspiral.parse_svdbank_string, options.svd_bank)
		except ValueError as e:
			print "Unable to parse --svd-bank"
			raise
	elif options.svd_bank_cache:
		svd_banks = []
		svd_bank_cache = map(CacheEntry, open(options.svd_bank_cache))
		if not svd_bank_cache:
			raise ValueError("--svd-bank-cache is empty")
		svd_bank_cache.sort(key = lambda cache_entry: cache_entry.description)
		for key, seq in itertools.groupby(svd_bank_cache, key = lambda cache_entry: cache_entry.description):
			svd_banks.append(dict((cache_entry.observatory, cache_entry.url) for cache_entry in seq))
	else:
		missing_options.append("either --svd-bank-cache or at least one --svd-bank")
	if options.output:
		if options.output_cache:
			raise ValueError("cannot supply both --output and --output-cache")
	elif options.output_cache:
		# do this out-of-place to preserve process_params' contents
		options.output = [CacheEntry(line).url for line in open(options.output_cache)]
		if not options.output:
			raise ValueError("--output-cache is empty")
	else:
		missing_options.append("either --output-cache or at least one --output")
	if options.ranking_stat_output:
		if options.ranking_stat_output_cache:
			raise ValueError("cannot supply both --ranking-stat-output and --ranking-stat-output-cache")
	elif options.ranking_stat_output_cache:
		# do this out-of-place to preserve process_params' contents
		options.ranking_stat_output = [CacheEntry(line).url for line in open(options.ranking_stat_output_cache)]
		if not options.ranking_stat_output:
			raise ValueError("--ranking-stat-output-cache is empty")
kipp's avatar
kipp committed

	if not options.time_slide_file:
		missing_options.append("--time-slide-file")

Kipp Cannon's avatar
Kipp Cannon committed
	if missing_options:
		raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
	#
	# check sanity of the input and output file collections
	#

	if len(svd_banks) != len(options.output):
		raise ValueError("must supply exactly as many --svd-bank options as --output")
	if options.ranking_stat_output and len(options.ranking_stat_output) != len(options.output):
		raise ValueError("must supply either none or exactly as many --ranking-stat-output options as --output")
	if options.likelihood_snapshot_interval and not options.ranking_stat_output:
		raise ValueError("must set --ranking-stat-output when --likelihood-snapshot-interval is set")
	if options.ranking_stat_output is None or len(options.ranking_stat_output) == 0:
		options.ranking_stat_output = [None] * len(options.output)

	required_urls = [options.time_slide_file]
	for svd_bank_set in svd_banks:
		required_urls += svd_bank_set.values()
	for filename in (options.veto_segments_file, options.injections, options.blind_injections, options.reference_psd):
		if filename:
			required_urls.append(filename)
			required_urls[i] = ligolw_utils.local_path_from_url(required_urls[i])
	missing_files = [filename for filename in required_urls if filename is not None and not os.access(filename, os.R_OK)]
Kipp Cannon's avatar
Kipp Cannon committed
	if missing_files:
		raise ValueError("one or more required files cannot be found or are not readable:  %s" % ", ".join("'%s'" % filename for filename in sorted(missing_files)))
	# check sanity of overall configuration
	if options.blind_injections and options.injections:
		raise ValueError("cannot set both --blind-injections and --injections")
	if options.data_source in ("lvshm", "framexmit"):
		missing_options = []
		for option in ["job_tag", "ranking_stat_input", "ranking_stat_pdf", "zerolag_rankingstat_pdf"]:
			if getattr(options, option) is None:
				missing_options.append("--%s" %option.replace("_","-"))
		if missing_options:
			raise ValueError("missing required option(s) %s when --data-source is lvshm or framexmit" % ", ".join(missing_options))
		if "-" in options.job_tag or "." in options.job_tag:
			raise ValueError("invalid characters in --job-tag \"%s\"" % options.job_tag)

		if len(svd_banks) > 1:
			raise ValueError("more than one --svd-bank not allowed when --datasource is lvshm or framexmit, have %d" % len(svd_banks))
		# make an "infinite" extent segment
		detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
		# this gets set so that if you log into a node you can find out what the job id is easily
		os.environ['GSTLAL_LL_JOB'] = options.job_tag

		# FIXME: this is ugly, but we have to protect against busted shared memory partitions
		if options.data_source == "lvshm":
			import subprocess
			for partition in detectors.shm_part_dict.values():
				subprocess.call(["smrepair", partition])
			if getattr(options, option) is not None:
				bad_options.append("--%s" % option.replace("_","-"))
			raise ValueError("cannot set %s when --data-source is not lvshm or framexmit " % ", ".join(bad_options))
	if options.reference_psd is None:
		options.track_psd = True
	if options.psd_fft_length < 4:
		raise ValueError("--psd-fft-length cannot be less than 4")
	if options.psd_fft_length < 32:
		warnings.warn("It is not advised to choose PSD lengths below 32s, since a loss in sensitivity may occur")
	if options.local_frame_caching and not options.data_source == "frames":
		raise ValueError("--local-frame-caching can only be used if --data-source is frames")
	if options.chisq_type not in ["autochisq", "timeslicechisq"]:
		raise ValueError("--chisq-type must be one of (autochisq|timeslicechisq), got %s" % (options.chisq_type))

	if options.likelihood_snapshot_interval is not None and options.likelihood_snapshot_interval <= 0.:
		raise ValueError("--likelihood-snapshot-interval cannot be <= 0")
	if options.ranking_stat_input:
		if not options.likelihood_snapshot_interval:
			raise ValueError("must set --likelihood-snapshot-interval when --ranking-stat-input is set")
	if not options.ht_gate_threshold:
		# default threshold is +inf = disable feature.
		options.ht_gate_threshold = [float("inf")] * len(svd_banks)
	elif len(options.ht_gate_threshold) != len(svd_banks):
		raise ValueError("must supply either none or exactly as many --ht-gate-threshold values options as --svd-bank")
	if not options.zerolag_rankingstat_pdf:
		options.zerolag_rankingstat_pdf = [None] * len(svd_banks)
	elif len(options.zerolag_rankingstat_pdf) != len(svd_banks):
		raise ValueError("must supply either none or exactly as many --zerolag-rankingstat-pdf options as --svd-bank")
	if options.min_instruments < 1:
		raise ValueError("--min-instruments must be >= 1")
	if options.min_instruments > len(detectors.channel_dict):
		raise ValueError("--min-instruments (%d) is greater than the number of --channel-name's (%d)" % (options.min_instruments, len(detectors.channel_dict)))
	#
	# Option checks complete
	#

	options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)

	if options.blind_injections is not None:
		detectors.injection_filename = options.blind_injections
	# Setup local caching
	if options.local_frame_caching:
		f, fname = tempfile.mkstemp(".cache")
		if options.verbose:
			print >> sys.stderr, "caching frame data locally to ", fname
		f = open(fname, "w")
		# FIXME:  should try to down-sample if possible.  there are
		# MDCs data sets floating around whose streams do not start
		# on integer second boundaries, however, and it's possible
		# to trigger a failure in the frame muxer if those get
		# down-sampled so for now we're not doing any resampling.
		# later, when we don't care about those MDCs, we can go
		# back to down-sampling.  if not being able to down-sample
		# is a problem in the meantime, I think the best way
		# forward is to clip the start of said streams using a drop
		# element and a (to be written) buffer probe that figures
		# out how many samples to clip off the start of the stream.
		# FIXME shouldn't use tempfile.gettempdir() directly, use
		# _CONDOR_SCRATCH_DIR like glue??
		# FIXME, note that at least for now condor sets TMPDIR to the
		# run scratch space so this *will* work properly
		detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), verbose = options.verbose)
		for cacheentry in detectors.local_cache_list:
			# Guarantee a lal cache complient file with only integer starts and durations.
			cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
			print >>f, str(cacheentry)
		detectors.frame_cache = fname

		# the injections are now present in the data so we don't want to do them twice
		detectors.injection_filename = None
	return options, filenames, process_params, svd_banks, detectors
#
# =============================================================================
#
#                                Signal Handler
#
# =============================================================================
#


class OneTimeSignalHandler(object):
	def __init__(self, pipeline):
		self.pipeline = pipeline
		self.count = 0

	def __call__(self, signum, frame):
		self.count += 1
		if self.count == 1:
			print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
			try:
				#FIXME how do I choose a timestamp?
				self.pipeline.get_bus().post(lloidhandler.message_new_checkpoint(self.pipeline, timestamp = inspiral.now().ns()))
				if not self.pipeline.send_event(Gst.Event.new_eos()):
					raise Exception("pipeline.send_event(EOS) returned failure")
			except Exception as e:
				print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
				os._exit(1)
		else:
			print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)


#
# =============================================================================
#
#                              Horizon Distances
#
# =============================================================================
#


#
# =============================================================================
#
#                                     Main
#
# =============================================================================
#


#
# parse command line
#
kipp's avatar
kipp committed

options, filenames, process_params, svd_banks, detectors = parse_command_line()
if not options.check_time_stamps:
	pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src

if options.veto_segments_file is not None:
	veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
else:
	veto_segments = None


#
# Load the time slide vectors, and use them to define the complete
# instrument list for the network
#


offsetvectors = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(options.time_slide_file, contenthandler = LIGOLWContentHandler, verbose = options.verbose)).as_dict().values()
all_instruments = reduce(lambda a, b: a | set(b), offsetvectors, set())
if len(all_instruments) < options.min_instruments:
	raise ValueError("--time-slide-file \"%s\" names %s but we need at least %d instruments" % (options.time_slide_file, ", ".join(sorted(all_instruments)), options.min_instruments))
if not (all_instruments >= set(detectors.channel_dict)):
	raise ValueError("--time-slide-file names %s but have channel names for %s" % (", ".join(sorted(all_instruments)), ", ".join(sorted(detectors.channel_dict))))
# set up the PSDs
#
# There are three modes for psds in this program
# 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
# 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
Kipp Cannon's avatar
Kipp Cannon committed
# 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first guess will come from the users provided psd
if options.reference_psd is not None:
Kipp Cannon's avatar
Kipp Cannon committed
	psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
	if set(psd) != set(detectors.channel_dict):
		raise ValueError("missing PSD(s) for %s" % ", ".join(sorted(set(detectors.channel_dict) - set(psd))))
	psd = dict.fromkeys(all_instruments, None)
Melissa Frei's avatar
Melissa Frei committed
#
for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, zerolag_rankingstat_pdf, ht_gate_threshold) in enumerate(zip(svd_banks, options.output, options.ranking_stat_output, options.zerolag_rankingstat_pdf, options.ht_gate_threshold)):
	#
	# Checkpointing only supported for gzip files in offline analysis
Kipp Cannon's avatar
Kipp Cannon committed
	# FIXME Implement a means by which to check for sqlite file
	# validity
	if options.data_source not in ("lvshm", "framexmit") and output_url.endswith('.gz'):
			# a single .read() would be easier but looping over
			# lines uses less memory
			for line in gzip.open(ligolw_utils.local_path_from_url(output_url)):
				pass
			if ranking_stat_output_url is not None:
				for line in gzip.open(ligolw_utils.local_path_from_url(ranking_stat_output_url)):
			# File is OK and there is no need to process it,
			# skip ahead in the loop
		except IOError:
			# File does not exist or is corrupted, need to
			# reprocess
			print >>sys.stderr, "Checkpoint: {0} of {1} files completed and continuing with {2}".format(output_file_number, len(options.output), os.path.basename(output_url))
	#
	# create a new, empty, Bottle application and make it the current
	# default, then start http server(s) to serve it up
	# uncomment the next line to show tracebacks when something fails
	# in the web server
	#bottle.app().catchall = False
	httpservers = httpinterface.HTTPServers(
		service_name = "%s.gstlal_inspiral" % (options.job_tag if options.job_tag is not None else base64.urlsafe_b64encode(uuid.uuid4().bytes)),
		service_domain = service_domain(options.gracedb_search, options.gracedb_pipeline),
		service_properties = {
			"cwd": os.getcwd(),
			"pid": str(os.getpid()),
		},
		verbose = options.verbose
	)


	#
	# Set up a registry of the resources that this job provides
	#


	@bottle.route("/")
	@bottle.route("/index.html")
	def index(job_tag = options.job_tag, instruments = all_instruments):
		# get the host and port to report in the links from the
		# request we've received so that the URLs contain the IP
		# address by which client has contacted us
		netloc = bottle.request.urlparts[1]
		server_address = "http://%s" % netloc
		yield "<html><body>\n<h3>%s %s %s %s</h3>\n<p>\n" % (job_tag, os.environ.get("GSTLAL_LL_JOB"), netloc, " ".join(sorted(instruments)))
		for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
			# don't create links back to this page
			if route.rule in ("/", "/index.html"):
			# only create links for GET methods
			if route.method != "GET":
				continue
			yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
		yield "</p>\n</body></html>"
	# FIXME:  get service-discovery working, then don't do this
	if "GSTLAL_LL_JOB" in os.environ:
		open("%s_registry.txt" % os.environ["GSTLAL_LL_JOB"], "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))


	#
	# parse SVD template bank and expose bank metadata
	#

	banks = inspiral.parse_bank_files(svd_bank_url_dict, verbose = options.verbose)
	# assume all instruments have the same templates, just extract them
	# from one of the instruments at random
	sngl_inspiral_table = banks.values()[0][0].sngl_inspiral_table.copy()
	for bank in banks.values()[0]:
		sngl_inspiral_table.extend(bank.sngl_inspiral_table)
		horizon_factors.update(bank.horizon_factors)
	@bottle.route("/template_bank.xml.gz")
	def get_template_bank_xml(sngl_inspiral_table = sngl_inspiral_table):
		xmldoc = ligolw.Document()
		xmldoc.appendChild(ligolw.LIGO_LW())
		map(xmldoc.childNodes[-1].appendChild(sngl_inspiral_table.copy()).append, sngl_inspiral_table)
		output = StringIO.StringIO()
		ligolw_utils.write_fileobj(xmldoc, output, gz = True)
		outstr = output.getvalue()
		output.close()
		return outstr
	template_ids = frozenset(row.template_id for row in sngl_inspiral_table)
	assert set(horizon_factors) == template_ids, "horizon factors are not assigned for each template id"
	assert len(template_ids) == len(sngl_inspiral_table), "template IDs are not unique within the template bank"
	@bottle.route("/template_ids.txt")
	def get_template_ids(template_ids = sorted(template_ids)):
		return "\n".join("%d" % template_id for template_id in template_ids)

Kipp Cannon's avatar
Kipp Cannon committed
	# Choose to optionally reconstruct segments around injections (not
	# blind injections!)
		offset_padding = max(math.ceil(abs(row.end))+1 for bank in banks.items()[0][-1] for row in bank.sngl_inspiral_table)
		reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections, pad = offset_padding)
	elif options.reconstruction_segment:
		reconstruction_segment_list = segments.segmentlist()
		for reconstruction_segment in options.reconstruction_segment:
			t0, t1 = reconstruction_segment.split(":")
			reconstruction_segment_list.append(segments.segment(LIGOTimeGPS(int(t0)), LIGOTimeGPS(int(t1))))
	@bottle.route("/bank.txt")
	def get_filter_length_and_chirpmass(banks = banks):
		bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
		yield '%.14g %.4g %.4g' % (float(inspiral.now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
Kipp Cannon's avatar
Kipp Cannon committed

	if options.verbose:
		print >>sys.stderr, "assembling pipeline ...",

Chad Hanna's avatar
Chad Hanna committed
	pipeline = Gst.Pipeline(name="gstlal_inspiral")
	mainloop = GObject.MainLoop()
	triggersrc = lloidparts.mkLLOIDmulti(
		pipeline,
		detectors = detectors,
		banks = banks,
		psd = psd,
		psd_fft_length = options.psd_fft_length,
		ht_gate_threshold = ht_gate_threshold,
		veto_segments = veto_segments,
		verbose = options.verbose,
		nxydump_segment = options.nxydump_segment,
		chisq_type = options.chisq_type,
		track_psd = options.track_psd,
		control_peak_time = options.control_peak_time,
		fir_stride = options.fir_stride,
		reconstruction_segment_list = reconstruction_segment_list
	)
	if options.verbose:
		print >>sys.stderr, "done"
	# Load/Initialize ranking statistic data.
	rankingstat = None
	if options.ranking_stat_input:
		try:
			xmldoc = ligolw_utils.load_url(options.ranking_stat_input, verbose = options.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)
		except IOError:
			print >>sys.stderr, "warning:  '%s' not found or cannot be loaded;  will fall back to newly-initialized ranking statistic" % options.ranking_stat_input
		else:
			rankingstat, _ = far.parse_likelihood_control_doc(xmldoc)
			if rankingstat is None:
				raise ValueError("\"%s\" does not contain parameter distribution data" % options.ranking_stat_input)
			if rankingstat.delta_t != options.coincidence_threshold:
				raise ValueError("\"%s\" is for delta_t=%g, we need %g" % (options.ranking_stat_input, rankingstat.denominator.delta_t, options.coincidence_threshold))
			if rankingstat.min_instruments != options.min_instruments:
				raise ValueError("\"%s\" is for min instruments = %d but we need %d" % (options.ranking_stat_input, rankingstat.denominator.min_instruments, options.min_instruments))
			if rankingstat.instruments != all_instruments:
				raise ValueError("\"%s\" is for %s but we need %s" % (options.ranking_stat_input, ", ".join(sorted(rankingstat.instruments)), ", ".join(sorted(all_instruments))))
			if rankingstat.template_ids is None:
				rankingstat.template_ids = template_ids
			elif rankingstat.template_ids != template_ids:
				raise ValueError("\"%s\" is for the wrong templates" % options.ranking_stat_input)
			rankingstat.numerator.set_horizon_factors(horizon_factors)
	if rankingstat is None:
		rankingstat = far.RankingStat(template_ids = template_ids, instruments = all_instruments, delta_t = options.coincidence_threshold, min_instruments = options.min_instruments, horizon_factors = horizon_factors)
Kipp Cannon's avatar
Kipp Cannon committed

	# construct and configure pipeline handler
		print >>sys.stderr, "initializing pipeline handler ..."
	handler = lloidhandler.Handler(
		mainloop = mainloop,
		pipeline = pipeline,
		coincs_document = inspiral.CoincsDocument(
			url = output_url or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.instrumentsproperty.set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
			process_params = process_params,
			process_start_time = GSTLAL_PROCESS_START_TIME,
			comment = options.comment,
			instruments = rankingstat.instruments,
			seg = detectors.seg,
			offsetvectors = offsetvectors,
			injection_filename = options.injections,
			tmp_path = options.tmp_space,
			replace_file = True,
			verbose = options.verbose
		),
		rankingstat = rankingstat,
		horizon_distance_func = banks.values()[0][0].horizon_distance_func,# they should all be the same
		gracedbwrapper = inspiral.GracedBWrapper(
			instruments = rankingstat.instruments,
			far_threshold = options.gracedb_far_threshold,
			min_instruments = options.min_instruments,
			group = options.gracedb_group,
			search = options.gracedb_search,
			pipeline = options.gracedb_pipeline,
			service_url = options.gracedb_service_url,
			verbose = options.verbose
		),
		ranking_stat_input_url = options.ranking_stat_input,
		ranking_stat_output_url = ranking_stat_output_url,
		rankingstatpdf_url = options.ranking_stat_pdf,
		zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf,
		likelihood_snapshot_interval = options.likelihood_snapshot_interval,
		sngls_snr_threshold = options.singles_threshold,
		kafka_server = options.output_kafka_server,
		cluster = True,#options.data_source in ("lvshm", "framexmit"),# If uncommented, we only cluster when running online
		FAR_trialsfactor = options.FAR_trialsfactor,
	if options.verbose:
		print >>sys.stderr, "... pipeline handler initialized"
	if options.verbose:
		print >>sys.stderr, "attaching appsinks to pipeline ...",
	appsync = pipeparts.AppSync(appsink_new_buffer = handler.appsink_new_buffer)
	appsinks = set(appsync.add_sink(pipeline, src, caps = Gst.Caps.from_string("application/x-lal-snglinspiral"), name = "bank_%s_sink" % bank_id) for bank_id, src in triggersrc.items())

	if options.verbose:
		print >>sys.stderr, "attached %d, done" % len(appsinks)
	#
	# if we request a dot graph of the pipeline, set it up
	#
	if options.write_pipeline is not None:
		pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
		pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
	if options.data_source in ("lvshm", "framexmit"):
		# setup sigint handler to shutdown pipeline.  this is how
		# the program stops gracefully, it is the only way to stop
		# it.  Otherwise it runs forever man.
		# this is only done in the online case because we want
		# offline jobs to just die and not write partial databases
		#

		signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
		signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))

	if options.verbose:
		print >>sys.stderr, "setting pipeline state to ready ..."
	if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
		raise RuntimeError("pipeline did not enter ready state")
	datasource.pipeline_seek_for_gps(pipeline, *detectors.seg)
	if options.verbose:
		print >>sys.stderr, "setting pipeline state to playing ..."
Chad Hanna's avatar
Chad Hanna committed
	if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
		raise RuntimeError("pipeline did not enter playing state")

	if options.write_pipeline is not None:
		pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)

	if options.verbose:
		print >>sys.stderr, "running pipeline ..."
	mainloop.run()
	handler.write_output_url(url = output_url or handler.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)
	for ifo in banks:
		for bank in banks[ifo]:
			if options.verbose:
				print >> sys.stderr, "removing file: ", bank.template_bank_filename
			os.remove(bank.template_bank_filename)

	#
	# Shutdown the web interface servers and garbage collect the Bottle
	# app.  This should release the references the Bottle app's routes
	# hold to the pipeline's data (like template banks and so on).
	#


	bottle.default_app.pop()


	#
	# Set pipeline state to NULL and garbage collect the handler
	#


Chad Hanna's avatar
Chad Hanna committed
	if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
		raise RuntimeError("pipeline could not be set to NULL")
	del handler.pipeline
	del bank
	del banks
Kipp Cannon's avatar
Kipp Cannon committed
# Cleanup local frame file cache
if options.local_frame_caching:
	if options.verbose:
		print >>sys.stderr, "deleting temporary cache file ", detectors.frame_cache
	os.remove(detectors.frame_cache)
	del detectors.local_cache_list

# done.  online pipeline always ends with an error code so that dagman does
# not mark the job "done" and the job will be restarted when the dag is
# restarted.
if options.data_source in ("lvshm", "framexmit"):
	sys.exit(1)