Skip to content
Snippets Groups Projects
Commit 73c99064 authored by Kipp Cannon's avatar Kipp Cannon
Browse files

gstlal: add gstlal_dagfile_rerun_relatives

parent ac4d0d75
No related branches found
No related tags found
No related merge requests found
Pipeline #88446 passed with warnings
dist_bin_SCRIPTS = \
gstlal_asd_txt_from_psd_xml \
gstlal_dagfile_rerun_relatives \
gstlal_fake_frames \
gstlal_fake_frames_pipe \
gstlal_launch \
......
#!/usr/bin/python
#
# Copyright (C) 2019 Kipp Cannon
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
from __future__ import print_function
from optparse import OptionParser
import sys
from gstlal import __version__, __date__
from gstlal import dagfile
__author__ = "Kipp Cannon <kipp.cannon@ligo.org>"
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
def parse_command_line():
parser = OptionParser(
version = "Name: %%prog\n%s" % __version__,
usage = "%prog [options] node1 [node2 ...] <old_dag >new_dag",
description = "%prog constructs a DAG to re-run relatives of DAG nodes named on the command line. The scenario this is intended to address is the situation in which one or more of the parents of a job malfunction but exit with a success code (0). This can occur, for example, during a filesystem malfunction, where the job believes it wrote its output files to disk successfully, it exits with a success code, but the files never show up on the server. Because the job exited with a success code, dagman marks it done, and submitting the rescue DAG will not retry the job. This tool can be used to automate the process of rerunning one or more selected jobs, as well as their ancestor and/or descendent graphs. The old DAG is read from stdin. The new one is written to stdout."
)
parser.add_option("--themselves", "-t", action = "store_true", help = "Rerun the named nodes.")
parser.add_option("--ancestors-of", "-a", action = "store_true", help = "Rerun the ancestors of the named nodes.")
parser.add_option("--descendants-of", "-d", action = "store_true", help = "Rerun the descendents of the named nodes.")
parser.add_option("--verbose", "-v", action = "store_true", help = "Be verbose.")
options, nodenames = parser.parse_args()
#
# check that there's something to do
#
if not (options.themselves or options.ancestors_of or options.descendants_of):
raise ValueError("nothing to do!")
if options.ancestors_of and options.descendants_of and not options.themselves:
raise ValueError("cowardly refusing to rerun both the parents and children of the named nodes without also rerunning the named nodes themselves. must include --themselves when both --ancestors-of and --descendants-of have been selected.")
#
# uniqueify the node names
#
nodenames = set(nodenames)
#
# done
#
return options, nodenames
#
# =============================================================================
#
# Process DAG
#
# =============================================================================
#
#
# command line
#
options, nodenames = parse_command_line()
#
# read original dag from stdin
#
if options.verbose:
def progress(f, n, done):
print("reading original dag from stdin ... %d lines\r" % n, end=' ', file=sys.stderr)
if done:
print(file=sys.stderr)
else:
progress = None
dag = dagfile.DAG.parse(sys.stdin, progress = progress)
if not nodenames.issubset(set(dag.nodes)):
raise ValueError("node(s) %s not found in dag" % ", ".join(sorted(nodenames - set(dag.nodes))))
#
# extract graph
#
if options.verbose:
print("extracting graph ...", file=sys.stderr)
names_to_rerun = set()
if options.ancestors_of:
names_to_rerun |= dag.get_all_parent_names(nodenames)
if options.descendants_of:
names_to_rerun |= dag.get_all_child_names(nodenames)
if options.themselves:
names_to_rerun |= nodenames
assert names_to_rerun # must not be empty
dag = dagfile.DAG.select_nodes_by_name(dag, names_to_rerun)
#
# set nodes to not done
#
if options.verbose:
print("setting job states to not-done ...", file=sys.stderr)
for nodename, node in dag.nodes.items():
node.done = False
#
# write new dag to stdout
#
if options.verbose:
def progress(f, n, done):
print("writing new dag to stdout ... %d lines\r" % n, end=' ', file=sys.stderr)
if done:
print(file=sys.stderr)
else:
progress = None
dag.write(sys.stdout, progress = progress)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment