Commit 7a06e4c0 authored by John Douglas Veitch's avatar John Douglas Veitch

More work on pipeline

Original: 191d4b3f20fff30c7f259d17bd11133225d366e7
parent 9f69d613
......@@ -4,11 +4,35 @@ engine=lalinferencenest
[paths]
basepath=/tmp/test/
#cachedir=
#logdir=
[datafind]
types={'H1':'H1_LDAS_C02_L2','L1':'L1_LDAS_C02_L2','V1':'HrecOnline'}
[data]
# S5 has LSC-STRAIN, S6 has LDAS-STRAIN
channels=['H1:LDAS-STRAIN','L1:LDAS-STRAIN','V1:h_16384Hz']
[condor]
lalinference=/home/jveitch/bin/lalinference_nest
lalinferencenest=/home/jveitch/bin/lalinference_nest
lalinferencemcmc=/home/jveitch/bin/lalinference_mcmc
segfind=/home/jveitch/bin/ligolw_segment_query
datafind=/home/jveitch/bin/ligo_data_find
[lalinferencenest]
nlive=1000
nmcmc=200
#ifo=[H1,L1,V1]
#cache=[LALLIGO,LALLIGO,LALVirgo]
#channel=[dummy,dummy,dummy]
[lalinferencemcmc]
[segfind]
segment-url=https://segdb.ligo.caltech.edu
[segments]
l1-analyze = L1:DMT-SCIENCE:2
h1-analyze = H1:DMT-SCIENCE:2
v1-analyze = V1:ITF_SCIENCEMODE:6
......@@ -9,11 +9,22 @@ import os
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO']
def chooseEngineNode(name):
if name=='lalinferencenest':
return LALInferenceNestNode
if name=='lalinferencemcmc':
return LALInferenceMCMCNode
return EngineNode
class LALInferencePipelineDAG(pipeline.CondorDAG):
def __init__(self,log,cp,dax=False):
pipeline.CondorDAG.__init__(self,log,dax)
self.subfiles=[]
self.config=cp
self.engine=cp.get_option('analysis','engine')
self.EngineNode=chooseEngineNode(self.engine)
if cp.has_option('paths','basedir'):
self.basepath=cp.get_option('paths','basedir')
else:
......@@ -36,10 +47,12 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.segments[ifo]=[]
self.dq={}
self.frtypes=cp.get_option('datafind','types')
self.use_available_data=False
# Set up necessary job files.
self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config)
self.datafind_job.add_opt('url-type','file')
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.engine_job = EngineJob(self.config, os.path.join(self.basepath,'lalinference.sub'),self.logpath)
def add_full_analysis_time(self,gpstime):
"""
......@@ -67,22 +80,34 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
segfile.close()
for seg in segs:
sciseg=pipeline.ScienceSegment((segs.index(seg),seg[0],seg[1],seg[1]-seg[0]))
df_node=self.add_datafind_node(ifo,self.frtypes[ifo],int(sciseg.start()),int(sciseg.end()))
df_node=self.get_datafind_node(ifo,self.frtypes[ifo],int(sciseg.start()),int(sciseg.end()))
sciseg.set_df_node(df_node)
self.segments[ifo].append(sciseg)
def add_datafind_node(self,ifo,frtype,gpsstart,gpsend):
def get_datafind_node(self,ifo,frtype,gpsstart,gpsend):
node=pipeline.LSCDataFindNode(self.datafind_job)
node.set_observatory(ifo[0])
node.set_type(frtype)
node.set_start(gpsstart)
node.set_end(gpsend)
self.add_node(node)
#self.add_node(node)
return node
def add_engine_node(self,end_time,extra_options=None):
node=EngineNode(self.config,self.engine_job)
node.
node=self.EngineNode(self.engine_job)
for ifo in self.ifos:
for seg in self.segments[ifo]:
if end_time > seg.start and end_time < seg.end:
node.add_ifo_data(ifo,seg)
if extra_options is not None:
for opt in extra_options.keys():
node.add_var_arg('--'+opt+' '+extra_options[opt])
# Add the nodes it depends on
for dfnode in node.__parents:
if df_node not in self.__nodes:
self.add_node(dfnode)
self.add_node(node)
class EngineJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
......@@ -91,9 +116,19 @@ class EngineJob(pipeline.CondorDAGJob):
pipeline.CondorDAGJob.__init__(self,"standard",exe)
# Set the options which are always used
self.set_sub_file(submitFile)
self.add_ini_opts(cp,'lalinference')
self.add_ini_opts(cp,self.engine)
self.set_stdout_file(os.path.join(logdir,'lalinference-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process).err'))
class LALInferenceNestNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
self.engine='lalinferencenest'
class LALInferenceMCMCNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
self.engine='lalinferencemcmc'
class EngineNode(pipeline.CondorDAGNode):
def __init__(self,li_job):
......@@ -124,7 +159,79 @@ class EngineNode(pipeline.CondorDAGNode):
self.add_var_opt('event',str(event))
get_trig_time = lambda self: self.__trigtime
def add_ifo_data(self,ifo,sciseg,timeslide=0):
self.ifos.append(ifo)
self.channels[ifo]=channel
self.scisegs[ifo]=sciseg
self.add_parent(sciseg.get_df_node())
self.timeslides[ifo]=timeslide
def finalize(self):
self._finalize_ifo_data()
pipeline.CondorDAGNode.finalize()
def _finalize_ifo_data(self):
"""
Add list of IFOs and data to analyse to command line arguments.
"""
cp = self.job().get_cp()
ifostring='['
cachestring='['
channelstring='['
first=True
for ifo in self.ifos:
if first:
delim=''
first=False
else: delim=','
cache=self.scisegs[ifo].get_df_node().get_output_files()[0]
self.add_parent(self.scisegs[ifo].get_df_node())
ifostring=ifostring+delim+ifo
cachestring=cachestring+delim+cache
channelstring=channelstring+delim+self.job().get_cp().get('data',ifo.lower()+'-channel')
ifostring=ifostring+']'
cachestring=cachestring+']'
channelstring=channelstring+']'
self.add_var_arg('--IFO '+ifostring)
self.add_var_arg('--channel '+channelstring)
self.add_var_arg('--cache '+cachestring)
# Start at earliest common time
# NOTE: We perform this arithmetic for all ifos to ensure that a common data set is
# Used when we are running the coherence test.
# Otherwise the noise evidence will differ.
starttime=max([int(self.scisegs[ifo].start()) for ifo in self.ifos])
endtime=min([int(self.scisegs[ifo].end()) for ifo in self.ifos])
self.__GPSstart=starttime
self.__GPSend=endtime
length=endtime-starttime
# Now we need to adjust the start time and length to make sure the maximum data length
# is not exceeded.
trig_time=self.get_trig_time()
maxLength=float(cp.get('analysis','analysis-chunk-length'))
if(length > maxLength):
while(self.__GPSstart+maxLength<trig_time and self.__GPSstart+maxLength<self.__GPSend):
self.__GPSstart+=maxLength/2.0
# Override calculated start time if requested by user in ini file
if self.job().get_cp().has_option(self.engine,'psdstart'):
self.__GPSstart=self.job().get_cp().getfloat(self.engine,'psdstart')
print 'Over-riding start time to user-specified value %f'%(self.__GPSstart)
if self.__GPSstart<starttime or self.__GPSstart>endtime:
print 'ERROR: Over-ridden time lies outside of science segment!'
raise Exception('Bad psdstart specified')
else:
self.add_var_opt('psdstart',str(self.__GPSstart))
if self.job().get_cp().has_option(self.engine,'psdlength'):
length=self.job().get_cp().getfloat(self.engine,'psdlength')
print 'Over-riding PSD length to user-specified value %f'%(length)
else:
length=self.__GPSend-self.__GPSstart
if(length>maxLength):
length=maxLength
self.add_var_opt('PSDlength',str(int(length)))
self.add_var_opt('seglen',self.job().get_cp().get('analysis','psd-chunk-length'))
class ResultsPageJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
exe=cp.get('condor','resultspage')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment