Commit 05a87301 authored by John Douglas Veitch's avatar John Douglas Veitch Committed by Vivien Raymond
Browse files

Clean up duplicate code in lalinference_pipe_utils

parent f224ac05
......@@ -2108,7 +2108,54 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_node(node)
return node
class SingularityJob(pipeline.CondorDAGJob):
class LALInferenceDAGJob(pipeline.CondorDAGJob):
"""
Class to define DAG Jobs for lalinference pipeline.
Handles some common condor settings like requirements, accounting groups,
and queues.
Parameters:
cp : configparser object
"""
def __init__(self, cp=None):
self.requirements=[]
if not cp:
# Create dummy empty config
from configparser import ConfigParser
cp = ConfigParser()
# Add requirements from the configparser condor section
if cp.has_option('condor','requirements'):
self.add_requirement(cp.get('condor','requirements'))
# Add accounting group information if present
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
self.add_requirement('(TARGET.'+cp.get('condor','queue')+' =?= True)')
def add_requirement(self,requirement):
"""
Add a requirement to the condor submit file
"""
# Check that string isn't empty
if requirement:
self.requirements.append(requirement)
def write_sub_file(self):
"""
Over-load CondorDAGJob.write_sub_file to write the requirements
"""
if self.requirements:
self.add_condor_cmd('requirements','&&'.join('({0})'.format(r) for r in self.requirements))
# Call the parent method to do the rest
super(LALInferenceDAGJob,self).write_sub_file()
class SingularityJob(LALInferenceDAGJob):
"""
Wrapper class for running jobs via a singularity image
"""
......@@ -2116,7 +2163,7 @@ class SingularityJob(pipeline.CondorDAGJob):
CVMFS_FRAMES="/cvmfs/oasis.opensciencegrid.org/ligo/frames/"
image=None
def __init__(self, cp, *args, **kwargs):
self.requirements=[]
super(SingularityJob, self).__init__(cp)
# Dir in which the DAG will run
# Execute from the basedir so all paths can be resolved
if cp.has_option('analysis','singularity'):
......@@ -2151,33 +2198,24 @@ class SingularityJob(pipeline.CondorDAGJob):
extra_paths="--bind {cvmfs_frames}".format(cvmfs_frames = self.CVMFS_FRAMES)
self.add_condor_cmd('+SingularityBindCVMFS','True')
self.add_condor_cmd('use_x509userproxy','True')
self.requirements.append('HAS_LIGO_FRAMES =?= TRUE')
self.add_requirement('HAS_LIGO_FRAMES =?= TRUE')
if cp.has_option('analysis','roq') and cp.getboolean('analysis','roq'):
extra_paths+=" --bind {roqpath}".format(roqpath=cp.get('paths','roq_b_matrix_directory'))
if self.osg:
self.add_condor_cmd('+OpenScienceGrid','True')
self.requirements.append('IS_GLIDEIN=?=True')
self.add_requirement('IS_GLIDEIN=?=True')
# Add requested sites if specified
if cp.has_option('condor','desired-sites'):
self.add_condor_cmd('+DESIRED_Sites',cp.get('condor','desired-sites'))
if self.singularity:
self.requirements.append('HAS_SINGULARITY =?= TRUE')
self.add_requirement('HAS_SINGULARITY =?= TRUE')
self.add_condor_cmd('+SingularityImage','"{0}"'.format(self.image))
self.add_condor_cmd('transfer_executable','False')
# Add data transfer options
self.add_condor_cmd('should_transfer_files','YES')
self.add_condor_cmd('when_to_transfer_output','ON_EXIT_OR_EVICT')
def write_sub_file(self):
"""
Over-load CondorDAGJob.write_sub_file to write the requirements
"""
self.add_condor_cmd('requirements','&&'.join('({0})'.format(r) for r in self.requirements))
# Call the parent method to do the rest
super(SingularityJob,self).write_sub_file()
class SingularityNode(pipeline.CondorDAGNode):
......@@ -2205,7 +2243,7 @@ class SingularityNode(pipeline.CondorDAGNode):
else:
self.add_input_file(filename)
class EngineJob(SingularityJob,pipeline.AnalysisJob):
class EngineJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,engine,ispreengine=False,dax=False,site=None, *args, **kwargs):
self.ispreengine=ispreengine
self.engine=engine
......@@ -2242,25 +2280,10 @@ class EngineJob(SingularityJob,pipeline.AnalysisJob):
self.add_condor_cmd('transfer_input_files','caches,engine,$(macroinput)')
self.add_condor_cmd('transfer_output_files','engine')
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
try:
hostname=socket.gethostbyaddr(socket.gethostname())[0]
except:
hostname='Unknown'
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
# Set grid site if needed
if cp.has_option('engine','resume'):
self.resume=True
else:
......@@ -2293,7 +2316,7 @@ class EngineJob(SingularityJob,pipeline.AnalysisJob):
#self.add_condor_cmd('machine_count',machine_count)
#self.add_condor_cmd('environment','CONDOR_MPI_PATH=%s'%(openmpipath))
if hostname=='pcdev1.phys.uwm.edu':
self.add_condor_cmd('Requirements','CAN_RUN_MULTICORE')
self.add_requirement('CAN_RUN_MULTICORE')
self.add_condor_cmd('+RequiresMultipleCores','True')
self.add_condor_cmd('request_cpus',self.machine_count)
self.add_condor_cmd('request_memory',str(float(self.machine_count)*float(self.machine_memory)))
......@@ -2631,7 +2654,7 @@ class LALInferenceDataDumpNode(EngineNode):
def set_output_file(self,filename):
pass
class BayesWavePSDJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class BayesWavePSDJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a BayesWave job
......@@ -2641,22 +2664,9 @@ class BayesWavePSDJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','bayeswave')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
SingularityJob.__init__(self, cp)
if cp.has_section('bayeswave'):
self.add_ini_opts(cp,'bayeswave')
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'bayeswavepsd-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'bayeswavepsd-$(cluster)-$(process).err'))
......@@ -2686,7 +2696,7 @@ class BayesWavePSDNode(EngineNode):
def set_output_file(self,filename):
pass
class BayesWavePostJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class BayesWavePostJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a BayesWavePost job
......@@ -2696,22 +2706,9 @@ class BayesWavePostJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','bayeswavepost')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
SingularityJob.__init__(self, cp)
if cp.has_section('bayeswave'):
self.add_ini_opts(cp,'bayeswave')
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'bayeswavepost-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'bayeswavepost-$(cluster)-$(process).err'))
......@@ -2739,7 +2736,7 @@ class BayesWavePostNode(EngineNode):
def set_output_file(self,filename):
pass
class PESummaryResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class PESummaryResultsPageJob(SingularityJob,pipeline.AnalysisJob):
"""Class to handle the creation of the summary page job using `PESummary`
"""
......@@ -2747,20 +2744,7 @@ class PESummaryResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','resultspage')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self,cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
......@@ -2768,25 +2752,12 @@ class PESummaryResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_condor_cmd('request_memory','2000')
class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class ResultsPageJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','resultspage')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self, cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
......@@ -2997,7 +2968,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
def set_ifos(self,ifos):
self.ifos=ifos
class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class CoherenceTestJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining the coherence test job to be run as part of a pipeline.
"""
......@@ -3005,20 +2976,7 @@ class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','coherencetest')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self, cp)
self.add_opt('new-coherent-incoherent-noise','')
self.add_condor_cmd('getenv','True')
self.set_stdout_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).out'))
......@@ -3059,7 +3017,7 @@ class CoherenceTestNode(pipeline.CondorDAGNode):
for inco in self.incoherent_parents:
self.add_file_arg(inco.get_pos_file())
class MergeJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class MergeJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining a job which merges several parallel nested sampling or MCMC jobs into a single file
Input arguments:
......@@ -3076,20 +3034,7 @@ class MergeJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','mergeNSscript')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self, cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
......@@ -3129,7 +3074,7 @@ class MergeNode(pipeline.CondorDAGNode):
def get_pos_file(self): return self.posfile
class CombineMCMCJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class CombineMCMCJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining a job which combines several parallel MCMC chains into a single hdf5 file
Input arguments:
......@@ -3141,10 +3086,7 @@ class CombineMCMCJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','combinePTMCMCh5script')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
SingularityJob.__init__(self, cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'combine-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'combine-$(cluster)-$(process).err'))
......@@ -3176,7 +3118,7 @@ class CombineMCMCNode(pipeline.CondorDAGNode):
def get_pos_file(self): return self.posfile
class GraceDBJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class GraceDBJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a gracedb job
"""
......@@ -3184,10 +3126,7 @@ class GraceDBJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','gracedb')
pipeline.CondorDAGJob.__init__(self,"scheduler",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
SingularityJob.__init__(self, cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).err'))
......@@ -3245,7 +3184,7 @@ class GraceDBNode(pipeline.CondorDAGNode):
self.__finalized=True
class ROMJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class ROMJob(SingularityJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a ROM compute weights job
"""
......@@ -3258,20 +3197,7 @@ class ROMJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','computeroqweights')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self, cp)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'computeroqweights-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'computeroqweights-$(cluster)-$(process).err'))
......@@ -3311,7 +3237,7 @@ class ROMNode(pipeline.CondorDAGNode):
return
self.__finalized=True
class BayesLineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class BayesLineJob(SingularityJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a BayesLine job
"""
......@@ -3319,20 +3245,7 @@ class BayesLineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','bayesline')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
SingularityJob.__init__(self, cp)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'bayesline-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'bayesline-$(cluster)-$(process).err'))
......@@ -3397,7 +3310,7 @@ class SkyMapNode(pipeline.CondorDAGNode):
self.add_var_opt('objid',self.objid)
super(SkyMapNode,self).finalize()
class SkyMapJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class SkyMapJob(SingularityJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Node to run ligo-skymap-from-samples
"""
......@@ -3405,18 +3318,7 @@ class SkyMapJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','ligo-skymap-from-samples')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp)
requirements=[]
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements.append('(TARGET.'+cp.get('condor','queue')+' =?= True)')
if cp.has_option('condor','Requirements'):
requirements.append(cp.get('condor','Requirements'))
if requirements:
self.add_condor_cmd('Requirements','&&'.join(requirements))
SingularityJob.__init__(self, cp)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'samples2map-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'samples2map-$(cluster)-$(process).err'))
......@@ -3428,7 +3330,7 @@ class SkyMapJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_opt('disable-distance-map','')
class PlotSkyMapJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
class PlotSkyMapJob(SingularityJob, pipeline.CondorDAGJob, pipeline.AnalysisJob):
"""
Job to run ligo-skymap-plot
"""
......@@ -3436,18 +3338,7 @@ class PlotSkyMapJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
exe=cp.get('condor','ligo-skymap-plot')
pipeline.CondorDAGJob.__init__(self, "vanilla", exe)
pipeline.AnalysisJob.__init__(self, cp)
requirements=[]
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements.append('(TARGET.'+cp.get('condor','queue')+' =?= True)')
if cp.has_option('condor','Requirements'):
requirements.append(cp.get('condor','Requirements'))
if requirements:
self.add_condor_cmd('Requirements','&&'.join(requirements))
SingularityJob.__init__(self, cp)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'plotskymap-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'plotskymap-$(cluster)-$(process).err'))
......@@ -3483,7 +3374,7 @@ class PlotSkyMapNode(pipeline.CondorDAGNode):
self.add_file_opt('output',self.outfile, file_is_output_file=True)
super(PlotSkyMapNode,self).finalize()
class PostRunInfoJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class PostRunInfoJob(SingularityJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
self.isdefined=True
......@@ -3493,8 +3384,7 @@ class PostRunInfoJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','gdbinfo')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('analysis','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('analysis','accounting_group'))
SingularityJob.__init__(self, cp)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).err'))
......@@ -3541,3 +3431,4 @@ class PostRunInfoNode(pipeline.CondorDAGNode):
self.server=server
if server is not None:
self.add_var_arg('--server %s'%self.server)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment