Commit 68082295 authored by John Douglas Veitch's avatar John Douglas Veitch
Browse files

Add some tweaks for x509proxy to lalinference_pipe_utils

parent fb0a4ff8
Pipeline #143467 passed with stages
in 108 minutes and 16 seconds
......@@ -2179,9 +2179,11 @@ class LALInferenceDAGJob(pipeline.CondorDAGJob):
cp : configparser object
sharedfs: If False, will map files to local paths on execute node
"""
def __init__(self, cp=None, sharedfs=False, requires_frames=True):
def __init__(self, cp=None, sharedfs=False, requires_frames=False):
self.transfer_files=not sharedfs
self.requirements=[]
self.requires_frames=requires_frames
self.x509path=None
if not cp:
# Create dummy empty config
from configparser import ConfigParser
......@@ -2233,20 +2235,34 @@ class LALInferenceDAGJob(pipeline.CondorDAGJob):
if self.osg: # Job will run on OSG nodes
# Remote site must have LIGO frames
if requires_frames:
self.add_requirement('HAS_LIGO_FRAMES =?= True')
# This line forces the job to run on OSG (remove after finished testing to allow local running
# self.add_requirement('IS_GLIDEIN=?=True')
pass
else: # Job will run on local cluster
# DESIRED_Sites="nogrid" forces to not run on OSG
self.add_condor_cmd('+DESIRED_Sites','"nogrid"')
# FlockLocal allows running on local cluster if submitted from an ldas-osg node
self.add_condor_cmd('+flock_local',True)
# self.add_requirement('TARGET.FileSystemDomain=="ligo"')
self.add_condor_cmd('use_x509userproxy','True')
def set_x509path(self, path):
"""
Copy the x509 proxy file to the path given, and
use it in the DAG. Requires ligo-proxy-init to be run
before generating DAG.
"""
from shutil import copyfile
from subprocess import check_output, CalledProcessError
try:
res = check_output(['grid-proxy-info','-path'])
except CalledProcessError as e:
print(e.output)
print('Error: x509proxy file not found. Please run ligo-proxy-init before creating DAG')
sys.exit(1)
tmp_path = res.strip()
print('Using x509 proxy from ',tmp_path)
copyfile(tmp_path, path)
self.x509path = path
def add_requirement(self,requirement):
"""
......@@ -2260,6 +2276,15 @@ class LALInferenceDAGJob(pipeline.CondorDAGJob):
"""
Over-load CondorDAGJob.write_sub_file to write the requirements
"""
if self.requires_frames and self.osg:
self.add_requirement('HAS_LIGO_FRAMES =?= True')
# If using frames, set the x509 path to a permanant location
self.set_x509path(os.path.join(
self.get_config('paths','basedir'),'x509proxy')
)
if self.x509path is not None:
self.add_condor_cmd('use_x509userproxy','True')
self.add_condor_cmd('x509userproxy',self.x509path)
if self.requirements:
self.add_condor_cmd('requirements','&&'.join('({0})'.format(r) for r in self.requirements))
......@@ -2319,7 +2344,8 @@ class LALInferenceDAGNode(pipeline.CondorDAGNode):
class EngineJob(LALInferenceDAGJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,engine,ispreengine=False,sharedfs=False, *args, **kwargs):
def __init__(self,cp,submitFile,logdir,engine,ispreengine=False,sharedfs=False,
requires_frames=False, *args, **kwargs):
self.ispreengine=ispreengine
self.engine=engine
basepath=cp.get('paths','basedir')
......@@ -2340,7 +2366,7 @@ class EngineJob(LALInferenceDAGJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
pipeline.CondorDAGJob.__init__(self,universe,exe)
pipeline.AnalysisJob.__init__(self,cp)
LALInferenceDAGJob.__init__(self, cp, sharedfs=sharedfs)
LALInferenceDAGJob.__init__(self, cp, sharedfs=sharedfs, requires_frames=requires_frames)
if cp.has_option('engine','resume'):
self.resume=True
......@@ -2547,6 +2573,7 @@ class EngineNode(LALInferenceDAGNode):
self.set_cache(df_output,ifo)
self.timeslides[ifo]=timeslide
self.channels[ifo]=channelname
self.job().requires_frames=True
return 1
else: return 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment