Commit 4daf21b0 authored by Charlie Hoy's avatar Charlie Hoy Committed by Vivien Raymond
Browse files

Adding PESummary to lalinference_pipe

parent 12e73f87
......@@ -21,6 +21,7 @@ import random
from itertools import permutations
import shutil
import numpy as np
from glob import glob
import math
from six.moves import range
from six import next
......@@ -743,7 +744,10 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.frtypes=ast.literal_eval(cp.get('datafind','types'))
self.channels=ast.literal_eval(cp.get('data','channels'))
self.use_available_data=False
self.webdir=cp.get('paths','webdir')
if cp.has_option("paths","webdir"):
self.webdir=cp.get('paths','webdir')
if cp.has_option("paths","existing_webdir"):
self.webdir=cp.get('paths','existing_webdir')
if cp.has_option('analysis','dataseed'):
self.dataseed=cp.getint('analysis','dataseed')
else:
......@@ -782,10 +786,14 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
ifocombos.append(a)
for ifos in ifocombos:
self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
self.results_page_job.set_grid_site('local')
self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
self.cotest_results_page_job.set_grid_site('local')
if "summarypages" in self.config.get('condor','resultspage'):
self.results_page_job = PESummaryResultsPageJob(self.config, os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
self.results_page_job.set_grid_site('local')
else:
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
self.results_page_job.set_grid_site('local')
self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
self.cotest_results_page_job.set_grid_site('local')
if self.engine=='lalinferencemcmc':
self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
self.combine_job.set_grid_site('local')
......@@ -1191,19 +1199,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
enginenodes[0].set_snr_file()
if self.config.getboolean('analysis','coherence-test') and len(enginenodes[0].ifos)>1:
if self.site!='local':
zipfilename='postproc_'+evstring+'.tar.gz'
zipfilename='postproc_'+evstring+'.tar.gz'
else:
zipfilename=None
respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=mergenode,gzip_output=zipfilename,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
mkdirs(os.path.join(self.basepath,'coherence_test'))
par_mergenodes=[]
for ifo in enginenodes[0].ifos:
co_merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs_%s.sub'%(ifo)),self.logpath,dax=self.is_dax(),engine='nest')
......@@ -1213,7 +1211,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
cot_node,bwpsdnodes=self.add_engine_node(event,bwpsdnodes,ifos=[ifo],co_test=True)
if cot_node is not None:
if i>0:
cot_node.add_var_arg('--dont-dump-extras')
cot_node.add_var_arg('--dont-dump-extras')
cotest_nodes.append(cot_node)
if len(cotest_nodes)==0:
return False
......@@ -1235,45 +1233,72 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
pzipfilename='postproc_'+evstring+'_'+ifo+'.tar.gz'
else:
pzipfilename=None
subresnode=self.add_results_page_node(outdir=presultsdir,parent=pmergenode, gzip_output=pzipfilename,ifos=ifo)
subresnode.set_psd_files(cotest_nodes[0].get_psd_files())
subresnode.set_snr_file(cotest_nodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
subresnode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
subresnode.set_injection(self.config.get('input','injection-file'),event.event_id)
elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
subresnode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
coherence_node=CoherenceTestNode(self.coherence_test_job,outfile=os.path.join(self.basepath,'coherence_test','coherence_test_%s_%s.dat'%(myifos,evstring)))
coherence_node.add_coherent_parent(mergenode)
for parmergenode in par_mergenodes:
coherence_node.add_incoherent_parent(parmergenode)
self.add_node(coherence_node)
respagenode.add_parent(coherence_node)
respagenode.set_bayes_coherent_incoherent(coherence_node.get_output_files()[0])
if "summarypages" not in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=mergenode,gzip_output=zipfilename,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
mkdirs(os.path.join(self.basepath,'coherence_test'))
respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=mergenode,gzip_output=zipfilename,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
respagenode.set_coinc_file(self.basepath+'/coinc.xml',self.config.get('input','gid'))
subresnode=self.add_results_page_node(outdir=presultsdir,parent=pmergenode, gzip_output=pzipfilename,ifos=ifo)
subresnode.set_psd_files(cotest_nodes[0].get_psd_files())
subresnode.set_snr_file(cotest_nodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
subresnode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
subresnode.set_injection(self.config.get('input','injection-file'),event.event_id)
elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
subresnode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
coherence_node=CoherenceTestNode(self.coherence_test_job,outfile=os.path.join(self.basepath,'coherence_test','coherence_test_%s_%s.dat'%(myifos,evstring)))
coherence_node.add_coherent_parent(mergenode)
for parmergenode in par_mergenodes:
coherence_node.add_incoherent_parent(parmergenode)
if "summarypages" in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=True)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
else:
if self.site!='local':
zipfilename='postproc_'+evstring+'.tar.gz'
else:
zipfilename=None
# Note: Disabled gzip_output for now. Possibly need it for future Pegasus use
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
if "summarypages" in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=False)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
else:
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
if self.config.has_option('analysis','upload-to-gracedb'):
if self.config.getboolean('analysis','upload-to-gracedb') and event.GID is not None:
......@@ -1350,19 +1375,24 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('resultspage','fixedBurnin'):
mergenode.add_var_arg('--fixedBurnin '+str(self.config.getint('resultspage','fixedBurnin')))
self.add_node(mergenode)
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
if self.config.has_option('input','burst-injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
if "summarypages" in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos, evstring=evstring, coherence=self.config.getboolean('analysis','coherence-test'))
respagenode.set_psd_files(enginenodes[0].get_psd_files())
else:
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
try:
gid = self.config.get('input','gid')
except:
gid = None
respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
if self.config.has_option('input','injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
if self.config.has_option('input','burst-injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
if event.GID is not None:
if self.config.has_option('analysis','upload-to-gracedb'):
if self.config.getboolean('analysis','upload-to-gracedb'):
......@@ -1745,6 +1775,36 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
prenode.add_parent(bayeswavepsdnode[ifo])
return node,bayeswavepsdnode
def add_results_page_node_pesummary(self, resjob=None, outdir=None, parent=None, extra_options=None, gzip_output=None, ifos=None, evstring=None, coherence=False):
if resjob is None:
resjob=self.results_page_job
node=PESummaryResultsPageNode(resjob)
if parent is not None:
node.add_parent(parent)
if coherence:
infiles = [os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(ifo,evstring)) for ifo in ifos]
infiles.append(parent.get_pos_file())
else:
infiles = [parent.get_pos_file()]
inifiles = ["config.ini"]*len(infiles)
node.add_file_opt("samples", " ".join(infiles))
node.add_file_opt("config", " ".join(inifiles))
approximant = self.config.get("engine", "approx")
if "pseudo" in approximant:
approximant = approximant.split("pseudo")[0]
node.add_file_opt('approximant', " ".join([approximant]*len(infiles)))
calibration = []
for ifo in ifos:
try:
calibration.append(self.config.get("engine", "%s-spcal-envelope" %(ifo)))
except:
pass
if len(calibration) > 0:
node.add_file_opt("calibration", " ".join(calibration))
node.set_output_path(outdir)
self.add_node(node)
return node
def add_results_page_node(self,resjob=None,outdir=None,parent=None,extra_options=None,gzip_output=None,ifos=None):
if resjob is None:
resjob=self.results_page_job
......@@ -2148,6 +2208,7 @@ class EngineNode(SingularityNode):
self.psdstart=None
self.snrfile=None
self.psdfiles=None
self.calibrationfiles=None
self.cachefiles={}
if li_job.ispreengine is False:
self.id=next(EngineNode.new_id)
......@@ -2468,6 +2529,35 @@ class BayesWavePSDNode(EngineNode):
def set_output_file(self,filename):
pass
class PESummaryResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""Class to handle the creation of the summary page job using `PESummary`
"""
def __init__(self, cp, submitFile, logdir, dax=False):
exe=cp.get('condor','resultspage')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('condor','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
if cp.has_option('condor','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
requirements=''
if cp.has_option('condor','queue'):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
if cp.has_option('condor','Requirements'):
if requirements!='':
requirements=requirements+' && '
requirements=requirements+cp.get('condor','Requirements')
if requirements!='':
self.add_condor_cmd('Requirements',requirements)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('request_memory','2000')
class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','resultspage')
......@@ -2497,6 +2587,98 @@ class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
if cp.has_option('results','skyres'):
self.add_opt('skyres',cp.get('results','skyres'))
class PESummaryResultsPageNode(pipeline.CondorDAGNode):
def __init__(self, results_page_job, outpath=None):
super(PESummaryResultsPageNode,self).__init__(results_page_job)
if outpath is not None:
self.set_output_path(path)
@staticmethod
def determine_webdir_or_existing_webdir(path):
entries = glob(path+"/*")
if "%s/home.html" %(path) in entries:
return "existing"
return "new"
def set_output_path(self,path):
self.webpath=path
option = self.determine_webdir_or_existing_webdir(path)
if option == "existing":
self.add_var_opt('existing_webdir',path)
else:
self.add_var_opt('webdir',path)
def get_output_path(self):
return self.webpath
def set_injection(self,injfile,eventnumber):
self.injfile=injfile
self.add_file_opt('inj',injfile)
self.set_event_number(eventnumber)
def get_injection(self):
return self.injfile
def set_event_number(self,event):
"""
Set the event number in the injection XML.
"""
if event is not None:
self.__event=int(event)
self.add_var_arg('--eventnum '+str(event))
def get_event_number(self):
return self.__event
def set_psd_files(self,st):
if st is None:
return
psds = " ".join(st.split(','))
self.add_var_arg('--psd %s'%psds)
def set_calibration_files(self,st):
if st is None:
return
calibration = " ".join(st.split(','))
self.add_var_arg('--calibration %s' %calibration)
def set_snr_file(self,st):
if st is None:
return
def set_coinc_file(self,coinc,gid=None):
if gid:
self.__event=gid
if coinc is None:
return
def add_engine_parent(self,node):
"""
Add a parent node which is one of the engine nodes
And automatically set options accordingly
"""
self.add_parent(node)
self.add_file_arg(node.get_pos_file())
self.infiles.append(node.get_pos_file())
def get_pos_file(self):
return self.posfile
def set_bayes_coherent_incoherent(self,bcifile):
self.add_file_opt('bci',bcifile)
def set_bayes_coherent_noise(self,bsnfile):
self.add_file_opt('bsn',bsnfile)
def set_header_file(self,headerfile):
self.add_file_opt('header',headerfile)
def set_ifos(self,ifos):
self.ifos=ifos
class ResultsPageNode(pipeline.CondorDAGNode):
def __init__(self,results_page_job,outpath=None):
super(ResultsPageNode,self).__init__(results_page_job)
......
......@@ -128,7 +128,13 @@ datafind=%(lalsuite-install)s/bin/gw_data_find
mergeNSscript=%(lalsuite-install)s/bin/lalinference_nest2pos
mergeMCMCscript=%(lalsuite-install)s/bin/cbcBayesMCMC2pos
combinePTMCMCh5script=%(lalsuite-install)s/bin/cbcBayesCombinePTMCMCh5s
# You can use either cbcBayesPostProc or PESummary to generate the summarypages.
# PESummary code from https://git.ligo.org/lscsoft/pesummary
# To install PESummary see https://docs.ligo.org/lscsoft/pesummary/installation.html
resultspage=%(lalsuite-install)s/bin/cbcBayesPostProc
# resultspage=%(lalsuite-install)s/bin/summarypages
segfind=%(lalsuite-install)s/bin/ligolw_segment_query
ligolw_print=%(lalsuite-install)s/bin/ligolw_print
coherencetest=%(lalsuite-install)s/bin/lalinference_coherence_test
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment