Commit 0f163ca9 authored by John Douglas Veitch's avatar John Douglas Veitch
Browse files

Add LALInference PP analysis DAG generator script to lalsuite, for lalinference review

Original: 3acc2559aae92997e69dae9a43731b6b802f6031
parent 76536f26
......@@ -67,7 +67,8 @@ bin_SCRIPTS = \
lalapps_vo_nest2pos \
lalinference_pipe \
lalapps_nest2pos \
lalapps_merge_nested_sampling_runs
lalapps_merge_nested_sampling_runs \
lalinference_pp_pipe
pkgpython_DATA = \
nest_utils.py \
combine_evidence.py \
......@@ -123,6 +124,9 @@ lalapps_merge_nested_sampling_runs: $(srcdir)/merge_nested_sampling_runs.in pyth
$(AM_V_GEN)sed -f python_config.sed $(srcdir)/merge_nested_sampling_runs.in >lalapps_merge_nested_sampling_runs
@chmod a+x lalapps_merge_nested_sampling_runs
lalinference_pp_pipe: $(srcdir)/lalinference_pp_pipe.in python_config.sed
$(AM_V_GEN)sed -f python_config.sed $(srcdir)/lalinference_pp_pipe.in > lalinference_pp_pipe
@chmod a+x lalinference_pp_pipe
CLEANFILES = \
$(bin_SCRIPTS) \
......
......@@ -62,6 +62,7 @@ events=all
# a stretch of time, i.e. when no other input files are given
#gps-start-time=
#gps-end-time=
analyse-all-time=False
# Can manually specify input files here or over-ride on the command line
#gps-time-file=
......@@ -95,6 +96,9 @@ mpirun=/home/albert.einstein/bin/mpirun
gracedb=/home/albert.einstein/bin/gracedb
# Queue information if needed to run on specific nodes
#queue=
# Samples to injection file (only for prior PP test)
pos_to_sim_inspiral=/home/albert.einstein/bin/cbcBayesPosToSimInspiral.py
ppanalysis=/home/albert.einstein/bin/cbcBayesPPAnalysis.py
#####################################################################################
# Section used by the datafind jobs (not used with simulated noise)
......
......@@ -418,7 +418,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
# Set up necessary job files.
self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
self.datafind_job.add_opt('url-type','file')
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
# Need to create a job file for each IFO combination
self.engine_jobs={}
ifocombos=[]
......@@ -439,14 +439,14 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.gracedbjob.set_grid_site('local')
# Process the input to build list of analyses to do
self.events=self.setup_from_inputs()
self.times=[e.trig_time for e in self.events]
# Sanity checking
if len(self.events)==0:
print 'No input events found, please check your config. Will generate an empty DAG'
print 'No input events found, please check your config if you expect some events'
self.times=[e.trig_time for e in self.events]
# Set up the segments
if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')):
if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
(mintime,maxtime)=self.get_required_data(self.times)
if not self.config.has_option('input','gps-start-time'):
self.config.set('input','gps-start-time',str(int(floor(mintime))))
......@@ -550,6 +550,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
in the [input] section of the ini file.
And process the events found therein
"""
events=[]
gpsstart=None
gpsend=None
if self.config.has_option('input','gps-start-time'):
......@@ -559,7 +560,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
inputnames=['gps-time-file','injection-file','sngl-inspiral-file','coinc-inspiral-file','pipedown-db','gid']
ReadInputFromList=sum([ 1 if self.config.has_option('input',name) else 0 for name in inputnames])
if ReadInputFromList!=1 and (gpsstart is None or gpsend is None):
print 'Plese specify only one input file'
return []
print 'Please specify only one input file'
print 'Or specify gps-start-time and gps-end-time in the ini file'
sys.exit(1)
if self.config.has_option('input','events'):
......@@ -573,7 +575,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
else:
selected_events=None
# No input file given, analyse the entire time stretch between gpsstart and gpsend
if ReadInputFromList!=1:
if self.config.has_option('input','analyse-all-time') and self.config.get('input','analyse-all-time')==True:
seglen=self.config.getfloat('engine','seglen')
if(self.config.has_option('input','segment-overlap')):
overlap=self.config.getfloat('input','segment-overlap')
......@@ -979,7 +981,7 @@ class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
if site!='local':
self.set_executable_installed(False)
# Set the options which are always used
self.set_sub_file(submitFile)
self.set_sub_file(os.path.abspath(submitFile))
if self.engine=='lalinferencemcmc' or self.engine=='lalinferencebambimpi':
#openmpipath=cp.get('condor','openmpi')
self.machine_count=cp.get('mpi','machine-count')
......@@ -1299,7 +1301,7 @@ class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','resultspage')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
self.set_sub_file(submitFile)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
......@@ -1370,7 +1372,7 @@ class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_condor_cmd('getenv','True')
self.set_stdout_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).err'))
self.set_sub_file(submitFile)
self.set_sub_file(os.path.abspath(submitFile))
class CoherenceTestNode(pipeline.CondorDAGNode):
"""
......@@ -1418,7 +1420,7 @@ class MergeNSJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
exe=cp.get('condor','mergescript')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
self.set_sub_file(submitFile)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
......@@ -1467,7 +1469,7 @@ class GraceDBJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
#pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.CondorDAGJob.__init__(self,"scheduler",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
self.set_sub_file(submitFile)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
......
#!/usr/bin/env @PYTHONPROG@
# End-to-end LALInference test pipeline
# (C) 2014 John Veitch
from lalapps import lalinference_pipe_utils as pipe_utils
from lalapps.lalinference_pipe_utils import mkdirs
from lalapps import inspiralutils
from glue import pipeline
import ConfigParser
from optparse import OptionParser,OptionValueError
import sys
import ast
import os
import uuid
usage=""" %prog [options] config.ini
Setup a DAG to run an end-to-end lalinference test:
1) Generate samples from prior
2) Analyse a set of injections drawn from the prior
3) Run P vs P test on results
"""
parser=OptionParser(usage)
parser.add_option("-r","--run-path",default='./',action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
parser.add_option("-x", "--dax",action="store_true",default=False, help="Delete the ligo_data_find jobs and populate frame LFNs in the DAX")
parser.add_option("-G", "--grid-site",action="store",type="string",metavar="SITE", help="Specify remote site in conjunction with --dax option. e.g. --grid-site=creamce for Bologna cluster.\
Supported options are: creamce and local",default=None)
parser.add_option('-N','--trials',action='store',type='int',metavar='NUM',help='Number of prior samples to analyse')
(opts,args)=parser.parse_args()
inifile=args[0]
# Set up the configuration for the sub-dags
prior_cp=ConfigParser.ConfigParser()
prior_cp.optionxform = str
prior_cp.readfp(open(inifile))
main_cp=ConfigParser.ConfigParser()
main_cp.optionxform = str
main_cp.readfp(open(inifile))
if opts.run_path is not None:
rundir=os.path.abspath(opts.run_path)
if opts.daglog_path is not None:
prior_cp.set('paths','daglogdir',os.path.join(os.path.abspath(opts.daglog_path),'prior'))
main_cp.set('paths','daglogdir',os.path.join(os.path.abspath(opts.daglog_path),'main'))
dagdir=os.path.abspath(opts.daglog_path)
elif opts.run_path is not None:
prior_cp.set('paths','daglogdir',os.path.join(os.path.abspath(opts.run_path),'prior'))
main_cp.set('paths','daglogdir',os.path.join(os.path.abspath(opts.run_path),'main'))
dagdir=os.path.abspath(opts.run_path)
priordir=os.path.join(rundir,'prior')
maindir=os.path.join(rundir,'main')
priorwebdir=os.path.join(priordir,'resultspage')
mainwebdir=os.path.join(maindir,'resultspage')
prior_cp.set('paths','basedir',priordir)
main_cp.set('paths','basedir',maindir)
prior_cp.set('paths','webdir',priorwebdir)
main_cp.set('paths','webdir',mainwebdir)
outerlogdir=os.path.join(dagdir,'log')
mkdirs(outerlogdir)
mkdirs(priordir)
mkdirs(maindir)
mkdirs(priorwebdir)
mkdirs(mainwebdir)
# Add the prior options to the sub dag
if prior_cp.get('analysis','engine')=='lalinferencenest':
prior_cp.set('engine','sampleprior',str(opts.trials))
prior_cp.set('engine','zeroLogLike','')
prior_cp.set('engine','nlive',str(opts.trials))
elif prior_cp.get('analysis','engine')=='lalinferencemcmc':
prior_cp.set('engine','Neff',str(opts.trials))
prior_cp.set('engine','zeroLogLike','')
elif prior_cp.get('analysis','engine')=='lalinferencebambi':
prior_cp.set('engine','zeroLogLike','')
prior_cp.set('engine','nlive',str(opts.trials))
local_work_dir=dagdir
mkdirs(local_work_dir)
# Create a DAG to contain the other scripts
outerdaglog=os.path.join(local_work_dir,'lalinference_injection_test_'+str(uuid.uuid1())+'.log')
outerdag=pipeline.CondorDAG(outerdaglog,dax=opts.dax)
outerdag.set_dag_file(os.path.join(dagdir,'priortest'))
# Run code with prior sampling
trig_time=1085855789
fake_event=pipe_utils.Event(trig_time=trig_time)
tfpath=os.path.join(local_work_dir,'time.txt')
tfile=open(tfpath,'w')
print >>tfile,'%i\n'%(trig_time)
tfile.close()
prior_cp.set('input','gps-time-file',tfpath)
priordag=pipe_utils.LALInferencePipelineDAG(prior_cp,dax=opts.dax,site=opts.grid_site)
priordag.set_dag_file(os.path.join(priordir,'lalinference_priorsample'))
priordagjob=pipeline.CondorDAGManJob(priordag.get_dag_file(),dir=priordir)
priordagnode=pipeline.CondorDAGManNode(priordagjob)
outerdag.add_node(priordagnode)
# Find the output file
pagenode=filter(lambda n:isinstance(n,pipe_utils.ResultsPageNode), priordag.get_nodes())[0]
priorfile=pagenode.get_pos_file()
# Convert prior samples to injections
convertsub=os.path.join(rundir,'samples2injections.sub')
converterr=os.path.join(outerlogdir,'samples2injection-$(cluster)-$(process)-$(node).err')
convertout=os.path.join(outerlogdir,'samples2injection-$(cluster)-$(process)-$(node).out')
injfile=os.path.join(rundir,'priorsamples.xml')
approx=prior_cp.get('engine','approx')
prior2injexe=prior_cp.get('condor','pos_to_sim_inspiral')
prior2injjob=pipeline.CondorDAGJob('vanilla',prior2injexe)
prior2injjob.set_sub_file(convertsub)
prior2injjob.set_stderr_file(converterr)
prior2injjob.set_stdout_file(convertout)
prior2injjob.add_condor_cmd('getenv','True')
prior2injnode=pipeline.CondorDAGNode(prior2injjob)
prior2injnode.add_var_opt('output',injfile)
prior2injnode.add_var_opt('num-of-injs',str(opts.trials))
prior2injnode.add_var_opt('approx',approx)
flow=str(40)
if prior_cp.has_option('engine','amporder'):
amporder=prior_cp.get('engine','amporder')
else:
amporder='0'
prior2injnode.add_var_opt('flow',flow) # TODO: Read from somewhere
prior2injnode.add_var_opt('amporder',amporder)
prior2injnode.add_var_arg(priorfile)
prior2injnode.add_parent(priordagnode)
outerdag.add_node(prior2injnode)
# Create the pipeline based on the injections
#main_cp.set('input','injection-file',injfile)
main_cp.set('input','gps-start-time',str(trig_time-1000))
main_cp.set('input','gps-end-time',str(trig_time+1000))
maindag=pipe_utils.LALInferencePipelineDAG(main_cp,dax=opts.dax,site=opts.grid_site)
maindag.set_dag_file(os.path.join(maindir,'lalinference_pipeline'))
maindagjob=pipeline.CondorDAGManJob(maindag.get_dag_file(),dir=maindir)
maindagnode=pipeline.CondorDAGManNode(maindagjob)
maindagnode.add_parent(prior2injnode)
maindag.config.set('input','injection-file',injfile)
for i in range(int(opts.trials)):
ev=pipe_utils.Event(trig_time=trig_time,event_id=i)
e=maindag.add_full_analysis(ev)
outerdag.add_node(maindagnode)
# Get a list of posterior samples files
resultspagenodes=filter(lambda n: isinstance(n, pipe_utils.ResultsPageNode), maindag.get_nodes())
posteriorfiles=[n.get_pos_file() for n in resultspagenodes]
# Analyse results of injection runs to generate PP plot
ppsub=os.path.join(rundir,'ppanalysis.sub')
pperr=os.path.join(outerlogdir,'ppanalysis-$(cluster)-$(process)-$(node).err')
ppout=os.path.join(outerlogdir,'ppanalysis-$(cluster)-$(process)-$(node).out')
ppexe=prior_cp.get('condor','ppanalysis')
ppjob=pipeline.CondorDAGJob('vanilla',ppexe)
ppjob.set_sub_file(ppsub)
ppjob.set_stderr_file(pperr)
ppjob.set_stdout_file(ppout)
ppjob.add_condor_cmd('getenv','True')
ppnode=pipeline.CondorDAGNode(ppjob)
ppnode.add_var_opt('injXML',injfile)
outdir=os.path.join(rundir,'ppanalysis')
mkdirs(outdir)
ppnode.add_var_opt('outdir',outdir)
for f in posteriorfiles:
ppnode.add_var_arg(f)
ppnode.add_parent(maindagnode)
outerdag.add_node(ppnode)
if(opts.dax):
# Create a text file with the frames listed
pfnfile = outerdag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
else:
peg_frame_cache = '/dev/null'
import uuid
execdir=os.path.join(local_work_dir,'lalinference_pegasus_'+str(uuid.uuid1()))
olddir=os.getcwd()
#os.chdir(local_work_dir)
if opts.grid_site is not None:
site='local,'+opts.grid_site
else:
site=None
# Create the DAX scripts
if opts.dax:
dag.prepare_dax(tmp_exec_dir=execdir,grid_site=site,peg_frame_cache=peg_frame_cache)
outerdag.write_sub_files()
outerdag.write_dag()
outerdag.write_script()
#os.chdir(olddir)
priordag.write_sub_files()
priordag.write_dag()
priordag.write_script()
maindag.write_sub_files()
maindag.write_dag()
maindag.write_script()
# End of program
print 'Successfully created DAG file.'
print 'Now run condor_submit_dag %s\n'%(outerdag.get_dag_file())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment