-
dags/layers/inspiral.py: fix issue on how many svd bins to group up in inspiral jobs so that max concurrency is used properly
dags/layers/inspiral.py: fix issue on how many svd bins to group up in inspiral jobs so that max concurrency is used properly
inspiral.py 57.42 KiB
# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from collections.abc import Mapping
import itertools
import os
from typing import Iterable
import numpy
import yaml
from yaml.loader import SafeLoader
from lal import rate
from lal.utils import CacheEntry
from ligo.segments import segment
from gstlal import plugins
from gstlal import datafind
from gstlal.datafind import DataType, DataCache
from gstlal.dags import Argument, Option
from gstlal.dags import util as dagutil
from gstlal.dags.layers import Layer, Node
from gstlal.stats.inspiral_extrinsics import TimePhaseSNR
def split_bank_layer(config, dag, psd_cache, bank_cache):
layer = Layer(
"gstlal_inspiral_bank_splitter",
requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
split_bank_cache = DataCache.generate(DataType.SVD_BANK, config.all_ifos, config.span, svd_bins=config.svd.bins)
layer += Node(
arguments = [
Option("f-low", config.svd.f_low),
Option("f-final", config.svd.max_f_final),
Option("group-by-chi", config.svd.num_chi_bins),
Option("approximant", config.svd.approximant),
Option("overlap", config.svd.overlap),
Option("instrument", ifo_to_string(config.all_ifos)),
Option("n", config.svd.num_split_templates),
Option("sort-by", "template_duration"),
Option("num-banks", config.svd.num_banks),
],
inputs = [
Argument("template-bank", bank_cache.files, track=False),
Argument("psd-xml", psd_cache.files, track=False),
],
outputs = [
Option("output-path", str(split_bank_cache.name).lower()),
Option("output-stats-file", DataType.SVD_MANIFEST.filename(config.ifos)),
],
)
dag.attach(layer)
return split_bank_cache
def svd_bank_layer(config, dag, median_psd_cache, split_bank_cache=None):
layer = Layer(
"gstlal_inspiral_svd_bank",
requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# set up autocorrelation mapping
mchirp_to_ac_length = autocorrelation_length_map(config.svd.autocorrelation_length)
svd_cache = DataCache.generate(
DataType.SVD_BANK,
config.ifos,
config.span,
svd_bins=config.svd.bins,
root="filter",
)
split_banks = split_bank_cache.groupby("bin")
for (ifo, svd_bin), svd_banks in svd_cache.groupby("ifo", "bin").items():
# grab sub-bank specific configuration if available
if "bank_name" in config.svd.stats.bins[svd_bin]:
bank_name = config.svd.stats.bins[svd_bin]["bank_name"]
svd_config = config.svd.sub_banks[bank_name]
else:
svd_config = config.svd
bin_mchirp = config.svd.stats.bins[svd_bin]["mean_mchirp"]
arguments = [
Option("instrument-override", ifo),
Option("flow", svd_config.f_low),
Option("samples-min", svd_config.samples_min),
Option("samples-max-64", svd_config.samples_max_64),
Option("samples-max-256", svd_config.samples_max_256),
Option("samples-max", svd_config.samples_max),
Option("svd-tolerance", svd_config.tolerance),
Option("autocorrelation-length", mchirp_to_ac_length(bin_mchirp)),
]
if "sample_rate" in svd_config:
arguments.append(Option("sample-rate", svd_config.sample_rate))
layer += Node(
arguments = arguments,
inputs = [
Option("reference-psd", median_psd_cache.files),
Argument("split-banks", sorted(split_banks[svd_bin].files)),
],
outputs = Option("write-svd", svd_banks.files)
)
dag.attach(layer)
return svd_cache
def checkerboard_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_svd_bank_checkerboard",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# set up arguments
arguments = [Option("in-place"), Option("reference-psd", ref_psd_cache.files)]
if config.svd.checkerboard == "even":
arguments.append(Option("even"))
chunk_size = 20
for svd_banks in svd_bank_cache.chunked(chunk_size):
layer += Node(
arguments = arguments,
inputs = Option("svd-files", svd_banks.files),
outputs = Argument("checkered-svd-files", svd_banks.files, suppress=True),
)
dag.attach(layer)
return svd_bank_cache
def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_inspiral",
requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
dynamic_memory=True,
)
dist_stat_cache = DataCache.generate(
DataType.DIST_STATS,
config.ifo_combos,
config.time_bins,
svd_bins=config.svd.bins,
root="filter",
)
trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
svd_bins=config.svd.bins,
root="filter",
)
common_opts = [
Option("track-psd"),
Option("data-source", "frames"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("frame-segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
]
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
dist_stat_files = dagutil.flatten(
[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", trigger_group.files),
Option("ranking-stat-output", dist_stat_files),
],
)
dag.attach(layer)
return trigger_cache, dist_stat_cache
def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_inspiral",
name="gstlal_inspiral_inj",
requirements={"request_cpus": 2, "request_memory": 5000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
dynamic_memory=True,
)
trigger_cache = DataCache(DataType.TRIGGERS)
for inj_name, inj_args in config.filter.injections.items():
min_mchirp, max_mchirp = map(float, inj_args["range"].split(":"))
svd_bins = mchirp_range_to_bins(min_mchirp, max_mchirp, config.svd.stats)
trigger_cache += DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
svd_bins=svd_bins,
subtype=inj_name,
root="filter",
)
common_opts = [
Option("track-psd"),
Option("data-source", "frames"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("frame-segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
]
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
injection_file = config.filter.injections[inj_type.lower()]["file"]
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs = Option("output", trigger_group.files),
)
dag.attach(layer)
return trigger_cache
def aggregate_layer(config, dag, trigger_cache, dist_stat_cache=None):
trg_layer = Layer(
"lalapps_run_sqlite",
name="cluster_triggers_by_snr",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# FIXME: find better way of discovering SQL file
share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
snr_cluster_sql_file = os.path.join(share_path, "snr_simplify_and_cluster.sql")
inj_snr_cluster_sql_file = os.path.join(share_path, "inj_snr_simplify_and_cluster.sql")
# cluster triggers by SNR
for (svd_bin, inj_type), triggers in trigger_cache.groupby("bin", "subtype").items():
trg_layer += Node(
arguments = [
Option("sql-file", inj_snr_cluster_sql_file if inj_type else snr_cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
outputs = Argument("clustered-triggers", triggers.files, suppress=True),
)
dag.attach(trg_layer)
# if no dist stats files to marginalize, only return triggers
if not dist_stat_cache:
return trigger_cache
# marginalize dist stats across time
dist_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats_across_time_filter",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
agg_dist_stat_cache = DataCache.generate(
DataType.DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="filter",
)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, agg_dist_stats in agg_dist_stat_cache.groupby("bin").items():
dist_layer += Node(
arguments = Option("marginalize", "ranking-stat"),
inputs = Argument("dist-stats", dist_stats[svd_bin].files),
outputs = Option("output", agg_dist_stats.files)
)
dag.attach(dist_layer)
return trigger_cache, agg_dist_stat_cache
def create_prior_layer(config, dag, svd_bank_cache, median_psd_cache, dist_stat_cache=None):
layer = Layer(
"gstlal_inspiral_create_prior_diststats",
requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
if dist_stat_cache:
prior_cache = DataCache.generate(
DataType.PRIOR_DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
else:
prior_cache = DataCache.generate(
DataType.DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
)
svd_banks = svd_bank_cache.groupby("bin")
for svd_bin, prior in prior_cache.groupby("bin").items():
prior_inputs = [
Option("svd-file", svd_banks[svd_bin].files),
Option("mass-model-file", config.prior.mass_model),
Option("psd-xml", median_psd_cache.files)
]
if config.prior.idq_timeseries:
prior_inputs.append(Option("idq-file", config.prior.idq_timeseries))
if config.prior.dtdphi:
if isinstance(config.prior.dtdphi, Mapping):
sub_bank = config.svd.stats.bins[svd_bin]["bank_name"]
prior_inputs.append(Option("dtdphi-file", config.prior.dtdphi[sub_bank]))
else:
prior_inputs.append(Option("dtdphi-file", config.prior.dtdphi))
layer += Node(
arguments = [
Option("df", "bandwidth"),
Option("background-prior", 1),
Option("instrument", config.ifos),
Option("min-instruments", config.min_ifos),
Option("coincidence-threshold", config.filter.coincidence_threshold),
],
inputs = prior_inputs,
outputs = Option("write-likelihood", prior.files),
)
dag.attach(layer)
return prior_cache
def marginalize_layer(config, dag, prior_cache, dist_stat_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats_across_time_rank",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
marg_dist_stat_cache = DataCache.generate(
DataType.MARG_DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
prior = prior_cache.groupby("bin")
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items():
layer += Node(
arguments = Option("marginalize", "ranking-stat"),
inputs = [
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files),
],
outputs = Option("output", marg_dist_stats.files)
)
dag.attach(layer)
return marg_dist_stat_cache
def calc_pdf_layer(config, dag, dist_stat_cache):
# FIXME: expose this in configuration
num_cores = 4
layer = Layer(
"gstlal_inspiral_calc_rank_pdfs",
requirements={"request_cpus": num_cores, "request_memory": 3000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
pdf_cache = DataCache.generate(
DataType.DIST_STAT_PDFS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, pdfs in pdf_cache.groupby("bin").items():
layer += Node(
arguments = [
Option("ranking-stat-samples", config.rank.ranking_stat_samples),
Option("num-cores", num_cores),
],
inputs = [
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
Argument("dist-stats", dist_stats[svd_bin].files),
],
outputs = Option("output", pdfs.files)
)
dag.attach(layer)
return pdf_cache
def marginalize_pdf_layer(config, dag, pdf_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="gstlal_inspiral_marginalize_pdfs",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
marg_pdf_cache = DataCache.generate(
DataType.DIST_STAT_PDFS,
config.all_ifos,
config.span,
root="rank",
)
layer += Node(
arguments = Option("marginalize", "ranking-stat-pdf"),
inputs = Argument("dist-stat-pdfs", pdf_cache.files),
outputs = Option("output", marg_pdf_cache.files)
)
dag.attach(layer)
return marg_pdf_cache
def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache):
layer = Layer(
"gstlal_inspiral_calc_likelihood",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# assign likelihood to triggers
calc_trigger_cache = trigger_cache.copy(root="rank")
calc_triggers = calc_trigger_cache.groupby("bin", "subtype", "dirname")
dist_stats = dist_stat_cache.groupby("bin")
for (svd_bin, inj_type, dirname), triggers in trigger_cache.groupby("bin", "subtype", "dirname").items():
calc_dirname = dirname.replace("filter", "rank")
arguments = [
Option("force"),
Option("copy"),
Option("tmp-space", dagutil.condor_scratch_space()),
]
# if file transfer not enabled, need to specify directory to
# copy triggers into as remaps aren't relevant here
if not config.condor.transfer_files:
arguments.append(Option("copy-dir", calc_dirname))
layer += Node(
arguments = arguments,
inputs = [
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
Argument("triggers", triggers.files),
Option("likelihood-url", dist_stats[svd_bin].files),
],
outputs = Argument(
"calc-triggers",
calc_triggers[(svd_bin, inj_type, calc_dirname)].files,
suppress_with_remap=True
),
)
dag.attach(layer)
return calc_trigger_cache
def cluster_layer(config, dag, trigger_cache):
# cluster triggers by likelihood
combine_layer = Layer(
"ligolw_add",
name="combine_triggers_across_bins",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
cluster_round1_layer = Layer(
"lalapps_run_sqlite",
name="cluster_triggers_by_likelihood_round_one",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
cluster_round2_layer = Layer(
"lalapps_run_sqlite",
name="cluster_triggers_by_likelihood_round_two",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
sqlite_layer = Layer(
"ligolw_sqlite",
name="convert_triggers_to_sqlite",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# set up data caches
inj_types = list(trigger_cache.groupby("subtype").keys())
combined_trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
subtype=inj_types,
root="rank",
)
trigger_db_cache = DataCache.generate(
DataType.TRIGGERS,
config.all_ifos,
config.time_boundaries,
subtype=inj_types,
extension="sqlite",
root="rank"
)
# FIXME: find better way of discovering SQL file
share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql")
inj_cluster_sql_file = os.path.join(share_path, "inj_simplify_and_cluster.sql")
# combine/cluster triggers across SVD bins
# if triggers are from an injection job, also add in the injections
combined_triggers_across_bins = combined_trigger_cache.groupby("time", "subtype")
for (span, inj_type), triggers in trigger_cache.groupby("time", "subtype").items():
combined_triggers = combined_triggers_across_bins[(span, inj_type)]
combine_layer += Node(
inputs = Argument("inputs", triggers.files),
outputs = Option("output", combined_triggers.files),
)
# cluster by likelihood
cluster_round1_layer += Node(
arguments = [
Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", combined_triggers.files),
outputs = Argument("calc-triggers", combined_triggers.files, suppress=True),
)
dag.attach(combine_layer)
dag.attach(cluster_round1_layer)
# combine/cluster triggers across time
combined_triggers_by_time = combined_trigger_cache.groupby("subtype")
for inj_type, trigger_dbs_by_time in trigger_db_cache.groupby("subtype").items():
for span, trigger_dbs in trigger_dbs_by_time.groupby_bins("time", config.time_boundaries).items():
combined_triggers = combined_triggers_by_time[inj_type].groupby_bins("time", config.time_boundaries)
# add input files for sqlite jobs
xml_files = []
if inj_type:
injection_file = config.filter.injections[inj_type.lower()]["file"]
xml_files.append(injection_file)
xml_files.extend(combined_triggers[span].files)
xml_files.append(config.source.frame_segments_file)
# convert triggers to sqlite
sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("xml-files", xml_files),
outputs = Option("database", trigger_dbs.files),
)
# cluster by likelihood
cluster_round2_layer += Node(
arguments = [
Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", trigger_dbs.files),
outputs = Argument("calc-triggers", trigger_dbs.files, suppress=True),
)
dag.attach(sqlite_layer)
dag.attach(cluster_round2_layer)
return trigger_db_cache
def find_injections_layer(config, dag, trigger_db_cache):
inj_sqlite_to_xml_layer = Layer(
"ligolw_sqlite",
name="inj_sqlite_to_xml",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
injfind_layer = Layer(
"lalapps_inspinjfind",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
inj_xml_to_sqlite_layer = Layer(
"ligolw_sqlite",
name="inj_xml_to_sqlite",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# set up data caches
grouped_trigger_dbs = trigger_db_cache.groupby("subtype")
grouped_trigger_dbs.pop("")
inj_trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.all_ifos,
config.time_boundaries,
subtype=grouped_trigger_dbs.keys(),
root="rank",
)
# generate layers
for inj_type, triggers_by_time in inj_trigger_cache.groupby("subtype").items():
for span, triggers in triggers_by_time.groupby_bins("time", config.time_boundaries).items():
trigger_dbs = grouped_trigger_dbs[inj_type].groupby_bins("time", config.time_boundaries)
# convert triggers to XML
inj_sqlite_to_xml_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Option("database", trigger_dbs[span].files),
outputs = Option("extract", triggers.files),
)
# find injections
injfind_layer += Node(
arguments = Option("time-window", 0.9),
inputs = Argument("triggers", triggers.files),
outputs = Argument("inj-triggers", triggers.files, suppress=True),
)
# convert triggers back to sqlite
inj_xml_to_sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
outputs = Option("database", trigger_dbs[span].files),
)
dag.attach(inj_sqlite_to_xml_layer)
dag.attach(injfind_layer)
dag.attach(inj_xml_to_sqlite_layer)
return trigger_db_cache, inj_trigger_cache
def compute_far_layer(config, dag, trigger_cache, pdf_cache):
layer = Layer(
"gstlal_compute_far_from_snr_chisq_histograms",
name="compute_far",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
post_pdf_cache = DataCache.generate(
DataType.POST_DIST_STAT_PDFS,
config.all_ifos,
config.span,
root="rank",
)
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
inj_triggers = inj_trigger_cache.groupby("time")
for span, noninj_triggers in noninj_trigger_cache.groupby("time").items():
databases = noninj_triggers.files
inputs = [
Option("non-injection-db", noninj_triggers.files),
Option("background-bins-file", pdf_cache.files),
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
]
if inj_triggers:
inputs.append(Option("injection-db", inj_triggers[span].files))
databases.extend(inj_triggers[span].files)
layer += Node(
arguments = Option("tmp-space", dagutil.condor_scratch_space()),
inputs = inputs,
outputs = [
Option("output-background-bins-file", post_pdf_cache.files),
Argument("databases", databases, suppress=True),
]
)
dag.attach(layer)
return trigger_cache, post_pdf_cache
def match_injections_layer(config, dag, injection_cache):
split_layer = Layer(
"gstlal_injsplitter",
name="split_injections",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
match_layer = Layer(
"gstlal_inspiral_injection_template_match",
name="match_injections",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
# roughly one file per 100000 seconds
num_splits = int(abs(config.span)) // 100000
inj_types = list(config.filter.injections.keys())
split_inj_cache = DataCache(DataType.SPLIT_INJECTIONS)
match_inj_cache = DataCache.generate(
DataType.MATCHED_INJECTIONS,
config.all_ifos,
config.span,
svd_bins=[f"{i:04d}" for i in range(num_splits)],
subtype=[inj_type.upper() for inj_type in inj_types],
root="rank",
)
# split injections up
for inj_type, injections in injection_cache.groupby("subtype").items():
inj_tag = inj_type.upper()
for inj_entry in injections.cache:
split_injections = DataCache.generate(
DataType.SPLIT_INJECTIONS,
inj_entry.observatory,
inj_entry.segment,
svd_bins=[f"{i:04d}" for i in range(num_splits)],
subtype=inj_tag,
root="rank",
)
split_inj_cache += split_injections
out_path = DataType.SPLIT_INJECTIONS.directory(root="rank", start=inj_entry.segment[0])
split_layer += Node(
arguments = [
Option("nsplit", num_splits),
Option("usertag", inj_tag),
],
inputs = Argument("injection-file", inj_entry.path),
outputs = [
Option("output-path", out_path, remap=False),
Argument("split-injections", split_injections.files, suppress=True),
],
)
dag.attach(split_layer)
# match injections to templates
matched_injections = match_inj_cache.groupby("subtype", "bin")
for (inj_type, split_bin), injections in split_inj_cache.groupby("subtype", "bin").items():
match_layer += Node(
inputs = [
Option("injection-file", injections.files),
Option("template-bank", config.data.template_bank),
],
outputs = Option("output", matched_injections[(inj_type, split_bin)].files),
)
dag.attach(match_layer)
return match_inj_cache
def measure_lnlr_cdf_layer(config, dag, dist_stats_cache, injection_cache):
layer = Layer(
"gstlal_inspiral_lnlrcdf_signal",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
lnlr_cdf_cache = DataCache(DataType.LNLR_SIGNAL_CDF)
for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items():
for chunk_bin, dist_stats in enumerate(dist_stats_cache.chunked(20)):
lnlr_cdfs = DataCache.generate(
DataType.LNLR_SIGNAL_CDF,
config.all_ifos,
config.span,
svd_bins=f"{split_bin}_{chunk_bin:04d}",
subtype=inj_type,
root="rank",
)
layer += Node(
inputs = [
Option("injection-template-match-file", injections.files),
Option("likelihood-url", dist_stats.files),
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
],
outputs = Option("output-file", lnlr_cdfs.files),
)
lnlr_cdf_cache += lnlr_cdfs
dag.attach(layer)
return lnlr_cdf_cache
def plot_analytic_vt_layer(config, dag, trigger_cache, pdf_cache, lnlr_cdf_cache):
trg_layer = Layer(
"gstlal_inspiral_make_mc_vtplot",
name="plot_mc_vt_triggers",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
inj_layer = Layer(
"gstlal_inspiral_make_mc_vtplot",
name="plot_mc_vt_injections",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
triggers = trigger_cache.groupby("subtype")
triggers.pop("")
# make plots
for inj_type, lnlr_cdfs in lnlr_cdf_cache.groupby("subtype").items():
injection_file = config.filter.injections[inj_type.lower()]["file"]
trg_layer += Node(
arguments = [
Option("check-vt"),
Option("instrument", config.all_ifos),
],
inputs = [
Argument("lnlr-cdfs", lnlr_cdfs.files),
Option("ranking-stat-pdf", pdf_cache.files),
Option("injection-database", triggers[inj_type].files),
],
outputs = Option("output-dir", "plots"),
)
inj_layer += Node(
arguments = Option("instrument", config.all_ifos),
inputs = [
Argument("lnlr-cdfs", lnlr_cdfs.files),
Option("ranking-stat-pdf", pdf_cache.files),
Option("injection-files", injection_file),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(trg_layer)
dag.attach(inj_layer)
def plot_horizon_distance_layer(config, dag, marg_dist_stat_caches):
layer = Layer(
"gstlal_inspiral_plot_rankingstats_horizon",
name="plot_horizon_distance",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
layer += Node(
inputs = Argument("rankingstats", marg_dist_stat_caches.files),
outputs = Option("outdir", "plots"),
)
dag.attach(layer)
def plot_summary_layer(config, dag, trigger_cache, post_pdf_cache):
requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
# common plot options
common_plot_args = [
Option("segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("shrink-data-segments", 32.0),
Option("extend-veto-segments", 8.),
]
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
# plot summary job
layer = Layer(
"gstlal_inspiral_plotsummary",
name="summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
Option("remove-precession"),
*common_plot_args,
],
inputs = [
Argument("input-files", trigger_cache.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# precession plot summary job
layer = Layer(
"gstlal_inspiral_plotsummary",
name="summary_plots_precession",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
layer += Node(
arguments = [
Option("user-tag", "PRECESSION_COMBINED"),
Option("isolate-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", trigger_cache.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# single injection plot summary jobs
layer = Layer(
"gstlal_inspiral_plotsummary",
name="sngl_injection_summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTION"),
Option("remove-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# single injection precession plot summary jobs
layer = Layer(
"gstlal_inspiral_plotsummary",
name="sngl_injection_precession_summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTION_PRECESSION"),
Option("isolate-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
def plot_sensitivity_layer(config, dag, trigger_cache):
requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
layer = Layer(
"gstlal_inspiral_plot_sensitivity",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
# common options
common_args = [
Option("tmp-space", dagutil.condor_scratch_space()),
Option("veto-segments-name", "vetoes"),
Option("bin-by-source-type"),
Option("dist-bins", 200),
Option("data-segments-name", "datasegments"),
]
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
*common_args,
],
inputs = [
Option("zero-lag-database", noninj_trigger_cache.files),
Argument("injection-database", inj_trigger_cache.files),
],
outputs = Option("output-dir", "plots"),
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTIONS"),
*common_args,
],
inputs = [
Option("zero-lag-database", noninj_trigger_cache.files),
Argument("injection-database", inj_triggers.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
def plot_background_layer(config, dag, trigger_cache, post_pdf_cache):
requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
layer = Layer(
"gstlal_inspiral_plot_background",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
non_inj_triggers = trigger_cache.groupby("subtype")[""]
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
],
inputs = [
Option("database", non_inj_triggers.files),
Argument("post-marg-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
def filter_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral",
requirements={"request_cpus": 1, "request_memory": 5000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
# set up datasource options
if config.source.data_source == "framexmit":
datasource_opts = [
Option("data-source", "framexmit"),
Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
datasource_opts = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
# set up common options
common_opts = [
Option("track-psd"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)),
Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
Option("analysis-tag", config.tag),
Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-label", config.upload.gracedb_label),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-trials-factor", config.upload.far_trials_factor),
Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
]
if config.services.kafka_server:
common_opts.append(Option("output-kafka-server", config.services.kafka_server)),
if config.upload.before_merger:
common_opts.append(Option("upload-time-before-merger"))
if config.upload.delay_uploads:
common_opts.append(Option("delay-uploads"))
if config.filter.cap_singles:
common_opts.append(Option("cap-singles"))
# set up activation counts if provided
if config.filter.activation_counts_file:
common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))
# compress ranking stat if requested
if config.filter.compress_ranking_stat:
common_opts.extend([
Option("compress-ranking-stat"),
Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
])
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
dist_stats = dist_stat_cache.groupby("bin")
zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
filter_opts = [
Option("job-tag", svd_bin),
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
]
filter_opts.extend(common_opts)
filter_opts.extend(datasource_opts)
layer += Node(
arguments = filter_opts,
inputs = [
Option("svd-bank", svd_banks.files),
Option("reference-psd", config.data.reference_psd),
Option("time-slide-file", config.filter.time_slide_file),
Option("ranking-stat-input", dist_stats[svd_bin].files),
Option("ranking-stat-pdf", marg_pdf_cache.files),
],
outputs = [
Option("output", "/dev/null"),
Option("ranking-stat-output", dist_stats[svd_bin].files),
Option("zerolag-rankingstat-pdf", zerolag_pdfs[svd_bin].files),
],
)
dag.attach(layer)
def filter_injections_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral",
name = "gstlal_inspiral_inj",
requirements={"request_cpus": 1, "request_memory": 5000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
if config.source.data_source == "framexmit":
datasource_opts = [
Option("data-source", "framexmit"),
Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
datasource_opts = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
common_opts = [
Option("track-psd"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)),
Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
Option("analysis-tag", config.tag),
Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-label", config.upload.gracedb_label),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-trials-factor", config.upload.far_trials_factor),
Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
]
if config.services.kafka_server:
common_opts.append(Option("output-kafka-server", config.services.kafka_server))
if config.upload.before_merger:
common_opts.append(Option("upload-time-before-merger"))
if config.upload.delay_uploads:
common_opts.append(Option("delay-uploads"))
if config.filter.cap_singles:
common_opts.append(Option("cap-singles"))
if config.filter.activation_counts_file:
common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))
if config.filter.compress_ranking_stat:
common_opts.extend([
Option("compress-ranking-stat"),
Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
])
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
dist_stats = dist_stat_cache.groupby("bin")
zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
]
filter_opts.extend(common_opts)
filter_opts.extend(datasource_opts)
for inj_idx, inj_name in enumerate(config.filter.injections.keys(), start=1):
injection_file = config.filter.injections[inj_name.lower()]["file"]
inj_job_start = 2000 * inj_idx
inj_job_tag = f"{inj_job_start + int(svd_bin):04d}"
injection_opts = [
Option("job-tag", inj_job_tag),
*filter_opts,
]
layer += Node(
arguments = injection_opts,
inputs = [
Option("svd-bank", svd_banks.files),
Option("reference-psd", config.data.reference_psd),
Option("time-slide-file", config.filter.time_slide_file),
Option("ranking-stat-input", dist_stats[svd_bin].files),
Option("ranking-stat-pdf", marg_pdf_cache.files),
Option("injections", injection_file),
],
outputs = Option("output", "/dev/null"),
)
dag.attach(layer)
def marginalize_online_layer(config, dag, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihoods_online",
requirements={"request_cpus": 2, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
inputs = list(marg_pdf_cache.files)
inputs.extend(f"{svd_bin}_registry.txt" for svd_bin in config.svd.bins)
layer += Node(inputs = Argument("inputs", inputs))
dag.attach(layer)
def upload_events_layer(config, dag):
layer = Layer(
"gstlal_ll_inspiral_event_uploader",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
layer += Node(
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-threshold", config.upload.aggregator_far_threshold),
Option("far-trials-factor", config.upload.aggregator_far_trials_factor),
Option("upload-cadence-type", config.upload.aggregator_cadence_type),
Option("upload-cadence-factor", config.upload.aggregator_cadence_factor),
Option("num-jobs", len(config.svd.bins)),
Option("tag", config.tag),
Option("input-topic", "events"),
Option("rootdir", "event_uploader"),
Option("verbose"),
],
)
dag.attach(layer)
def plot_events_layer(config, dag):
layer = Layer(
"gstlal_ll_inspiral_event_plotter",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
layer += Node(
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("tag", config.tag),
Option("verbose"),
],
)
dag.attach(layer)
def count_events_layer(config, dag, dist_stat_cache):
layer = Layer(
"gstlal_ll_inspiral_trigger_counter",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
zerolag_pdf = DataCache.generate(DataType.ZEROLAG_DIST_STAT_PDFS, config.all_ifos)
layer += Node(
arguments = [
Option("uri", f"kafka://{config.tag}-counter@{config.services.kafka_server}"),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("output-period", 300),
Option("topic", f"gstlal.{config.tag}.coinc"),
Option("bootstrap", dist_stat_cache.files[0]),
],
outputs = Option("output", zerolag_pdf.files),
)
dag.attach(layer)
def collect_metrics_layer(config, dag):
requirements = {"request_cpus": 1, "request_memory": 2000, **config.condor.submit}
event_layer = Layer(
"scald",
name="scald_event_collector",
requirements=requirements,
transfer_files=config.condor.transfer_files,
retries=1000,
)
metric_layer = Layer(
"scald",
name="scald_metric_collector",
requirements=requirements,
transfer_files=config.condor.transfer_files,
retries=1000,
)
metric_leader_layer = Layer(
"scald",
name="scald_metric_collector_leader",
requirements=requirements,
transfer_files=config.condor.transfer_files,
retries=1000,
)
# set up common options
common_opts = [
Argument("command", "aggregate"),
Option("config", config.metrics.scald_config),
Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
]
# define metrics used for aggregation jobs
snr_metrics = [f"{ifo}_snr_history" for ifo in config.ifos]
network_metrics = ["likelihood_history", "snr_history", "latency_history", "far_history"]
state_metrics = [f"{ifo}_strain_dropped" for ifo in config.ifos]
usage_metrics = ["ram_history"]
latency_metrics = [f"{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice")]
latency_metrics.append("all_itacac_latency")
agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics))
gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")]
seg_metrics = [f"{ifo}_{gate}" for ifo in config.ifos for gate in gates]
# set up partitioning
# FIXME don't hard code the 1000
max_agg_jobs = 1000
num_jobs = len(config.svd.bins)
agg_job_bounds = list(range(0, num_jobs, max_agg_jobs))
# timeseries metrics
agg_metrics = dagutil.groups(agg_metrics, max(max_agg_jobs // (4 * num_jobs), 1))
seg_metrics = dagutil.groups(seg_metrics, max(max_agg_jobs // (4 * num_jobs), 1))
for metrics in itertools.chain(agg_metrics, seg_metrics):
for i, _ in enumerate(agg_job_bounds):
arguments = list(common_opts)
arguments.extend([
Option("data-type", "timeseries"),
Option("topic", [f"gstlal.{config.tag}.{metric}" for metric in metrics]),
Option("schema", metrics),
])
# elect first metric collector as leader
if i == 0:
arguments.append(Option("across-jobs"))
metric_leader_layer += Node(arguments=arguments)
else:
metric_layer += Node(arguments=arguments)
# add optional test suite metrics
if config.metrics.test_suite_config:
with open(config.metrics.test_suite_config, 'r') as f:
testsuite_config = yaml.load(f, Loader=SafeLoader)
testsuite_metrics = [metric for metric in testsuite_config["schemas"]]
testsuite_metrics = dagutil.groups(testsuite_metrics, max(max_agg_jobs // (4 * num_jobs), 1))
for metrics in itertools.chain(testsuite_metrics):
testsuite_arguments = [
Argument("command", "aggregate"),
Option("config", config.metrics.test_suite_config),
Option("uri", f"kafka://{config.tag}-testsuite@{config.services.kafka_server}"),
]
testsuite_arguments.extend([
Option("data-type", "timeseries"),
Option("topic", [f"gstlal.{config.tag}.testsuite.{metric}" for metric in metrics]),
Option("schema", metrics)
])
metric_layer += Node(arguments=testsuite_arguments)
if metric_layer.nodes:
dag.attach(metric_leader_layer)
dag.attach(metric_layer)
else:
dag.attach(metric_leader_layer)
# event metrics
event_arguments = list(common_opts)
event_arguments.extend([
Option("data-type", "triggers"),
Option("topic", f"gstlal.{config.tag}.coinc"),
Option("schema", "coinc"),
])
event_layer += Node(arguments=event_arguments)
dag.attach(event_layer)
def track_noise_layer(config, dag):
layer = Layer(
"gstlal_ll_dq",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
for ifo in config.ifos:
# set up datasource options
if config.source.data_source == "framexmit":
arguments = [
Option("data-source", "framexmit"),
Option("framexmit-addr", f"{ifo}={config.source.framexmit_addr[ifo]}"),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
arguments = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", f"{ifo}={config.source.shared_memory_partition[ifo]}"),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
arguments.extend([
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", f"{ifo}={config.source.channel_name[ifo]}"),
Option("state-channel-name", f"{ifo}={config.source.state_channel_name[ifo]}"),
Option("dq-channel-name", f"{ifo}={config.source.dq_channel_name[ifo]}"),
Option("state-vector-on-bits", f"{ifo}={config.source.state_vector_on_bits[ifo]}"),
Option("state-vector-off-bits", f"{ifo}={config.source.state_vector_off_bits[ifo]}"),
Option("dq-vector-on-bits", f"{ifo}={config.source.dq_vector_on_bits[ifo]}"),
Option("dq-vector-off-bits", f"{ifo}={config.source.dq_vector_off_bits[ifo]}"),
Option("scald-config", config.metrics.scald_config),
Option("out-path", "aggregator"),
])
layer += Node(arguments = arguments)
dag.attach(layer)
def calc_gate_threshold(config, svd_bin, aggregate="max"):
"""
Given a configuration, svd bin and aggregate, this calculates
the h(t) gate threshold used for a given svd bin.
"""
if isinstance(config.filter.ht_gate_threshold, str):
bank_mchirp = config.svd.stats["bins"][svd_bin][f"{aggregate}_mchirp"]
min_mchirp, min_threshold, max_mchirp, max_threshold = [
float(y) for x in config.filter.ht_gate_threshold.split("-") for y in x.split(":")
]
gate_mchirp_ratio = (max_threshold - min_threshold) / (max_mchirp - min_mchirp)
return round(gate_mchirp_ratio * (bank_mchirp - min_mchirp) + min_threshold, 3)
else: # uniform threshold
return config.filter.ht_gate_threshold
def autocorrelation_length_map(ac_length_range):
"""
Given autocorrelation length ranges (e.g. 0:15:701)
or a single autocorrelation value, returns a function that
maps a given chirp mass to an autocorrelation length.
"""
if isinstance(ac_length_range, str):
ac_length_range = [ac_length_range]
# handle case with AC length ranges
if isinstance(ac_length_range, Iterable):
ac_lengths = []
min_mchirps = []
max_mchirps = []
for this_range in ac_length_range:
min_mchirp, max_mchirp, ac_length = this_range.split(":")
min_mchirps.append(float(min_mchirp))
max_mchirps.append(float(max_mchirp))
ac_lengths.append(int(ac_length))
# sanity check inputs
for bound1, bound2 in zip(min_mchirps[1:], max_mchirps[:-1]):
assert bound1 == bound2, "gaps not allowed in autocorrelation length ranges"
# convert to binning
bins = rate.IrregularBins([min_mchirps[0]] + max_mchirps)
# handle single value case
else:
ac_lengths = [ac_length_range]
bins = rate.IrregularBins([0., numpy.inf])
# create mapping
def mchirp_to_ac_length(mchirp):
idx = bins[mchirp]
return ac_lengths[idx]
return mchirp_to_ac_length
def mchirp_range_to_bins(min_mchirp, max_mchirp, svd_metadata):
"""
Given a range of chirp masses and the SVD metadata, determine
and return the SVD bins that overlap.
"""
svd_bins = []
mchirp_range = segment(min_mchirp, max_mchirp)
for svd_bin, bin_metadata in svd_metadata["bins"].items():
bin_range = segment(bin_metadata["min_mchirp"], bin_metadata["max_mchirp"])
if mchirp_range.intersects(bin_range):
svd_bins.append(svd_bin)
return svd_bins
def ifo_to_string(ifos):
"""Given a list of ifos, converts this to a string.
"""
return "".join(sorted(list(ifos)))
@plugins.register
def layers():
return {
"split_bank": split_bank_layer,
"svd_bank": svd_bank_layer,
"checkerboard": checkerboard_layer,
"filter": filter_layer,
"filter_injections": filter_injections_layer,
"aggregate": aggregate_layer,
"create_prior": create_prior_layer,
"calc_pdf": calc_pdf_layer,
"marginalize": marginalize_layer,
"marginalize_pdf": marginalize_pdf_layer,
"calc_likelihood": calc_likelihood_layer,
"cluster": cluster_layer,
"compute_far": compute_far_layer,
"find_injections": find_injections_layer,
"match_injections": match_injections_layer,
"measure_lnlr_cdf": measure_lnlr_cdf_layer,
"plot_analytic_vt": plot_analytic_vt_layer,
"plot_horizon_distance": plot_horizon_distance_layer,
"plot_summary": plot_summary_layer,
"plot_background": plot_background_layer,
"plot_sensitivity": plot_sensitivity_layer,
"filter_online": filter_online_layer,
"filter_injections_online": filter_injections_online_layer,
"marginalize_online": marginalize_online_layer,
"upload_events": upload_events_layer,
"plot_events": plot_events_layer,
"count_events": count_events_layer,
"collect_metrics": collect_metrics_layer,
"track_noise": track_noise_layer,
}