Skip to content
Snippets Groups Projects
Commit a060cfe9 authored by Patrick Godwin's avatar Patrick Godwin Committed by ChiWai Chan
Browse files

modify offline workflow to generate Makefile templates, removing need for separate repo

parent 0a3b1e0f
No related branches found
No related tags found
No related merge requests found
Showing
with 367 additions and 105 deletions
......@@ -20,73 +20,132 @@
import argparse
import os
import shutil
import sys
from gstlal.config.inspiral import Config
from gstlal.dags.inspiral import DAG
from gstlal.datafind import DataCache, DataType
from gstlal.workflows import write_makefile
# set up command line options
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
parser.add_argument("-w", "--workflow", default="full", help="Sets the type of workflow to run.")
subparser = parser.add_subparsers(title="commands", metavar="<command>", dest="command")
subparser.required = True
p = subparser.add_parser("init", help="generate a Makefile based on configuration")
p.add_argument("-c", "--config", help="Sets the path to read configuration from.")
p.add_argument("-w", "--workflow", default="full", help="Sets the type of workflow to run.")
p.add_argument("-f", "--force", action="store_true", help="If set, overwrites the existing Makefile")
p = subparser.add_parser("create", help="create a workflow DAG")
p.add_argument("-c", "--config", help="Sets the path to read configuration from.")
p.add_argument("-w", "--workflow", default="full", help="Sets the type of workflow to run.")
# load config
args = parser.parse_args()
config = Config.load(args.config)
# create dag
dag = DAG(config)
dag.create_log_dir()
# create makefile
if args.command == "init":
if os.path.exists(os.path.join(os.getcwd(), "Makefile")) and not args.force:
print("Makefile already exists. To overwrite, run with --force", file=sys.stderr)
else:
write_makefile(config, "Makefile.offline_inspiral_gwosc_template", workflow=args.workflow)
# generate dag
elif args.command == "create":
config.setup()
dag = DAG(config)
dag.create_log_dir()
# common paths
filter_dir = os.path.join(config.data.analysis_dir, "filter")
rank_dir = os.path.join(config.data.rerank_dir, "rank")
# generate workflow
if args.workflow in set(("full", "filter", "rerank")):
if args.workflow in set(("full", "filter")):
# input data products
split_bank = DataCache.find(DataType.SPLIT_BANK, svd_bins="*", subtype="*")
# generate filter dag layers
ref_psd = dag.reference_psd()
median_psd = dag.median_psd(ref_psd)
svd_bank = dag.svd_bank(median_psd, split_bank)
triggers, dist_stats = dag.filter(ref_psd, svd_bank)
if config.filter.injections:
inj_triggers = dag.filter_injections(ref_psd, svd_bank)
triggers += inj_triggers
triggers, dist_stats = dag.aggregate(triggers, dist_stats)
else:
# load filter data products
ref_psd = DataCache.find(DataType.REFERENCE_PSD, root=config.data.analysis_dir)
median_psd = DataCache.find(DataType.MEDIAN_PSD, root=config.data.analysis_dir)
injections = DataCache.find(DataType.MATCHED_INJECTIONS, root=filter_dir, svd_bins="*", subtype="*")
svd_bank = DataCache.find(DataType.SVD_BANK, root=filter_dir, svd_bins="*")
dist_stats = DataCache.find(DataType.DIST_STATS, root=filter_dir, svd_bins="*")
triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*", subtype="*")
inj_triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*")
triggers += inj_triggers
# common paths
filter_dir = os.path.join(config.data.analysis_dir, "filter")
rank_dir = os.path.join(config.data.rerank_dir, "rank")
if args.workflow in set(("full", "rerank")):
# generate rerank dag layers
prior = dag.create_prior(svd_bank, median_psd, dist_stats)
dist_stats = dag.marginalize(prior, dist_stats)
# generate workflow
if args.workflow in set(("full", "filter", "rerank")):
if args.workflow in set(("full", "filter")):
# input data products
split_bank = DataCache.find(DataType.SPLIT_BANK, svd_bins="*", subtype="*")
pdfs = dag.calc_pdf(dist_stats)
pdfs = dag.marginalize_pdf(pdfs)
# generate filter dag layers
ref_psd = dag.reference_psd()
median_psd = dag.median_psd(ref_psd)
svd_bank = dag.svd_bank(median_psd, split_bank)
triggers = dag.calc_likelihood(triggers, dist_stats)
triggers = dag.cluster(triggers)
triggers, dist_stats = dag.filter(ref_psd, svd_bank)
if config.filter.injections:
inj_triggers = dag.filter_injections(ref_psd, svd_bank)
triggers += inj_triggers
if config.filter.injections:
triggers, injections = dag.find_injections(triggers)
injections = dag.match_injections(injections)
lnlr_cdfs = dag.measure_lnlr_cdf(dist_stats, injections)
triggers, dist_stats = dag.aggregate(triggers, dist_stats)
triggers, pdfs = dag.compute_far(triggers, pdfs)
else:
# load filter data products
dag.plot_horizon_distance(ref_psd)
dag.plot_summary(triggers, pdfs)
dag.plot_background(triggers, pdfs)
dag.plot_sensitivity(triggers)
if config.filter.injections:
dag.plot_analytic_vt(triggers, pdfs, lnlr_cdfs)
elif args.workflow == "injection":
# input data products
ref_psd = DataCache.find(DataType.REFERENCE_PSD, root=config.data.analysis_dir)
median_psd = DataCache.find(DataType.MEDIAN_PSD, root=config.data.analysis_dir)
injections = DataCache.find(DataType.MATCHED_INJECTIONS, root=filter_dir, svd_bins="*", subtype="*")
svd_bank = DataCache.find(DataType.SVD_BANK, root=filter_dir, svd_bins="*")
dist_stats = DataCache.find(DataType.DIST_STATS, root=filter_dir, svd_bins="*")
dist_stats = DataCache.find(DataType.MARG_DIST_STATS, root=rank_dir, svd_bins="*")
pdfs = DataCache.find(DataType.DIST_STAT_PDFS, root=rank_dir)
orig_zl_triggers = DataCache.find(DataType.TRIGGER_DATABASE, root=rank_dir)
triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*", subtype="*")
inj_triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*")
triggers += inj_triggers
# make a copy of zerolag triggers
zerolag_triggers = orig_zl_triggers.copy(root="rank")
for src_trg, dest_trg in zip(orig_zl_triggers.files, zerolag_triggers.files):
shutil.copy2(src_trg, dest_trg)
if args.workflow in set(("full", "rerank")):
# generate rerank dag layers
prior = dag.create_prior(svd_bank, median_psd, dist_stats)
dist_stats = dag.marginalize(prior, dist_stats)
pdfs = dag.calc_pdf(dist_stats)
pdfs = dag.marginalize_pdf(pdfs)
# generate injection-only dag layers
injections = dag.match_injections()
triggers = dag.filter_injections(ref_psd, svd_bank)
triggers = dag.aggregate(triggers)
triggers = dag.calc_likelihood(triggers, dist_stats)
triggers = dag.cluster(triggers)
if config.filter.injections:
triggers, injections = dag.find_injections(triggers)
injections = dag.match_injections(injections)
lnlr_cdfs = dag.measure_lnlr_cdf(dist_stats, injections)
triggers, injections = dag.find_injections(triggers)
injections = dag.match_injections(injections)
lnlr_cdfs = dags.measure_lnlr_cdf(dist_stats, injections)
triggers += zerolag_triggers
triggers, pdfs = dag.compute_far(triggers, pdfs)
......@@ -94,48 +153,12 @@ if args.workflow in set(("full", "filter", "rerank")):
dag.plot_summary(triggers, pdfs)
dag.plot_background(triggers, pdfs)
dag.plot_sensitivity(triggers)
dag.plot_analytic_vt(triggers, pdfs, lnlr_cdfs)
else:
raise ValueError(f"{args.workflow} is not a valid workflow option")
if config.filter.injections:
dag.plot_analytic_vt(triggers, pdfs, lnlr_cdfs)
elif args.workflow == "injection":
# input data products
ref_psd = DataCache.find(DataType.REFERENCE_PSD, root=config.data.analysis_dir)
svd_bank = DataCache.find(DataType.SVD_BANK, root=filter_dir, svd_bins="*")
dist_stats = DataCache.find(DataType.MARG_DIST_STATS, root=rank_dir, svd_bins="*")
pdfs = DataCache.find(DataType.DIST_STAT_PDFS, root=rank_dir)
orig_zl_triggers = DataCache.find(DataType.TRIGGER_DATABASE, root=rank_dir)
# make a copy of zerolag triggers
zerolag_triggers = orig_zl_triggers.copy(root="rank")
for src_trg, dest_trg in zip(orig_zl_triggers.files, zerolag_triggers.files):
shutil.copy2(src_trg, dest_trg)
# generate injection-only dag layers
injections = dag.match_injections()
triggers = dag.filter_injections(ref_psd, svd_bank)
triggers = dag.aggregate(triggers)
triggers = dag.calc_likelihood(triggers, dist_stats)
triggers = dag.cluster(triggers)
triggers, injections = dag.find_injections(triggers)
injections = dag.match_injections(injections)
lnlr_cdfs = dags.measure_lnlr_cdf(dist_stats, injections)
triggers += zerolag_triggers
triggers, pdfs = dag.compute_far(triggers, pdfs)
dag.plot_horizon_distance(dist_stats)
dag.plot_summary(triggers, pdfs)
dag.plot_background(triggers, pdfs)
dag.plot_sensitivity(triggers)
dag.plot_analytic_vt(triggers, pdfs, lnlr_cdfs)
else:
raise ValueError(f"{args.workflow} is not a valid workflow option")
# write dag/script to disk
dag_name = f"{args.workflow}_inspiral_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
# write dag/script to disk
dag_name = f"{args.workflow}_inspiral_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
......@@ -31,6 +31,7 @@ parser.add_argument("-w", "--workflow", default="inspiral", help="Sets the type
# load config
args = parser.parse_args()
config = Config.load(args.config)
config.setup()
# create dag
dag = DAG(config)
......
......@@ -33,6 +33,8 @@ AC_CONFIG_FILES([ \
python/emcee/Makefile \
python/plots/Makefile \
python/stats/Makefile \
python/workflows/Makefile \
python/workflows/templates/Makefile \
share/Makefile \
share/population_models/Makefile \
share/population_models/O2/Makefile \
......
AM_CPPFLAGS = -I$(top_srcdir)/lib
SUBDIRS = config dags emcee plots stats
SUBDIRS = config dags emcee plots stats workflows
# This is a trick taken from the gst-python automake setup.
# All of the Python scripts will be installed under the exec dir,
......
......@@ -42,6 +42,20 @@ class Config(BaseConfig):
self.metrics = dotdict(replace_keys(kwargs["metrics"]))
if "services" in kwargs:
self.services = dotdict(replace_keys(kwargs["services"]))
if "summary" in kwargs:
self.summary = dotdict(replace_keys(kwargs["summary"]))
# set up analysis directories
if not self.data.analysis_dir:
self.data.analysis_dir = os.getcwd()
if not self.data.rerank_dir:
self.data.rerank_dir = self.data.analysis_dir
def setup(self):
"""
Set up binning, load relevant analysis files.
"""
super().setup()
# load manifest
self.load_svd_manifest(self.svd.manifest)
......@@ -54,12 +68,6 @@ class Config(BaseConfig):
if self.span != segment(0, 0):
self.create_time_bins(start_pad=self.filter.start_pad)
# set up analysis directories
if not self.data.analysis_dir:
self.data.analysis_dir = os.getcwd()
if not self.data.rerank_dir:
self.data.rerank_dir = self.data.analysis_dir
def load_svd_manifest(self, manifest_file):
with open(manifest_file, "r") as f:
svd_stats = dotdict(replace_keys(json.load(f)))
......
SUBDIRS = templates
pkgpythondir = $(pkgpyexecdir)
workflowsdir = $(pkgpythondir)/workflows
#workflows_PYTHON = \
# inspiral.py
EXTRA_DIST = \
__init__.py
pkgpythondir = $(pkgpyexecdir)
templatesdir = $(pkgpythondir)/workflows/templates
templates_PYTHON = \
Makefile.offline_inspiral_gwosc_template
EXTRA_DIST = \
__init__.py
all : dag
.PHONY: launch
launch : {{ workflow }}_inspiral_dag.dag
condor_submit_dag $<
.PHONY: dag
dag : {{ workflow }}_inspiral_dag.dag
@echo ""
.PHONY: summary
summary :
mkdir -p {{ config.summary.webdir }}
gstlal_inspiral_summary_page \
--title gstlal-{{ config.start }}-{{ config.stop }}-closed-box \
--webserver-dir {{ config.summary.webdir }} \
--output-user-tag ALL_COMBINED \
--output-user-tag PRECESSION_COMBINED \
{% for inj_name in config.filter.injections.keys() %}
--output-user-tag {{ inj_name.upper() }}_INJECTION \
--output-user-tag {{ inj_name.upper() }}_INJECTION_PRECESSION \
{% endfor %}
--glob-path plots
segments.xml.gz : CAT1_vetoes.xml.gz
gstlal_query_gwosc_segments -o $@ {{ config.start }} {{ config.stop }}{% for instrument in config.ifos %} {{ instrument }}{% endfor %}
gstlal_segments_operations --segment-name vetoes --output-segment-name datasegments --union --output-file CAT1_vetoes_renamed.xml.gz $< $<
gstlal_segments_operations --diff --output-file $@ $@ CAT1_vetoes_renamed.xml.gz
gstlal_segments_trim --trim 0 --gps-start-time {{ config.start }} --gps-end-time {{ config.stop }} --min-length 512 --output $@ $@
rm CAT1_vetoes_renamed.xml.gz
@echo ""
vetoes.xml.gz : CAT1_vetoes.xml.gz
cp $< $@
@echo ""
CAT1_vetoes.xml.gz :
gstlal_query_gwosc_veto_segments -o $@ {{ config.start }} {{ config.stop }} {% for instrument in config.ifos %} {{ instrument }}{% endfor %} --category CAT1 --cumulative
@echo ""
tisi.xml : inj_tisi.xml
lalapps_gen_timeslides {% for instrument, slides in config.filter.time_slides.items() %} --instrument={{ instrument }}={{ slides }}{% endfor %} bg_tisi.xml
ligolw_add --output $@ bg_tisi.xml $<
@echo ""
inj_tisi.xml :
lalapps_gen_timeslides {% for instrument in config.ifos %} --instrument={{ instrument }}=0:0:0{% endfor %} $@
@echo ""
{% if config.data.template_bank is mapping %}
{{ config.svd.manifest }} :{% for bank_file in config.data.template_bank.values() %} {{ bank_file }}{% endfor %}
{% else %}
{{ config.svd.manifest }} : {{ config.data.template_bank }}
{% endif %}
mkdir -p split_bank
{% if config.svd.sub_banks %}
{% for bank_name, params in config.svd.sub_banks.items() %}
gstlal_inspiral_bank_splitter \
--f-low {{ params.f_low }} \
--group-by-chi {{ params.num_chi_bins }} \
--output-path split_bank \
{% for approx in config.svd.approximant %}
--approximant {{ approx }} \
{% endfor %}
--overlap {{ params.overlap }} \
--instrument {% for instrument in config.ifos %}{{ instrument }}{% endfor %} \
--n {{ params.num_split_templates }} \
--sort-by template_duration \
--f-final {{ config.svd.max_f_final }} \
--num-banks {{ params.num_banks }} \
--stats-file $@ \
{{ config.data.template_bank[bank_name] }}
{% endfor %}
{% else %}
gstlal_inspiral_bank_splitter \
--f-low {{ config.svd.f_low }} \
--group-by-chi {{ config.svd.num_chi_bins }} \
--output-path split_bank \
{% for approx in config.svd.approximant %}
--approximant {{ approx }} \
{% endfor %}
--overlap {{ config.svd.overlap }} \
--instrument {% for instrument in config.ifos %}{{ instrument }}{% endfor %} \
--n {{ config.svd.num_split_templates }} \
--sort-by template_duration \
--f-final {{ config.svd.max_f_final }} \
--num-banks {{ config.svd.num_banks }} \
--stats-file $@ \
$<
{% endif %}
@echo ""
%_inspiral_dag.dag : {{ config.svd.manifest }} vetoes.xml.gz segments.xml.gz tisi.xml x509_proxy plots {% for inj in config.filter.injections.values() %} {{ inj.file }}{% endfor %}
gstlal_inspiral_workflow create -c config.yml --workflow $*
{% for inj_name, params in config.injections.items() %}
{{ params.file }} :
lalapps_inspinj \
--gps-start-time {{ config.start + params.time.shift }} \
--gps-end-time {{ config.stop }} \
--enable-spin \
--aligned \
--i-distr uniform \
--l-distr random \
--t-distr uniform \
--dchirp-distr uniform \
--m-distr {{ params.mass_distr }} \
{% for param in ['mass1', 'mass2', 'spin1', 'spin2', 'distance'] %}
{% for stat, val in params[param].items() %}
--{{ stat }}-{{ param }} {{ val }} \
{% endfor %}
{% endfor %}
--f-lower {{ params.f_low }} \
--waveform {{ params.waveform }} \
--time-step {{ params.time.step }} \
--time-interval {{ params.time.interval }} \
--taper-injection startend \
--seed {{ params.seed }} \
--output $@
ligolw_no_ilwdchar $@
@echo ""
{% endfor %}
x509_proxy :
cp /tmp/x509up_u$(shell id -u $$USER) x509_proxy
plots :
mkdir -p $@
clean :
rm -rf segments.xml.gz svd_manifest.json *tisi.xml x509_proxy
rm -rf split_bank *vetoes.xml.gz
rm -rf reference_psd median_psd
rm -rf filter rank plots
rm -rf logs *inspiral_dag.dag* *inspiral_dag.sh *.sub _condor_stdout
clean-lite :
rm -rf logs/* *inspiral_dag.dag* *inspiral_dag.sh *.sub
......@@ -30,6 +30,7 @@ parser.add_argument("-c", "--config", help="Sets the path to read configuration
# load config
args = parser.parse_args()
config = Config.load(args.config)
config.setup()
config.create_time_bins(start_pad=0, overlap=0, one_ifo_only=True)
# create dag
......
......@@ -29,6 +29,7 @@ parser.add_argument("-c", "--config", help="Sets the path to read configuration
# load config
args = parser.parse_args()
config = Config.load(args.config)
config.setup()
# create dag
dag = DAG(config)
......
......@@ -30,6 +30,8 @@ AC_CONFIG_FILES([ \
python/plots/Makefile \
python/stats/Makefile \
python/utilities/Makefile \
python/workflows/Makefile \
python/workflows/templates/Makefile \
gst/Makefile \
gst/debug/Makefile \
gst/gst/Makefile \
......
SUBDIRS = config dags pipeparts plots stats utilities
SUBDIRS = config dags pipeparts plots stats utilities workflows
AM_CPPFLAGS = -I$(top_srcdir)/lib
......
......@@ -73,18 +73,6 @@ class Config:
self.stop = self.start + self.duration
self.span = segment(self.start, self.stop)
if "frame_segments_file" in self.source:
xmldoc = ligolw_utils.load_filename(
self.source.frame_segments_file,
contenthandler=ligolw_segments.LIGOLWContentHandler
)
self.segments = ligolw_segments.segmenttable_get_by_name(xmldoc, "datasegments").coalesce()
else:
self.segments = segmentlistdict((ifo, segmentlist([self.span])) for ifo in self.ifos)
if self.span != segment(0, 0):
self.create_time_bins(start_pad=512, min_instruments=self.min_ifos)
# section-specific options
if "psd" in kwargs:
self.psd = dotdict(replace_keys(kwargs["psd"]))
......@@ -201,6 +189,22 @@ class Config:
"file transfer can be disabled by setting transfer-files: false in the condor section"
)
def setup(self):
"""
Load segments and create time bins.
"""
if "frame_segments_file" in self.source:
xmldoc = ligolw_utils.load_filename(
self.source.frame_segments_file,
contenthandler=ligolw_segments.LIGOLWContentHandler
)
self.segments = ligolw_segments.segmenttable_get_by_name(xmldoc, "datasegments").coalesce()
else:
self.segments = segmentlistdict((ifo, segmentlist([self.span])) for ifo in self.ifos)
if self.span != segment(0, 0):
self.create_time_bins(start_pad=512, min_instruments=self.min_ifos)
@staticmethod
def to_ifo_list(ifos):
"""
......
SUBDIRS = templates
pkgpythondir = $(pkgpyexecdir)
workflowsdir = $(pkgpythondir)/workflows
workflows_PYTHON = \
__init__.py
# Copyright (C) 2021 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
from typing import Optional
import jinja2
from gstlal.config import Config
def write_makefile(config: Config, template: str, path: Optional[str] = None, **kwargs) -> None:
"""Generate a Makefile based on a configuration and template.
Args:
config:
Config, a configuration to use to fill in Makefile template
template:
str, name of the Makefile template
path:
str, default $PWD, the path to write the Makefile to
**kwargs:
any extra key-value pairs used for the template
"""
if not path:
path = os.getcwd()
template_loader = jinja2.PackageLoader("gstlal.workflows", "templates")
template_env = jinja2.Environment(loader=template_loader, trim_blocks=True, lstrip_blocks=True)
template = template_env.get_template(template)
makefile = template.render(config=config, **kwargs)
with open(os.path.join(path, "Makefile"), "w") as f:
f.write(makefile)
pkgpythondir = $(pkgpyexecdir)
templatesdir = $(pkgpythondir)/workflows/templates
templates_PYTHON = \
__init__.py
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment