Commit 742bb4bf authored by John Douglas Veitch's avatar John Douglas Veitch

Working DAG!

Original: e8ee0496b1182bf3912056c240f0c4e3f59eccc6
parent 75fb4aa7
[analysis]
ifos=['H1','L1','V1']
engine=lalinferencenest
nparallel=3
[paths]
basepath=/tmp/test/
......@@ -32,6 +33,7 @@ padding=16
#coinc-inspiral-file=
#pipedown-database=
#ignore-science-segments=True
[datafind]
types={'H1':'H1_LDAS_C02_L2','L1':'L1_LDAS_C02_L2','V1':'HrecOnline'}
......@@ -47,6 +49,7 @@ segfind=/home/jveitch/bin/ligolw_segment_query
datafind=/home/jveitch/bin/ligo_data_find
resultspage=/home/jveitch/bin/cbcBayesPostProc.py
ligolw_print=/home/jveitch/bin/ligolw_print
mergescript=/home/jveitch/bin/lalapps_merge_nested_sampling_runs
[resultspage]
skyres=0.5
......@@ -66,14 +69,13 @@ skyres=0.5
[lalinference]
seglen=32
#fake-cache={'H1':'LALLIGO','L1':'LALLIGO','V1':'LALVirgo'}
[lalinferencenest]
nlive=1000
nmcmc=200
#ifo=[H1,L1,V1]
#cache=[LALLIGO,LALLIGO,LALVirgo]
#channel=[dummy,dummy,dummy]
[lalinferencemcmc]
downsample=1000
......
# DAG Class definitions for LALInference Pipeline
# (C) 2012 John Veitch, Kiersten Ruisard, Kan Wang
import itertools
import glue
from glue import pipeline,segmentsUtils
import os
......@@ -13,20 +14,26 @@ import pdb
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.
class Event():
"""
Represents a unique event to run on
"""
new_id=itertools.count().next
def __init__(self,trig_time=None,SimInspiral=None,SnglInspiral=None,event_id=None,timeslide_dict=None):
self.trigtime=trig_time
self.trig_time=trig_time
self.injection=SimInspiral
self.sngltrigger=SnglInspiral
self.timeslides=timeslide_dict
if event_id is not None:
self.event_id=event_id
else:
self.event_id=id(self)
self.event_id=Event.new_id()
if self.injection is not None:
self.trig_time=self.injection.get_end()
self.event_id=self.injection.simulation_id
if self.sngltrigger is not None:
self.trig_time=sngltrigger.get_end()
self.event_id=sngltrigger.event_id
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
......@@ -73,6 +80,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.basepath=os.getcwd()
print 'No basepath specified, using current directory: %s'%(self.basepath)
mkdirs(self.basepath)
self.posteriorpath=os.path.join(self.basepath,'posteriors')
mkdirs(self.posteriorpath)
daglogdir=cp.get('paths','daglogdir')
mkdirs(daglogdir)
self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
......@@ -108,14 +117,15 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.engine_job = EngineJob(self.config, os.path.join(self.basepath,'lalinference.sub'),self.logpath)
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
self.merge_job = MergeNSJob(self.config,os.path.join(self,basepath,'merge_runs.sub',self.logpath)
self.merge_job = MergeNSJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath)
# Process the input to build list of analyses to do
self.times=[]
self.events=[]
if cp.has_option('input','gps-time-file'):
times=scan_timefile(cp.get('input','gps-time-file'))
for time in times:
self.times.append(float(time))
self.times.append(float(time))
self.events.append(Event(trig_time=float(time)))
# SimInspiral Table
if cp.has_option('input','injection-file'):
from pylal import SimInspiralUtils
......@@ -168,7 +178,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
Generate a DAG from a list of times
"""
for time in self.times:
self.add_full_analysis_time(time)
self.add_full_analysis_lalinferencenest(Event(trig_time=time))
def add_full_analysis_lalinferencenest(self,event):
"""
......@@ -177,36 +187,35 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
"""
evstring=str(event.event_id)
if event.trig_time is not None:
evstring=evstring+'-'+str(event.gps_time)
Npar=self.config.get('analysis','nparallel')
evstring=str(event.trig_time)+'-'+str(event.event_id)
Npar=self.config.getint('analysis','nparallel')
# Set up the parallel engine nodes
enginenodes=[]
for i in range(Npar):
enginenodes.append(self.add_engine_node(event))
# Merge the results together
mergenode=MergeNSNode(self.merge_job)
map(mergenode.add_engine_parent, enginenodes)
mergenode=MergeNSNode(self.merge_job,parents=enginenodes)
mergedir=os.path.join(self.basepath,'nested_samples')
mergenode.set_output_file(os.path.join(mergedir,'outfile_%s.dat'%evstring))
mergenode.set_output_file(os.path.join(mergedir,'outfile_%s.dat'%(evstring)))
mergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s.dat'%(evstring)))
self.add_node(mergenode)
def add_full_analysis_time(self,gpstime):
"""
Analyse a given GPS time
"""
ev=Event(trig_time=gpstime)
# datafindnode=self.get_datafind_node(gpstime)
enginenode=self.add_engine_node(ev)
ifos=reduce(lambda a,b:a+b,enginenode.ifos)
pagedir=os.path.join(self.basepath,str(gpstime)+'-'+'%x'%(id(enginenode)),ifos)
pagedir=os.path.join(self.webdir,evstring,enginenodes[0].get_ifos())
mkdirs(pagedir)
self.add_results_page_node(outdir=pagedir,parent=enginenode)
self.add_results_page_node(outdir=pagedir,parent=mergenode)
def add_science_segments(self):
# Query the segment database for science segments and
# add them to the pool of segments
if self.config.has_option('input','ignore-science-segments'):
if self.config.getboolean('input','ignore-science-segments'):
start=self.config.getfloat('input','gps-start-time')
end=self.config.getfloat('input','gps-end-time')
i=0
for ifo in self.ifos:
self.segments[ifo].append(pipeline.ScienceSegment((i,start,end,end-start)))
i+=1
return
# Look up science segments as required
segmentdir=os.path.join(self.basepath,'segments')
mkdirs(segmentdir)
curdir=os.getcwd()
......@@ -234,20 +243,21 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.set_end(gpsend)
#self.add_node(node)
return node
def add_engine_node(self,event,ifos=self.ifos,extra_options=None):
def add_engine_node(self,event,ifos=None,extra_options=None):
"""
Add an engine node to the dag. Will find the appropriate cache files automatically.
Will determine the data to be read and the output file.
Will use all IFOs known to the DAG, unless otherwise specified as a list of strings
"""
if ifos is None:
ifos=self.ifos
node=self.EngineNode(self.engine_job)
end_time=event.trig_time
node.set_trig_time(end_time)
for ifo in self.ifos:
for ifo in ifos:
for seg in self.segments[ifo]:
print 'Looking at segment %s for ifo %s to see if it contains end time %f...'%(str(seg),str(ifo),end_time)
if end_time >= seg.start() and end_time < seg.end():
print ' Adding segment'
node.add_ifo_data(ifo,seg,self.channels[ifo])
if extra_options is not None:
for opt in extra_options.keys():
......@@ -255,7 +265,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
# Add the nodes it depends on
for seg in node.scisegs.values():
dfnode=seg.get_df_node()
if dfnode not in self.get_nodes():
if dfnode is not None and dfnode not in self.get_nodes():
self.add_node(dfnode)
self.add_node(node)
# Add control options
......@@ -265,9 +275,11 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('input','psd-start-time'):
node.set_psdstart(self.config.getint('input','psd-start-time'))
node.set_max_psdlength(self.config.getint('input','max-psd-length'))
if self.config.has_option('lalinference','fake-cache'):
node.cachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
out_dir=os.path.join(self.basepath,'engine')
mkdirs(out_dir)
node.set_output_file(os.path.join(out_dir,node.engine+'-'+str(event.event_id)+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+'%x'%(id(node))))
node.set_output_file(os.path.join(out_dir,node.engine+'-'+str(event.event_id)+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+str(node.id)))
if event.injection is not None:
node.set_injection(event.injections,event.event_id)
return node
......@@ -276,9 +288,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node=ResultsPageNode(self.results_page_job)
if parent is not None:
node.add_parent(parent)
infiles=parent.get_pos_file()
for infile in infiles:
node.add_var_arg(infile)
infile=parent.get_pos_file()
node.add_var_arg(infile)
node.set_output_dir(outdir)
self.add_node(node)
......@@ -294,6 +305,7 @@ class EngineJob(pipeline.CondorDAGJob):
self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process).err'))
class EngineNode(pipeline.CondorDAGNode):
new_id = itertools.count().next
def __init__(self,li_job):
pipeline.CondorDAGNode.__init__(self,li_job)
self.ifos=[]
......@@ -304,6 +316,8 @@ class EngineNode(pipeline.CondorDAGNode):
self.psdlength=None
self.maxlength=None
self.psdstart=None
self.cachefiles={}
self.id=EngineNode.new_id()
def set_seglen(self,seglen):
self.seglen=seglen
......@@ -353,7 +367,11 @@ class EngineNode(pipeline.CondorDAGNode):
def add_ifo_data(self,ifo,sciseg,channelname,timeslide=0):
self.ifos.append(ifo)
self.scisegs[ifo]=sciseg
self.add_parent(sciseg.get_df_node())
parent=sciseg.get_df_node()
if parent is not None:
self.add_parent(parent)
self.cachefiles[ifo]=parent.get_output_files()[0]
self.add_input_file(self.cachefiles[ifo])
self.timeslides[ifo]=timeslide
self.channels[ifo]=channelname
......@@ -374,11 +392,8 @@ class EngineNode(pipeline.CondorDAGNode):
delim=''
first=False
else: delim=','
cache=self.scisegs[ifo].get_df_node().get_output_files()[0]
self.add_input_file(cache)
self.add_parent(self.scisegs[ifo].get_df_node())
ifostring=ifostring+delim+ifo
cachestring=cachestring+delim+cache
cachestring=cachestring+delim+self.cachefiles[ifo]
channelstring=channelstring+delim+self.channels[ifo]
ifostring=ifostring+']'
cachestring=cachestring+']'
......@@ -471,6 +486,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.set_output_path(path)
def set_output_path(self,path):
self.webpath=path
self.posfile=os.path.join(path,'posterior_samples.dat')
def set_event_number(self,event):
"""
Set the event number in the injection XML.
......@@ -484,16 +500,14 @@ class ResultsPageNode(pipeline.CondorDAGNode):
And automatically set options accordingly
"""
self.add_parent(node)
for infile in node.get_output_files():
self.add_file_arg(infile)
if isinstance(node, LALInferenceNestNode):
self.add_var_opt('ns','')
self.add_file_arg(node.get_pos_file())
if isinstance(node,LALInferenceMCMCNode):
self.add_var_opt('lalinfmcmc','')
def set_output_dir(self,dir):
self.add_var_opt('outpath',dir)
mkdirs(dir)
def get_pos_file(self): return self.posfile
class CoherenceTestJob(pipeline.CondorDAGJob):
"""
......@@ -628,8 +642,8 @@ class MergeNSNode(pipeline.CondorDAGNode):
self.add_engine_parent(parent)
def add_engine_parent(self,parent):
self.add_parent(node)
self.add_file_arg(node.get_ns_file())
self.add_parent(parent)
self.add_file_arg(parent.get_ns_file())
def set_output_file(self,file):
self.add_file_opt('out',file,file_is_output_file=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment