Commit 94c1b1a1 authored by Vivien Raymond's avatar Vivien Raymond
Browse files

LALInference_pipe update: default nparallel, random engine and --pegasus-submit option.

Also includes Salvatore's fits.patch modified for the master branch
Original: 5e88260d4e87cedaf2cdb0c5bfb3468becacd9e7
parent 165d062c
......@@ -88,7 +88,7 @@ def readLValert(SNRthreshold=0,gid=None,flow=40.0,gracedb="gracedb",savepsdpath=
import numpy as np
import subprocess
from subprocess import Popen, PIPE
print "gracedb download %s coinc.xml" % gid
print "%s download %s coinc.xml"%(gracedb,gid)
subprocess.call([gracedb,"download", gid ,"coinc.xml"])
xmldoc=utils.load_filename("coinc.xml",contenthandler = LIGOLWContentHandler)
coinctable = lsctables.CoincInspiralTable.get_table(xmldoc)
......@@ -498,7 +498,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
# Generate the DAG according to the config given
for event in self.events: self.add_full_analysis(event)
self.add_skyarea_followup()
self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
self.set_dag_file(self.dagfilename)
if self.is_dax():
......@@ -510,9 +511,11 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('condor','skyarea'):
self.skyareajob=SkyAreaJob(self.config,os.path.join(self.basepath,'skyarea.sub'),self.logpath,dax=self.is_dax())
respagenodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
prefix='LALInference_'
for p in respagenodes:
skyareanode=SkyAreaNode(self.skyareajob)
skyareanode=SkyAreaNode(self.skyareajob,prefix=prefix)
skyareanode.add_resultspage_parent(p)
skyareanode.set_ifos(p.ifos)
self.add_node(skyareanode)
def add_full_analysis(self,event):
......@@ -522,7 +525,6 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
result=self.add_full_analysis_lalinferencemcmc(event)
elif self.engine=='lalinferencebambi' or self.engine=='lalinferencebambimpi':
result=self.add_full_analysis_lalinferencebambi(event)
self.add_skyarea_followup()
return result
......@@ -755,7 +757,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
zipfilename='postproc_'+evstring+'.tar.gz'
else:
zipfilename=None
respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=mergenode,gzip_output=zipfilename)
respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=mergenode,gzip_output=zipfilename,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
mkdirs(os.path.join(self.basepath,'coherence_test'))
......@@ -777,7 +779,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
presultsdir=os.path.join(pagedir,ifo)
mkdirs(presultsdir)
pzipfilename='postproc_'+evstring+'_'+ifo+'.tar.gz'
subresnode=self.add_results_page_node(outdir=presultsdir,parent=pmergenode, gzip_output=pzipfilename)
subresnode=self.add_results_page_node(outdir=presultsdir,parent=pmergenode, gzip_output=pzipfilename,ifos=ifo)
subresnode.set_psd_files(cotest_nodes[0].get_psd_files())
subresnode.set_snr_file(cotest_nodes[0].get_snr_file())
subresnode.set_bayes_coherent_noise(pmergenode.get_B_file())
......@@ -794,7 +796,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
zipfilename='postproc_'+evstring+'.tar.gz'
else:
zipfilename=None
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=zipfilename)
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=zipfilename,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
respagenode.set_bayes_coherent_noise(mergenode.get_B_file())
......@@ -834,7 +836,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
enginenodes[0].set_snr_file()
pagedir=os.path.join(self.webdir,evstring,myifos)
mkdirs(pagedir)
respagenode=self.add_results_page_node(outdir=pagedir)
respagenode=self.add_results_page_node(outdir=pagedir,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if self.config.has_option('input','injection-file') and event.event_id is not None:
......@@ -860,7 +862,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
enginenodes[0].set_snr_file()
pagedir=os.path.join(self.webdir,evstring,myifos)
mkdirs(pagedir)
respagenode=self.add_results_page_node(outdir=pagedir)
respagenode=self.add_results_page_node(outdir=pagedir,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
respagenode.set_bayes_coherent_noise(enginenodes[0].get_B_file())
......@@ -1107,7 +1109,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.add_var_opt(opt,arg)
return node
def add_results_page_node(self,resjob=None,outdir=None,parent=None,extra_options=None,gzip_output=None):
def add_results_page_node(self,resjob=None,outdir=None,parent=None,extra_options=None,gzip_output=None,ifos=None):
if resjob is None:
resjob=self.results_page_job
node=ResultsPageNode(resjob)
......@@ -1118,15 +1120,51 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.set_output_path(outdir)
if gzip_output is not None:
node.set_gzip_output(gzip_output)
if ifos is not None:
if isinstance(ifos,list):
pass
else:
ifos=[ifos]
node.set_ifos(ifos)
self.add_node(node)
return node
def add_gracedb_log_node(self,respagenode,gid):
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid)
node.add_parent(respagenode)
node.set_filename(respagenode.webpath+'/posterior_samples.dat')
resurl=respagenode.webpath.replace(self.gracedbjob.basepath,self.gracedbjob.baseurl)
node.set_message('Parameter estimation finished. '+resurl+'/posplots.html')
self.add_node(node)
return node
def add_gracedb_FITSskymap_upload(self,event,engine=None):
gid=event.GID
if gid is None:
return
if engine=='lalinferenceburst':
prefix='LIB'
elif engine is None:
prefix=""
else:
prefix='LALInference'
nodes=None
if self.config.has_option('condor','skyarea'):
skynodes=filter(lambda x: isinstance(x,SkyAreaNode) ,self.get_nodes())
nodes=[]
for sk in skynodes:
if len(sk.ifos)>1:
node=GraceDBNode(self.gracedbjob,parent=sk,gid=gid)
#for p in sk.__parents:
# if isinstance(p,ResultPageNode):
# resultpagenode=p
node.set_filename(sk.outdir+'/%s_skymap.fits.gz'%prefix)
node.set_message('%s FITS sky map'%prefix)
self.add_node(node)
nodes.append(node)
return nodes
def add_rom_weights_node(self,ifo,parent=None):
#try:
#node=self.romweightsnodes[ifo]
......@@ -1581,6 +1619,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
if outpath is not None:
self.set_output_path(path)
self.__event=0
self.ifos=None
self.injfile=None
def set_gzip_output(self,path):
self.add_file_opt('archive',path,file_is_output_file=True)
......@@ -1640,6 +1679,8 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.add_file_opt('bsn',bsnfile)
def set_header_file(self,headerfile):
self.add_var_arg('--header '+headerfile)
def set_ifos(self,ifos):
self.ifos=ifos
class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
......@@ -1767,41 +1808,35 @@ class GraceDBNode(pipeline.CondorDAGNode):
"""
Run the gracedb executable to report the results
"""
def __init__(self,gracedb_job,gid=None,parent=None):
def __init__(self,gracedb_job,gid=None,parent=None,message=None,upfile=None):
# Message need to be a string
# Upfile is the full path of the file to be uploaded
pipeline.CondorDAGNode.__init__(self,gracedb_job)
self.resultsurl=""
if gid: self.set_gid(gid)
if parent: self.set_parent_resultspage(parent,gid)
if parent: self.add_parent(parent)
self.message=message
self.filename=upfile
self.__finalized=False
def set_page_path(self,path):
"""
Set the path to the results page, after self.baseurl.
"""
self.resultsurl=os.path.join(self.job().baseurl,path)
def set_gid(self,gid):
"""
Set the GraceDB ID to log to
"""
self.gid=gid
def set_parent_resultspage(self,respagenode,gid):
"""
Setup to log the results from the given parent results page node
"""
res=respagenode
#self.set_page_path(res.webpath.replace(self.job().basepath,self.job().baseurl))
self.resultsurl=res.webpath.replace(self.job().basepath,self.job().baseurl)
self.webpath=res.webpath
self.set_gid(gid)
def set_message(self,message):
self.message=message
def set_filename(self,filename):
self.filename=filename
def finalize(self):
if self.__finalized:
return
self.add_var_arg('upload')
self.add_var_arg(str(self.gid))
#self.add_var_arg('"Parameter estimation finished. <a href=\"'+self.resultsurl+'/posplots.html\">'+self.resultsurl+'/posplots.html</a>"')
self.add_var_arg(self.webpath+'/posterior_samples.dat Parameter estimation finished. '+self.resultsurl+'/posplots.html')
self.add_var_arg(self.filename+' ')
self.add_var_arg(self.message)
self.__finalized=True
class ROMJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
......@@ -1850,18 +1885,29 @@ class SkyAreaNode(pipeline.CondorDAGNode):
"""
Node to run sky area code
"""
def __init__(self,skyarea_job,posfile=None,parent=None):
def __init__(self,skyarea_job,posfile=None,parent=None,prefix=None):
pipeline.CondorDAGNode.__init__(self,skyarea_job)
if parent:
self.add_parent(parent)
if posfile:
self.set_posterior_file(posfile)
self.ifos=None
self.outdir=None
self.prefix=prefix
def set_posterior_file(self,posfile):
self.add_file_opt('samples',posfile,file_is_output_file=False)
self.posfile=posfile
def set_outdir(self,outdir):
self.add_var_opt('outdir',outdir)
self.outdir=outdir
def get_outdir(self):
return self.outdir
def set_fits_name(self):
name='skymap.fits.gz'
if self.prefix is not None:
name=self.prefix+name
self.add_var_opt('fitsoutname',name)
def set_injection(self,injfile,eventnum):
if injfile is not None:
self.add_file_opt('inj',injfile)
......@@ -1872,7 +1918,10 @@ class SkyAreaNode(pipeline.CondorDAGNode):
self.set_posterior_file(resultspagenode.get_pos_file())
self.set_outdir(resultspagenode.get_output_path())
self.add_parent(resultspagenode)
self.set_fits_name()
self.set_injection(resultspagenode.get_injection(),resultspagenode.get_event_number())
def set_ifos(self,ifos=None):
self.ifos=ifos
class SkyAreaJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment