Skip to content
Snippets Groups Projects
Commit 22699dfa authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal-inspiral: add rerank layers to rerank DAG

parent 8f4d0d5c
No related branches found
No related tags found
1 merge request!41DAG Workflow Overhaul + OSG DAG support
......@@ -20,29 +20,50 @@
import argparse
from gstlal.config.inspiral import Config
from gstlal.dags import util as dagutils
from gstlal.dags.inspiral import DAG
from gstlal.datafind import DataCache, DataType
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
parser.add_argument("-w", "--workflow", default="full", help="Sets the type of workflow to run.")
parser.add_argument("--svd-manifest", help="Load SVD manifest from this path.")
# load config
args = parser.parse_args()
config = Config.load(args.config)
config.load_svd_manifest(args.svd_manifest)
#config.load_svd_manifest(args.svd_manifest)
config.load_svd_manifest(config.svd.manifest)
# create dag
dag = DAG(config)
dag.create_log_dir()
# generate dag layers
ref_psd = dag.reference_psd()
median_psd = dag.median_psd(ref_psd)
svd_bank = dag.svd_bank(median_psd)
triggers, dist_stats = dag.filter(ref_psd, svd_bank)
triggers, dist_stats = dag.aggregate(triggers, dist_stats)
dag.write_dag("trigger_pipe.dag")
dag.write_script("trigger_pipe.sh")
if args.workflow in set(("full", "filter")):
# generate filter dag layers
ref_psd = dag.reference_psd()
median_psd = dag.median_psd(ref_psd)
svd_bank = dag.svd_bank(median_psd)
triggers, dist_stats = dag.filter(ref_psd, svd_bank)
triggers, dist_stats = dag.aggregate(triggers, dist_stats)
else:
# load filter data products
triggers = DataCache.find(DataType.TRIGGERS, root=config.rootdir)
dist_stats = DataCache.find(DataType.DIST_STATS, root=config.rootdir)
median_psd = DataCache.find(DataType.MEDIAN_PSD, root=config.rootdir)
if args.workflow in set(("full", "rerank")):
# generate rerank dag layers
prior = dag.prior(median_psd, dist_stats)
dist_stats = dag.marginalize(prior, dist_stats)
pdfs = dag.calc_pdf(dist_stats)
pdfs = dag.marginalize_pdf(pdfs)
triggers = dag.calc_likelihood(triggers, dist_stats)
triggers = dag.cluster(triggers)
triggers = dag.compute_far(triggers, pdfs)
# write dag/script to disk
dag_name = f"{args.workflow}_inspiral_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
......@@ -32,6 +32,8 @@ class Config(BaseConfig):
self.psd = dotdict(replace_keys(kwargs["psd"]))
self.svd = dotdict(replace_keys(kwargs["svd"]))
self.filter = dotdict(replace_keys(kwargs["filter"]))
self.prior = dotdict(replace_keys(kwargs["prior"]))
self.rank = dotdict(replace_keys(kwargs["rank"]))
def load_svd_manifest(self, manifest_file):
with open(manifest_file, "r") as f:
......
......@@ -25,7 +25,7 @@ from gstlal.dags import util as dagutil
def reference_psd_layer(config, dag):
requirements = {"request_cpu": 2, "request_memory": 2000, **config.condor}
requirements = {"request_cpus": 2, "request_memory": 2000, **config.condor}
layer = Layer("gstlal_reference_psd", requirements=requirements)
psd_cache = DataCache.generate(DataType.REFERENCE_PSD, config.ifo_combo, config.time_bins)
......@@ -53,7 +53,7 @@ def reference_psd_layer(config, dag):
def median_psd_layer(config, dag, ref_psd_cache):
requirements = {"request_cpu": 2, "request_memory": 2000, **config.condor}
requirements = {"request_cpus": 2, "request_memory": 2000, **config.condor}
layer = Layer("gstlal_median_of_psds", parents="reference_psd", requirements=requirements)
median_psd_cache = DataCache.generate(DataType.REFERENCE_PSD, config.ifo_combo, config.span)
......@@ -68,7 +68,7 @@ def median_psd_layer(config, dag, ref_psd_cache):
def svd_bank_layer(config, dag, median_psd_cache):
requirements = {"request_cpu": 1, "request_memory": 4000, **config.condor}
requirements = {"request_cpus": 1, "request_memory": 4000, **config.condor}
layer = Layer("gstlal_inspiral_svd_bank", parents="median_psd", requirements=requirements)
svd_cache = DataCache.generate(DataType.SVD_BANK, config.ifos, config.span, svd_bins=config.svd.bins)
......@@ -100,7 +100,7 @@ def svd_bank_layer(config, dag, median_psd_cache):
def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
requirements = {"request_cpu": 2, "request_memory": 4000, **config.condor}
requirements = {"request_cpus": 2, "request_memory": 4000, **config.condor}
layer = Layer("gstlal_inspiral", parents=("reference_psd", "svd_bank"), requirements=requirements)
trigger_cache = DataCache.generate(DataType.TRIGGERS, config.ifo_combo, config.time_bins, svd_bins=config.svd.bins)
......@@ -168,7 +168,7 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
"lalapps_run_sqlite",
name="cluster_triggers_by_snr",
parents="filter",
requirements={"request_cpu": 1, "request_memory": 2000, **config.condor}
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
# FIXME: find better way of discovering SQL file
......@@ -189,9 +189,9 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
# marginalize dist stats across time
dist_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats",
name="marginalize_dist_stats_across_time_filter",
parents="filter",
requirements={"request_cpu": 1, "request_memory": 2000, **config.condor}
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
agg_dist_stat_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins)
......@@ -212,7 +212,218 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
return trigger_cache, agg_dist_stat_cache
def prior_layer(config, dag, median_psd_cache, dist_stat_cache):
if "aggregate_dist_stats" in dag:
parents = ("median_psd", "aggregate_dist_stats")
else:
parents = None
layer = Layer(
"gstlal_inspiral_create_prior_diststats",
parents=parents,
requirements={"request_cpus": 2, "request_memory": 4000, **config.condor}
)
prior_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins)
for svd_bin, prior in prior_cache.groupby("bin").items():
prior_inputs = [
Option("svd-file", config.svd.manifest),
Option("mass-model-file", config.prior.mass_model),
Option("psd-xml", median_psd_cache.files)
]
if config.prior.idq_timeseries:
prior_inputs["idq-file"] = config.prior.idq_timeseries
layer += Node(
key = svd_bin,
parent_keys = {"aggregate_dist_stats": [svd_bin]},
arguments = [
Option("df", "bandwidth"),
Option("svd-bin", svd_bin),
Option("background-prior", 1),
Option("instrument", config.ifos),
Option("min-instruments", config.filter.min_instruments),
Option("coincidence-threshold", config.filter.coincidence_threshold),
],
inputs = prior_inputs,
outputs = Option("write-likelihood", prior.files),
)
dag["prior"] = layer
return prior_cache
def marginalize_layer(config, dag, prior_cache, dist_stat_cache):
if "aggregate_dist_stats" in dag:
parents = ("prior", "aggregate_dist_stats")
else:
parents = "prior"
layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats_across_time_rank",
parents=parents,
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
marg_dist_stat_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins)
prior = prior_cache.groupby("bin")
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items():
parent_keys = {"prior": [svd_bin]}
if "aggregate_dist_stats" in dag:
parent_keys["aggregate_dist_stats"] = [svd_bin]
layer += Node(
key = svd_bin,
parent_keys = parent_keys,
arguments = Option("marginalize", "ranking-stat"),
inputs = Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files),
outputs = Option("output", marg_dist_stats.files)
)
dag["marginalize"] = layer
return marg_dist_stat_cache
def calc_pdf_layer(config, dag, dist_stat_cache):
layer = Layer(
"gstlal_inspiral_calc_rank_pdfs",
parents="marginalize",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
pdf_cache = DataCache.generate(DataType.DIST_STAT_PDFS, config.ifo_combo, config.span, svd_bins=config.svd.bins)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, pdfs in pdf_cache.groupby("bin").items():
layer += Node(
key = svd_bin,
parent_keys = {"marginalize": [svd_bin]},
arguments = Option("ranking-stat-samples", config.rank.ranking_stat_samples),
inputs = Argument("dist-stats", dist_stats[svd_bin].files),
outputs = Option("output", pdfs.files)
)
dag["calc_pdf"] = layer
return pdf_cache
def marginalize_pdf_layer(config, dag, pdf_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="gstlal_inspiral_marginalize_pdfs",
parents="calc_pdf",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
marg_pdf_cache = DataCache.generate(DataType.DIST_STAT_PDFS, config.ifo_combo, config.span)
layer += Node(
parent_keys = {"calc_pdf": [svd_bin for svd_bin in config.svd.bins]},
arguments = Option("marginalize", "ranking-stat-pdf"),
inputs = Argument("dist-stat-pdfs", pdf_cache.files),
outputs = Option("output", marg_pdf_cache.files)
)
dag["marginalize_pdf"] = layer
return marg_pdf_cache
def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache):
if "aggregate_dist_stats" in dag:
parents = ("marginalize", "aggregate_triggers", "aggregate_dist_stats")
else:
parents = "marginalize"
layer = Layer(
"gstlal_inspiral_calc_likelihood",
parents=parents,
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, triggers in trigger_cache.groupby("bin").items():
layer += Node(
parent_keys = {
"marginalize": [svd_bin],
"aggregate_triggers": [svd_bin],
"aggregate_dist_stats": [svd_bin],
},
arguments = [
Option("force"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = [
Option("likelihood-url", dist_stats[svd_bin].files),
Argument("triggers", triggers.files),
],
outputs = Argument("calc-triggers", triggers.files, include=False),
)
dag["calc_likelihood"] = layer
return trigger_cache
def cluster_layer(config, dag, trigger_cache):
# cluster triggers by likelihood
layer = Layer(
"lalapps_run_sqlite",
name="cluster_triggers_by_likelihood",
parents="calc_likelihood",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
# FIXME: find better way of discovering SQL file
share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql")
for span, triggers in trigger_cache.groupby("time").items():
layer += Node(
key = span,
arguments = [
Option("sql-file", cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
)
dag["cluster"] = layer
return trigger_cache
def compute_far_layer(config, dag, trigger_cache, pdf_cache):
layer = Layer(
"gstlal_compute_far_from_snr_chisq_histograms",
name="compute_far",
parents=("cluster", "marginalize_pdf"),
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor}
)
for span, triggers in trigger_cache.groupby("time").items():
layer += Node(
key = span,
parent_keys = {"cluster": [span]},
arguments = [
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = [
Option("non-injection-db", triggers.files),
Option("background-bins-file", pdf_cache.files),
],
)
dag["compute_far"] = layer
return trigger_cache
def calc_gate_threshold(config, svd_bin, aggregate="max"):
"""
Given a configuration, svd bin and aggregate, this calculates
the h(t) gate threshold used for a given svd bin.
"""
if ":" in config.filter.ht_gate_threshold:
bank_mchirp = config.svd.stats[svd_bin][f"{aggregate}_mchirp"]
min_mchirp, min_threshold, max_mchirp, max_threshold = [
......@@ -225,6 +436,12 @@ def calc_gate_threshold(config, svd_bin, aggregate="max"):
def format_ifo_args(ifos, args):
"""
Given a set of instruments and arguments keyed by instruments, this
creates a list of strings in the form {ifo}={arg}. This is suitable
for command line options like --channel-name which expects this
particular format.
"""
if isinstance(ifos, str):
ifos = [ifos]
return [f"{ifo}={args[ifo]}" for ifo in ifos]
......@@ -238,4 +455,11 @@ def layers():
"svd_bank": svd_bank_layer,
"filter": filter_layer,
"aggregate": aggregate_layer,
"prior": prior_layer,
"calc_pdf": calc_pdf_layer,
"marginalize": marginalize_layer,
"marginalize_pdf": marginalize_pdf_layer,
"calc_likelihood": calc_likelihood_layer,
"cluster": cluster_layer,
"compute_far": compute_far_layer,
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment