From d2f24c8b9ff0aef39bba1591cd0fbb31fe7ec35c Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Sun, 22 Sep 2019 08:22:23 -0700
Subject: [PATCH] gstlal_ll_inspiral_pipe, inspiral_pipe.py: remove unused
 code, refactor; gstlal_inspiral: modify far-trials-factor option name

---
 gstlal-inspiral/bin/gstlal_inspiral         |   4 +-
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe | 516 ++++----------------
 gstlal-inspiral/python/inspiral_pipe.py     | 256 ++++++++++
 3 files changed, 352 insertions(+), 424 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral
index 991862d2ce..a04a5b7acd 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral
+++ b/gstlal-inspiral/bin/gstlal_inspiral
@@ -258,7 +258,7 @@ def parse_command_line():
 
 	group = OptionGroup(parser, "Ranking Statistic Options", "Adjust ranking statistic behaviour")
 	group.add_option("--cap-singles", action = "store_true", help = "Cap singles to 1 / livetime if computing FAR. No effect otherwise")
-	group.add_option("--FAR-trialsfactor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
+	group.add_option("--far-trials-factor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
 	group.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
 	group.add_option("--coincidence-threshold", metavar = "seconds", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005 s).  The light-travel time between instruments will be added automatically in the coincidence test.")
 	group.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
@@ -848,7 +848,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
 		kafka_server = options.output_kafka_server,
 		cluster = True,#options.data_source in ("lvshm", "framexmit"),# If uncommented, we only cluster when running online
 		cap_singles = options.cap_singles,
-		FAR_trialsfactor = options.FAR_trialsfactor,
+		FAR_trialsfactor = options.far_trials_factor,
 		verbose = options.verbose
 	)
 	if options.verbose:
diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 41fa5f2232..d3401e3b20 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -56,7 +56,7 @@ from lal.utils import CacheEntry
 #
 #       rankdir=LR;
 #       compound=true;
-#       node [shape=record fontsize=10 fontname="Verdana"];     
+#       node [shape=record fontsize=10 fontname="Verdana"];
 #       edge [fontsize=8 fontname="Verdana"];
 #	gstlal_inspiral [URL="\ref gstlal_inspiral"];
 #	gstlal_llcbcsummary [URL="\ref gstlal_llcbcsummary"];
@@ -65,72 +65,42 @@ from lal.utils import CacheEntry
 # }
 # @enddot
 
-class zookeeper_job(dagparts.DAGJob):
-	"""
-	A zookeeper job
-	"""
-	def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}):
-		"""
-		"""
-		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)
-
-		try:
-			os.mkdir(datadir)
-		except OSError:
-			pass
-		f = open("zookeeper.properties", "w")
-		f.write("""
-# the directory where the snapshot is stored.
-dataDir=%s
-# the port at which the clients will connect
-clientPort=%d
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=%d
-		""" % (datadir, port, maxclients))
-
-		f.close()
-
-
-class kafka_job(dagparts.DAGJob):
-	"""
-	A kafka job
-	"""
-	def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}):
-		"""
-		"""
-		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)
-
-		try:
-			os.mkdir(logdir)
-		except OSError:
-			pass
-		f = open("kafka.properties", "w")
-		f.write("""
-broker.id=0
-listeners = PLAINTEXT://%s
-num.network.threads=3
-num.io.threads=8
-socket.send.buffer.bytes=102400
-socket.receive.buffer.bytes=102400
-socket.request.max.bytes=104857600
-log.dirs=%s
-num.partitions=1
-num.recovery.threads.per.data.dir=1
-offsets.topic.replication.factor=1
-transaction.state.log.replication.factor=1
-transaction.state.log.min.isr=1
-log.flush.interval.messages=10000
-log.flush.interval.ms=1000
-log.retention.ms=100000
-log.roll.ms = 1000000
-log.segment.bytes=1073741824
-log.retention.check.interval.ms=300000
-zookeeper.connect=%s
-zookeeper.connection.timeout.ms=6000
-group.initial.rebalance.delay.ms=0
-		""" % (host, logdir, zookeeperaddr))
-
-		f.close()
+#
+# utility functions
+#
+
+def set_up_jobs(options):
+	jobs = {}
+
+	# condor commands
+	inspiral_condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal": "True", "kill_sig": "15"})
+	non_inspiral_condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command, {"want_graceful_removal": "True", "kill_sig": "15"})
+	local_condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)
+
+	# set up jobs
+	jobs['gstlalInspiral'] = dagparts.DAGJob('gstlal_inspiral', condor_commands = inspiral_condor_commands)
+	if options.inj_channel_dict:
+		jobs['gstlalInspiralInj'] = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_condor_commands)
+
+	# A local universe job that will run in a loop marginalizing all of the likelihoods
+	jobs['marg'] = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = local_condor_commands)
+
+	# an lvalert_listen job
+	jobs['lvalertListen'] = dagparts.DAGJob('gstlal_inspiral_lvalert_uberplotter', universe = "local", condor_commands = local_condor_commands)
+
+	# aggregator job
+	jobs['agg'] = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = non_inspiral_condor_commands)
+	jobs['aggLeader'] = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = non_inspiral_condor_commands)
+	jobs['trigagg'] = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = non_inspiral_condor_commands)
+
+	# DQ job
+	jobs['dq'] = dagparts.DAGJob("gstlal_ll_dq", condor_commands = non_inspiral_condor_commands)
+
+	if options.state_backup_destination:
+		# State saving job
+		jobs['state'] = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = local_condor_commands)
+
+	return jobs
 
 
 #
