-
Evan Goetz authoredEvan Goetz authored
lalpulsar_MakeSFTDAG.py 32.18 KiB
# Copyright (C) 2013, 2014, 2020--2022 Evan Goetz
# Copyright (C) 2011, 2021, 2022 Karl Wette
# Copyright (C) 2005, 2007 Gregory Mendell
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with with program; see the file COPYING. If not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
"""Creates DAGs to run jobs that generates SFTs"""
import math
import argparse
import os
import re
from lalpulsar import git_version
__author__ = 'Evan Goetz <evan.goetz@ligo.org>, Greg Mendell'
__version__ = git_version.id
__date__ = git_version.date
# REVISIONS:
# 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
# PWD, with log files based subLogPath and dag filename.
# 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
# on this many GPS digits.
# 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
# SFT description field in the filename.
# 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
# these.
# 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
# Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
# Hann window.
# 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
# use -P 0.5 with -w 3 Hann windows; default is 0.0)
# 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
# default is version 1 SFTs; 2 = version 2 SFTs.
# 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
# 12/28/05 gam; Remove sample rate option
# 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
# file name.
# 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
# to specify IFO name.
# 07/24/07 gam; Add in -q option to read in list of nodes on which to output
# SFTs, -Q option to give node path, and -R option for number of
# jobs per node.
# 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
# 07/24/14 eag; Change default to version 2 SFTs
# 12/2020 eag; Update script to conform to modern python3 and pep8
# 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
# 10/2022 kww; Deprecate options that have been removed from MakeSFTs
# 10/2022 kww; Parse window type as a string, parameter separated by colon
# 10/2022 kww; Merge -O and -o log path options to free up -O option
# 10/2022 kww; Implement public SFT file naming convention
# 11/2022 kww; -R command line option now used for --observing-revision
# instead of --output-jobs-per-node, which now uses -r
# 11/2022 kww; --datafind-path and --makesfts-path accept executable names
#
# FUNCTION THAT WRITE ONE JOB TO DAG FILE
#
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args):
datafind = f'datafind_{nodeCount}'
MakeSFTs = f'MakeSFTs_{nodeCount}'
startTimeDatafind = startTimeThisNode - args.extra_datafind_time
endTimeDatafind = endTimeThisNode + args.extra_datafind_time
tagStringOut = f'{args.tag_string}_{nodeCount}'
if args.cache_file:
cacheFile = args.cache_file
else:
cacheFile = f'{args.cache_path}/{site}-{startTimeDatafind}-{endTimeDatafind}.cache'
argList = []
argList.append(f'-O {args.observing_run}')
if args.observing_run > 0:
argList.append(f'-K {args.observing_kind}')
argList.append(f'-R {args.observing_revision}')
elif args.misc_desc:
argList.append(f'-X {args.misc_desc}')
argList.append(f'-f {args.filter_knee_freq}')
argList.append(f'-t {args.time_baseline}')
argList.append(f'-p {args.output_sft_path}')
argList.append(f'-C {cacheFile}')
argList.append(f'-s {startTimeThisNode}')
argList.append(f'-e {endTimeThisNode}')
argList.append(f'-N {args.channel_name}')
argList.append(f'-F {args.start_freq}')
argList.append(f'-B {args.band}')
if args.comment_field:
argList.append(f'-c {args.comment_field}')
if args.window_type:
if ':' in args.window_type:
window_type, window_param = args.window_type.split(':')
argList.append(f'-w {window_type} -r {window_param}')
else:
argList.append(f'-w {args.window_type}')
if args.overlap_fraction:
argList.append(f'-P {args.overlap_fraction}')
argStr = ' '.join(argList)
# gw_data_find job
if not args.cache_file:
dagFID.write(f'JOB {datafind} datafind.sub\n')
dagFID.write(f'RETRY {datafind} 10\n')
dagFID.write(f'VARS {datafind} gpsstarttime="{startTimeDatafind}" gpsendtime="{endTimeDatafind}" observatory="{site}" inputdatatype="{args.input_data_type}" tagstring="{tagStringOut}"\n')
# MakeSFT job
dagFID.write(f'JOB {MakeSFTs} MakeSFTs.sub\n')
dagFID.write(f'RETRY {MakeSFTs} 5\n')
dagFID.write(f'VARS {MakeSFTs} argList="{argStr}" tagstring="{tagStringOut}"\n')
if not args.cache_file:
dagFID.write(f'PARENT {datafind} CHILD {MakeSFTs}\n')
#
# MAIN CODE START HERE
#
parser = argparse.ArgumentParser(
description='This script creates datafind.sub, MakeSFTs.sub, and a dag \
file that generates SFTs based on the options given.',
fromfile_prefix_chars='@')
parser.add_argument('-O', '--observing-run', required=True, type=int,
help='For public SFTs, observing run data the SFTs are generated from, or \
(in the case of mock data challenge data) the observing \
run on which the data is most closely based')
parser.add_argument('-K', '--observing-kind', type=str, choices=['RUN', 'AUX', 'SIM', 'DEV'],
help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
"AUX" for SFTs of non-h(t) channels; \
"SIM" for mock data challenge or other simulated data; or \
"DEV" for development/testing purposes')
parser.add_argument('-R', '--observing-revision', type=int,
help='For public SFTs: revision number starts at 1, and should be incremented once \
SFTs have been widely distributed across clusters, advertised \
as being ready for use, etc. For example, if mistakes are found \
in the initial SFT production run after they have been published, \
regenerated SFTs should have a revision number of at least 2')
parser.add_argument('-X', '--misc-desc', type=str,
help='For private SFTs, miscellaneous part of the SFT \
description field in the filename')
parser.add_argument('-a', '--analysis-start-time', type=int,
help='GPS start time of data from which to generate \
SFTs (optional and unused if a segment file is given)')
parser.add_argument('-b', '--analysis-end-time', type=int,
help='GPS end time of data from which to generate SFTs \
(optional and unused if a segment file is given)')
parser.add_argument('-f', '--dag-file', required=True, type=str,
help='filename for .dag file (should end in .dag)')
parser.add_argument('-G', '--tag-string', required=True, type=str,
help='tag string used in names of various files unique to \
jobs that will run under the DAG')
parser.add_argument('-d', '--input-data-type', required=True, type=str,
help='input data type for use with the gw_data_find --type \
option')
parser.add_argument('-x', '--extra-datafind-time', type=int, default=0,
help='extra time to subtract/add from/to start/end time \
arguments of gw_data_find')
parser.add_argument('-M', '--datafind-match', type=str,
help='string to use with the gw_data_find --match option')
parser.add_argument('-y', '--synchronize-start', action='store_true',
help='synchronize the start times of the SFTs so that the \
start times are synchronized when there are gaps in the \
data')
parser.add_argument('-k', '--filter-knee-freq', required=True, type=int,
help='high pass filter knee frequency used on time domain \
data before generating SFTs')
parser.add_argument('-T', '--time-baseline', required=True, type=int,
help='time baseline of SFTs (e.g., 60 or 1800 seconds)')
parser.add_argument('-p', '--output-sft-path', required=True, type=str,
help='path to output SFTs')
parser.add_argument('-C', '--cache-path', type=str, default='cache',
help='path to cache files that will be produced by \
gw_data_find (default is $PWD/cache; this directory is \
created if it does not exist and must agree with that \
given in .sub files)')
parser.add_argument('-e', '--cache-file', type=str,
help='path and filename to frame cache file to use instead \
of gw_data_find')
parser.add_argument('-o', '--log-path', type=str, default='logs',
help='path to log, output, and error files (default \
is $PWD/logs; this directory is created if it does not \
exist and usually should be under a local file system)')
parser.add_argument('-N', '--channel-name', required=True, type=str,
help='name of input time-domain channel to read from \
frames')
parser.add_argument('-c', '--comment-field', type=str,
help='comment for SFT header')
parser.add_argument('-F', '--start-freq', type=int, default=10,
help='start frequency of the SFTs')
parser.add_argument('-B', '--band', type=int, default=1990,
help='frequency band of the SFTs')
parser.add_argument('-w', '--window-type', type=str,
help='type of windowing of time-domain to do \
before generating SFTs, e.g. "rectangular", \
"hann", "tukey:<beta in [0,1], required>"; \
if unspecified use lalpulsar_MakeSFTs defaults')
parser.add_argument('-P', '--overlap-fraction', type=float, default=0,
help='overlap fraction (for use with windows; e.g., use \
--overlap-fraction 0.5 with --window-type hann windows)')
parser.add_argument('-m', '--max-num-per-node', type=int, default=1,
help='maximum number of SFTs to generate on one node')
parser.add_argument('-L', '--max-length-all-jobs', type=int,
help='maximum total amount of data to process, in seconds \
(optional and unused if a segment file is given)')
parser.add_argument('-g', '--segment-file', type=str,
help='alternative file with segments to use, rather than \
the input times')
parser.add_argument('-l', '--min-seg-length', type=int, default=0,
help='minimum length segments to process in seconds (used \
only if a segment file is given)')
parser.add_argument('-q', '--list-of-nodes', type=str,
help='file with list of nodes on which to output SFTs')
parser.add_argument('-Q', '--node-path', type=str,
help='path to nodes to output SFTs; the node name is \
appended to this path, followed by path given by the -p \
option; for example, if -q point to file with the list \
node1 node2 ... and the -Q /data/ -p /frames/S5/sfts/LHO \
options are given, the first output file will go into \
/data/node1/frames/S5/sfts/LHO; the next node in the list \
is used in constructing the path when the number of jobs \
given by the -r option reached, and so on')
parser.add_argument('-r', '--output-jobs-per-node', type=int, default=0,
help='number of jobs to output per node in the list of \
nodes given with the -q option')
parser.add_argument('-j', '--datafind-path', type=str,
help='string specifying the gw_data_find executable, \
or a path to it; if not set, will use \
LSC_DATAFIND_PATH env variable or system default (in \
that order)')
parser.add_argument('-J', '--makesfts-path', type=str,
help='string specifying the lalpulsar_MakeSFTs executable, \
or a path to it; if not set, will use \
MAKESFTS_PATH env variable or system default (in that \
order)')
parser.add_argument('-Y', '--request-memory', type=int, default=2048,
help='memory allocation in MB to request from condor for \
lalpulsar_MakeSFTs step')
parser.add_argument('-s', '--request-disk', type=int, default=1024,
help='disk space allocation in MB to request from condor \
for lalpulsar_MakeSFTs step')
parser.add_argument('-A', '--accounting-group', required=True, type=str,
help='Condor tag for the production of SFTs')
parser.add_argument('-U', '--accounting-group-user', required=True, type=str,
help='albert.einstein username (do not add @LIGO.ORG)')
##### DEPRECATED OPTIONS #####
class DeprecateAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
parser.error(f'Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs')
parser.add_argument('-u', '--frame-struct-type', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer required; \
the frame channel type is determined automatically')
parser.add_argument('-H', '--use-hot', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer required; \
the frame channel type is determined automatically')
parser.add_argument('-i', '--ifo', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer required; \
the detector prefix is deduced from the channel name')
parser.add_argument('-D', '--make-gps-dirs', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer supported')
parser.add_argument('-Z', '--make-tmp-file', nargs=0, action=DeprecateAction,
help='DEPRECATED. Default behaviour')
parser.add_argument('-v', '--sft-version', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer supported')
parser.add_argument('-S', '--use-single', nargs=0, action=DeprecateAction,
help='DEPRECATED. No longer supported')
args = parser.parse_args()
# Some basic argument value checking
if args.observing_run < 0:
raise argparse.error('--observing-run must be >= 0')
if args.observing_run > 0 and not args.observing_kind:
raise argparse.error('--observing-run requires --observing-kind')
if args.observing_run > 0 and not args.observing_revision:
raise argparse.error('--observing-run requires --observing-revision')
if args.observing_revision and args.observing_revision <= 0:
raise argparse.error('--observing-revision must be > 0')
if args.observing_run > 0 and args.misc_desc:
raise argparse.error(f'--observing-run={args.observing_run} incompatible with --misc-desc')
if args.misc_desc and not re.compile(r'^[A-Za-z0-9]+$').match(args.misc_desc):
raise argparse.error('--misc-desc may only contain A-Z, a-z, 0-9 characters')
if args.extra_datafind_time < 0:
raise argparse.error('--extra-datafind-time must be >= 0')
if args.filter_knee_freq < 0:
raise argparse.error('--filter-knee-freq must be >= 0')
if args.time_baseline <= 0:
raise argparse.error('--time-baseline must be > 0')
if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
raise argparse.error('--overlap-fraction must be in the range [0,1)')
if args.start_freq < 0.0 or args.start_freq >= 7192.0:
raise argparse.error('--start-freq must be in the range [0,7192)')
if args.band <= 0 or args.band >= 8192.0:
raise argparse.error('--band must be in the range (0,8192)')
if args.start_freq + args.band >= 8192.0:
raise argparse.error('--start-freq + --band must be < 8192')
if args.max_num_per_node <= 0:
raise argparse.error('--max-num-per-node must be > 0')
# Set the data find executable and lalpulsar_MakeSFTs executable
dataFindExe = 'gw_data_find'
if args.datafind_path:
if os.path.isfile(args.datafind_path):
dataFindExe = args.datafind_path
else:
dataFindExe = os.path.join(args.datafind_path, dataFindExe)
elif 'LSC_DATAFIND_PATH' in os.environ:
dataFindExe = os.path.join('$ENV(LSC_DATAFIND_PATH)', dataFindExe)
else:
dataFindExe = os.path.join('/usr/bin', dataFindExe)
makeSFTsExe = 'lalpulsar_MakeSFTs'
if args.makesfts_path:
if os.path.isfile(args.makesfts_path):
makeSFTsExe = args.makesfts_path
else:
makeSFTsExe = os.path.join(args.makesfts_path, makeSFTsExe)
elif 'MAKESFTS_PATH' in os.environ:
makeSFTsExe = os.path.join('$ENV(MAKESFTS_PATH)', makeSFTsExe)
else:
makeSFTsExe = os.path.join('@LALSUITE_BINDIR@', makeSFTsExe)
# try and make a directory to store the cache files and job logs
try: os.mkdir(args.log_path)
except: pass
if not args.cache_file:
try: os.mkdir(args.cache_path)
except: pass
# Check if list of nodes is given, on which to output SFTs.
nodeList = []
useNodeList = False
savedOutputSFTPath = None
if args.list_of_nodes is not None:
if args.node_path is None:
raise argparse.error(
'Node file list given, but no node path specified')
if args.output_jobs_per_node < 1:
raise argparse.error(
'Node file list given, but invalid output jobs per node specified')
with open(args.list_of_nodes) as fp_nodelist:
for idx, line in enumerate(fp_nodelist):
splitLine = line.split()
nodeList.append(splitLine[0])
if len(nodeList) < 1:
raise ValueError('No nodes found in node list file: {}'.format(
args.list_of_nodes))
# Set flag to use list of nodes in constructing output files
useNodeList = True
savedOutputSFTPath = args.output_sft_path
# END if (args.list_of_nodes != None)
# Check if segment file was given, else set up one segment from the command line
segList = []
adjustSegExtraTime = False
if args.segment_file is not None:
if args.min_seg_length < 0:
raise argparse.error('--min-seg-length must be >= 0')
# the next flag causes extra time that cannot be processes to be trimmed
# from the start and end of a segment
adjustSegExtraTime = True
with open(args.segment_file) as fp_segfile:
for idx, line in enumerate(fp_segfile):
splitLine = line.split()
oneSeg = []
oneSeg.append(int(splitLine[0]))
oneSeg.append(int(splitLine[1]))
if (oneSeg[1] - oneSeg[0]) >= args.min_seg_length:
segList.append(oneSeg)
if len(segList) < 1:
raise ValueError('No segments found in segment file: {}'.format(
args.segment_file))
else:
if args.analysis_start_time is None:
raise argparse.error(
'--analysis-start-time must be specified if no segment file is \
given')
if args.analysis_end_time is None:
raise argparse.error(
'--analysis-start-time must be specified if no segment file is \
given')
if args.max_length_all_jobs is None:
raise argparse.error(
'--max-length-all-jobs must be specified if no segment file is \
given')
# Make sure not to exceed maximum allow analysis
if args.analysis_end_time > (args.analysis_start_time +
args.max_length_all_jobs):
args.analysis_end_time = args.analysis_start_time + \
args.max_length_all_jobs
oneSeg = [];
oneSeg.append(args.analysis_start_time);
oneSeg.append(args.analysis_end_time);
segList.append(oneSeg);
# END if (args.segment_file != None)
# Get the IFO site, which is the first letter of the channel name.
site = args.channel_name[0]
# initialize count of nodes
nodeCount = 0
# Create .sub files
path_to_dag_file = os.path.dirname(args.dag_file)
dag_filename = os.path.basename(args.dag_file)
datafind_sub = os.path.join(path_to_dag_file, 'datafind.sub')
makesfts_sub = os.path.join(path_to_dag_file, 'MakeSFTs.sub')
# create datafind.sub
if not args.cache_file:
with open(datafind_sub, 'w') as datafindFID:
datafindLogFile = f'{args.log_path}/datafind_{dag_filename}.log'
datafindFID.write('universe = vanilla\n')
datafindFID.write(f'executable = {dataFindExe}\n')
if not args.datafind_match:
dataFindMatchString = ''
else:
dataFindMatchString = f'--match {args.datafind_match}'
datafindFID.write('arguments = -r $ENV(LIGO_DATAFIND_SERVER) ')
datafindFID.write('--observatory $(observatory) --url-type file ')
datafindFID.write('--gps-start-time $(gpsstarttime) ')
datafindFID.write('--gps-end-time $(gpsendtime) --lal-cache --gaps ')
datafindFID.write(f'--type $(inputdatatype) {dataFindMatchString}\n')
datafindFID.write('getenv = True\n')
datafindFID.write('request_disk = 5MB\n')
datafindFID.write(f'accounting_group = {args.accounting_group}\n')
datafindFID.write(f'accounting_group_user = {args.accounting_group_user}\n')
datafindFID.write(f'log = {datafindLogFile}\n')
datafindFID.write(f'error = {args.log_path}/datafind_$(tagstring).err\n')
datafindFID.write(f'output = {args.cache_path}/')
datafindFID.write('$(observatory)-$(gpsstarttime)-$(gpsendtime).cache\n')
datafindFID.write('notification = never\n')
datafindFID.write('queue 1\n')
# create MakeSFTs.sub
with open(makesfts_sub, 'w') as MakeSFTsFID:
MakeSFTsLogFile = '{}/MakeSFTs_{}.log'.format(args.log_path,
dag_filename)
MakeSFTsFID.write('universe = vanilla\n')
MakeSFTsFID.write('executable = {}\n'.format(makeSFTsExe))
MakeSFTsFID.write('arguments = $(argList)\n')
MakeSFTsFID.write('getenv = True\n')
MakeSFTsFID.write('accounting_group = {}\n'.format(args.accounting_group))
MakeSFTsFID.write('accounting_group_user = {}\n'.format(
args.accounting_group_user))
MakeSFTsFID.write('log = {}\n'.format(MakeSFTsLogFile))
MakeSFTsFID.write('error = {}/MakeSFTs_$(tagstring).err\n'.format(
args.log_path))
MakeSFTsFID.write('output = {}/MakeSFTs_$(tagstring).out\n'.format(
args.log_path))
MakeSFTsFID.write('notification = never\n')
MakeSFTsFID.write(f'request_memory = {args.request_memory}MB\n')
MakeSFTsFID.write(f'request_disk = {args.request_disk}MB\n')
MakeSFTsFID.write('RequestCpus = 1\n')
MakeSFTsFID.write('queue 1\n')
# create the DAG file with the jobs to run
with open(args.dag_file, 'w') as dagFID:
startTimeAllNodes = None
firstSFTstartTime = 0 # need this for the synchronized start option
nodeListIndex = 0
# Loop over the segment list to generate the SFTs for each segment
for seg in segList:
# Each segment in the segList runs on one or more nodes;
# initialize the number SFTs produced by the current node:
numThisNode = 0
numThisSeg = 0
# Case 1: a segment file was given but the SFTs do not need their
# start times to be synchronized
if (adjustSegExtraTime and not args.synchronize_start):
segStartTime = seg[0]
segEndTime = seg[1]
# First we figure out how much extra time is in the segment so that
# SFTs are fit within the segment:
# |..<SFT><SFT><SFT>..|
# where the .. represent the extra time in the segment
# The amount of extra time in a segment is given as the remainder
# of (total segment time) / (SFT time baseline)
segExtraTime = (segEndTime - segStartTime) % args.time_baseline
# If there is overlap of SFTs requested, then we compute the extra
# time as:
# the remainder of (end - start - Tsft) / (non-overlap time)
# provided there was at least one SFT that is in the segment
if args.overlap_fraction != 0.0:
if (segEndTime - segStartTime) > args.time_baseline:
segExtraTime = \
(segEndTime - segStartTime - args.time_baseline) \
% int((1.0 - args.overlap_fraction)*args.time_baseline)
# We'll add half the extra time to the start of the SFTs to be
# created in this segment and half at the end
segExtraStart = int(segExtraTime / 2)
segExtraEnd = segExtraTime - segExtraStart
args.analysis_start_time = segStartTime + segExtraStart
# This shift may have pushed past the end time of the segment. In
# that case, just fix the start time to the end time of the segment
if args.analysis_start_time > segEndTime:
args.analysis_start_time = segEndTime
# shifting the end time by the other portion of the extra time
# amount ...
args.analysis_end_time = segEndTime - segExtraEnd
# Again, this shift could have pushed the end time beyond the start
# of the segment, so just fix the end time to the segment start
if args.analysis_end_time < segStartTime:
args.analysis_end_time = segStartTime
# Case 2: SFTs need a synchronized start. This is a special case for
# methods like TwoSpect, where signal periodicity spacing must be
# maintained
elif (args.synchronize_start):
segStartTime = seg[0]
segEndTime = seg[1]
# If we haven't set the first SFT start time, then set it equal to
# the start time of the first segment
if firstSFTstartTime == 0:
firstSFTstartTime = segStartTime
# This is a tricky bit of math to set the start time based on when
# the first SFT start time of all the segments
args.analysis_start_time = \
int(round(math.ceil((segStartTime - firstSFTstartTime)/((
1.0 - args.overlap_fraction)*args.time_baseline))*(
1.0 - args.overlap_fraction)*args.time_baseline)) + \
firstSFTstartTime
# This shift may have pushed past the end time of the segment. In
# that case, just fix the start time to the end time of the segment
if args.analysis_start_time > segEndTime:
args.analysis_start_time = segEndTime
# This is a tricky bit of math to set the end time based on when
# the first SFT start time of all the segments
args.analysis_end_time = \
int(round(math.floor((
segEndTime - args.analysis_start_time -
args.time_baseline)/((
1.0 - args.overlap_fraction)*args.time_baseline))*\
(1.0 - args.overlap_fraction)*\
args.time_baseline)) + args.time_baseline + \
args.analysis_start_time
# Again, this shift could have pushed the end time beyond the start
# of the segment, so just fix the end time to the segment start
if args.analysis_end_time < segStartTime:
args.analysis_end_time = segStartTime
# If no segment file given and no synchronized starts, just set the
# start time and end time to the segment start and end
else:
args.analysis_start_time = seg[0]
args.analysis_end_time = seg[1]
# Loop through the analysis time; make sure no more than
# args.max_num_per_node SFTs are produced by any one node
startTimeThisNode = args.analysis_start_time
endTimeThisNode = args.analysis_start_time
endTimeAllNodes = args.analysis_start_time
while (endTimeAllNodes < args.analysis_end_time):
# increment endTimeAllNodes by the args.time_baseline until we get
# past the args.analysis_end_time
if args.overlap_fraction != 0.0:
# handle overlap
if numThisSeg == 0:
endTimeAllNodes = endTimeAllNodes + args.time_baseline
else:
endTimeAllNodes = endTimeAllNodes + \
int((1.0 - args.overlap_fraction)*args.time_baseline)
else:
# default case, no overlap
endTimeAllNodes = endTimeAllNodes + args.time_baseline
if (endTimeAllNodes <= args.analysis_end_time):
# increment the number of SFTs output from this node, and
# update the end time this node.
numThisNode = numThisNode + 1
numThisSeg = numThisSeg + 1
endTimeThisNode = endTimeAllNodes
if (numThisNode < args.max_num_per_node):
continue
else:
# write jobs to dag for this node
nodeCount = nodeCount + 1
if (useNodeList):
args.output_sft_path = args.node_path + \
nodeList[nodeListIndex] + savedOutputSFTPath
if ((nodeCount % args.output_jobs_per_node) == 0):
nodeListIndex = nodeListIndex + 1
# END if ((nodeCount % args.output_jobs_per_node) == 0L)
# END if (useNodeList)
if (nodeCount == 1):
startTimeAllNodes = startTimeThisNode
writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args)
# Update for next node
numThisNode = 0
if args.overlap_fraction != 0.0:
# handle overlap
startTimeThisNode = endTimeThisNode - \
int((args.overlap_fraction)*args.time_baseline)
else:
# default case, no overlap
startTimeThisNode = endTimeThisNode
else:
# we are at or past the args.analysis_end_time; output job for last
# node if needed.
if (numThisNode > 0):
# write jobs to dag for this node
nodeCount = nodeCount + 1
if (useNodeList):
args.output_sft_path = args.node_path + \
nodeList[nodeListIndex] + savedOutputSFTPath
if ((nodeCount % args.output_jobs_per_node) == 0):
nodeListIndex = nodeListIndex + 1
# END if ((nodeCount % args.output_jobs_per_node) == 0L)
# END if (useNodeList)
if (nodeCount == 1):
startTimeAllNodes = startTimeThisNode
writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args)
# END while (endTimeAllNodes < args.analysis_end_time)
# END for seg in segList
# Close the DAG file
# Update actual end time of the last job and print out the times all jobs will run on:
endTimeAllNodes = endTimeThisNode
if startTimeAllNodes is None:
raise Exception('The startTimeAllNodes == none; \
the DAG file contains no jobs!')
if (endTimeAllNodes <= startTimeAllNodes):
raise Exception('The endTimeAllNodes <= startTimeAllNodes; \
the DAG file contains no jobs!')
print(startTimeAllNodes, endTimeAllNodes)