Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_ll_inspiral_pipe 18.87 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011  Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe

"""
This program makes a dag for a gstlal inspiral low latency pipeline
"""

__author__ = 'Chad Hanna <channa@caltech.edu>'

#
# import standard modules and append the lalapps prefix to the python path
#

import itertools
import sys, os, stat
import shutil
import socket
from optparse import OptionParser

#
# import the modules we need to build the pipeline
#

from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import dagparts
from gstlal import datasource
from lal.utils import CacheEntry

##
# ### Graph of the HTCondor DAG
#
# - gray boxes are optional and depend on the command line given
#
# @dot
# digraph G {
#       // graph properties
#
#       rankdir=LR;
#       compound=true;
#       node [shape=record fontsize=10 fontname="Verdana"];
#       edge [fontsize=8 fontname="Verdana"];
#	gstlal_inspiral [URL="\ref gstlal_inspiral"];
#	gstlal_llcbcsummary [URL="\ref gstlal_llcbcsummary"];
#	gstlal_llcbcnode [URL="\ref gstlal_llcbcnode"];
#	gstlal_inspiral_marginalize_likelihoods_online [URL="\ref gstlal_inspiral_marginalize_likelihoods_online"];
# }
# @enddot

#
# utility functions
#

def set_up_jobs(options):
	jobs = {}

	# condor commands
	inspiral_condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal": "True", "kill_sig": "15"})
	non_inspiral_condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command, {"want_graceful_removal": "True", "kill_sig": "15"})
	local_condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)

	# set up jobs
	jobs['gstlalInspiral'] = dagparts.DAGJob('gstlal_inspiral', condor_commands = inspiral_condor_commands)
	if options.inj_channel_dict:
		jobs['gstlalInspiralInj'] = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_condor_commands)

	# A local universe job that will run in a loop marginalizing all of the likelihoods
	jobs['marg'] = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = local_condor_commands)

	# an lvalert_listen job
	jobs['lvalertListen'] = dagparts.DAGJob('gstlal_inspiral_lvalert_uberplotter', universe = "local", condor_commands = local_condor_commands)

	# aggregator job
	jobs['agg'] = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = non_inspiral_condor_commands)
	jobs['aggLeader'] = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = non_inspiral_condor_commands)
	jobs['trigagg'] = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = non_inspiral_condor_commands)

	# DQ job
	jobs['dq'] = dagparts.DAGJob("gstlal_ll_dq", condor_commands = non_inspiral_condor_commands)

	if options.state_backup_destination:
		# State saving job
		jobs['state'] = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = local_condor_commands)

	return jobs


#
# Parse the command line
#


def parse_command_line():
	parser = OptionParser(description = __doc__)

	# append all the datasource specific options
	datasource.append_options(parser)

	parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously.")
	parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")
	parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
	parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
	parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
	parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.")
	parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
	parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
	parser.add_option("--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
	parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).")
	parser.add_option("--inj-shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument.  Can be given multiple times as --inj-shared-memory-partition=IFO=PARTITION-NAME")
	parser.add_option("--inj-shared-memory-assumed-duration", type = "int", default = 4, help = "Set the assumed span of files in seconds. Default = 4.")
	parser.add_option("--inj-shared-memory-block-size", type = "int", default = 4096, help = "Set the byte size to read per buffer. Default = 4096.")
	parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
	parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
	parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")
	parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
	parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
	parser.add_option("--activation-counts-file", metavar = "filename", help = "Set the name of the h5 file containing activation counts for multicomponent p-astro.")
	parser.add_option("--compress-ranking-stat", action = "store_true", help = "Choose whether to compress the ranking stat upon start up. Only used when --ranking-stat-input is set.")
	parser.add_option("--compress-ranking-stat-threshold", type = "float", default = 0.03, help = "Only keep horizon distance values that differ by this much, fractionally, from their neighbours (default = 0.03).")
	parser.add_option("--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
	parser.add_option("--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
	parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
	parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
	parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
	parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default Test")
	parser.add_option("--gracedb-service-url", default = "https://gracedb.ligo.org/api/", help = "GraceDb service url, default https://gracedb.ligo.org/api/")
	parser.add_option("--lvalert-server-url", default = "https://lvalert.cgca.uwm.edu", help = "lvalert server url, default https://lvalert.cgca.uwm.edu")
	parser.add_option("--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
	parser.add_option("--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
	parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
	parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
	parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
	parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
	parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
	parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
	parser.add_option("--inj-lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times (for injection stream)")
	parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005).  The light-travel time between instruments will be added automatically in the coincidence test.")
	parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
	parser.add_option("--non-inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
	parser.add_option("--local-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
	parser.add_option("--inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value for inspiral jobs can be given multiple times")
	parser.add_option("--injection-file", metavar = "filename", default = [], action = "append", help = "The injection xml files that corresponds to the low latency injections for given mass bins: only used for making missed found plots. 0000:0002:Injection_1.xml, 0002:0004:Injection_2.xml")
	parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.")
	parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to")
	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
	parser.add_option("--far-trials-factor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
	parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
	parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
	parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")

	options, filenames = parser.parse_args()

	#
	# extract data source configuration
	#

	datasourceinfo = datasource.GWDataSourceInfo(options)

	fail = ""
	for option in ("bank_cache",):
		if getattr(options, option) is None:
			fail += "must provide option %s\n" % (option)
	if fail: raise ValueError(fail)

	if options.injection_file:
		options.inj_name_dict = datasource.injection_dict_from_channel_list_with_node_range(options.injection_file)
	else:
		options.inj_name_dict = {}

	if options.data_source not in datasourceinfo.live_sources:
		raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")

	#FIXME add consistency check?
	options.bank_cache = inspiral_pipe.parse_cache_str(options.bank_cache)
	options.channel_dict = datasourceinfo.channel_dict
	options.state_channel_dict = datasourceinfo.state_channel_dict
	options.dq_channel_dict = datasourceinfo.dq_channel_dict
	options.framexmit_dict = datasourceinfo.framexmit_addr
	options.shm_part_dict = datasourceinfo.shm_part_dict
	options.inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
	options.inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
	options.inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
	options.inj_framexmit_dict = datasource.framexmit_dict_from_framexmit_list(options.inj_framexmit_addr)

	## A dictionary for injection shared memory partition
	options.inj_shm_part_dict = {"H1": "LHO_InjData", "L1": "LLO_InjData", "V1": "VIRGO_InjData"}
	if options.inj_shared_memory_partition is not None:
		options.inj_shm_part_dict.update( datasource.channel_dict_from_channel_list(options.inj_shared_memory_partition) )

	options.inj_range_dict = {}
	for tag, channel in options.inj_name_dict.items():
		options.inj_range_dict.setdefault(channel, []).append(tag)
	for k,v in options.inj_range_dict.items():
		options.inj_range_dict[k] = sorted(v)

	if options.inj_channel_dict:
		for nodes in options.inj_channel_dict.keys():
			if not ( set(options.inj_channel_dict[nodes].keys()) == set(options.channel_dict.keys()) ):
				raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")

	options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
	options.dq_vector_on_off_dict = datasourceinfo.dq_vector_on_off_bits

	options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
	options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]

	return options, filenames


#
# MAIN
#


options, filenames = parse_command_line()

# make directories
for dir_ in ['logs', 'gracedb', 'aggregator']:
	if not os.path.exists(dir_):
		os.mkdir(dir_)

if options.analysis_tag:
	dag = dagparts.DAG("trigger_pipe_%s" % options.analysis_tag)
else:
	dag = dagparts.DAG("trigger_pipe")

#
# setup the job classes
#

jobs = set_up_jobs(options)

#
# Setup the Node classes
#

listenNode = dagparts.DAGNode(jobs['lvalertListen'], dag, [],
	opts = {"gracedb-service-url": options.gracedb_service_url, "lvalert-server-url": options.lvalert_server_url},
)


# dq with default options
dqNodes = inspiral_pipe.dq_monitor_layer(dag, jobs, options)

#
# loop over banks to run gstlal inspiral pre clustering and far computation
# FIXME by default the inspiral jobs advertise the current directory as their
# job tag, but this should be made to be more flexible
#

job_tags, inj_job_tags = inspiral_pipe.online_inspiral_layer(dag, jobs, options)

margNode = dagparts.DAGNode(jobs['marg'], dag, [],
	opts = {},
	input_files = {"": [options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in job_tags]},
	output_files = {}
)

#
# set up aggregation jobs
#

aggNodes = inspiral_pipe.aggregator_layer(dag, jobs, options, job_tags)

# inspiral state backup

if options.state_backup_destination:
	stateNode = dagparts.DAGNode(jobs['state'], dag, [],
		opts = {},
		input_files = {"": [options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files},
		output_files = {},
	)


#
# Write out the dag and other flies
#


dag.write_sub_files()

# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
# There are few enough total jobs that this really shouldn't bog down the
# scheduler. For now 10000 will be considered indefinite
[node.set_retry(10000) for node in dag.get_nodes()]
dag.write_dag()
dag.write_script()
dag.write_cache()