@@ -160,9 +130,9 @@ def parse_command_line():
 	parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
 	parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
 	parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
-	parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")	
-	parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")	
-	parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")	
+	parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")
+	parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
+	parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
 	parser.add_option("--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
 	parser.add_option("--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
 	parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
@@ -193,16 +163,15 @@ def parse_command_line():
 	parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.")
 	parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to")
 	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
+	parser.add_option("--far-trials-factor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
 	parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
 	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
-	parser.add_option("--run-output-kafka", action = "store_true", help = "Actually launch a zookeeper and kafka job as part of the dag")
 	parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
 	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
 	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
 	parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
 	parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
 	parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")
-	parser.add_option("--FAR-trialsfactor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
 
 	options, filenames = parser.parse_args()
 
@@ -216,41 +185,42 @@ def parse_command_line():
 	for option in ("bank_cache",):
 		if getattr(options, option) is None:
 			fail += "must provide option %s\n" % (option)
-	if fail: raise ValueError, fail
+	if fail: raise ValueError(fail)
 
 	if options.injection_file:
-		inj_name_dict = datasource.injection_dict_from_channel_list_with_node_range(options.injection_file)
+		options.inj_name_dict = datasource.injection_dict_from_channel_list_with_node_range(options.injection_file)
 	else:
-		inj_name_dict = {}
+		options.inj_name_dict = {}
 
-	if options.data_source not in datasourceinfo.live_sources :
+	if options.data_source not in datasourceinfo.live_sources:
 		raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")
 
 	#FIXME add consistency check?
-	bankcache = inspiral_pipe.parse_cache_str(options.bank_cache)
-	channel_dict = datasourceinfo.channel_dict
-	state_channel_dict = datasourceinfo.state_channel_dict
-	dq_channel_dict = datasourceinfo.dq_channel_dict
-	framexmit_dict = datasourceinfo.framexmit_addr
-	shm_part_dict = datasourceinfo.shm_part_dict
-	inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
-	inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
-	inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
-	inj_framexmit_dict = datasource.framexmit_dict_from_framexmit_list(options.inj_framexmit_addr)
+	options.bank_cache = inspiral_pipe.parse_cache_str(options.bank_cache)
+	options.channel_dict = datasourceinfo.channel_dict
+	options.state_channel_dict = datasourceinfo.state_channel_dict
+	options.dq_channel_dict = datasourceinfo.dq_channel_dict
+	options.framexmit_dict = datasourceinfo.framexmit_addr
+	options.shm_part_dict = datasourceinfo.shm_part_dict
+	options.inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
+	options.inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
+	options.inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
+	options.inj_framexmit_dict = datasource.framexmit_dict_from_framexmit_list(options.inj_framexmit_addr)
+
 	## A dictionary for injection shared memory partition
-	inj_shm_part_dict = {"H1": "LHO_InjData", "L1": "LLO_InjData", "V1": "VIRGO_InjData"}
+	options.inj_shm_part_dict = {"H1": "LHO_InjData", "L1": "LLO_InjData", "V1": "VIRGO_InjData"}
 	if options.inj_shared_memory_partition is not None:
-		inj_shm_part_dict.update( datasource.channel_dict_from_channel_list(options.inj_shared_memory_partition) )
+		options.inj_shm_part_dict.update( datasource.channel_dict_from_channel_list(options.inj_shared_memory_partition) )
 
-	inj_range_dict = {}
-	for tag, channel in inj_name_dict.items():
-		inj_range_dict.setdefault(channel, []).append(tag)
-	for k,v in inj_range_dict.items():
-		inj_range_dict[k] = sorted(v)
+	options.inj_range_dict = {}
+	for tag, channel in options.inj_name_dict.items():
+		options.inj_range_dict.setdefault(channel, []).append(tag)
+	for k,v in options.inj_range_dict.items():
+		options.inj_range_dict[k] = sorted(v)
 
-	if inj_channel_dict:
-		for nodes in inj_channel_dict.keys():
-			if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ):
+	if options.inj_channel_dict:
+		for nodes in options.inj_channel_dict.keys():
+			if not ( set(options.inj_channel_dict[nodes].keys()) == set(options.channel_dict.keys()) ):
 				raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")
 
 	options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
@@ -259,7 +229,7 @@ def parse_command_line():
 	options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
 	options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]
 
-	return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict
+	return options, filenames
 
 
 #
@@ -267,12 +237,12 @@ def parse_command_line():
 #
 
 
-options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict = parse_command_line()
+options, filenames = parse_command_line()
 
-try: os.mkdir("logs")
-except: pass
-try: os.mkdir("gracedb")
-except: pass
+# make directories
+for dir_ in ['logs', 'gracedb', 'aggregator']:
+	if not os.path.exists(dir_):
+		os.mkdir(dir_)
 
 if options.analysis_tag:
 	dag = dagparts.DAG("trigger_pipe_%s" % options.analysis_tag)
@@ -283,347 +253,48 @@ else:
 # setup the job classes
 #
 
-gstlalInspiralJob = dagparts.DAGJob('gstlal_inspiral', condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
-if inj_channel_dict:
-	gstlalInspiralInjJob = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
-
-# A local universe job that will run in a loop marginalizing all of the likelihoods
-margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
-
-# an lvalert_listen job
-listenJob = dagparts.DAGJob('gstlal_inspiral_lvalert_uberplotter', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
-
-# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
-
-if options.output_kafka_server is not None and options.run_output_kafka:
-	zooJob = zookeeper_job("zookeeper-server-start.sh", tag_base = "zookeeper-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
-	kafkaJob = kafka_job("kafka-server-start.sh", tag_base = "kafka-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
-	zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
-	kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"})
-
-
-# aggregator job
-aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
-aggLeaderJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
-trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
-
-# Summary page job
-#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online",  universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
-
-# DQ job
-dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
-
-if options.state_backup_destination:
-	# State saving job 
-	stateJob = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
+jobs = set_up_jobs(options)
 
 #
 # Setup the Node classes
 #
 
-listenNode = dagparts.DAGNode(listenJob, dag, [], opts = {"gracedb-service-url": options.gracedb_service_url, "lvalert-server-url": options.lvalert_server_url})
+listenNode = dagparts.DAGNode(jobs['lvalertListen'], dag, [],
+	opts = {"gracedb-service-url": options.gracedb_service_url, "lvalert-server-url": options.lvalert_server_url},
+)
+
 
 # dq with default options
-for ifo in channel_dict:
-	outpath = "aggregator"
-	try:
-		os.makedirs(outpath)
-	except OSError:
-		pass
-
-	# Data source dag options
-	if (options.data_source == "framexmit"):
-		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
-			"framexmit-iface":options.framexmit_iface
-			}
-	else :
-		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shm_part_dict[ifo]}),
-			"shared-memory-block-size":options.shared_memory_block_size,
-			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
-			}
-
-	common_opts = {"psd-fft-length":options.psd_fft_length,
-		"channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}),
-		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"),
-		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"),
-		"state-vector-on-bits":options.state_vector_on_bits,
-		"state-vector-off-bits":options.state_vector_off_bits,
-		"dq-vector-on-bits":options.dq_vector_on_bits,
-		"dq-vector-off-bits":options.dq_vector_off_bits,
-		"data-source":options.data_source,
-		"out-path": outpath,
-		"data-backend": options.agg_data_backend,
-		}
-	common_opts.update(datasource_opts)
-
-	if options.agg_data_backend == 'influx':
-		common_opts.update({
-			"influx-database-name": options.influx_database_name,
-			"influx-hostname": options.influx_hostname,
-			"influx-port": options.influx_port,
-		})
-		if options.enable_auth:
-			common_opts.update({"enable-auth": ""})
-		if options.enable_https:
-			common_opts.update({"enable-https": ""})
-
-	dagparts.DAGNode(dqJob, dag, [], opts = common_opts)
+dqNodes = inspiral_pipe.dq_monitor_layer(dag, jobs, options)
 
 #
 # loop over banks to run gstlal inspiral pre clustering and far computation
-#
-
-jobTags = []
-inj_jobTags = []
-
-if options.ht_gate_threshold_linear is not None:
-	# Linear scale specified
-	template_mchirp_dict = inspiral_pipe.get_svd_bank_params_online(bank_cache.values()[0])
-	mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
-
-bank_groups = list(inspiral_pipe.build_bank_groups(bank_cache, [1], options.max_jobs - 1))
-if len(options.likelihood_files) != len(bank_groups):
-	raise ValueError("Likelihood files must correspond 1:1 with bank files")
-
-for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
-	svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
-	jobTags.append("%04d" % num_insp_nodes)
-
-	# Calculate the appropriate ht-gate-threshold value
-	threshold_values = None
-	if options.ht_gate_threshold_linear is not None:
-		# Linear scale specified
-		# use max mchirp in a given svd bank to decide gate threshold
-		bank_mchirps = [template_mchirp_dict["%04d" % int(os.path.basename(svd_file).split("-")[1].split("_")[3])][1] for svd_file in svd_banks.items()[0][1]]
-		threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
-	else:
-		if options.ht_gate_threshold is not None:
-			threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
-
-	# Data source dag options
-	if (options.data_source == "framexmit"):
-		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
-			"framexmit-iface":options.framexmit_iface
-			}
-	else :
-		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(shm_part_dict, opt = "shared-memory-partition"),
-			"shared-memory-block-size":options.shared_memory_block_size,
-			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
-			}
-
-	common_opts = {"psd-fft-length":options.psd_fft_length,
-		"reference-psd":options.reference_psd,
-		"ht-gate-threshold":threshold_values,
-		"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
-		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
-		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
-		"state-vector-on-bits":options.state_vector_on_bits,
-		"state-vector-off-bits":options.state_vector_off_bits,
-		"dq-vector-on-bits":options.dq_vector_on_bits,
-		"dq-vector-off-bits":options.dq_vector_off_bits,
-		"svd-bank":svd_bank_string,
-		"tmp-space":dagparts.condor_scratch_space(),
-		"track-psd":"",
-		"control-peak-time":options.control_peak_time,
-		"coincidence-threshold":options.coincidence_threshold,
-		"fir-stride":options.fir_stride,
-		"data-source":options.data_source,
-		"gracedb-far-threshold":options.gracedb_far_threshold,
-		"gracedb-group":options.gracedb_group,
-		"gracedb-pipeline":options.gracedb_pipeline,
-		"gracedb-search":options.gracedb_search,
-		"gracedb-service-url":options.gracedb_service_url,
-		"job-tag":jobTags[-1],
-		"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
-		"FAR-trialsfactor":options.FAR_trialsfactor,
-		"min-instruments":options.min_instruments,
-		"time-slide-file":options.time_slide_file,
-		"output-kafka-server": options.output_kafka_server
-		}
-	common_opts.update(datasource_opts)
-
-	inspNode = dagparts.DAGNode(gstlalInspiralJob, dag, [],
-		opts = common_opts,
-		input_files = {
-			"ranking-stat-input":[likefile],
-			"ranking-stat-pdf":options.marginalized_likelihood_file
-		},
-		output_files = {
-			"output":"/dev/null",
-			"ranking-stat-output":likefile,
-			"zerolag-rankingstat-pdf":zerolikefile
-		}
-	)
-
-	if str("%04d" %num_insp_nodes) in inj_channel_dict:
-		# FIXME The node number for injection jobs currently follows the same
-		# numbering system as non-injection jobs, except instead of starting at
-		# 0000 the numbering starts at 1000. There is probably a better way to
-		# do this in the future, this system was just the simplest to start
-		# with
-		inj_jobTags.append("%04d" % (num_insp_nodes + 1000))
-
-		# Data source dag options
-		if (options.data_source == "framexmit"):
-			datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
-				"framexmit-iface":options.framexmit_iface
-				}
-		else :
-			datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(inj_shm_part_dict, opt = "shared-memory-partition"),
-				"shared-memory-block-size":options.inj_shared_memory_block_size,
-				"shared-memory-assumed-duration":options.inj_shared_memory_assumed_duration
-				}
-
-		common_opts = {"psd-fft-length":options.psd_fft_length,
-			"reference-psd":options.reference_psd,
-			"ht-gate-threshold":threshold_values,
-			"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
-			"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
-			"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
-			"state-vector-on-bits":options.inj_state_vector_on_bits,
-			"state-vector-off-bits":options.inj_state_vector_off_bits,
-			"dq-vector-on-bits":options.inj_dq_vector_on_bits,
-			"dq-vector-off-bits":options.inj_dq_vector_off_bits,
-			"svd-bank":svd_bank_string,
-			"tmp-space":dagparts.condor_scratch_space(),
-			"track-psd":"",
-			"control-peak-time":options.control_peak_time,
-			"coincidence-threshold":options.coincidence_threshold,
-			"fir-stride":options.fir_stride,
-			"data-source":options.data_source,
-			"gracedb-far-threshold":options.inj_gracedb_far_threshold,
-			"gracedb-group":options.inj_gracedb_group,
-			"gracedb-pipeline":options.inj_gracedb_pipeline,
-			"gracedb-search":options.inj_gracedb_search,
-			"gracedb-service-url":options.inj_gracedb_service_url,
-			"job-tag":inj_jobTags[-1],
-			"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
-			"FAR-trialsfactor":options.FAR_trialsfactor,
-			"min-instruments":options.min_instruments,
-			"time-slide-file":options.time_slide_file
-			}
-		common_opts.update(datasource_opts)
-		inspInjNode = dagparts.DAGNode(gstlalInspiralInjJob, dag, [],
-			opts = common_opts,
-			input_files = {
-				"ranking-stat-input":[likefile],
-				"ranking-stat-pdf":options.marginalized_likelihood_file
-			},
-			output_files = {
-				"output":"/dev/null"
-			}
-		)
-
-margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {})
-
-#
-# set up aggregation jobs
-#
-
-#
 # FIXME by default the inspiral jobs advertise the current directory as their
 # job tag, but this should be made to be more flexible
 #
 
-# set up common settings for aggregation jobs
-agg_options = {
-	"dump-period": 0,
-	"base-dir": "aggregator",
-	"job-tag": os.getcwd(),
-	"num-jobs": len(jobTags),
-	"num-threads": 2,
-	"job-start": 0,
-	"kafka-server": options.output_kafka_server,
-	"data-backend": options.agg_data_backend,
-}
-
-if options.agg_data_backend == 'influx':
-	agg_options.update({
-		"influx-database-name": options.influx_database_name,
-		"influx-hostname": options.influx_hostname,
-		"influx-port": options.influx_port,
-	})
-	if options.enable_auth:
-		agg_options.update({"enable-auth": ""})
-	if options.enable_https:
-		agg_options.update({"enable-https": ""})
-
-# define routes used for aggregation jobs
-snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
-network_routes = ["likelihood_history", "snr_history", "latency_history"]
-usage_routes = ["ram_history"]
-
-state_routes = []
-for ifo in channel_dict.keys():
-    state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
-    state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
-    state_routes.append("%s_strain_dropped" % ifo)
-agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
-
-# analysis-based aggregation jobs
-# FIXME don't hard code the 1000
-max_agg_jobs = 1000
-agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
-agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(jobTags)), 1))) + ["far_history"]
-for routes in agg_routes:
-	these_options = dict(agg_options)
-	these_options["route"] = routes
-	if routes == "far_history":
-		these_options["data-type"] = "min"
-	else:
-		these_options["data-type"] = "max"
-
-	for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
-		these_options["job-start"] = aggstart
-		these_options["num-jobs"] = aggend - aggstart
-		if ii == 0: ### elect first aggregator per route as leader
-			these_options["across-jobs"] = ""
-			aggNode = dagparts.DAGNode(aggLeaderJob, dag, [], opts = these_options)
-		else:
-			aggNode = dagparts.DAGNode(aggJob, dag, [], opts = these_options)
-
-# Trigger aggregation
-trigagg_options = {
-	"dump-period": 0,
-	"base-dir": "aggregator",
-	"job-tag": os.getcwd(),
-	"num-jobs": len(jobTags),
-	"num-threads": 2,
-	"job-start": 0,
-	"kafka-server": options.output_kafka_server,
-	"data-backend": options.agg_data_backend,
-}
-if options.agg_data_backend == 'influx':
-	trigagg_options.update({
-		"influx-database-name": options.influx_database_name,
-		"influx-hostname": options.influx_hostname,
-		"influx-port": options.influx_port,
-	})
-	if options.enable_auth:
-		trigagg_options.update({"enable-auth": ""})
-	if options.enable_https:
-		trigagg_options.update({"enable-https": ""})
-
-aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
+job_tags, inj_job_tags = inspiral_pipe.online_inspiral_layer(dag, jobs, options)
 
-#
-# summary page
-#
+margNode = dagparts.DAGNode(jobs['marg'], dag, [],
+	opts = {},
+	input_files = {"": [options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in job_tags]},
+	output_files = {}
+)
 
-#if options.injection_file:
-#
-#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
 #
-#	for injfile, jobrange in inj_range_dict.items():
-#		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
-#
-#else:
+# set up aggregation jobs
 #
-#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
 
+aggNodes = inspiral_pipe.aggregator_layer(dag, jobs, options, job_tags)
+
+# inspiral state backup
 
 if options.state_backup_destination:
-	stateNode = dagparts.DAGNode(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {})
+	stateNode = dagparts.DAGNode(jobs['state'], dag, [],
+		opts = {},
+		input_files = {"": [options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files},
+		output_files = {},
+	)
 
 
 #
@@ -632,6 +303,7 @@ if options.state_backup_destination:
 
 
 dag.write_sub_files()
+
 # we probably want these jobs to retry indefinitely on dedicated nodes. A user
 # can intervene and fix a problem without having to bring the dag down and up.
 # There are few enough total jobs that this really shouldn't bog down the
diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py
index 44ac157c0d..4f2c130bd2 100644
--- a/gstlal-inspiral/python/inspiral_pipe.py
+++ b/gstlal-inspiral/python/inspiral_pipe.py
@@ -78,6 +78,262 @@ lsctables.use_in(LIGOLWContentHandler)
 #
 
 
+def online_inspiral_layer(dag, jobs, options):
+	job_tags = []
+	inj_job_tags = []
+
+	if options.ht_gate_threshold_linear is not None:
+		# Linear scale specified
+		template_mchirp_dict = get_svd_bank_params(options.bank_cache.values()[0], online = True)
+		mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
+
+	bank_groups = list(build_bank_groups(options.bank_cache, [1], options.max_jobs - 1))
+	if len(options.likelihood_files) != len(bank_groups):
+		raise ValueError("Likelihood files must correspond 1:1 with bank files")
+
+	for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
+		svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
+		job_tags.append("%04d" % num_insp_nodes)
+
+		# Calculate the appropriate ht-gate-threshold value
+		threshold_values = None
+		if options.ht_gate_threshold_linear is not None:
+			# Linear scale specified
+			# use max mchirp in a given svd bank to decide gate threshold
+			bank_mchirps = [template_mchirp_dict["%04d" % int(os.path.basename(svd_file).split("-")[1].split("_")[3])][1] for svd_file in svd_banks.items()[0][1]]
+			threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
+		elif options.ht_gate_threshold is not None:
+			threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
+
+		# Data source dag options
+		if (options.data_source == "framexmit"):
+			datasource_opts = {
+				"framexmit-addr": datasource.framexmit_list_from_framexmit_dict(options.framexmit_dict),
+				"framexmit-iface": options.framexmit_iface
+			}
+		else:
+			datasource_opts = {
+				"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict(options.shm_part_dict, opt = "shared-memory-partition"),
+				"shared-memory-block-size": options.shared_memory_block_size,
+				"shared-memory-assumed-duration": options.shared_memory_assumed_duration
+			}
+
+		common_opts = {
+			"psd-fft-length": options.psd_fft_length,
+			"reference-psd": options.reference_psd,
+			"ht-gate-threshold": threshold_values,
+			"channel-name": datasource.pipeline_channel_list_from_channel_dict(options.channel_dict),
+			"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.state_channel_dict, opt = "state-channel-name"),
+			"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.dq_channel_dict, opt = "dq-channel-name"),
+			"state-vector-on-bits": options.state_vector_on_bits,
+			"state-vector-off-bits": options.state_vector_off_bits,
+			"dq-vector-on-bits": options.dq_vector_on_bits,
+			"dq-vector-off-bits": options.dq_vector_off_bits,
+			"svd-bank": svd_bank_string,
+			"tmp-space": dagparts.condor_scratch_space(),
+			"track-psd": "",
+			"control-peak-time": options.control_peak_time,
+			"coincidence-threshold": options.coincidence_threshold,
+			"fir-stride": options.fir_stride,
+			"data-source": options.data_source,
+			"gracedb-far-threshold": options.gracedb_far_threshold,
+			"gracedb-group": options.gracedb_group,
+			"gracedb-pipeline": options.gracedb_pipeline,
+			"gracedb-search": options.gracedb_search,
+			"gracedb-service-url": options.gracedb_service_url,
+			"job-tag": job_tags[-1],
+			"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
+			"far-trials-factor": options.far_trials_factor,
+			"min-instruments": options.min_instruments,
+			"time-slide-file": options.time_slide_file,
+			"output-kafka-server": options.output_kafka_server
+		}
+		common_opts.update(datasource_opts)
+
+		inspNode = dagparts.DAGNode(jobs['gstlalInspiral'], dag, [],
+			opts = common_opts,
+			input_files = {
+				"ranking-stat-input": [likefile],
+				"ranking-stat-pdf": options.marginalized_likelihood_file
+			},
+			output_files = {
+				"output": "/dev/null",
+				"ranking-stat-output": likefile,
+				"zerolag-rankingstat-pdf": zerolikefile
+			}
+		)
+
+		if str("%04d" %num_insp_nodes) in options.inj_channel_dict:
+			# FIXME The node number for injection jobs currently follows the same
+			# numbering system as non-injection jobs, except instead of starting at
+			# 0000 the numbering starts at 1000. There is probably a better way to
+			# do this in the future, this system was just the simplest to start
+			# with
+			inj_job_tags.append("%04d" % (num_insp_nodes + 1000))
+
+			injection_opts = {
+				"channel-name": datasource.pipeline_channel_list_from_channel_dict_with_node_range(options.inj_channel_dict, node = job_tags[-1]),
+				"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_state_channel_dict, opt = "state-channel-name"),
+				"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_dq_channel_dict, opt = "dq-channel-name"),
+				"state-vector-on-bits": options.inj_state_vector_on_bits,
+				"state-vector-off-bits": options.inj_state_vector_off_bits,
+				"dq-vector-on-bits": options.inj_dq_vector_on_bits,
+				"dq-vector-off-bits": options.inj_dq_vector_off_bits,
+				"gracedb-far-threshold": options.inj_gracedb_far_threshold,
+				"gracedb-group": options.inj_gracedb_group,
+				"gracedb-pipeline": options.inj_gracedb_pipeline,
+				"gracedb-search": options.inj_gracedb_search,
+				"gracedb-service-url": options.inj_gracedb_service_url,
+				"job-tag": inj_job_tags[-1],
+				"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
+				"far-trials-factor": options.far_trials_factor,
+				"min-instruments": options.min_instruments,
+				"time-slide-file": options.time_slide_file
+			}
+
+			common_opts.update(injection_opts)
+			inspInjNode = dagparts.DAGNode(jobs['gstlalInspiralInj'], dag, [],
+				opts = common_opts,
+				input_files = {
+					"ranking-stat-input": [likefile],
+					"ranking-stat-pdf": options.marginalized_likelihood_file
+				},
+				output_files = {
+					"output": "/dev/null"
+				}
+			)
+
+	return job_tags, inj_job_tags
+
+
+def aggregator_layer(dag, jobs, options, job_tags):
+	# set up common settings for aggregation jobs
+	agg_options = {
+		"dump-period": 0,
+		"base-dir": "aggregator",
+		"job-tag": os.getcwd(),
+		"num-jobs": len(job_tags),
+		"num-threads": 2,
+		"job-start": 0,
+		"kafka-server": options.output_kafka_server,
+		"data-backend": options.agg_data_backend,
+	}
+
+	if options.agg_data_backend == 'influx':
+		agg_options.update({
+			"influx-database-name": options.influx_database_name,
+			"influx-hostname": options.influx_hostname,
+			"influx-port": options.influx_port,
+		})
+		if options.enable_auth:
+			agg_options.update({"enable-auth": ""})
+		if options.enable_https:
+			agg_options.update({"enable-https": ""})
+
+	# define routes used for aggregation jobs
+	snr_routes = ["%s_snr_history" % ifo for ifo in options.channel_dict]
+	network_routes = ["likelihood_history", "snr_history", "latency_history"]
+	usage_routes = ["ram_history"]
+
+	state_routes = []
+	for ifo in options.channel_dict.keys():
+	    state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
+	    state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
+	    state_routes.append("%s_strain_dropped" % ifo)
+	agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
+
+	# analysis-based aggregation jobs
+	# FIXME don't hard code the 1000
+	max_agg_jobs = 1000
+	agg_job_bounds = range(0, len(job_tags), max_agg_jobs) + [max_agg_jobs]
+	agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(job_tags)), 1))) + ["far_history"]
+	for routes in agg_routes:
+		these_options = dict(agg_options)
+		these_options["route"] = routes
+		if routes == "far_history":
+			these_options["data-type"] = "min"
+		else:
+			these_options["data-type"] = "max"
+
+		for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
+			these_options["job-start"] = aggstart
+			these_options["num-jobs"] = aggend - aggstart
+			if ii == 0: ### elect first aggregator per route as leader
+				these_options["across-jobs"] = ""
+				aggNode = dagparts.DAGNode(jobs['aggLeader'], dag, [], opts = these_options)
+			else:
+				aggNode = dagparts.DAGNode(jobs['agg'], dag, [], opts = these_options)
+
+	# Trigger aggregation
+	trigagg_options = {
+		"dump-period": 0,
+		"base-dir": "aggregator",
+		"job-tag": os.getcwd(),
+		"num-jobs": len(job_tags),
+		"num-threads": 2,
+		"job-start": 0,
+		"kafka-server": options.output_kafka_server,
+		"data-backend": options.agg_data_backend,
+	}
+	if options.agg_data_backend == 'influx':
+		trigagg_options.update({
+			"influx-database-name": options.influx_database_name,
+			"influx-hostname": options.influx_hostname,
+			"influx-port": options.influx_port,
+		})
+		if options.enable_auth:
+			trigagg_options.update({"enable-auth": ""})
+		if options.enable_https:
+			trigagg_options.update({"enable-https": ""})
+
+	return dagparts.DAGNode(jobs['trigagg'], dag, [], opts = trigagg_options)
+
+
+def dq_monitor_layer(dag, jobs, options):
+	outpath = 'aggregator'
+	for ifo in options.channel_dict:
+		# Data source dag options
+		if (options.data_source == "framexmit"):
+			datasource_opts = {
+				"framexmit-addr": datasource.framexmit_list_from_framexmit_dict({ifo: options.framexmit_dict[ifo]}),
+				"framexmit-iface": options.framexmit_iface
+			}
+		else:
+			datasource_opts = {
+				"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict({ifo: options.shm_part_dict[ifo]}),
+				"shared-memory-block-size": options.shared_memory_block_size,
+				"shared-memory-assumed-duration": options.shared_memory_assumed_duration
+			}
+
+		common_opts = {
+			"psd-fft-length": options.psd_fft_length,
+			"channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.channel_dict[ifo]}),
+			"state-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.state_channel_dict[ifo]}, opt = "state-channel-name"),
+			"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.dq_channel_dict[ifo]}, opt = "dq-channel-name"),
+			"state-vector-on-bits": options.state_vector_on_bits,
+			"state-vector-off-bits": options.state_vector_off_bits,
+			"dq-vector-on-bits": options.dq_vector_on_bits,
+			"dq-vector-off-bits": options.dq_vector_off_bits,
+			"data-source": options.data_source,
+			"out-path": outpath,
+			"data-backend": options.agg_data_backend,
+		}
+		common_opts.update(datasource_opts)
+
+		if options.agg_data_backend == 'influx':
+			common_opts.update({
+				"influx-database-name": options.influx_database_name,
+				"influx-hostname": options.influx_hostname,
+				"influx-port": options.influx_port,
+			})
+			if options.enable_auth:
+				common_opts.update({"enable-auth": ""})
+			if options.enable_https:
+				common_opts.update({"enable-https": ""})
+
+		return dagparts.DAGNode(jobs['dq'], dag, [], opts = common_opts)
+
+
 def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
 	psd_nodes = {}
 	for ifos in segsdict:
-- 
GitLab