diff --git a/gstlal-inspiral/bin/Makefile.am b/gstlal-inspiral/bin/Makefile.am index aeb0af205a26c08b0b05e9760aba43896951df9f..9937a66f618d8e394ddf8f4b8c552da612fe7b83 100644 --- a/gstlal-inspiral/bin/Makefile.am +++ b/gstlal-inspiral/bin/Makefile.am @@ -62,6 +62,7 @@ dist_bin_SCRIPTS = \ gstlal_inspiral_combine_injection_sets \ gstlal_inspiral_svd_bank \ gstlal_inspiral_svd_bank_pipe \ + gstlal_inspiral_workflow \ gstlal_ll_inspiral_calculate_range \ gstlal_ll_inspiral_event_plotter \ gstlal_ll_inspiral_event_uploader \ diff --git a/gstlal-inspiral/bin/gstlal_inspiral_config b/gstlal-inspiral/bin/gstlal_inspiral_config index d9f3c8a9dde1b16f107b8156f8d65926c100630b..f2d3e58378c37bac0ab61a634b75c868dd763001 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_config +++ b/gstlal-inspiral/bin/gstlal_inspiral_config @@ -63,6 +63,10 @@ class gYAML(object): mc = float(self.svdstats[sbin]["median_mchirp"]) return ac1 if mc < float(mct) else ac2 + if "ifo-list" in arg: + ifostr = self("ifos") + return " ".join([ifostr[2*n:2*n+2] for n in range(len(ifostr) // 2)]) + if "svd.number-of-svd-bins" in arg: return len(self.svdstats) diff --git a/gstlal-inspiral/bin/gstlal_inspiral_workflow b/gstlal-inspiral/bin/gstlal_inspiral_workflow new file mode 100755 index 0000000000000000000000000000000000000000..819bd20a5878fb8335549e518cc2b1626a638ecd --- /dev/null +++ b/gstlal-inspiral/bin/gstlal_inspiral_workflow @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +import argparse + +from gstlal.config import Config +from gstlal.dags import util as dagutils +from gstlal.dags.inspiral import DAG + + +parser = argparse.ArgumentParser() +parser.add_argument("-c", "--config", help="Sets the path to read configuration from.") + +# load config +args = parser.parse_args() +config = Config.load(args.config) + +# define bins +time_bins = dagutils.partition_by_time(config.span, config.segments, config.ifos) +#svd_bins = workflow.svd_bins(config.svd) +svd_bins = ["%04X"%i for i in range(3)] # dummy until we wire up split bank/metadata + +# generate dag +dag = DAG(config) + +dag.reference_psd(time_bins) \ + .median_psd() \ + .svd_bank(svd_bins) \ + .filter(time_bins, svd_bins) \ + +dag.write("trigger_pipe.dag") diff --git a/gstlal-inspiral/configure.ac b/gstlal-inspiral/configure.ac index 3c226051293d10caeb2021809ab64580629f99ed..0af245e1b5c328fcdeb1a39a3f08133244473db2 100644 --- a/gstlal-inspiral/configure.ac +++ b/gstlal-inspiral/configure.ac @@ -26,8 +26,10 @@ AC_CONFIG_FILES([ \ lib/Makefile \ lib/skymap/Makefile \ Makefile \ - python/emcee/Makefile \ python/Makefile \ + python/dags/Makefile \ + python/dags/layers/Makefile \ + python/emcee/Makefile \ python/plots/Makefile \ python/stats/Makefile \ share/Makefile \ diff --git a/gstlal-inspiral/python/Makefile.am b/gstlal-inspiral/python/Makefile.am index 9b29339e042c8763ab22e109e101dd00409edbcf..04f758c9ee1e7c9ad04485147465087f0c5e2d24 100644 --- a/gstlal-inspiral/python/Makefile.am +++ b/gstlal-inspiral/python/Makefile.am @@ -1,6 +1,6 @@ AM_CPPFLAGS = -I$(top_srcdir)/lib -SUBDIRS = emcee plots stats +SUBDIRS = dags emcee plots stats # This is a trick taken from the gst-python automake setup. # All of the Python scripts will be installed under the exec dir, diff --git a/gstlal-inspiral/python/dags/Makefile.am b/gstlal-inspiral/python/dags/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..7200c9e092381876f16c0647c42fd8a5c3be713f --- /dev/null +++ b/gstlal-inspiral/python/dags/Makefile.am @@ -0,0 +1,10 @@ +SUBDIRS = layers + +pkgpythondir = $(pkgpyexecdir) +dagsdir = $(pkgpythondir)/dags + +dags_PYTHON = \ + inspiral.py + +EXTRA_DIST = \ + __init__.py diff --git a/gstlal-inspiral/python/dags/inspiral.py b/gstlal-inspiral/python/dags/inspiral.py new file mode 100644 index 0000000000000000000000000000000000000000..00e3673a72b97931faaa3b1484754714b3b2816c --- /dev/null +++ b/gstlal-inspiral/python/dags/inspiral.py @@ -0,0 +1,50 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +import pluggy + +from gstlal import plugins +from gstlal.dags import DAG as BaseDAG + + +class DAG(BaseDAG): + pass + + +def _get_registered_layers(): + """Get all registered DAG layers. + """ + # set up plugin manager + manager = pluggy.PluginManager("gstlal") + manager.add_hookspecs(plugins) + + # load layers + from gstlal.dags.layers import inspiral + manager.register(inspiral) + + # add all registered plugins to registry + registered = {} + for plugin_name in manager.hook.layers(): + for name, layer in plugin_name.items(): + registered[name] = layer + + return registered + + +# register layers to DAG +for layer_name, layer in _get_registered_layers().items(): + DAG.register_layer(layer_name)(layer) diff --git a/gstlal-inspiral/python/dags/layers/Makefile.am b/gstlal-inspiral/python/dags/layers/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..68dd9b5098b719a3d1ae22a76dbf412512b4aa3b --- /dev/null +++ b/gstlal-inspiral/python/dags/layers/Makefile.am @@ -0,0 +1,8 @@ +pkgpythondir = $(pkgpyexecdir) +layersdir = $(pkgpythondir)/dags/layers + +layers_PYTHON = \ + inspiral.py + +EXTRA_DIST = \ + __init__.py diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py new file mode 100644 index 0000000000000000000000000000000000000000..4c081970f669e0ca7a4e7d2ae6dd97ce0dbdb0f0 --- /dev/null +++ b/gstlal-inspiral/python/dags/layers/inspiral.py @@ -0,0 +1,176 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +import os + +from htcondor.dags.node import BaseNode + +from gstlal import plugins +from gstlal.config import Argument, Option +from gstlal.dags.layers import Layer, Node +from gstlal.dags import util as dagutils + + +def reference_psd_layer(config, dag, dag_layer, time_bins): + layer = Layer("gstlal_reference_psd", requirements=config.condor.requirements) + + for span in time_bins: + start, end = span + psd_path = os.path.join("psd", dagutils.gps_directory(start)) + psd_file = dagutils.T050017_filename(config.ifos, "REFERENCE_PSD", span, '.xml.gz') + layer += Node( + arguments = [ + Option("gps-start-time", int(start)), + Option("gps-end-time", int(end)), + Option("data-source", "frames"), + Option("channel-name", config_to_channels(config)), + Option("psd-fft-length", config.psd.fft_length), + Option("frame-segments-name", config.source.frame_segments_name) + ], + inputs = [ + Option("frame-cache", config.source.frame_cache), + Option("frame-segments-file", config.source.frame_segments_file) + ], + outputs = [ + Option("write-psd", os.path.join(psd_path, psd_file)) + ], + ) + + dag["reference_psd"] = layer + if isinstance(dag_layer, BaseNode): + return dag_layer.child_layer(**layer.config(), retries=3) + else: + return dag.layer(**layer.config(), retries=3) + + +def median_psd_layer(config, dag, dag_layer): + layer = Layer("gstlal_median_of_psds", requirements=config.condor.requirements) + + median_path = os.path.join("median_psd", dagutils.gps_directory(config.start)) + median_file = dagutils.T050017_filename(config.ifos, "REFERENCE_PSD", config.span, '.xml.gz') + layer += Node( + inputs = [Argument("psds", dag["reference_psd"].outputs["write-psd"])], + outputs = [Option("output-name", os.path.join(median_path, median_file))] + ) + + dag["median_psd"] = layer + return dag_layer.child_layer(**layer.config(), retries=3) + + +def svd_bank_layer(config, dag, dag_layer, svd_bins): + layer = Layer("gstlal_inspiral_svd_bank", requirements=config.condor.requirements) + + for svd_bin in svd_bins: + svd_path = os.path.join("svd_bank", dagutils.gps_directory(config.start)) + svd_file = dagutils.T050017_filename(config.ifos, f"{svd_bin}_SVD", config.span, '.xml.gz') + layer += Node( + arguments = [ + Option("svd-tolerance", config.svd.tolerance), + Option("flow", config.svd.f_low), + Option("sample-rate", config.svd.sample_rate), + Option("samples-min", config.svd.samples_min), + Option("samples-max-64", config.svd.samples_max_64), + Option("samples-max-256", config.svd.samples_max_256), + Option("samples-max", config.svd.samples_max), + Option("autocorrelation-length", config.svd.autocorrelation_length), + Option("bank-id", svd_bin), + ], + inputs = [Option("reference-psd", dag["median_psd"].outputs["output-name"])], + outputs = [Option("write-svd", os.path.join(svd_path, svd_file))], + ) + + dag["svd_bank"] = layer + return dag_layer.child_layer(**layer.config(), retries=3) + + +def filter_layer(config, dag, dag_layer, time_bins, svd_bins): + layer = Layer("gstlal_inspiral", requirements=config.condor.requirements) + + common_opts = [ + Option("track-psd"), + Option("local-frame-caching"), + Option("data-source", "frames"), + Option("psd-fft-length", config.psd.fft_length), + Option("frame-segments-name", config.source.frame_segments_name), + Option("tmp-space", dagutils.condor_scratch_space()), + Option("control-peak-time", config.filter.control_peak_time), + Option("coincidence-threshold", config.filter.coincidence_threshold), + Option("singles-threshold", config.filter.singles_threshold), + Option("fir-stride", config.filter.fir_stride), + Option("min-instruments", config.filter.min_instruments), + Option("reference-likelihood-file", config.filter.reference_likelihood_file), + Option("channel-name", config_to_channels(config)), + ] + + # disable service discovery if using singularity + if config.condor.singularity_image: + common_opts.append(Option("disable-service-discovery")) + + for time_idx, span in enumerate(time_bins): + start, end = span + for svd_idx, svd_bin in enumerate(svd_bins): + filter_opts = [ + #Option("ht-gate-threshold", config.filter.gate_threshold[svd_bin]), + Option("ht-gate-threshold", config.filter.ht_gate_threshold), + Option("gps-start-time", int(start)), + Option("gps-end-time", int(end)), + ] + filter_opts.extend(common_opts) + + trigger_file = dagutils.T050017_filename(config.ifos, f"{svd_bin}_LLOID", span, '.xml.gz') + dist_stat_file = dagutils.T050017_filename(config.ifos, f"{svd_bin}_DIST_STATS", span, '.xml.gz') + + layer += Node( + arguments = filter_opts, + inputs = [ + Option("frame-cache", config.source.frame_cache), + Option("frame-segments-file", config.source.frame_segments_file), + Option("veto-segments-file", config.filter.vetoes), + Option("reference-psd", dag["reference_psd"].outputs["write-psd"][time_idx]), + Option("svd-bank", dag["svd_bank"].outputs["write-svd"][svd_idx]), + Option("time-slide-file", config.filter.time_slide_file), + ], + outputs = [ + Option("output", trigger_file), + Option("ranking-stat-output", dist_stat_file), + ], + ) + + dag["filter"] = layer + return dag_layer.child_layer(**layer.config(), retries=3) + + +def aggregate_layer(config, dag, dag_layer, time_bins): + layer = Layer("gstlal_inspiral_aggregate", requirements=config.condor.requirements) + + dag["aggregate"] = layer + return dag_layer.child_layer(**layer.config(), retries=3) + + +def config_to_channels(config): + return [f"{ifo}:{channel}" for ifo, channel in config.source.channel_name.items()] + + +@plugins.register +def layers(): + return { + "reference_psd": reference_psd_layer, + "median_psd": median_psd_layer, + "svd_bank": svd_bank_layer, + "filter": filter_layer, + "aggregate": aggregate_layer, + } diff --git a/gstlal/configure.ac b/gstlal/configure.ac index ba11f9fe1ad4416691d4632e4e848eb9a34d7d3f..45de6398ffacc4ccdc21ae6a7480068d26df3394 100644 --- a/gstlal/configure.ac +++ b/gstlal/configure.ac @@ -22,6 +22,8 @@ AC_CONFIG_FILES([ \ lib/gstlal/Makefile \ python/__init__.py \ python/Makefile \ + python/dags/Makefile \ + python/dags/layers/Makefile \ python/pipeparts/Makefile \ python/plots/Makefile \ python/stats/Makefile \ diff --git a/gstlal/python/Makefile.am b/gstlal/python/Makefile.am index f0c58b61b7bd807be075262704ccdcec4d3c67d1..7ab5d47185dece899c6cdd5a65be45eec4c4c4ff 100644 --- a/gstlal/python/Makefile.am +++ b/gstlal/python/Makefile.am @@ -1,4 +1,4 @@ -SUBDIRS = pipeparts plots stats +SUBDIRS = dags pipeparts plots stats AM_CPPFLAGS = -I$(top_srcdir)/lib @@ -11,9 +11,9 @@ pkgpythondir = $(pkgpyexecdir) pkgpython_PYTHON = \ __init__.py \ bottle.py \ + config.py \ coherent_null.py \ dagfile.py \ - dagparts.py \ datasource.py \ elements.py \ httpinterface.py \ @@ -23,6 +23,7 @@ pkgpython_PYTHON = \ pipeio.py \ pipeline.py \ pipeutil.py \ + plugins.py \ reference_psd.py \ servicediscovery.py \ simplehandler.py \ diff --git a/gstlal/python/config.py b/gstlal/python/config.py new file mode 100644 index 0000000000000000000000000000000000000000..3c67fc90f259aa1cda0e47bbdea5b00c2f172042 --- /dev/null +++ b/gstlal/python/config.py @@ -0,0 +1,142 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import List, Tuple, Union + +import yaml + +from lal import LIGOTimeGPS +from ligo.lw import utils as ligolw_utils +from ligo.lw.utils import segments as ligolw_segments +from ligo.segments import segment, segmentlist, segmentlistdict + + +class Config: + """ + Hold configuration used for analyzes. + """ + def __init__(self, **kwargs): + # general options + if isinstance(kwargs["ifos"], list): + self.ifos = kwargs["ifos"] + else: + self.ifos = parse_ifo_string(kwargs["ifos"]) + + # time options + self.start = LIGOTimeGPS(kwargs["start"]) + if "stop" in kwargs: + self.stop = LIGOTimeGPS(kwargs["stop"]) + self.duration = self.stop - self.start + else: + self.duration = kwargs["duration"] + self.stop = self.start + self.duration + self.span = segment(self.start, self.stop) + + if "segments" in kwargs: + xmldoc = ligolw_utils.load_filename( + kwargs["segments"], + contenthandler=ligolw_segments.LIGOLWContentHandler + ) + self.segments = ligolw_segments.segmenttable_get_by_name(xmldoc, "segments").coalesce() + else: + self.segments = segmentlistdict((ifo, segmentlist([self.span])) for ifo in self.ifos) + + # section-specific options + self.source = dotdict(replace_keys(kwargs["source"])) + self.psd = dotdict(replace_keys(kwargs["psd"])) + self.svd = dotdict(replace_keys(kwargs["svd"])) + self.filter = dotdict(replace_keys(kwargs["filter"])) + self.condor = dotdict(replace_keys(kwargs["condor"])) + + @classmethod + def load(cls, path): + """ + Load configuration from a file given a file path. + """ + with open(path, "r") as f: + return cls(**yaml.safe_load(f)) + + +@dataclass +class Argument: + name: str + argument: Union[str, List] + + def vars(self): + if isinstance(self.argument, Iterable) and not isinstance(self.argument, str): + return " ".join(self.argument) + else: + return f"{self.argument}" + + def files(self): + if isinstance(self.argument, Iterable) and not isinstance(self.argument, str): + return ",".join(self.argument) + else: + return f"{self.argument}" + + +@dataclass +class Option: + name: str + argument: Union[None, str, List] = None + + def vars(self): + if not self.argument: + return f"--{self.name}" + elif isinstance(self.argument, Iterable) and not isinstance(self.argument, str): + return " ".join([f"--{self.name} {arg}" for arg in self.argument]) + else: + return f"--{self.name} {self.argument}" + + def files(self): + if not self.argument: + return + elif isinstance(self.argument, Iterable) and not isinstance(self.argument, str): + return ",".join([f"{arg}" for arg in self.argument]) + else: + return f"{self.argument}" + + +class dotdict(dict): + """ + A dictionary supporting dot notation. + + Implementation from https://gist.github.com/miku/dc6d06ed894bc23dfd5a364b7def5ed8. + + """ + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + for k, v in self.items(): + if isinstance(v, dict): + self[k] = dotdict(v) + + +def parse_ifo_string(ifos): + """ + Given a string of IFO pairs (e.g. H1L1), return a list of IFOs. + """ + return [ifos[2*n:2*n+2] for n in range(len(ifos) // 2)] + + +def replace_keys(dict_): + return {k.replace("-", "_"): v for k, v in dict_.items()} diff --git a/gstlal/python/dags/Makefile.am b/gstlal/python/dags/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..5d0454f315976ca6bc3f23e7f13d7550a3a89d2e --- /dev/null +++ b/gstlal/python/dags/Makefile.am @@ -0,0 +1,8 @@ +SUBDIRS = layers + +pkgpythondir = $(pkgpyexecdir) +dagsdir = $(pkgpythondir)/dags + +dags_PYTHON = \ + __init__.py \ + util.py diff --git a/gstlal/python/dags/__init__.py b/gstlal/python/dags/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ceec68b471aa538c1648f13e13cacdf8d91c5560 --- /dev/null +++ b/gstlal/python/dags/__init__.py @@ -0,0 +1,101 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +import os +from typing import Tuple + +import htcondor +from htcondor import dags +import pluggy + +from gstlal import plugins + + +class DAG(dags.DAG): + def __init__(self, config, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = config + self._current_layer = self + self._layers = {} + + def __setitem__(self, key, value): + self._layers[key] = value + + def __getitem__(self, key): + return self._layers[key] + + def write(self, filename, path=None, **kwargs): + write_dag(self, dag_file_name=filename, dag_dir=path, **kwargs) + + @classmethod + def register_layer(cls, layer_name): + """Register a layer to the DAG, making it callable. + """ + def register(func): + def wrapped(self, *args, **kwargs): + self._current_layer = func(self.config, self, self._current_layer, *args, **kwargs) + return self + setattr(cls, layer_name, wrapped) + return register + + +class HexFormatter(dags.SimpleFormatter): + """A hex-based node formatter that produces names like LayerName_000C. + + """ + def __init__(self, offset: int = 0): + self.separator = "." + self.index_format = "{:04X}" + self.offset = offset + + def parse(self, node_name: str) -> Tuple[str, int]: + layer, index = node_name.split(self.separator) + index = int(index, 16) + return layer, index - self.offset + + +def create_log_dir(log_dir="logs"): + os.makedirs(log_dir, exist_ok=True) + + +def write_dag(dag, dag_dir=None, node_name_formatter=None, **kwargs): + if not node_name_formatter: + node_name_formatter = HexFormatter() + if not dag_dir: + dag_dir = os.getcwd() + return htcondor.dags.write_dag(dag, dag_dir, node_name_formatter=node_name_formatter, **kwargs) + + +def _get_registered_layers(): + """Get all registered DAG layers. + """ + # set up plugin manager + manager = pluggy.PluginManager("gstlal") + manager.add_hookspecs(plugins) + + # add all registered plugins to registry + registered = {} + for plugin_name in manager.hook.layers(): + for name, layer in plugin_name.items(): + registered[name] = layer + + return registered + + +# register layers to DAG +for layer_name, layer in _get_registered_layers().items(): + DAG.register_layer(layer_name)(layer) diff --git a/gstlal/python/dags/layers/Makefile.am b/gstlal/python/dags/layers/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..56dd59632d135ccf4c32941b7f84757a33033fae --- /dev/null +++ b/gstlal/python/dags/layers/Makefile.am @@ -0,0 +1,5 @@ +pkgpythondir = $(pkgpyexecdir) +layersdir = $(pkgpythondir)/dags/layers + +layers_PYTHON = \ + __init__.py diff --git a/gstlal/python/dags/layers/__init__.py b/gstlal/python/dags/layers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4e8b683d834d1215c541a609aa0d7d83e6c1278f --- /dev/null +++ b/gstlal/python/dags/layers/__init__.py @@ -0,0 +1,142 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +from collections.abc import Iterable +from dataclasses import dataclass, field +import os +from typing import List, Tuple, Union + +import htcondor + +from gstlal.dags.util import which + + +@dataclass +class Layer: + executable: str + name: str = "" + universe: str = "vanilla" + log_dir: str = "logs" + requirements: dict = field(default_factory=dict) + inputs: dict = field(default_factory=dict) + outputs: dict = field(default_factory=dict) + nodes: list = field(default_factory=list) + + def __post_init__(self): + if not self.name: + self.name = os.path.basename(self.executable) + + def config(self): + # check that nodes are valid + self.validate() + + # add base submit opts + requirements + submit_options = { + "universe": self.universe, + "executable": which(self.executable), + "arguments": self._arguments(), + **self.requirements, + } + + # file submit opts + inputs = self._inputs() + outputs = self._outputs() + if inputs or outputs: + submit_options["should_transfer_files"] = "YES" + submit_options["when_to_transfer_output"] = "ON_SUCCESS" + submit_options["success_exit_code"] = 0 + submit_options["preserve_relative_paths"] = True + if inputs: + submit_options["transfer_inputs"] = inputs + if outputs: + submit_options["transfer_outputs"] = outputs + + # log submit opts + submit_options["output"] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).out" + submit_options["error"] = f"{self.log_dir}/$(nodename)-$(cluster)-$(process).err" + + # extra boilerplate submit opts + submit_options["notification"] = "never" + + return { + "name": self.name, + "submit_description": htcondor.Submit(submit_options), + "vars": self._vars(), + } + + def append(self, node): + for input_ in node.inputs: + self.inputs.setdefault(input_.name, []).append(input_.argument) + for output in node.outputs: + self.outputs.setdefault(output.name, []).append(output.argument) + self.nodes.append(node) + + def extend(self, nodes): + for node in nodes: + self.append(node) + + def __iadd__(self, nodes): + if isinstance(nodes, Iterable): + self.extend(nodes) + else: + self.append(nodes) + return self + + def validate(self): + assert self.nodes, "at least one node must be connected to this layer" + + # check arg names across nodes are equal + args = [arg.name for arg in self.nodes[0].arguments] + for node in self.nodes[:-1]: + assert args == [arg.name for arg in node.arguments] + + # check input/output names across nodes are equal + inputs = [arg.name for arg in self.nodes[0].inputs] + for node in self.nodes[:-1]: + assert inputs == [arg.name for arg in node.inputs] + outputs = [arg.name for arg in self.nodes[0].outputs] + for node in self.nodes[:-1]: + assert outputs == [arg.name for arg in node.outputs] + + def _arguments(self): + return " ".join([f"$({arg.name})" for arg in self.nodes[0].arguments]) + + def _inputs(self): + return ",".join([f"$(input_{arg.name})" for arg in self.nodes[0].inputs]) + + def _outputs(self): + return ",".join([f"$(output_{arg.name})" for arg in self.nodes[0].outputs]) + + def _vars(self): + allvars = [] + for i, node in enumerate(self.nodes): + nodevars = {arg.name: arg.vars() for arg in node.arguments} + nodevars["nodename"] = f"{self.name}_{i:04X}" + if node.inputs: + nodevars.update({f"input_{arg.name}": arg.files() for arg in node.inputs}) + if node.outputs: + nodevars.update({f"output_{arg.name}": arg.files() for arg in node.outputs}) + allvars.append(nodevars) + + return allvars + + +@dataclass +class Node: + arguments: list = field(default_factory=list) + inputs: list = field(default_factory=list) + outputs: list = field(default_factory=list) diff --git a/gstlal/python/dagparts.py b/gstlal/python/dags/util.py similarity index 85% rename from gstlal/python/dagparts.py rename to gstlal/python/dags/util.py index 051bf50554b26fab48cc94d88adc82e212120314..6cebca9fd6ba0d3a814f8b9936ee38488ad26d7b 100644 --- a/gstlal/python/dagparts.py +++ b/gstlal/python/dags/util.py @@ -1,5 +1,6 @@ # Copyright (C) 2010 Kipp Cannon (kipp.cannon@ligo.org) -# Copyright (C) 2010 Chad Hanna (chad.hanna@ligo.org) +# Copyright (C) 2010 Chad Hanna (chad.hanna@ligo.org) +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software @@ -43,6 +44,8 @@ import socket import subprocess import tempfile +import numpy + from ligo import segments from lal.utils import CacheEntry @@ -344,6 +347,61 @@ def breakupseglists(seglists, maxextent, overlap): seglists[instrument] = newseglist +def partition_by_time(span, segdict, ifos, min_ifos=1, max_livetime=14440, start_pad=512): + """! + Splits a time span roughly equally based on livetime. + """ + # get segments for all ifo combinations requested and take union + segdict_by_combo = segments.segmentlistdict() + ifo_combos = flatten(itertools.combinations(ifos, n) for n in range(min_ifos, len(ifos))) + ifo_combos = [frozenset(ifo_combo) for ifo_combo in ifo_combos] + for ifo_combo in ifo_combos: + segdict_by_combo[ifo_combo] = segdict.intersection(ifo_combo) + all_segs = segdict_by_combo.union(ifo_combos) & segments.segmentlist([span]) + + # split equally into bins + num_bins = int(numpy.ceil(float(abs(all_segs) / max_livetime))) + time_bins = [segments.segmentlist() for i in range(num_bins)] + + # calculate livetime for each bin_, ensuring + # start, end edges fall on integer boundaries + small_bin, remainder = divmod(float(abs(all_segs)), num_bins) + big_bin = small_bin + remainder + bin_livetime = [big_bin if n == 0 else small_bin for n in range(num_bins)] + + # determine bins + bin_ = 0 + for seg in all_segs: + # add entire segment to current bin_ if livetime doesn't spill over + current_livetime = abs(time_bins[bin_]) + if current_livetime + abs(segments.segmentlist([seg])) <= bin_livetime[bin_]: + time_bins[bin_] |= segments.segmentlist([seg]) + + # otherwise, split segment and put spill-over into next bin(s) + else: + diff_livetime = bin_livetime[bin_] - current_livetime + needed_seg = segments.segmentlist([segments.segment(seg[0], seg[0] + diff_livetime)]) + time_bins[bin_] |= needed_seg + + # if segment is still too big, keep splitting until it isn't + remainder = segments.segmentlist([segments.segment(seg[0] + diff_livetime, seg[1])]) + while abs(remainder) > bin_livetime[bin_]: + remainder_start = remainder[0][0] + remainder_mid = remainder[0][0] + bin_livetime[bin_] + time_bins[bin_+1] |= segments.segmentlist([segments.segment(remainder_start, remainder_mid)]) + remainder = segments.segmentlist([segments.segment(remainder_mid, seg[1])]) + bin_ += 1 + + # divvy up final piece + if bin_ < num_bins - 1: + bin_ += 1 + time_bins[bin_] |= remainder + + # calculate start/end times from each bin and pad accordingly + half_pad = start_pad / 2 + return [segs.extent().protract(half_pad).shift(-half_pad) for segs in time_bins] + + # # ============================================================================= # @@ -364,6 +422,14 @@ def cache_to_instruments(cache): return ''.join(sorted(list(observatories))) +def gps_directory(gpstime): + """! + Given a gps time, returns the directory name where files corresponding + to this time will be written to, e.g. 1234567890 -> '12345'. + """ + return str(int(gpstime))[:5] + + def T050017_filename(instruments, description, seg, extension, path = None): """! A function to generate a T050017 filename. @@ -449,8 +515,3 @@ def flatten(lst): Flatten a list by one level of nesting. """ return list(itertools.chain.from_iterable(lst)) - - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/gstlal/python/plugins.py b/gstlal/python/plugins.py new file mode 100644 index 0000000000000000000000000000000000000000..68b8acb60c527c5d5acfff56ec24f76ab20ec013 --- /dev/null +++ b/gstlal/python/plugins.py @@ -0,0 +1,32 @@ +# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org) +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +import pluggy + + +# hooks for registering plugins +register = pluggy.HookimplMarker("gstlal") +specification = pluggy.HookspecMarker("gstlal") + + +@specification +def layers(): + """ + This plugin spec is used to return DAG layers in the form: + {"layer-name": layer_func} + + """