Commit 089d71d9 authored by John Douglas Veitch's avatar John Douglas Veitch

build a working dag for single jobs

Original: 1e7f143e3f7a64a82fb919c0d07a0ad7043fdf6e
parent 13e26624
......@@ -10,6 +10,17 @@ basepath=/tmp/test/
webdir=/home/jveitch/public_html/
[input]
# User-specified length of the psd. if not specified, will be automatically calculated from segment availability
# psd-length=512
# User-specified psd start time
# psd-start-time=
# Maximum length to use for automatically-determined psdlength options
max-psd-length=1024
# spacing between trigger times and start of psd estimation
padding=16
# Can manually over-ride time limits here
#gps-start-time=
#gps-end-time=
......@@ -27,7 +38,7 @@ types={'H1':'H1_LDAS_C02_L2','L1':'L1_LDAS_C02_L2','V1':'HrecOnline'}
[data]
# S5 has LSC-STRAIN, S6 has LDAS-STRAIN
channels=['H1:LDAS-STRAIN','L1:LDAS-STRAIN','V1:h_16384Hz']
channels={'H1':'H1:LDAS-STRAIN','L1':'L1:LDAS-STRAIN','V1':'V1:h_16384Hz'}
[condor]
lalinferencenest=/home/jveitch/bin/lalinference_nest
......@@ -53,6 +64,9 @@ skyres=0.5
# --Nlive is set automatically from the lalinferencnest section
# --ns is set automatically
[lalinference]
seglen=32
[lalinferencenest]
nlive=1000
......
......@@ -14,6 +14,14 @@ import ast
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO']
def mkdirs(path):
"""
Helper function. Make the given directory, creating intermediate
dirs if necessary, and don't complain about it already existing.
"""
if os.access(path,os.W_OK) and os.path.isdir(path): return
else: os.makedirs(path)
def chooseEngineNode(name):
if name=='lalinferencenest':
return LALInferenceNestNode
......@@ -48,17 +56,21 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
else:
self.basepath=os.getcwd()
print 'No basepath specified, using current directory: %s'%(self.basepath)
mkdirs(self.basepath)
daglogdir=cp.get('paths','daglogdir')
mkdirs(daglogdir)
self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
pipeline.CondorDAG.__init__(self,self.daglogfile,dax)
if cp.has_option('paths','cachedir'):
self.cachepath=cp.get('paths','cachedir')
else:
self.cachepath=os.path.join(self.basepath,'caches')
mkdirs(self.cachepath)
if cp.has_option('paths','logdir'):
self.logpath=cp.get('paths','logdir')
else:
self.logpath=os.path.join(self.basepath,'log')
mkdirs(self.logpath)
if cp.has_option('analysis','ifos'):
self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
else:
......@@ -71,6 +83,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.segments[ifo]=[]
self.dq={}
self.frtypes=ast.literal_eval(cp.get('datafind','types'))
self.channels=ast.literal_eval(cp.get('data','channels'))
self.use_available_data=False
self.webdir=cp.get('paths','webdir')
# Set up necessary job files.
......@@ -86,7 +99,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if cp.has_option('input','gps-time-file'):
times=scan_timefile(cp.get('input','gps-time-file'))
for time in times:
self.times.append(time)
self.times.append(float(time))
# SimInspiral Table
if cp.has_option('input','injection-file'):
from pylal import SimInspiralUtils
......@@ -128,34 +141,30 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
"""
Calculate teh data that will be needed to process all events
"""
psdlength=self.config.getint('input','psd-length')
buffer=self.config.getint('input','buffer')
psdlength=self.config.getint('input','max-psd-length')
padding=self.config.getint('input','padding')
# Assume that the PSD is estimated from the interval (end_time+ buffer , psdlength)
return (min(times),max(times)+buffer+psdlength)
# Also require padding before start time
return (min(times)-padding,max(times)+padding+psdlength)
def setup_from_times(self,times):
"""
Generate a DAG from a list of times
"""
for time in self.times:
self.add_full_analysis_time(str(time))
self.add_full_analysis_time(time)
def add_full_analysis_time(self,gpstime):
"""
Analyse a given GPS time
"""
datafindnode=self.get_datafind_node(gpstime)
# datafindnode=self.get_datafind_node(gpstime)
enginenode=self.add_engine_node(gpstime)
ifos=reduce(lambda a,b:a+b,enginenode.ifos)
pagedir=os.path.join(ifos,str(gpstime)+'-'+str(id(enginenode)))
pagedir=os.path.join(str(gpstime)+'-'+str(id(enginenode)),ifos)
mkdirs(pagedir)
self.add_results_page_node(outdir=pagedir,parent=enginenode)
def get_science_segment(self,ifo,gpstime):
# Check if time is in existing segment
for seg in self.segments[ifo]:
if gpstime in seg: return seg
raise pipeline.CondorDAGError('Unable to find time in segments')
def add_science_segments(self):
# Query the segment database for science segments and
# add them to the pool of segments
......@@ -184,18 +193,32 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
def add_engine_node(self,end_time,extra_options=None):
node=self.EngineNode(self.engine_job)
node.set_trig_time(end_time)
for ifo in self.ifos:
for seg in self.segments[ifo]:
if end_time > seg.start and end_time < seg.end:
node.add_ifo_data(ifo,seg)
print 'Looking at segment %s for ifo %s to see if it contains end time %f...'%(str(seg),str(ifo),end_time)
if end_time >= seg.start() and end_time < seg.end():
print ' Adding segment'
node.add_ifo_data(ifo,seg,self.channels[ifo])
if extra_options is not None:
for opt in extra_options.keys():
node.add_var_arg('--'+opt+' '+extra_options[opt])
node.add_var_arg('--'+opt+' '+extra_options[opt])
# Add the nodes it depends on
for dfnode in node.__parents:
if df_node not in self.__nodes:
self.add_node(dfnode)
for seg in node.scisegs.values():
dfnode=seg.get_df_node()
if dfnode not in self.get_nodes():
self.add_node(dfnode)
self.add_node(node)
# Add control options
node.set_seglen(self.config.getint('lalinference','seglen'))
if self.config.has_option('input','psd-length'):
node.set_psdlength(self.config.getint('input','psd-length'))
if self.config.has_option('input','psd-start-time'):
node.set_psdstart(self.config.getint('input','psd-start-time'))
node.set_max_psdlength(self.config.getint('input','max-psd-length'))
out_dir=os.path.join(self.basepath,'engine')
mkdirs(out_dir)
node.set_output_file(os.path.join(out_dir,node.engine+'-'+node.get_ifos()+'-'+str(node.get_trig_time)+str(id(self))))
return node
def add_results_page_node(self,outdir=None,parent=None,extra_options=None):
......@@ -204,10 +227,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.add_parent(parent)
infiles=parent.get_output_files()
for infile in infiles:
node.add_var_arg(infile)
node.add_var_arg(infile)
node.set_output_dir(outdir)
self.add_node(node)
class EngineJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
......@@ -219,20 +241,39 @@ class EngineJob(pipeline.CondorDAGJob):
self.add_ini_opts(cp,self.engine)
self.set_stdout_file(os.path.join(logdir,'lalinference-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process).err'))
class EngineNode(pipeline.CondorDAGNode):
def __init__(self,li_job):
pipeline.CondorDAGNode.__init__(self,li_job)
self.ifos=[]
self.scisegs={}
self.channels={}
self.timeslides={}
self.seglen=None
self.psdlength=None
self.maxlength=None
self.psdstart=None
def set_seglen(self,seglen):
self.seglen=seglen
def set_psdlength(self,psdlength):
self.psdlength=psdlength
def set_max_psdlength(self,psdlength):
self.maxlength=psdlength
def set_psd_start(self,psdstart):
self.psdstart=psdstart
def set_seed(self,seed):
self.add_var_opt('randomseed',seed)
def set_dataseed(self,seed):
self.add_var_opt('dataseed',seed)
def get_ifos(self):
return ''.join(map(str,self.__ifos))
return ''.join(map(str,self.ifos))
def set_trig_time(self,time):
"""
......@@ -251,22 +292,21 @@ class EngineNode(pipeline.CondorDAGNode):
get_trig_time = lambda self: self.__trigtime
def add_ifo_data(self,ifo,sciseg,timeslide=0):
def add_ifo_data(self,ifo,sciseg,channelname,timeslide=0):
self.ifos.append(ifo)
self.channels[ifo]=channel
self.scisegs[ifo]=sciseg
self.add_parent(sciseg.get_df_node())
self.timeslides[ifo]=timeslide
self.channels[ifo]=channelname
def finalize(self):
self._finalize_ifo_data()
pipeline.CondorDAGNode.finalize()
pipeline.CondorDAGNode.finalize(self)
def _finalize_ifo_data(self):
"""
Add list of IFOs and data to analyse to command line arguments.
"""
cp = self.job().get_cp()
ifostring='['
cachestring='['
channelstring='['
......@@ -280,7 +320,7 @@ class EngineNode(pipeline.CondorDAGNode):
self.add_parent(self.scisegs[ifo].get_df_node())
ifostring=ifostring+delim+ifo
cachestring=cachestring+delim+cache
channelstring=channelstring+delim+self.job().get_cp().get('data',ifo.lower()+'-channel')
channelstring=channelstring+delim+self.channels[ifo]
ifostring=ifostring+']'
cachestring=cachestring+']'
channelstring=channelstring+']'
......@@ -300,36 +340,34 @@ class EngineNode(pipeline.CondorDAGNode):
# Now we need to adjust the start time and length to make sure the maximum data length
# is not exceeded.
trig_time=self.get_trig_time()
maxLength=float(cp.get('analysis','analysis-chunk-length'))
maxLength=self.maxlength
if(length > maxLength):
while(self.__GPSstart+maxLength<trig_time and self.__GPSstart+maxLength<self.__GPSend):
self.__GPSstart+=maxLength/2.0
# Override calculated start time if requested by user in ini file
if self.job().get_cp().has_option(self.engine,'psdstart'):
self.__GPSstart=self.job().get_cp().getfloat(self.engine,'psdstart')
if self.psdstart is not None:
self.__GPSstart=self.psdstart
print 'Over-riding start time to user-specified value %f'%(self.__GPSstart)
if self.__GPSstart<starttime or self.__GPSstart>endtime:
print 'ERROR: Over-ridden time lies outside of science segment!'
raise Exception('Bad psdstart specified')
else:
self.add_var_opt('psdstart',str(self.__GPSstart))
if self.job().get_cp().has_option(self.engine,'psdlength'):
length=self.job().get_cp().getfloat(self.engine,'psdlength')
print 'Over-riding PSD length to user-specified value %f'%(length)
else:
length=self.__GPSend-self.__GPSstart
if(length>maxLength):
length=maxLength
self.add_var_opt('PSDlength',str(int(length)))
self.add_var_opt('seglen',self.job().get_cp().get('analysis','psd-chunk-length'))
if self.psdlength is None:
self.psdlength=self.__GPSend-self.__GPSstart
if(self.psdlength>self.maxlength):
self.psdlength=self.maxlength
self.add_var_opt('psdlength',self.psdlength)
self.add_var_opt('seglen',self.seglen)
class LALInferenceNestNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
self.engine='lalinferencenest'
self.outfilearg='outfile'
def set_output_file(self,filename):
self.add_file_opt(self.outfilearg,filename,file_is_output_file=True)
self.add_file_opt(self.outfilearg+'.dat',filename,file_is_output_file=True)
self.paramsfile=filename+'_params.txt'
self.Bfilename=filename+'_B.txt'
......@@ -337,6 +375,11 @@ class LALInferenceMCMCNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
self.engine='lalinferencemcmc'
self.outfilearg='outfile'
def set_output_file(self,filename):
self.add_file_opt(self.outfilearg,filename)
self.add_output_file(filename+'.00')
class ResultsPageJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
......@@ -352,9 +395,12 @@ class ResultsPageJob(pipeline.CondorDAGJob):
self.add_opt('skyres',cp.get('results','skyres'))
class ResultsPageNode(pipeline.CondorDAGNode):
def __init__(self,results_page_job):
def __init__(self,results_page_job,outpath=None):
pipeline.CondorDAGNode.__init__(self,results_page_job)
self.webpath=self.job().get_cp().get('paths','webdir')
if outpath is not None:
self.set_output_path(path)
def set_output_path(self,path):
self.webpath=path
def set_event_number(self,event):
"""
Set the event number in the injection XML.
......@@ -377,5 +423,5 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.add_var_opt('lalinfmcmc','')
def set_output_dir(self,dir):
self.add_var_opt('outpath',dir)
inspiralutils.mkdir(dir)
mkdirs(dir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment