Skip to content
Snippets Groups Projects
inspiral.py 57.42 KiB
# Copyright (C) 2020  Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from collections.abc import Mapping
import itertools
import os
from typing import Iterable

import numpy
import yaml
from yaml.loader import SafeLoader

from lal import rate
from lal.utils import CacheEntry
from ligo.segments import segment

from gstlal import plugins
from gstlal import datafind
from gstlal.datafind import DataType, DataCache
from gstlal.dags import Argument, Option
from gstlal.dags import util as dagutil
from gstlal.dags.layers import Layer, Node
from gstlal.stats.inspiral_extrinsics import TimePhaseSNR


def split_bank_layer(config, dag, psd_cache, bank_cache):
	layer = Layer(
		"gstlal_inspiral_bank_splitter",
		requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	split_bank_cache = DataCache.generate(DataType.SVD_BANK, config.all_ifos, config.span, svd_bins=config.svd.bins)

	layer += Node(
		arguments = [
			Option("f-low", config.svd.f_low),
			Option("f-final", config.svd.max_f_final),
			Option("group-by-chi", config.svd.num_chi_bins),
			Option("approximant", config.svd.approximant),
			Option("overlap", config.svd.overlap),
			Option("instrument", ifo_to_string(config.all_ifos)),
			Option("n", config.svd.num_split_templates),
			Option("sort-by", "template_duration"),
			Option("num-banks", config.svd.num_banks),
		],
		inputs = [
			Argument("template-bank", bank_cache.files, track=False),
			Argument("psd-xml", psd_cache.files, track=False),
		],
		outputs = [
			Option("output-path", str(split_bank_cache.name).lower()),
			Option("output-stats-file", DataType.SVD_MANIFEST.filename(config.ifos)),
		],
	)

	dag.attach(layer)
	return split_bank_cache


def svd_bank_layer(config, dag, median_psd_cache, split_bank_cache=None):
	layer = Layer(
		"gstlal_inspiral_svd_bank",
		requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# set up autocorrelation mapping
	mchirp_to_ac_length = autocorrelation_length_map(config.svd.autocorrelation_length)

	svd_cache = DataCache.generate(
		DataType.SVD_BANK,
		config.ifos,
		config.span,
		svd_bins=config.svd.bins,
		root="filter",
	)

	split_banks = split_bank_cache.groupby("bin")
	for (ifo, svd_bin), svd_banks in svd_cache.groupby("ifo", "bin").items():

		# grab sub-bank specific configuration if available
		if "bank_name" in config.svd.stats.bins[svd_bin]:
			bank_name = config.svd.stats.bins[svd_bin]["bank_name"]
			svd_config = config.svd.sub_banks[bank_name]
		else:
			svd_config = config.svd

		bin_mchirp = config.svd.stats.bins[svd_bin]["mean_mchirp"]

		arguments = [
			Option("instrument-override", ifo),
			Option("flow", svd_config.f_low),
			Option("samples-min", svd_config.samples_min),
			Option("samples-max-64", svd_config.samples_max_64),
			Option("samples-max-256", svd_config.samples_max_256),
			Option("samples-max", svd_config.samples_max),
			Option("svd-tolerance", svd_config.tolerance),
			Option("autocorrelation-length", mchirp_to_ac_length(bin_mchirp)),
		]
		if "sample_rate" in svd_config:
			arguments.append(Option("sample-rate", svd_config.sample_rate))

		layer += Node(
			arguments = arguments,
			inputs = [
				Option("reference-psd", median_psd_cache.files),
				Argument("split-banks", sorted(split_banks[svd_bin].files)),
			],
			outputs = Option("write-svd", svd_banks.files)
		)

	dag.attach(layer)
	return svd_cache


def checkerboard_layer(config, dag, ref_psd_cache, svd_bank_cache):
	layer = Layer(
		"gstlal_svd_bank_checkerboard",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# set up arguments
	arguments = [Option("in-place"), Option("reference-psd", ref_psd_cache.files)]
	if config.svd.checkerboard == "even":
		arguments.append(Option("even"))

	chunk_size = 20
	for svd_banks in svd_bank_cache.chunked(chunk_size):
		layer += Node(
			arguments = arguments,
			inputs = Option("svd-files", svd_banks.files),
			outputs = Argument("checkered-svd-files", svd_banks.files, suppress=True),
		)

	dag.attach(layer)
	return svd_bank_cache


def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
	layer = Layer(
		"gstlal_inspiral",
		requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		dynamic_memory=True,
	)

	dist_stat_cache = DataCache.generate(
		DataType.DIST_STATS,
		config.ifo_combos,
		config.time_bins,
		svd_bins=config.svd.bins,
		root="filter",
	)
	trigger_cache = DataCache.generate(
		DataType.TRIGGERS,
		config.ifo_combos,
		config.time_bins,
		svd_bins=config.svd.bins,
		root="filter",
	)

	common_opts = [
		Option("track-psd"),
		Option("data-source", "frames"),
		Option("control-peak-time", 0),
		Option("psd-fft-length", config.psd.fft_length),
		Option("frame-segments-name", config.source.frame_segments_name),
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("coincidence-threshold", config.filter.coincidence_threshold),
		Option("fir-stride", config.filter.fir_stride),
		Option("min-instruments", config.min_ifos),
	]

	# disable service discovery if using singularity
	if config.condor.singularity_image:
		common_opts.append(Option("disable-service-discovery"))

	# checkpoint by grouping SVD bins together + enable local
	# frame caching if there if more than one SVD bin per group
	if config.condor.transfer_files:
		max_concurrency = 10
	else:
		max_concurrency = 20

	num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
	if num_per_group > 1:
		common_opts.append(Option("local-frame-caching"))

	ref_psds = ref_psd_cache.groupby("ifo", "time")
	svd_banks = svd_bank_cache.groupby("ifo", "bin")
	dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
	for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
		ifos = config.to_ifo_list(ifo_combo)
		start, end = span

		filter_opts = [
			Option("gps-start-time", int(start)),
			Option("gps-end-time", int(end)),
			Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
		]
		if config.source.frame_cache:
			filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
		else:
			filter_opts.extend([
				Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
				Option("data-find-server", config.source.data_find_server),
			])

		filter_opts.extend(common_opts)

		for trigger_group in triggers.chunked(num_per_group):
			svd_bins = trigger_group.groupby("bin").keys()

			thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
			these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]

			svd_bank_files = dagutil.flatten(
				[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
			)
			dist_stat_files = dagutil.flatten(
				[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
			)

			layer += Node(
				arguments = these_opts,
				inputs = [
					Option("frame-segments-file", config.source.frame_segments_file),
					Option("veto-segments-file", config.filter.veto_segments_file),
					Option("reference-psd", ref_psds[(ifo_combo, span)].files),
					Option("time-slide-file", config.filter.time_slide_file),
					Option("svd-bank", svd_bank_files),
				],
				outputs = [
					Option("output", trigger_group.files),
					Option("ranking-stat-output", dist_stat_files),
				],
			)

	dag.attach(layer)

	return trigger_cache, dist_stat_cache


def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache):
	layer = Layer(
		"gstlal_inspiral",
		name="gstlal_inspiral_inj",
		requirements={"request_cpus": 2, "request_memory": 5000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		dynamic_memory=True,
	)

	trigger_cache = DataCache(DataType.TRIGGERS)
	for inj_name, inj_args in config.filter.injections.items():
		min_mchirp, max_mchirp = map(float, inj_args["range"].split(":"))
		svd_bins = mchirp_range_to_bins(min_mchirp, max_mchirp, config.svd.stats)
		trigger_cache += DataCache.generate(
			DataType.TRIGGERS,
			config.ifo_combos,
			config.time_bins,
			svd_bins=svd_bins,
			subtype=inj_name,
			root="filter",
		)

	common_opts = [
		Option("track-psd"),
		Option("data-source", "frames"),
		Option("control-peak-time", 0),
		Option("psd-fft-length", config.psd.fft_length),
		Option("frame-segments-name", config.source.frame_segments_name),
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("coincidence-threshold", config.filter.coincidence_threshold),
		Option("fir-stride", config.filter.fir_stride),
		Option("min-instruments", config.min_ifos),
	]

	# disable service discovery if using singularity
	if config.condor.singularity_image:
		common_opts.append(Option("disable-service-discovery"))

	# checkpoint by grouping SVD bins together + enable local
	# frame caching if there if more than one SVD bin per group
	if config.condor.transfer_files:
		max_concurrency = 10
	else:
		max_concurrency = 20

	num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
	if num_per_group > 1:
		common_opts.append(Option("local-frame-caching"))

	ref_psds = ref_psd_cache.groupby("ifo", "time")
	svd_banks = svd_bank_cache.groupby("ifo", "bin")
	for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
		ifos = config.to_ifo_list(ifo_combo)
		start, end = span

		filter_opts = [
			Option("gps-start-time", int(start)),
			Option("gps-end-time", int(end)),
			Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
		]
		if config.source.frame_cache:
			filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
		else:
			filter_opts.extend([
				Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
				Option("data-find-server", config.source.data_find_server),
			])
		filter_opts.extend(common_opts)
		injection_file = config.filter.injections[inj_type.lower()]["file"]

		for trigger_group in triggers.chunked(num_per_group):
			svd_bins = trigger_group.groupby("bin").keys()

			thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
			these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]

			svd_bank_files = dagutil.flatten(
				[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
			)

			layer += Node(
				arguments = these_opts,
				inputs = [
					Option("frame-segments-file", config.source.frame_segments_file),
					Option("veto-segments-file", config.filter.veto_segments_file),
					Option("reference-psd", ref_psds[(ifo_combo, span)].files),
					Option("time-slide-file", config.filter.time_slide_file),
					Option("svd-bank", svd_bank_files),
					Option("injections", injection_file),
				],
				outputs = Option("output", trigger_group.files),
			)

	dag.attach(layer)

	return trigger_cache


def aggregate_layer(config, dag, trigger_cache, dist_stat_cache=None):
	trg_layer = Layer(
		"lalapps_run_sqlite",
		name="cluster_triggers_by_snr",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# FIXME: find better way of discovering SQL file
	share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
	snr_cluster_sql_file = os.path.join(share_path, "snr_simplify_and_cluster.sql")
	inj_snr_cluster_sql_file = os.path.join(share_path, "inj_snr_simplify_and_cluster.sql")

	# cluster triggers by SNR
	for (svd_bin, inj_type), triggers in trigger_cache.groupby("bin", "subtype").items():
		trg_layer += Node(
			arguments = [
				Option("sql-file", inj_snr_cluster_sql_file if inj_type else snr_cluster_sql_file),
				Option("tmp-space", dagutil.condor_scratch_space()),
			],
			inputs = Argument("triggers", triggers.files),
			outputs = Argument("clustered-triggers", triggers.files, suppress=True),
		)

	dag.attach(trg_layer)

	# if no dist stats files to marginalize, only return triggers
	if not dist_stat_cache:
		return trigger_cache

	# marginalize dist stats across time
	dist_layer = Layer(
		"gstlal_inspiral_marginalize_likelihood",
		name="marginalize_dist_stats_across_time_filter",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	agg_dist_stat_cache = DataCache.generate(
		DataType.DIST_STATS,
		config.all_ifos,
		config.span,
		svd_bins=config.svd.bins,
		root="filter",
	)

	dist_stats = dist_stat_cache.groupby("bin")
	for svd_bin, agg_dist_stats in agg_dist_stat_cache.groupby("bin").items():
		dist_layer += Node(
			arguments = Option("marginalize", "ranking-stat"),
			inputs = Argument("dist-stats", dist_stats[svd_bin].files),
			outputs = Option("output", agg_dist_stats.files)
		)

	dag.attach(dist_layer)

	return trigger_cache, agg_dist_stat_cache


def create_prior_layer(config, dag, svd_bank_cache, median_psd_cache, dist_stat_cache=None):
	layer = Layer(
		"gstlal_inspiral_create_prior_diststats",
		requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	if dist_stat_cache:
		prior_cache = DataCache.generate(
			DataType.PRIOR_DIST_STATS,
			config.all_ifos,
			config.span,
			svd_bins=config.svd.bins,
			root="rank",
		)
	else:
		prior_cache = DataCache.generate(
			DataType.DIST_STATS,
			config.all_ifos,
			config.span,
			svd_bins=config.svd.bins,
		)

	svd_banks = svd_bank_cache.groupby("bin")
	for svd_bin, prior in prior_cache.groupby("bin").items():
		prior_inputs = [
			Option("svd-file", svd_banks[svd_bin].files),
			Option("mass-model-file", config.prior.mass_model),
			Option("psd-xml", median_psd_cache.files)
		]
		if config.prior.idq_timeseries:
			prior_inputs.append(Option("idq-file", config.prior.idq_timeseries))

		if config.prior.dtdphi:
			if isinstance(config.prior.dtdphi, Mapping):
				sub_bank = config.svd.stats.bins[svd_bin]["bank_name"]
				prior_inputs.append(Option("dtdphi-file", config.prior.dtdphi[sub_bank]))
			else:
				prior_inputs.append(Option("dtdphi-file", config.prior.dtdphi))

		layer += Node(
			arguments = [
				Option("df", "bandwidth"),
				Option("background-prior", 1),
				Option("instrument", config.ifos),
				Option("min-instruments", config.min_ifos),
				Option("coincidence-threshold", config.filter.coincidence_threshold),
			],
			inputs = prior_inputs,
			outputs = Option("write-likelihood", prior.files),
		)

	dag.attach(layer)
	return prior_cache


def marginalize_layer(config, dag, prior_cache, dist_stat_cache):
	layer = Layer(
		"gstlal_inspiral_marginalize_likelihood",
		name="marginalize_dist_stats_across_time_rank",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	marg_dist_stat_cache = DataCache.generate(
		DataType.MARG_DIST_STATS,
		config.all_ifos,
		config.span,
		svd_bins=config.svd.bins,
		root="rank",
	)

	prior = prior_cache.groupby("bin")
	dist_stats = dist_stat_cache.groupby("bin")
	for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items():
		layer += Node(
			arguments = Option("marginalize", "ranking-stat"),
			inputs = [
				Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
				Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files),
			],
			outputs = Option("output", marg_dist_stats.files)
		)

	dag.attach(layer)
	return marg_dist_stat_cache


def calc_pdf_layer(config, dag, dist_stat_cache):
	# FIXME: expose this in configuration
	num_cores = 4

	layer = Layer(
		"gstlal_inspiral_calc_rank_pdfs",
		requirements={"request_cpus": num_cores, "request_memory": 3000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	pdf_cache = DataCache.generate(
		DataType.DIST_STAT_PDFS,
		config.all_ifos,
		config.span,
		svd_bins=config.svd.bins,
		root="rank",
	)

	dist_stats = dist_stat_cache.groupby("bin")
	for svd_bin, pdfs in pdf_cache.groupby("bin").items():
		layer += Node(
			arguments = [
				Option("ranking-stat-samples", config.rank.ranking_stat_samples),
				Option("num-cores", num_cores),
			],
			inputs = [
				Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
				Argument("dist-stats", dist_stats[svd_bin].files),
			],
			outputs = Option("output", pdfs.files)
		)

	dag.attach(layer)
	return pdf_cache


def marginalize_pdf_layer(config, dag, pdf_cache):
	layer = Layer(
		"gstlal_inspiral_marginalize_likelihood",
		name="gstlal_inspiral_marginalize_pdfs",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	marg_pdf_cache = DataCache.generate(
		DataType.DIST_STAT_PDFS,
		config.all_ifos,
		config.span,
		root="rank",
	)

	layer += Node(
		arguments = Option("marginalize", "ranking-stat-pdf"),
		inputs = Argument("dist-stat-pdfs", pdf_cache.files),
		outputs = Option("output", marg_pdf_cache.files)
	)

	dag.attach(layer)
	return marg_pdf_cache


def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache):
	layer = Layer(
		"gstlal_inspiral_calc_likelihood",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# assign likelihood to triggers
	calc_trigger_cache = trigger_cache.copy(root="rank")
	calc_triggers = calc_trigger_cache.groupby("bin", "subtype", "dirname")
	dist_stats = dist_stat_cache.groupby("bin")
	for (svd_bin, inj_type, dirname), triggers in trigger_cache.groupby("bin", "subtype", "dirname").items():
		calc_dirname = dirname.replace("filter", "rank")

		arguments = [
			Option("force"),
			Option("copy"),
			Option("tmp-space", dagutil.condor_scratch_space()),
		]

		# if file transfer not enabled, need to specify directory to
		# copy triggers into as remaps aren't relevant here
		if not config.condor.transfer_files:
			arguments.append(Option("copy-dir", calc_dirname))

		layer += Node(
			arguments = arguments,
			inputs = [
				Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
				Argument("triggers", triggers.files),
				Option("likelihood-url", dist_stats[svd_bin].files),
			],
			outputs = Argument(
				"calc-triggers",
				calc_triggers[(svd_bin, inj_type, calc_dirname)].files,
				suppress_with_remap=True
			),
		)

	dag.attach(layer)
	return calc_trigger_cache


def cluster_layer(config, dag, trigger_cache):
	# cluster triggers by likelihood
	combine_layer = Layer(
		"ligolw_add",
		name="combine_triggers_across_bins",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	cluster_round1_layer = Layer(
		"lalapps_run_sqlite",
		name="cluster_triggers_by_likelihood_round_one",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	cluster_round2_layer = Layer(
		"lalapps_run_sqlite",
		name="cluster_triggers_by_likelihood_round_two",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	sqlite_layer = Layer(
		"ligolw_sqlite",
		name="convert_triggers_to_sqlite",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# set up data caches
	inj_types = list(trigger_cache.groupby("subtype").keys())
	combined_trigger_cache = DataCache.generate(
		DataType.TRIGGERS,
		config.ifo_combos,
		config.time_bins,
		subtype=inj_types,
		root="rank",
	)
	trigger_db_cache = DataCache.generate(
		DataType.TRIGGERS,
		config.all_ifos,
		config.time_boundaries,
		subtype=inj_types,
		extension="sqlite",
		root="rank"
	)

	# FIXME: find better way of discovering SQL file
	share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
	cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql")
	inj_cluster_sql_file = os.path.join(share_path, "inj_simplify_and_cluster.sql")

	# combine/cluster triggers across SVD bins
	# if triggers are from an injection job, also add in the injections
	combined_triggers_across_bins = combined_trigger_cache.groupby("time", "subtype")
	for (span, inj_type), triggers in trigger_cache.groupby("time", "subtype").items():
		combined_triggers = combined_triggers_across_bins[(span, inj_type)]

		combine_layer += Node(
			inputs = Argument("inputs", triggers.files),
			outputs = Option("output", combined_triggers.files),
		)

		# cluster by likelihood
		cluster_round1_layer += Node(
			arguments = [
				Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
				Option("tmp-space", dagutil.condor_scratch_space()),
			],
			inputs = Argument("triggers", combined_triggers.files),
			outputs = Argument("calc-triggers", combined_triggers.files, suppress=True),
		)

	dag.attach(combine_layer)
	dag.attach(cluster_round1_layer)

	# combine/cluster triggers across time
	combined_triggers_by_time = combined_trigger_cache.groupby("subtype")
	for inj_type, trigger_dbs_by_time in trigger_db_cache.groupby("subtype").items():
		for span, trigger_dbs in trigger_dbs_by_time.groupby_bins("time", config.time_boundaries).items():
			combined_triggers = combined_triggers_by_time[inj_type].groupby_bins("time", config.time_boundaries)

			# add input files for sqlite jobs
			xml_files = []
			if inj_type:
				injection_file = config.filter.injections[inj_type.lower()]["file"]
				xml_files.append(injection_file)
			xml_files.extend(combined_triggers[span].files)
			xml_files.append(config.source.frame_segments_file)

			# convert triggers to sqlite
			sqlite_layer += Node(
				arguments = [
					Option("replace"),
					Option("tmp-space", dagutil.condor_scratch_space()),
				],
				inputs = Argument("xml-files", xml_files),
				outputs = Option("database", trigger_dbs.files),
			)

			# cluster by likelihood
			cluster_round2_layer += Node(
				arguments = [
					Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
					Option("tmp-space", dagutil.condor_scratch_space()),
				],
				inputs = Argument("triggers", trigger_dbs.files),
				outputs = Argument("calc-triggers", trigger_dbs.files, suppress=True),
			)

	dag.attach(sqlite_layer)
	dag.attach(cluster_round2_layer)

	return trigger_db_cache


def find_injections_layer(config, dag, trigger_db_cache):
	inj_sqlite_to_xml_layer = Layer(
		"ligolw_sqlite",
		name="inj_sqlite_to_xml",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	injfind_layer = Layer(
		"lalapps_inspinjfind",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	inj_xml_to_sqlite_layer = Layer(
		"ligolw_sqlite",
		name="inj_xml_to_sqlite",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# set up data caches
	grouped_trigger_dbs = trigger_db_cache.groupby("subtype")
	grouped_trigger_dbs.pop("")
	inj_trigger_cache = DataCache.generate(
		DataType.TRIGGERS,
		config.all_ifos,
		config.time_boundaries,
		subtype=grouped_trigger_dbs.keys(),
		root="rank",
	)

	# generate layers
	for inj_type, triggers_by_time in inj_trigger_cache.groupby("subtype").items():
		for span, triggers in triggers_by_time.groupby_bins("time", config.time_boundaries).items():
			trigger_dbs = grouped_trigger_dbs[inj_type].groupby_bins("time", config.time_boundaries)

			# convert triggers to XML
			inj_sqlite_to_xml_layer += Node(
				arguments = [
					Option("replace"),
					Option("tmp-space", dagutil.condor_scratch_space()),
				],
				inputs = Option("database", trigger_dbs[span].files),
				outputs = Option("extract", triggers.files),
			)

			# find injections
			injfind_layer += Node(
				arguments = Option("time-window", 0.9),
				inputs = Argument("triggers", triggers.files),
				outputs = Argument("inj-triggers", triggers.files, suppress=True),
			)

			# convert triggers back to sqlite
			inj_xml_to_sqlite_layer += Node(
				arguments = [
					Option("replace"),
					Option("tmp-space", dagutil.condor_scratch_space()),
				],
				inputs = Argument("triggers", triggers.files),
				outputs = Option("database", trigger_dbs[span].files),
			)

	dag.attach(inj_sqlite_to_xml_layer)
	dag.attach(injfind_layer)
	dag.attach(inj_xml_to_sqlite_layer)

	return trigger_db_cache, inj_trigger_cache


def compute_far_layer(config, dag, trigger_cache, pdf_cache):
	layer = Layer(
		"gstlal_compute_far_from_snr_chisq_histograms",
		name="compute_far",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	post_pdf_cache = DataCache.generate(
		DataType.POST_DIST_STAT_PDFS,
		config.all_ifos,
		config.span,
		root="rank",
	)

	# split trigger cache into injections and non-injections
	grouped_triggers = trigger_cache.groupby("subtype")
	noninj_trigger_cache = grouped_triggers.pop("")
	inj_trigger_cache = DataCache(
		trigger_cache.name,
		list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
	)

	inj_triggers = inj_trigger_cache.groupby("time")
	for span, noninj_triggers in noninj_trigger_cache.groupby("time").items():
		databases = noninj_triggers.files
		inputs = [
			Option("non-injection-db", noninj_triggers.files),
			Option("background-bins-file", pdf_cache.files),
			Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
		]
		if inj_triggers:
			inputs.append(Option("injection-db", inj_triggers[span].files))
			databases.extend(inj_triggers[span].files)

		layer += Node(
			arguments = Option("tmp-space", dagutil.condor_scratch_space()),
			inputs = inputs,
			outputs = [
				Option("output-background-bins-file", post_pdf_cache.files),
				Argument("databases", databases, suppress=True),
			]
		)

	dag.attach(layer)
	return trigger_cache, post_pdf_cache


def match_injections_layer(config, dag, injection_cache):
	split_layer = Layer(
		"gstlal_injsplitter",
		name="split_injections",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	match_layer = Layer(
		"gstlal_inspiral_injection_template_match",
		name="match_injections",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	# roughly one file per 100000 seconds
	num_splits = int(abs(config.span)) // 100000

	inj_types = list(config.filter.injections.keys())
	split_inj_cache = DataCache(DataType.SPLIT_INJECTIONS)
	match_inj_cache = DataCache.generate(
		DataType.MATCHED_INJECTIONS,
		config.all_ifos,
		config.span,
		svd_bins=[f"{i:04d}" for i in range(num_splits)],
		subtype=[inj_type.upper() for inj_type in inj_types],
		root="rank",
	)

	# split injections up
	for inj_type, injections in injection_cache.groupby("subtype").items():
		inj_tag = inj_type.upper()
		for inj_entry in injections.cache:
			split_injections = DataCache.generate(
				DataType.SPLIT_INJECTIONS,
				inj_entry.observatory,
				inj_entry.segment,
				svd_bins=[f"{i:04d}" for i in range(num_splits)],
				subtype=inj_tag,
				root="rank",
			)
			split_inj_cache += split_injections
			out_path = DataType.SPLIT_INJECTIONS.directory(root="rank", start=inj_entry.segment[0])

			split_layer += Node(
				arguments = [
					Option("nsplit", num_splits),
					Option("usertag", inj_tag),
				],
				inputs = Argument("injection-file", inj_entry.path),
				outputs = [
					Option("output-path", out_path, remap=False),
					Argument("split-injections", split_injections.files, suppress=True),
				],
			)
	dag.attach(split_layer)

	# match injections to templates
	matched_injections = match_inj_cache.groupby("subtype", "bin")
	for (inj_type, split_bin), injections in split_inj_cache.groupby("subtype", "bin").items():
		match_layer += Node(
			inputs = [
				Option("injection-file", injections.files),
				Option("template-bank", config.data.template_bank),
			],
			outputs = Option("output", matched_injections[(inj_type, split_bin)].files),
		)
	dag.attach(match_layer)

	return match_inj_cache


def measure_lnlr_cdf_layer(config, dag, dist_stats_cache, injection_cache):
	layer = Layer(
		"gstlal_inspiral_lnlrcdf_signal",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	lnlr_cdf_cache = DataCache(DataType.LNLR_SIGNAL_CDF)
	for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items():
		for chunk_bin, dist_stats in enumerate(dist_stats_cache.chunked(20)):
			lnlr_cdfs = DataCache.generate(
				DataType.LNLR_SIGNAL_CDF,
				config.all_ifos,
				config.span,
				svd_bins=f"{split_bin}_{chunk_bin:04d}",
				subtype=inj_type,
				root="rank",
			)

			layer += Node(
				inputs = [
					Option("injection-template-match-file", injections.files),
					Option("likelihood-url", dist_stats.files),
					Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
				],
				outputs = Option("output-file", lnlr_cdfs.files),
			)

			lnlr_cdf_cache += lnlr_cdfs

	dag.attach(layer)

	return lnlr_cdf_cache


def plot_analytic_vt_layer(config, dag, trigger_cache, pdf_cache, lnlr_cdf_cache):
	trg_layer = Layer(
		"gstlal_inspiral_make_mc_vtplot",
		name="plot_mc_vt_triggers",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)
	inj_layer = Layer(
		"gstlal_inspiral_make_mc_vtplot",
		name="plot_mc_vt_injections",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	triggers = trigger_cache.groupby("subtype")
	triggers.pop("")

	# make plots
	for inj_type, lnlr_cdfs in lnlr_cdf_cache.groupby("subtype").items():
		injection_file = config.filter.injections[inj_type.lower()]["file"]

		trg_layer += Node(
			arguments = [
				Option("check-vt"),
				Option("instrument", config.all_ifos),
			],
			inputs = [
				Argument("lnlr-cdfs", lnlr_cdfs.files),
				Option("ranking-stat-pdf", pdf_cache.files),
				Option("injection-database", triggers[inj_type].files),
			],
			outputs = Option("output-dir", "plots"),
		)
		inj_layer += Node(
			arguments = Option("instrument", config.all_ifos),
			inputs = [
				Argument("lnlr-cdfs", lnlr_cdfs.files),
				Option("ranking-stat-pdf", pdf_cache.files),
				Option("injection-files", injection_file),
			],
			outputs = Option("output-dir", "plots"),
		)

	dag.attach(trg_layer)
	dag.attach(inj_layer)


def plot_horizon_distance_layer(config, dag, marg_dist_stat_caches):
	layer = Layer(
		"gstlal_inspiral_plot_rankingstats_horizon",
		name="plot_horizon_distance",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
	)

	layer += Node(
		inputs = Argument("rankingstats", marg_dist_stat_caches.files),
		outputs = Option("outdir", "plots"),
	)

	dag.attach(layer)


def plot_summary_layer(config, dag, trigger_cache, post_pdf_cache):
	requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}

	# common plot options
	common_plot_args = [
		Option("segments-name", config.source.frame_segments_name),
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("shrink-data-segments", 32.0),
		Option("extend-veto-segments", 8.),
	]

	# split trigger cache into injections and non-injections
	grouped_triggers = trigger_cache.groupby("subtype")
	noninj_trigger_cache = grouped_triggers.pop("")
	inj_trigger_cache = DataCache(
		trigger_cache.name,
		list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
	)

	# plot summary job
	layer = Layer(
		"gstlal_inspiral_plotsummary",
		name="summary_plots",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)
	layer += Node(
		arguments = [
			Option("user-tag", "ALL_COMBINED"),
			Option("remove-precession"),
			*common_plot_args,
		],
		inputs = [
			Argument("input-files", trigger_cache.files),
			Option("likelihood-file", post_pdf_cache.files),
		],
		outputs = Option("output-dir", "plots"),
	)
	dag.attach(layer)

	# precession plot summary job
	layer = Layer(
		"gstlal_inspiral_plotsummary",
		name="summary_plots_precession",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)
	layer += Node(
		arguments = [
			Option("user-tag", "PRECESSION_COMBINED"),
			Option("isolate-precession"),
			Option("plot-group", 1),
			*common_plot_args,
		],
		inputs = [
			Argument("input-files", trigger_cache.files),
			Option("likelihood-file", post_pdf_cache.files),
		],
		outputs = Option("output-dir", "plots"),
	)
	dag.attach(layer)

	# single injection plot summary jobs
	layer = Layer(
		"gstlal_inspiral_plotsummary",
		name="sngl_injection_summary_plots",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)
	for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
		layer += Node(
			arguments = [
				Option("user-tag", f"{inj_type}_INJECTION"),
				Option("remove-precession"),
				Option("plot-group", 1),
				*common_plot_args,
			],
			inputs = [
				Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
				Option("likelihood-file", post_pdf_cache.files),
			],
			outputs = Option("output-dir", "plots"),
		)
	dag.attach(layer)

	# single injection precession plot summary jobs
	layer = Layer(
		"gstlal_inspiral_plotsummary",
		name="sngl_injection_precession_summary_plots",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)

	for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
		layer += Node(
			arguments = [
				Option("user-tag", f"{inj_type}_INJECTION_PRECESSION"),
				Option("isolate-precession"),
				Option("plot-group", 1),
				*common_plot_args,
			],
			inputs = [
				Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
				Option("likelihood-file", post_pdf_cache.files),
			],
			outputs = Option("output-dir", "plots"),
		)
	dag.attach(layer)


def plot_sensitivity_layer(config, dag, trigger_cache):
	requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
	layer = Layer(
		"gstlal_inspiral_plot_sensitivity",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)

	# common options
	common_args = [
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("veto-segments-name", "vetoes"),
		Option("bin-by-source-type"),
		Option("dist-bins", 200),
		Option("data-segments-name", "datasegments"),
	]

	# split trigger cache into injections and non-injections
	grouped_triggers = trigger_cache.groupby("subtype")
	noninj_trigger_cache = grouped_triggers.pop("")
	inj_trigger_cache = DataCache(
		trigger_cache.name,
		list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
	)

	layer += Node(
		arguments = [
			Option("user-tag", "ALL_COMBINED"),
			*common_args,
		],
		inputs = [
			Option("zero-lag-database", noninj_trigger_cache.files),
			Argument("injection-database", inj_trigger_cache.files),
		],
		outputs = Option("output-dir", "plots"),
	)
	for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
		layer += Node(
			arguments = [
				Option("user-tag", f"{inj_type}_INJECTIONS"),
				*common_args,
			],
			inputs = [
				Option("zero-lag-database", noninj_trigger_cache.files),
				Argument("injection-database", inj_triggers.files),
			],
			outputs = Option("output-dir", "plots"),
		)

	dag.attach(layer)


def plot_background_layer(config, dag, trigger_cache, post_pdf_cache):
	requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
	layer = Layer(
		"gstlal_inspiral_plot_background",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
	)

	non_inj_triggers = trigger_cache.groupby("subtype")[""]
	layer += Node(
		arguments = [
			Option("user-tag", "ALL_COMBINED"),
		],
		inputs = [
			Option("database", non_inj_triggers.files),
			Argument("post-marg-file", post_pdf_cache.files),
		],
		outputs = Option("output-dir", "plots"),
	)

	dag.attach(layer)


def filter_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
	layer = Layer(
		"gstlal_inspiral",
		requirements={"request_cpus": 1, "request_memory": 5000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	# set up datasource options
	if config.source.data_source == "framexmit":
		datasource_opts = [
			Option("data-source", "framexmit"),
			Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
			Option("framexmit-iface", config.source.framexmit_iface),
		]
	elif config.source.data_source == "lvshm":
		datasource_opts = [
			Option("data-source", "lvshm"),
			Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
			Option("shared-memory-block-size", config.source.shared_memory_block_size),
			Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
		]
	else:
		raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")

	# set up common options
	common_opts = [
		Option("track-psd"),
		Option("control-peak-time", 0),
		Option("psd-fft-length", config.psd.fft_length),
		Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)),
		Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
		Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
		Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
		Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
		Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
		Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("coincidence-threshold", config.filter.coincidence_threshold),
		Option("fir-stride", config.filter.fir_stride),
		Option("min-instruments", config.min_ifos),
		Option("analysis-tag", config.tag),
		Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
		Option("gracedb-group", config.upload.gracedb_group),
		Option("gracedb-pipeline", config.upload.gracedb_pipeline),
		Option("gracedb-search", config.upload.gracedb_search),
		Option("gracedb-label", config.upload.gracedb_label),
		Option("gracedb-service-url", config.upload.gracedb_service_url),
		Option("far-trials-factor", config.upload.far_trials_factor),
		Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
	]

	if config.services.kafka_server:
		common_opts.append(Option("output-kafka-server", config.services.kafka_server)),

	if config.upload.before_merger:
		common_opts.append(Option("upload-time-before-merger"))

	if config.upload.delay_uploads:
		common_opts.append(Option("delay-uploads"))

	if config.filter.cap_singles:
		common_opts.append(Option("cap-singles"))

	# set up activation counts if provided
	if config.filter.activation_counts_file:
		common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))

	# compress ranking stat if requested
	if config.filter.compress_ranking_stat:
		common_opts.extend([
			Option("compress-ranking-stat"),
			Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
		])

	# disable service discovery if using singularity
	if config.condor.singularity_image:
		common_opts.append(Option("disable-service-discovery"))

	dist_stats = dist_stat_cache.groupby("bin")
	zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
	for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
		filter_opts = [
			Option("job-tag", svd_bin),
			Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
		]
		filter_opts.extend(common_opts)
		filter_opts.extend(datasource_opts)

		layer += Node(
			arguments = filter_opts,
			inputs = [
				Option("svd-bank", svd_banks.files),
				Option("reference-psd", config.data.reference_psd),
				Option("time-slide-file", config.filter.time_slide_file),
				Option("ranking-stat-input", dist_stats[svd_bin].files),
				Option("ranking-stat-pdf", marg_pdf_cache.files),
			],
			outputs = [
				Option("output", "/dev/null"),
				Option("ranking-stat-output", dist_stats[svd_bin].files),
				Option("zerolag-rankingstat-pdf", zerolag_pdfs[svd_bin].files),
			],
		)

	dag.attach(layer)


def filter_injections_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
	layer = Layer(
		"gstlal_inspiral",
		name = "gstlal_inspiral_inj",
		requirements={"request_cpus": 1, "request_memory": 5000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	if config.source.data_source == "framexmit":
		datasource_opts = [
			Option("data-source", "framexmit"),
			Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
			Option("framexmit-iface", config.source.framexmit_iface),
		]
	elif config.source.data_source == "lvshm":
		datasource_opts = [
			Option("data-source", "lvshm"),
			Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
			Option("shared-memory-block-size", config.source.shared_memory_block_size),
			Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
		]
	else:
		raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")

	common_opts = [
		Option("track-psd"),
		Option("control-peak-time", 0),
		Option("psd-fft-length", config.psd.fft_length),
		Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)),
		Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
		Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
		Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
		Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
		Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
		Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
		Option("tmp-space", dagutil.condor_scratch_space()),
		Option("coincidence-threshold", config.filter.coincidence_threshold),
		Option("fir-stride", config.filter.fir_stride),
		Option("min-instruments", config.min_ifos),
		Option("analysis-tag", config.tag),
		Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
		Option("gracedb-group", config.upload.gracedb_group),
		Option("gracedb-pipeline", config.upload.gracedb_pipeline),
		Option("gracedb-search", config.upload.gracedb_search),
		Option("gracedb-label", config.upload.gracedb_label),
		Option("gracedb-service-url", config.upload.gracedb_service_url),
		Option("far-trials-factor", config.upload.far_trials_factor),
		Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
	]

	if config.services.kafka_server:
		common_opts.append(Option("output-kafka-server", config.services.kafka_server))

	if config.upload.before_merger:
		common_opts.append(Option("upload-time-before-merger"))

	if config.upload.delay_uploads:
		common_opts.append(Option("delay-uploads"))

	if config.filter.cap_singles:
		common_opts.append(Option("cap-singles"))

	if config.filter.activation_counts_file:
		common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))

	if config.filter.compress_ranking_stat:
		common_opts.extend([
			Option("compress-ranking-stat"),
			Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
		])

	if config.condor.singularity_image:
		common_opts.append(Option("disable-service-discovery"))

	dist_stats = dist_stat_cache.groupby("bin")
	zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
	for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
		filter_opts = [
			Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
		]
		filter_opts.extend(common_opts)
		filter_opts.extend(datasource_opts)

		for inj_idx, inj_name in enumerate(config.filter.injections.keys(), start=1):
			injection_file = config.filter.injections[inj_name.lower()]["file"]
			inj_job_start = 2000 * inj_idx
			inj_job_tag = f"{inj_job_start + int(svd_bin):04d}"
			injection_opts = [
				Option("job-tag", inj_job_tag),
				*filter_opts,
			]

			layer += Node(
				arguments = injection_opts,
				inputs = [
					Option("svd-bank", svd_banks.files),
					Option("reference-psd", config.data.reference_psd),
					Option("time-slide-file", config.filter.time_slide_file),
					Option("ranking-stat-input", dist_stats[svd_bin].files),
					Option("ranking-stat-pdf", marg_pdf_cache.files),
					Option("injections", injection_file),
				],
				outputs = Option("output", "/dev/null"),
			)
	dag.attach(layer)


def marginalize_online_layer(config, dag, marg_pdf_cache):
	layer = Layer(
		"gstlal_inspiral_marginalize_likelihoods_online",
		requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	inputs = list(marg_pdf_cache.files)
	inputs.extend(f"{svd_bin}_registry.txt" for svd_bin in config.svd.bins)
	layer += Node(inputs = Argument("inputs", inputs))

	dag.attach(layer)


def upload_events_layer(config, dag):
	layer = Layer(
		"gstlal_ll_inspiral_event_uploader",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	layer += Node(
		arguments = [
			Option("kafka-server", config.services.kafka_server),
			Option("gracedb-group", config.upload.gracedb_group),
			Option("gracedb-pipeline", config.upload.gracedb_pipeline),
			Option("gracedb-search", config.upload.gracedb_search),
			Option("gracedb-service-url", config.upload.gracedb_service_url),
			Option("far-threshold", config.upload.aggregator_far_threshold),
			Option("far-trials-factor", config.upload.aggregator_far_trials_factor),
			Option("upload-cadence-type", config.upload.aggregator_cadence_type),
			Option("upload-cadence-factor", config.upload.aggregator_cadence_factor),
			Option("num-jobs", len(config.svd.bins)),
			Option("tag", config.tag),
			Option("input-topic", "events"),
			Option("rootdir", "event_uploader"),
			Option("verbose"),
		],
	)

	dag.attach(layer)


def plot_events_layer(config, dag):
	layer = Layer(
		"gstlal_ll_inspiral_event_plotter",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	layer += Node(
		arguments = [
			Option("kafka-server", config.services.kafka_server),
			Option("gracedb-group", config.upload.gracedb_group),
			Option("gracedb-pipeline", config.upload.gracedb_pipeline),
			Option("gracedb-search", config.upload.gracedb_search),
			Option("gracedb-service-url", config.upload.gracedb_service_url),
			Option("tag", config.tag),
			Option("verbose"),
		],
	)

	dag.attach(layer)


def count_events_layer(config, dag, dist_stat_cache):
	layer = Layer(
		"gstlal_ll_inspiral_trigger_counter",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	zerolag_pdf = DataCache.generate(DataType.ZEROLAG_DIST_STAT_PDFS, config.all_ifos)

	layer += Node(
		arguments = [
			Option("uri", f"kafka://{config.tag}-counter@{config.services.kafka_server}"),
			Option("gracedb-pipeline", config.upload.gracedb_pipeline),
			Option("gracedb-search", config.upload.gracedb_search),
			Option("output-period", 300),
			Option("topic", f"gstlal.{config.tag}.coinc"),
			Option("bootstrap", dist_stat_cache.files[0]),
		],
		outputs = Option("output", zerolag_pdf.files),
	)

	dag.attach(layer)


def collect_metrics_layer(config, dag):
	requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
	event_layer = Layer(
		"scald",
		name="scald_event_collector",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)
	metric_layer = Layer(
		"scald",
		name="scald_metric_collector",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)
	metric_leader_layer = Layer(
		"scald",
		name="scald_metric_collector_leader",
		requirements=requirements,
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	# set up common options
	common_opts = [
		Argument("command", "aggregate"),
		Option("config", config.metrics.scald_config),
		Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
	]

	# define metrics used for aggregation jobs
	snr_metrics = [f"{ifo}_snr_history" for ifo in config.ifos]
	network_metrics = ["likelihood_history", "snr_history", "latency_history", "far_history"]
	state_metrics = [f"{ifo}_strain_dropped" for ifo in config.ifos]
	usage_metrics = ["ram_history"]
	latency_metrics = [f"{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice")]
	latency_metrics.append("all_itacac_latency")

	agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics))

	gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")]
	seg_metrics = [f"{ifo}_{gate}" for ifo in config.ifos for gate in gates]

	# set up partitioning
	# FIXME don't hard code the 1000
	max_agg_jobs = 1000
	num_jobs = len(config.svd.bins)
	agg_job_bounds = list(range(0, num_jobs, max_agg_jobs))

	# timeseries metrics
	agg_metrics = dagutil.groups(agg_metrics, max(max_agg_jobs // (4 * num_jobs), 1))
	seg_metrics = dagutil.groups(seg_metrics, max(max_agg_jobs // (4 * num_jobs), 1))

	for metrics in itertools.chain(agg_metrics, seg_metrics):
		for i, _ in enumerate(agg_job_bounds):
			arguments = list(common_opts)
			arguments.extend([
				Option("data-type", "timeseries"),
				Option("topic", [f"gstlal.{config.tag}.{metric}" for metric in metrics]),
				Option("schema", metrics),
			])

			# elect first metric collector as leader
			if i == 0:
				arguments.append(Option("across-jobs"))
				metric_leader_layer += Node(arguments=arguments)
			else:
				metric_layer += Node(arguments=arguments)

	# add optional test suite metrics
	if config.metrics.test_suite_config:
		with open(config.metrics.test_suite_config, 'r') as f:
			testsuite_config = yaml.load(f, Loader=SafeLoader)

		testsuite_metrics = [metric for metric in testsuite_config["schemas"]]
		testsuite_metrics = dagutil.groups(testsuite_metrics, max(max_agg_jobs // (4 * num_jobs), 1))

		for metrics in itertools.chain(testsuite_metrics):
			testsuite_arguments = [
				Argument("command", "aggregate"),
				Option("config", config.metrics.test_suite_config),
				Option("uri", f"kafka://{config.tag}-testsuite@{config.services.kafka_server}"),
			]
			testsuite_arguments.extend([
				Option("data-type", "timeseries"),
				Option("topic", [f"gstlal.{config.tag}.testsuite.{metric}" for metric in metrics]),
				Option("schema", metrics)
			])

			metric_layer += Node(arguments=testsuite_arguments)

	if metric_layer.nodes:
		dag.attach(metric_leader_layer)
		dag.attach(metric_layer)
	else:
		dag.attach(metric_leader_layer)

	# event metrics
	event_arguments = list(common_opts)
	event_arguments.extend([
		Option("data-type", "triggers"),
		Option("topic", f"gstlal.{config.tag}.coinc"),
		Option("schema", "coinc"),
	])
	event_layer += Node(arguments=event_arguments)

	dag.attach(event_layer)


def track_noise_layer(config, dag):
	layer = Layer(
		"gstlal_ll_dq",
		requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
		transfer_files=config.condor.transfer_files,
		retries=1000,
	)

	for ifo in config.ifos:
		# set up datasource options
		if config.source.data_source == "framexmit":
			arguments = [
				Option("data-source", "framexmit"),
				Option("framexmit-addr", f"{ifo}={config.source.framexmit_addr[ifo]}"),
				Option("framexmit-iface", config.source.framexmit_iface),
			]
		elif config.source.data_source == "lvshm":
			arguments = [
				Option("data-source", "lvshm"),
				Option("shared-memory-partition", f"{ifo}={config.source.shared_memory_partition[ifo]}"),
				Option("shared-memory-block-size", config.source.shared_memory_block_size),
				Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
			]
		else:
			raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")

		arguments.extend([
			Option("psd-fft-length", config.psd.fft_length),
			Option("channel-name", f"{ifo}={config.source.channel_name[ifo]}"),
			Option("state-channel-name", f"{ifo}={config.source.state_channel_name[ifo]}"),
			Option("dq-channel-name", f"{ifo}={config.source.dq_channel_name[ifo]}"),
			Option("state-vector-on-bits", f"{ifo}={config.source.state_vector_on_bits[ifo]}"),
			Option("state-vector-off-bits", f"{ifo}={config.source.state_vector_off_bits[ifo]}"),
			Option("dq-vector-on-bits", f"{ifo}={config.source.dq_vector_on_bits[ifo]}"),
			Option("dq-vector-off-bits", f"{ifo}={config.source.dq_vector_off_bits[ifo]}"),
			Option("scald-config", config.metrics.scald_config),
			Option("out-path", "aggregator"),
		])
		layer += Node(arguments = arguments)

	dag.attach(layer)


def calc_gate_threshold(config, svd_bin, aggregate="max"):
	"""
	Given a configuration, svd bin and aggregate, this calculates
	the h(t) gate threshold used for a given svd bin.
	"""
	if isinstance(config.filter.ht_gate_threshold, str):
		bank_mchirp = config.svd.stats["bins"][svd_bin][f"{aggregate}_mchirp"]
		min_mchirp, min_threshold, max_mchirp, max_threshold = [
			float(y) for x in config.filter.ht_gate_threshold.split("-") for y in x.split(":")
		]
		gate_mchirp_ratio = (max_threshold - min_threshold) / (max_mchirp - min_mchirp)
		return round(gate_mchirp_ratio * (bank_mchirp - min_mchirp) + min_threshold, 3)
	else: # uniform threshold
		return config.filter.ht_gate_threshold


def autocorrelation_length_map(ac_length_range):
	"""
	Given autocorrelation length ranges (e.g. 0:15:701)
	or a single autocorrelation value, returns a function that
	maps a given chirp mass to an autocorrelation length.
	"""
	if isinstance(ac_length_range, str):
		ac_length_range = [ac_length_range]

	# handle case with AC length ranges
	if isinstance(ac_length_range, Iterable):
		ac_lengths = []
		min_mchirps = []
		max_mchirps = []
		for this_range in ac_length_range:
			min_mchirp, max_mchirp, ac_length = this_range.split(":")
			min_mchirps.append(float(min_mchirp))
			max_mchirps.append(float(max_mchirp))
			ac_lengths.append(int(ac_length))

		# sanity check inputs
		for bound1, bound2 in zip(min_mchirps[1:], max_mchirps[:-1]):
			assert bound1 == bound2, "gaps not allowed in autocorrelation length ranges"

		# convert to binning
		bins = rate.IrregularBins([min_mchirps[0]] + max_mchirps)

	# handle single value case
	else:
		ac_lengths = [ac_length_range]
		bins = rate.IrregularBins([0., numpy.inf])

	# create mapping
	def mchirp_to_ac_length(mchirp):
		idx = bins[mchirp]
		return ac_lengths[idx]

	return mchirp_to_ac_length


def mchirp_range_to_bins(min_mchirp, max_mchirp, svd_metadata):
	"""
	Given a range of chirp masses and the SVD metadata, determine
	and return the SVD bins that overlap.
	"""
	svd_bins = []
	mchirp_range = segment(min_mchirp, max_mchirp)
	for svd_bin, bin_metadata in svd_metadata["bins"].items():
		bin_range = segment(bin_metadata["min_mchirp"], bin_metadata["max_mchirp"])

		if mchirp_range.intersects(bin_range):
			svd_bins.append(svd_bin)

	return svd_bins


def ifo_to_string(ifos):
	"""Given a list of ifos, converts this to a string.
	"""
	return "".join(sorted(list(ifos)))


@plugins.register
def layers():
	return {
		"split_bank": split_bank_layer,
		"svd_bank": svd_bank_layer,
		"checkerboard": checkerboard_layer,
		"filter": filter_layer,
		"filter_injections": filter_injections_layer,
		"aggregate": aggregate_layer,
		"create_prior": create_prior_layer,
		"calc_pdf": calc_pdf_layer,
		"marginalize": marginalize_layer,
		"marginalize_pdf": marginalize_pdf_layer,
		"calc_likelihood": calc_likelihood_layer,
		"cluster": cluster_layer,
		"compute_far": compute_far_layer,
		"find_injections": find_injections_layer,
		"match_injections": match_injections_layer,
		"measure_lnlr_cdf": measure_lnlr_cdf_layer,
		"plot_analytic_vt": plot_analytic_vt_layer,
		"plot_horizon_distance": plot_horizon_distance_layer,
		"plot_summary": plot_summary_layer,
		"plot_background": plot_background_layer,
		"plot_sensitivity": plot_sensitivity_layer,
		"filter_online": filter_online_layer,
		"filter_injections_online": filter_injections_online_layer,
		"marginalize_online": marginalize_online_layer,
		"upload_events": upload_events_layer,
		"plot_events": plot_events_layer,
		"count_events": count_events_layer,
		"collect_metrics": collect_metrics_layer,
		"track_noise": track_noise_layer,
	}