diff --git a/gstlal-inspiral/bin/gstlal_inspiral_workflow b/gstlal-inspiral/bin/gstlal_inspiral_workflow index 86e3d834dbbb660e4fcf0efdb4c1f79e8bb62754..3e0af124b0437763ba4dd783223b0b3503dee30c 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_workflow +++ b/gstlal-inspiral/bin/gstlal_inspiral_workflow @@ -20,29 +20,50 @@ import argparse from gstlal.config.inspiral import Config -from gstlal.dags import util as dagutils from gstlal.dags.inspiral import DAG +from gstlal.datafind import DataCache, DataType parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", help="Sets the path to read configuration from.") +parser.add_argument("-w", "--workflow", default="full", help="Sets the type of workflow to run.") parser.add_argument("--svd-manifest", help="Load SVD manifest from this path.") # load config args = parser.parse_args() config = Config.load(args.config) -config.load_svd_manifest(args.svd_manifest) +#config.load_svd_manifest(args.svd_manifest) +config.load_svd_manifest(config.svd.manifest) # create dag dag = DAG(config) dag.create_log_dir() # generate dag layers -ref_psd = dag.reference_psd() -median_psd = dag.median_psd(ref_psd) -svd_bank = dag.svd_bank(median_psd) -triggers, dist_stats = dag.filter(ref_psd, svd_bank) -triggers, dist_stats = dag.aggregate(triggers, dist_stats) - -dag.write_dag("trigger_pipe.dag") -dag.write_script("trigger_pipe.sh") +if args.workflow in set(("full", "filter")): + # generate filter dag layers + ref_psd = dag.reference_psd() + median_psd = dag.median_psd(ref_psd) + svd_bank = dag.svd_bank(median_psd) + triggers, dist_stats = dag.filter(ref_psd, svd_bank) + triggers, dist_stats = dag.aggregate(triggers, dist_stats) +else: + # load filter data products + triggers = DataCache.find(DataType.TRIGGERS, root=config.rootdir) + dist_stats = DataCache.find(DataType.DIST_STATS, root=config.rootdir) + median_psd = DataCache.find(DataType.MEDIAN_PSD, root=config.rootdir) + +if args.workflow in set(("full", "rerank")): + # generate rerank dag layers + prior = dag.prior(median_psd, dist_stats) + dist_stats = dag.marginalize(prior, dist_stats) + pdfs = dag.calc_pdf(dist_stats) + pdfs = dag.marginalize_pdf(pdfs) + triggers = dag.calc_likelihood(triggers, dist_stats) + triggers = dag.cluster(triggers) + triggers = dag.compute_far(triggers, pdfs) + +# write dag/script to disk +dag_name = f"{args.workflow}_inspiral_dag" +dag.write_dag(f"{dag_name}.dag") +dag.write_script(f"{dag_name}.sh") diff --git a/gstlal-inspiral/python/config/inspiral.py b/gstlal-inspiral/python/config/inspiral.py index 2458f26facf5c427ec843bbd631bf2817c6d48f5..84270d0245a931908ebbd04312dd001fc241642a 100644 --- a/gstlal-inspiral/python/config/inspiral.py +++ b/gstlal-inspiral/python/config/inspiral.py @@ -32,6 +32,8 @@ class Config(BaseConfig): self.psd = dotdict(replace_keys(kwargs["psd"])) self.svd = dotdict(replace_keys(kwargs["svd"])) self.filter = dotdict(replace_keys(kwargs["filter"])) + self.prior = dotdict(replace_keys(kwargs["prior"])) + self.rank = dotdict(replace_keys(kwargs["rank"])) def load_svd_manifest(self, manifest_file): with open(manifest_file, "r") as f: diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py index 5b385cd0581d791adf911a7bfa99d7a41a401323..03e2adf3b74eeb1c1c19b9151478dfb14a722089 100644 --- a/gstlal-inspiral/python/dags/layers/inspiral.py +++ b/gstlal-inspiral/python/dags/layers/inspiral.py @@ -25,7 +25,7 @@ from gstlal.dags import util as dagutil def reference_psd_layer(config, dag): - requirements = {"request_cpu": 2, "request_memory": 2000, **config.condor} + requirements = {"request_cpus": 2, "request_memory": 2000, **config.condor} layer = Layer("gstlal_reference_psd", requirements=requirements) psd_cache = DataCache.generate(DataType.REFERENCE_PSD, config.ifo_combo, config.time_bins) @@ -53,7 +53,7 @@ def reference_psd_layer(config, dag): def median_psd_layer(config, dag, ref_psd_cache): - requirements = {"request_cpu": 2, "request_memory": 2000, **config.condor} + requirements = {"request_cpus": 2, "request_memory": 2000, **config.condor} layer = Layer("gstlal_median_of_psds", parents="reference_psd", requirements=requirements) median_psd_cache = DataCache.generate(DataType.REFERENCE_PSD, config.ifo_combo, config.span) @@ -68,7 +68,7 @@ def median_psd_layer(config, dag, ref_psd_cache): def svd_bank_layer(config, dag, median_psd_cache): - requirements = {"request_cpu": 1, "request_memory": 4000, **config.condor} + requirements = {"request_cpus": 1, "request_memory": 4000, **config.condor} layer = Layer("gstlal_inspiral_svd_bank", parents="median_psd", requirements=requirements) svd_cache = DataCache.generate(DataType.SVD_BANK, config.ifos, config.span, svd_bins=config.svd.bins) @@ -100,7 +100,7 @@ def svd_bank_layer(config, dag, median_psd_cache): def filter_layer(config, dag, ref_psd_cache, svd_bank_cache): - requirements = {"request_cpu": 2, "request_memory": 4000, **config.condor} + requirements = {"request_cpus": 2, "request_memory": 4000, **config.condor} layer = Layer("gstlal_inspiral", parents=("reference_psd", "svd_bank"), requirements=requirements) trigger_cache = DataCache.generate(DataType.TRIGGERS, config.ifo_combo, config.time_bins, svd_bins=config.svd.bins) @@ -168,7 +168,7 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache): "lalapps_run_sqlite", name="cluster_triggers_by_snr", parents="filter", - requirements={"request_cpu": 1, "request_memory": 2000, **config.condor} + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} ) # FIXME: find better way of discovering SQL file @@ -189,9 +189,9 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache): # marginalize dist stats across time dist_layer = Layer( "gstlal_inspiral_marginalize_likelihood", - name="marginalize_dist_stats", + name="marginalize_dist_stats_across_time_filter", parents="filter", - requirements={"request_cpu": 1, "request_memory": 2000, **config.condor} + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} ) agg_dist_stat_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins) @@ -212,7 +212,218 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache): return trigger_cache, agg_dist_stat_cache +def prior_layer(config, dag, median_psd_cache, dist_stat_cache): + if "aggregate_dist_stats" in dag: + parents = ("median_psd", "aggregate_dist_stats") + else: + parents = None + + layer = Layer( + "gstlal_inspiral_create_prior_diststats", + parents=parents, + requirements={"request_cpus": 2, "request_memory": 4000, **config.condor} + ) + + prior_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins) + + for svd_bin, prior in prior_cache.groupby("bin").items(): + prior_inputs = [ + Option("svd-file", config.svd.manifest), + Option("mass-model-file", config.prior.mass_model), + Option("psd-xml", median_psd_cache.files) + ] + if config.prior.idq_timeseries: + prior_inputs["idq-file"] = config.prior.idq_timeseries + + layer += Node( + key = svd_bin, + parent_keys = {"aggregate_dist_stats": [svd_bin]}, + arguments = [ + Option("df", "bandwidth"), + Option("svd-bin", svd_bin), + Option("background-prior", 1), + Option("instrument", config.ifos), + Option("min-instruments", config.filter.min_instruments), + Option("coincidence-threshold", config.filter.coincidence_threshold), + ], + inputs = prior_inputs, + outputs = Option("write-likelihood", prior.files), + ) + + dag["prior"] = layer + return prior_cache + + +def marginalize_layer(config, dag, prior_cache, dist_stat_cache): + if "aggregate_dist_stats" in dag: + parents = ("prior", "aggregate_dist_stats") + else: + parents = "prior" + + layer = Layer( + "gstlal_inspiral_marginalize_likelihood", + name="marginalize_dist_stats_across_time_rank", + parents=parents, + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + marg_dist_stat_cache = DataCache.generate(DataType.DIST_STATS, config.ifo_combo, config.span, svd_bins=config.svd.bins) + + prior = prior_cache.groupby("bin") + dist_stats = dist_stat_cache.groupby("bin") + for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items(): + parent_keys = {"prior": [svd_bin]} + if "aggregate_dist_stats" in dag: + parent_keys["aggregate_dist_stats"] = [svd_bin] + + layer += Node( + key = svd_bin, + parent_keys = parent_keys, + arguments = Option("marginalize", "ranking-stat"), + inputs = Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files), + outputs = Option("output", marg_dist_stats.files) + ) + + dag["marginalize"] = layer + return marg_dist_stat_cache + + +def calc_pdf_layer(config, dag, dist_stat_cache): + layer = Layer( + "gstlal_inspiral_calc_rank_pdfs", + parents="marginalize", + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + pdf_cache = DataCache.generate(DataType.DIST_STAT_PDFS, config.ifo_combo, config.span, svd_bins=config.svd.bins) + + dist_stats = dist_stat_cache.groupby("bin") + for svd_bin, pdfs in pdf_cache.groupby("bin").items(): + layer += Node( + key = svd_bin, + parent_keys = {"marginalize": [svd_bin]}, + arguments = Option("ranking-stat-samples", config.rank.ranking_stat_samples), + inputs = Argument("dist-stats", dist_stats[svd_bin].files), + outputs = Option("output", pdfs.files) + ) + + dag["calc_pdf"] = layer + return pdf_cache + + +def marginalize_pdf_layer(config, dag, pdf_cache): + layer = Layer( + "gstlal_inspiral_marginalize_likelihood", + name="gstlal_inspiral_marginalize_pdfs", + parents="calc_pdf", + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + marg_pdf_cache = DataCache.generate(DataType.DIST_STAT_PDFS, config.ifo_combo, config.span) + + layer += Node( + parent_keys = {"calc_pdf": [svd_bin for svd_bin in config.svd.bins]}, + arguments = Option("marginalize", "ranking-stat-pdf"), + inputs = Argument("dist-stat-pdfs", pdf_cache.files), + outputs = Option("output", marg_pdf_cache.files) + ) + + dag["marginalize_pdf"] = layer + return marg_pdf_cache + + +def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache): + if "aggregate_dist_stats" in dag: + parents = ("marginalize", "aggregate_triggers", "aggregate_dist_stats") + else: + parents = "marginalize" + + layer = Layer( + "gstlal_inspiral_calc_likelihood", + parents=parents, + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + dist_stats = dist_stat_cache.groupby("bin") + for svd_bin, triggers in trigger_cache.groupby("bin").items(): + layer += Node( + parent_keys = { + "marginalize": [svd_bin], + "aggregate_triggers": [svd_bin], + "aggregate_dist_stats": [svd_bin], + }, + arguments = [ + Option("force"), + Option("tmp-space", dagutil.condor_scratch_space()), + ], + inputs = [ + Option("likelihood-url", dist_stats[svd_bin].files), + Argument("triggers", triggers.files), + ], + outputs = Argument("calc-triggers", triggers.files, include=False), + ) + + dag["calc_likelihood"] = layer + return trigger_cache + + +def cluster_layer(config, dag, trigger_cache): + # cluster triggers by likelihood + layer = Layer( + "lalapps_run_sqlite", + name="cluster_triggers_by_likelihood", + parents="calc_likelihood", + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + # FIXME: find better way of discovering SQL file + share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal") + cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql") + + for span, triggers in trigger_cache.groupby("time").items(): + layer += Node( + key = span, + arguments = [ + Option("sql-file", cluster_sql_file), + Option("tmp-space", dagutil.condor_scratch_space()), + ], + inputs = Argument("triggers", triggers.files), + ) + + dag["cluster"] = layer + return trigger_cache + + +def compute_far_layer(config, dag, trigger_cache, pdf_cache): + layer = Layer( + "gstlal_compute_far_from_snr_chisq_histograms", + name="compute_far", + parents=("cluster", "marginalize_pdf"), + requirements={"request_cpus": 1, "request_memory": 2000, **config.condor} + ) + + for span, triggers in trigger_cache.groupby("time").items(): + layer += Node( + key = span, + parent_keys = {"cluster": [span]}, + arguments = [ + Option("tmp-space", dagutil.condor_scratch_space()), + ], + inputs = [ + Option("non-injection-db", triggers.files), + Option("background-bins-file", pdf_cache.files), + ], + ) + + dag["compute_far"] = layer + return trigger_cache + + def calc_gate_threshold(config, svd_bin, aggregate="max"): + """ + Given a configuration, svd bin and aggregate, this calculates + the h(t) gate threshold used for a given svd bin. + """ if ":" in config.filter.ht_gate_threshold: bank_mchirp = config.svd.stats[svd_bin][f"{aggregate}_mchirp"] min_mchirp, min_threshold, max_mchirp, max_threshold = [ @@ -225,6 +436,12 @@ def calc_gate_threshold(config, svd_bin, aggregate="max"): def format_ifo_args(ifos, args): + """ + Given a set of instruments and arguments keyed by instruments, this + creates a list of strings in the form {ifo}={arg}. This is suitable + for command line options like --channel-name which expects this + particular format. + """ if isinstance(ifos, str): ifos = [ifos] return [f"{ifo}={args[ifo]}" for ifo in ifos] @@ -238,4 +455,11 @@ def layers(): "svd_bank": svd_bank_layer, "filter": filter_layer, "aggregate": aggregate_layer, + "prior": prior_layer, + "calc_pdf": calc_pdf_layer, + "marginalize": marginalize_layer, + "marginalize_pdf": marginalize_pdf_layer, + "calc_likelihood": calc_likelihood_layer, + "cluster": cluster_layer, + "compute_far": compute_far_layer, }