Skip to content
Snippets Groups Projects

Changes to BayesWaveCleanFrame (and bayeswave_pipe) to work with scitokens

3 files
+ 341
192
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -173,13 +173,19 @@ def x509_manipulation(workdir):
"""
Copy x509 proxy cert to the working directory and return its location
"""
print("---------------------------------------------------------\n")
print("Warning!!\n")
print("It looks like you're still using X509 proxy.\n")
print("Please consider using scitokens instead\n")
print("see: https://computing.docs.ligo.org/guide/auth/scitokens/")
print("---------------------------------------------------------\n")
try:
print("Trying to get X509 location from igwn_auth_utils")
x509 = igwn_auth_utils.find_x509_credentials(timeleft=600)
except:
traceback.print_exc(file=sys.stderr)
print("Warning: No X509 proxy found, please run ligo-proxy-init")
print("Warning: No X509 proxy found, please run ligo-proxy-init...")
# Copy X509 to workdir and return the new path
@@ -298,6 +304,21 @@ class eventTrigger:
self.hv_time_lag = hv_time_lag
self.lv_time_lag = lv_time_lag
self.total_time_lag = total_time_lag
# Keep track of actual GPS times in each IFO
ifo_list = ast.literal_eval(cp.get('input', 'ifo-list'))
self.perIFO_trigtime = {}
if 'H1' in ifo_list:
self.perIFO_trigtime['H1'] = trigger_time - total_time_lag
if 'L1' in ifo_list:
self.perIFO_trigtime['L1'] = trigger_time - hl_time_lag
if 'V1' in ifo_list:
if 'H1' in ifo_list:
self.perIFO_trigtime['V1'] = trigger_time - hv_time_lag
elif 'L1' in ifo_list:
self.perIFO_trigtime['V1'] = trigger_time - lv_time_lag
self.trigger_frequency = trigger_frequency
# Variable sample rate / window length [fixed TF volume]
@@ -638,7 +659,6 @@ class triggerList:
def parse_cwb_trigger_list(self, cp, cwb_trigger_file, rho_threshold=-1.0,
keep_frac=1.0):
# Get rho threshold
try:
rho_threshold = cp.getfloat('input', 'rho-threshold')
@@ -647,6 +667,10 @@ class triggerList:
# Determine network
ifo_list = ast.literal_eval(cp.get('input', 'ifo-list'))
if len(ifo_list) > 3:
# TODO: need to update for arbitrary number of ifos.
print("This functionality not currently supported beyond the HLV network.")
sys.exit(1)
if 'H1' in ifo_list:
network='H'
if 'L1' in ifo_list:
@@ -938,7 +962,7 @@ def condor_job_config(job_type, condor_job, config_parser):
* job_type='bayeswave_post'
* job_type='bayeswave_fpeak'
* job_type='bayeswave_clean_frame'
* job_type='megaplot.py'
* job_type='megaplot'
This identifies the site (OSG vs LDG) and set properties of the condor job
(file transfers, executable location etc) accordingly
@@ -989,8 +1013,9 @@ def condor_job_config(job_type, condor_job, config_parser):
if singularityImage[-1]!='"': singularityImage += '"'
condor_job.add_condor_cmd('+SingularityImage', singularityImage)
else:
condor_job.add_condor_cmd('getenv',True)
# No longer supported
# else:
# condor_job.add_condor_cmd('getenv',True)
#
# File Transfer configuration
@@ -1011,9 +1036,7 @@ def condor_job_config(job_type, condor_job, config_parser):
# Using +SuccessCheckpointExitCode (recommended approach)
if job_type == 'bayeswave':
condor_job.add_condor_cmd('+SuccessCheckpointExitCode', 77)
condor_job.add_condor_cmd('+WantFTOnCheckpoint', True)
condor_job.add_condor_cmd('checkpoint_exit_code', 77)
# "Working Around The Assumptions"
#
@@ -1047,14 +1070,15 @@ def condor_job_config(job_type, condor_job, config_parser):
#
# OSG specific configuration
#
if config_parser.getboolean('condor', 'osg-deploy'):
if config_parser.getboolean('condor', 'igwn-pool'):
# --- Force downstream jobs to run locally
if job_type in ['bayeswave_post', 'bayeswave_fpeak', 'bayeswave_clean_frame',
'megaplot.py']:
#requires.append("(GLIDEIN_SITE=?=undefined)")
print("jobtype=",job_type)
if job_type in ['bayeswave_post', 'bayeswave_fpeak',
'megaplot']:
condor_job.add_condor_cmd('+flock_local', 'True')
condor_job.add_condor_cmd('+DESIRED_Sites', '\"nogrid\"')
# bayeswave and bayeswave_clean_frame jobs should run non-locally since they need access to frames
else:
try:
condor_job.add_condor_cmd('+DESIRED_Sites',
@@ -1067,17 +1091,20 @@ def condor_job_config(job_type, condor_job, config_parser):
except configparser.NoOptionError:
pass
# Ensure LIGO data is present
if not config_parser.getboolean('datafind','sim-data'):
requires.append("(HAS_LIGO_FRAMES=?=TRUE)")
# only should have LIGO frames if we are not deploying on IGWN-pool
# # Ensure LIGO data is present
# if not config_parser.getboolean('datafind','sim-data'):
# requires.append("(HAS_LIGO_FRAMES=?=TRUE)")
#
# Accounting configurations
#
workdir = os.getcwd()
x509 = x509_manipulation(os.getcwd())
condor_job.add_condor_cmd('x509userproxy', x509)
if config_parser.getboolean('condor','igwn-scitoken'):
condor_job.add_condor_cmd('use_oauth_services','igwn')
# x509 = x509_manipulation(os.getcwd())
# condor_job.add_condor_cmd('x509userproxy', x509)
else: # add stuff for using scitokens
condor_job.add_condor_cmd('use_oauth_services','scitokens') # TODO add extra warning about x509 not supported any more
try:
condor_job.add_condor_cmd('accounting_group',
config_parser.get('condor', 'accounting-group'))
@@ -1192,7 +1219,11 @@ class bayeswaveJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
#
if cp.getboolean('condor', 'transfer-files'):
transferstring='datafind,$(macrooutputDir)'
# See if this works for individual caches
if cp.getboolean('condor','copy-frames'):
transferstring = '$(macrooutputDir)/datafind,$(macrooutputDir)'
else:
transferstring='datafind,$(macrooutputDir)'
# Generate a script for PreCmd to setup directory structure
if condor_precommand:
@@ -1363,10 +1394,12 @@ class bayeswaveNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
self.frames=[]
for ifo in framedict.keys():
for frame in framedict[ifo]:
self.frames.append(frame)
self.frames = ",".join(self.frames)
self.add_macro('macroframes', self.frames)
# TODO: would it be cleaner to have a single time slide variable as a dictionary?
def set_L1_timeslide(self, L1_timeslide):
self.add_var_opt('L1-timeslide', L1_timeslide)
self.L1_timeslide = L1_timeslide
@@ -1791,6 +1824,7 @@ class bayeswave_clean_frameJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
clean_suffix = cp.get('bayeswave_clean_frame_options', 'clean-suffix')
self.add_opt('clean-suffix', clean_suffix)
class bayeswave_clean_frameNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
def __init__(self, bayeswave_clean_frame_job):
@@ -1809,10 +1843,6 @@ class bayeswave_clean_frameNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
self.add_var_opt('seglen', seglen)
self.seglen = seglen
def set_frame_srate(self, frame_srate):
self.add_var_opt('frame-srate', frame_srate)
self.frame_srate = frame_srate
def set_frame_start(self, frame_start):
self.add_var_opt('frame-start', frame_start)
self.frame_start = frame_start
@@ -1839,6 +1869,18 @@ class bayeswave_clean_frameNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
def set_cache_file(self, cache_file):
self.add_var_opt('cachefile', cache_file)
def add_frame_transfer(self, framedict):
"""
Add a list of frames to transfer
"""
self.frames=[]
for ifo in framedict.keys():
for frame in framedict[ifo]:
self.frames.append(frame)
self.frames = ",".join(self.frames)
self.add_macro('macroframes', self.frames)
#
@@ -2050,7 +2092,8 @@ class submitToGraceDB(pipeline.CondorDAGJob,pipeline.AnalysisJob):
if cp.has_option('condor', 'accounting-group'):
self.add_condor_cmd('accounting_group', cp.get('condor', 'accounting-group'))
self.add_condor_cmd('getenv', 'True')
# No longer supported
# self.add_condor_cmd('getenv', 'True')
self.set_stdout_file('gdb_submitter_$(cluster)-$(process)-$(node).out')
self.set_stderr_file('gdb_submitter_$(cluster)-$(process)-$(node).err')
Loading