Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_ll_inspiral_pipe 31.71 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011  Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe

"""
This program makes a dag for a gstlal inspiral low latency pipeline
"""

__author__ = 'Chad Hanna <channa@caltech.edu>'

#
# import standard modules and append the lalapps prefix to the python path
#

import itertools
import sys, os, stat
import shutil
import socket
from optparse import OptionParser

#
# import the modules we need to build the pipeline
#

from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import dagparts
from gstlal import datasource
from lal.utils import CacheEntry

##
# ### Graph of the HTCondor DAG
#
# - gray boxes are optional and depend on the command line given
#
# @dot
# digraph G {
#       // graph properties
#
#       rankdir=LR;
#       compound=true;
#       node [shape=record fontsize=10 fontname="Verdana"];     
#       edge [fontsize=8 fontname="Verdana"];
#	gstlal_inspiral [URL="\ref gstlal_inspiral"];
#	gstlal_llcbcsummary [URL="\ref gstlal_llcbcsummary"];
#	gstlal_llcbcnode [URL="\ref gstlal_llcbcnode"];
#	gstlal_inspiral_marginalize_likelihoods_online [URL="\ref gstlal_inspiral_marginalize_likelihoods_online"];
# }
# @enddot

class zookeeper_job(dagparts.DAGJob):
	"""
	A zookeeper job
	"""
	def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}):
		"""
		"""
		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)

		try:
			os.mkdir(datadir)
		except OSError:
			pass
		f = open("zookeeper.properties", "w")
		f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
		""" % (datadir, port, maxclients))

		f.close()


class kafka_job(dagparts.DAGJob):
	"""
	A kafka job
	"""
	def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}):
		"""
		"""
		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)

		try:
			os.mkdir(logdir)
		except OSError:
			pass
		f = open("kafka.properties", "w")
		f.write("""
broker.id=0
listeners = PLAINTEXT://%s
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
		""" % (host, logdir, zookeeperaddr))

		f.close()


#
# Parse the command line
#


def parse_command_line():
	parser = OptionParser(description = __doc__)

	# append all the datasource specific options
	datasource.append_options(parser)

	parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously.")
	parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")
	parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
	parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
	parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
	parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.")
	parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
	parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
	parser.add_option("--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
	parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).")
	parser.add_option("--inj-shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument.  Can be given multiple times as --inj-shared-memory-partition=IFO=PARTITION-NAME")
	parser.add_option("--inj-shared-memory-assumed-duration", type = "int", default = 4, help = "Set the assumed span of files in seconds. Default = 4.")
	parser.add_option("--inj-shared-memory-block-size", type = "int", default = 4096, help = "Set the byte size to read per buffer. Default = 4096.")
	parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
	parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
	parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")	
	parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")	
	parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")	
	parser.add_option("--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
	parser.add_option("--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
	parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
	parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
	parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
	parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default Test")
	parser.add_option("--gracedb-service-url", default = "https://gracedb.ligo.org/api/", help = "GraceDb service url, default https://gracedb.ligo.org/api/")
	parser.add_option("--lvalert-server-url", default = "https://lvalert.cgca.uwm.edu", help = "lvalert server url, default https://lvalert.cgca.uwm.edu")
	parser.add_option("--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
	parser.add_option("--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
	parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
	parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
	parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
	parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
	parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
	parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
	parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
	parser.add_option("--inj-lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times (for injection stream)")
	parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005).  The light-travel time between instruments will be added automatically in the coincidence test.")
	parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
	parser.add_option("--non-inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
	parser.add_option("--local-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
	parser.add_option("--inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value for inspiral jobs can be given multiple times")
	parser.add_option("--injection-file", metavar = "filename", default = [], action = "append", help = "The injection xml files that corresponds to the low latency injections for given mass bins: only used for making missed found plots. 0000:0002:Injection_1.xml, 0002:0004:Injection_2.xml")
	parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.")
	parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to")
	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
	parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
	parser.add_option("--run-output-kafka", action = "store_true", help = "Actually launch a zookeeper and kafka job as part of the dag")
	parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
	parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
	parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")

	options, filenames = parser.parse_args()

	#
	# extract data source configuration
	#

	datasourceinfo = datasource.GWDataSourceInfo(options)

	fail = ""
	for option in ("bank_cache",):
		if getattr(options, option) is None:
			fail += "must provide option %s\n" % (option)
	if fail: raise ValueError, fail

	if options.injection_file:
		inj_name_dict = datasource.injection_dict_from_channel_list_with_node_range(options.injection_file)
	else:
		inj_name_dict = {}

	if options.data_source not in datasourceinfo.live_sources :
		raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")

	#FIXME add consistency check?
	bankcache = inspiral_pipe.parse_cache_str(options.bank_cache)
	channel_dict = datasourceinfo.channel_dict
	state_channel_dict = datasourceinfo.state_channel_dict
	dq_channel_dict = datasourceinfo.dq_channel_dict
	framexmit_dict = datasourceinfo.framexmit_addr
	shm_part_dict = datasourceinfo.shm_part_dict
	inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
	inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
	inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
	inj_framexmit_dict = datasource.framexmit_dict_from_framexmit_list(options.inj_framexmit_addr)
	## A dictionary for injection shared memory partition
	inj_shm_part_dict = {"H1": "LHO_InjData", "L1": "LLO_InjData", "V1": "VIRGO_InjData"}
	if options.inj_shared_memory_partition is not None:
		inj_shm_part_dict.update( datasource.channel_dict_from_channel_list(options.inj_shared_memory_partition) )

	inj_range_dict = {}
	for tag, channel in inj_name_dict.items():
		inj_range_dict.setdefault(channel, []).append(tag)
	for k,v in inj_range_dict.items():
		inj_range_dict[k] = sorted(v)

	if inj_channel_dict:
		for nodes in inj_channel_dict.keys():
			if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ):
				raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")

	options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
	options.dq_vector_on_off_dict = datasourceinfo.dq_vector_on_off_bits

	options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
	options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]

	return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict


#
# MAIN
#


options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict = parse_command_line()

try: os.mkdir("logs")
except: pass
try: os.mkdir("gracedb")
except: pass

if options.analysis_tag:
	dag = dagparts.DAG("trigger_pipe_%s" % options.analysis_tag)
else:
	dag = dagparts.DAG("trigger_pipe")

#
# setup the job classes
#

gstlalInspiralJob = dagparts.DAGJob('gstlal_inspiral', condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
if inj_channel_dict:
	gstlalInspiralInjJob = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))

# A local universe job that will run in a loop marginalizing all of the likelihoods
margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))

# an lvalert_listen job
listenJob = dagparts.DAGJob('gstlal_inspiral_lvalert_uberplotter', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))

# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server

if options.output_kafka_server is not None and options.run_output_kafka:
	zooJob = zookeeper_job("zookeeper-server-start.sh", tag_base = "zookeeper-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
	kafkaJob = kafka_job("kafka-server-start.sh", tag_base = "kafka-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
	zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
	kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"})


# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator",  condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))

# Summary page job
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online",  universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))

# DQ job
dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))

if options.state_backup_destination:
	# State saving job 
	stateJob = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))

#
# Setup the Node classes
#

listenNode = dagparts.DAGNode(listenJob, dag, [], opts = {"gracedb-service-url": options.gracedb_service_url, "lvalert-server-url": options.lvalert_server_url})

# dq with default options
for ifo in channel_dict:
	outpath = "aggregator"
	try:
		os.makedirs(outpath)
	except OSError:
		pass

	# Data source dag options
	if (options.data_source == "framexmit"):
		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
			"framexmit-iface":options.framexmit_iface
			}
	else :
		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shm_part_dict[ifo]}),
			"shared-memory-block-size":options.shared_memory_block_size,
			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
			}

	common_opts = {"psd-fft-length":options.psd_fft_length,
		"channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}),
		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"),
		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"),
		"state-vector-on-bits":options.state_vector_on_bits,
		"state-vector-off-bits":options.state_vector_off_bits,
		"dq-vector-on-bits":options.dq_vector_on_bits,
		"dq-vector-off-bits":options.dq_vector_off_bits,
		"data-source":options.data_source,
		"out-path": outpath,
		"data-backend": options.agg_data_backend,
		}
	common_opts.update(datasource_opts)

	if options.agg_data_backend == 'influx':
		common_opts.update({
			"influx-database-name": options.influx_database_name,
			"influx-hostname": options.influx_hostname,
			"influx-port": options.influx_port,
			"enable-auth": options.enable_auth,
			"enable-https": options.enable_https,
		})
		if options.enable_auth:
			common_opts.update({"enable-auth": ""})
		if options.enable_https:
			common_opts.update({"enable-https": ""})

	dagparts.DAGNode(dqJob, dag, [], opts = common_opts)

#
# loop over banks to run gstlal inspiral pre clustering and far computation
#

jobTags = []
inj_jobTags = []

if options.ht_gate_threshold_linear is not None:
	# Linear scale specified
	template_mchirp_dict = inspiral_pipe.get_svd_bank_params_online(bank_cache.values()[0])
	mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]

bank_groups = list(inspiral_pipe.build_bank_groups(bank_cache, [1], options.max_jobs - 1))
if len(options.likelihood_files) != len(bank_groups):
	raise ValueError("Likelihood files must correspond 1:1 with bank files")

for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
	svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
	jobTags.append("%04d" % num_insp_nodes)

	# Calculate the appropriate ht-gate-threshold value
	threshold_values = None
	if options.ht_gate_threshold_linear is not None:
		# Linear scale specified
		# use max mchirp in a given svd bank to decide gate threshold
		bank_mchirps = [template_mchirp_dict["%04d" % int(os.path.basename(svd_file).split("-")[1].split("_")[3])][1] for svd_file in svd_banks.items()[0][1]]
		threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
	else:
		if options.ht_gate_threshold is not None:
			threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given

	# Data source dag options
	if (options.data_source == "framexmit"):
		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
			"framexmit-iface":options.framexmit_iface
			}
	else :
		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(shm_part_dict, opt = "shared-memory-partition"),
			"shared-memory-block-size":options.shared_memory_block_size,
			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
			}

	common_opts = {"psd-fft-length":options.psd_fft_length,
		"reference-psd":options.reference_psd,
		"ht-gate-threshold":threshold_values,
		"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
		"state-vector-on-bits":options.state_vector_on_bits,
		"state-vector-off-bits":options.state_vector_off_bits,
		"dq-vector-on-bits":options.dq_vector_on_bits,
		"dq-vector-off-bits":options.dq_vector_off_bits,
		"svd-bank":svd_bank_string,
		"tmp-space":dagparts.condor_scratch_space(),
		"track-psd":"",
		"control-peak-time":options.control_peak_time,
		"coincidence-threshold":options.coincidence_threshold,
		"fir-stride":options.fir_stride,
		"data-source":options.data_source,
		"gracedb-far-threshold":options.gracedb_far_threshold,
		"gracedb-group":options.gracedb_group,
		"gracedb-pipeline":options.gracedb_pipeline,
		"gracedb-search":options.gracedb_search,
		"gracedb-service-url":options.gracedb_service_url,
		"job-tag":jobTags[-1],
		"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
		"min-instruments":options.min_instruments,
		"time-slide-file":options.time_slide_file,
		"output-kafka-server": options.output_kafka_server
		}
	common_opts.update(datasource_opts)

	inspNode = dagparts.DAGNode(gstlalInspiralJob, dag, [],
		opts = common_opts,
		input_files = {
			"ranking-stat-input":[likefile],
			"ranking-stat-pdf":options.marginalized_likelihood_file
		},
		output_files = {
			"output":"/dev/null",
			"ranking-stat-output":likefile,
			"zerolag-rankingstat-pdf":zerolikefile
		}
	)

	if str("%04d" %num_insp_nodes) in inj_channel_dict:
		# FIXME The node number for injection jobs currently follows the same
		# numbering system as non-injection jobs, except instead of starting at
		# 0000 the numbering starts at 1000. There is probably a better way to
		# do this in the future, this system was just the simplest to start
		# with
		inj_jobTags.append("%04d" % (num_insp_nodes + 1000))

		# Data source dag options
		if (options.data_source == "framexmit"):
			datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
				"framexmit-iface":options.framexmit_iface
				}
		else :
			datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(inj_shm_part_dict, opt = "shared-memory-partition"),
				"shared-memory-block-size":options.inj_shared_memory_block_size,
				"shared-memory-assumed-duration":options.inj_shared_memory_assumed_duration
				}

		common_opts = {"psd-fft-length":options.psd_fft_length,
			"reference-psd":options.reference_psd,
			"ht-gate-threshold":threshold_values,
			"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
			"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
			"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
			"state-vector-on-bits":options.inj_state_vector_on_bits,
			"state-vector-off-bits":options.inj_state_vector_off_bits,
			"dq-vector-on-bits":options.inj_dq_vector_on_bits,
			"dq-vector-off-bits":options.inj_dq_vector_off_bits,
			"svd-bank":svd_bank_string,
			"tmp-space":dagparts.condor_scratch_space(),
			"track-psd":"",
			"control-peak-time":options.control_peak_time,
			"coincidence-threshold":options.coincidence_threshold,
			"fir-stride":options.fir_stride,
			"data-source":options.data_source,
			"gracedb-far-threshold":options.inj_gracedb_far_threshold,
			"gracedb-group":options.inj_gracedb_group,
			"gracedb-pipeline":options.inj_gracedb_pipeline,
			"gracedb-search":options.inj_gracedb_search,
			"gracedb-service-url":options.inj_gracedb_service_url,
			"job-tag":inj_jobTags[-1],
			"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
			"min-instruments":options.min_instruments,
			"time-slide-file":options.time_slide_file
			}
		common_opts.update(datasource_opts)
		inspInjNode = dagparts.DAGNode(gstlalInspiralInjJob, dag, [],
			opts = common_opts,
			input_files = {
				"ranking-stat-input":[likefile],
				"ranking-stat-pdf":options.marginalized_likelihood_file
			},
			output_files = {
				"output":"/dev/null"
			}
		)

def groups(l, n):
	for i in xrange(0, len(l), n):
		yield l[i:i+n]

margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {})

#
# set up aggregation jobs
#

#
# FIXME by default the inspiral jobs advertise the current directory as their
# job tag, but this should be made to be more flexible
#

# set up common settings for aggregation jobs
agg_options = {
	"dump-period": 0,
	"base-dir": "aggregator",
	"job-tag": os.getcwd(),
	"num-jobs": len(jobTags),
	"num-threads": 2,
	"job-start": 0,
	"kafka-server": options.output_kafka_server,
	"data-backend": options.agg_data_backend,
}

if options.agg_data_backend == 'influx':
	agg_options.update({
		"influx-database-name": options.influx_database_name,
		"influx-hostname": options.influx_hostname,
		"influx-port": options.influx_port,
	})
	if options.enable_auth:
		agg_options.update({"enable-auth": ""})
	if options.enable_https:
		agg_options.update({"enable-https": ""})

# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"]
usage_routes = ["ram_history"]
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))

state_routes = []
for ifo in channel_dict.keys():
    state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
    state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
    state_routes.append("%s_strain_dropped" % ifo)

# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
for route in agg_routes:
	agg_options["route"] = route
	if route == "far_history":
		agg_options["data-type"] = "min"
	else:
		agg_options["data-type"] = "max"

	for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]):
		agg_options["job-start"] = aggstart
		agg_options["num-jobs"] = aggend - aggstart
		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)

# state-based aggregation jobs
for routes in groups(state_routes, 2):
	agg_options["route"] = routes
	agg_options["data-type"] = "max"
	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)

#
# summary page
#

#if options.injection_file:
#
#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
#
#	for injfile, jobrange in inj_range_dict.items():
#		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
#
#else:
#
#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})


if options.state_backup_destination:
	stateNode = dagparts.DAGNode(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {})


#
# Write out the dag and other flies
#


dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
# There are few enough total jobs that this really shouldn't bog down the
# scheduler. For now 10000 will be considered indefinite
[node.set_retry(10000) for node in dag.get_nodes()]
dag.write_dag()
dag.write_script()
dag.write_cache()