Commit a91dfc87 authored by John Douglas Veitch's avatar John Douglas Veitch
Browse files

More work on pipeline script

Original: 685fb5011ecf988e91fb5851a105c56917e0fb98
parent 7a06e4c0
......@@ -3,3 +3,44 @@
# DAG generation code for running LALInference pipeline
# (C) 2012 John Veitch
from lalapps import lalinference_pipe_utils as pipe_utils
import ConfigParser
from optparse import OptionParser,OptionValueError
usage=""" %prog [options]
Setup a Condor DAG file to run the LALInference pipeline
The user must specify either an injection file to analyse, with the --inj option,
a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
or an ASCII list of GPS times with the --gps-time-file option.
The user must also specify and ini file which will contain the main analysis config.
"""
parser=OptionParser(usage)
parser.add_option("-i","--ini-file",default=None,action="store",type="string",help="ini-file for pipeline",metavar="CONFIG.ini")
parser.add_option("-r","--run-path",default=None,action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
parser.add_option("-g","--gps-time-file",action="store",type="string",default=None,help="Text file containing list of GPS times to analyse",metavar="TIMES.txt")
(opts,args)=parser.parse_args()
cp=ConfigParser.ConfigParser()
cp.readfp(open(opts.ini_file))
if opts.run_path is not None:
cp.set('paths','basedir',opts.run_path)
if opts.daglog_path is not None:
cp.set('paths','daglogdir',opts.daglog_path)
else:
cp.set('paths','daglogdir',opts.run_path)
if opts.gps_time_file is not None:
cp.set('input','gps-time-file',opts.gps_time_file)
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp)
......@@ -6,6 +6,21 @@ engine=lalinferencenest
basepath=/tmp/test/
#cachedir=
#logdir=
#webdir is the base output dir for results pages
webdir=/home/jveitch/public_html/
[input]
# Can manually over-ride time limits here
#gps-start-time=
#gps-end-time=
# Can manually specify input files here or over-ride on the command line
#gps-time-file=
#injection-file=
#sngl-inspiral-file=
#coinc-inspiral-file=
#pipedown-database=
[datafind]
types={'H1':'H1_LDAS_C02_L2','L1':'L1_LDAS_C02_L2','V1':'HrecOnline'}
......@@ -20,6 +35,23 @@ lalinferencemcmc=/home/jveitch/bin/lalinference_mcmc
segfind=/home/jveitch/bin/ligolw_segment_query
datafind=/home/jveitch/bin/ligo_data_find
[resultspage]
skyres=0.5
# Additional options for the results page
# --event is set automatically
# LALInferenceMCMC options
# --lalinfmcmc is set automatically
#downsample=1000
#deltaLogL=5
#fixedBurnin=100000
#oldMassConvention
# LALInferenceNest options
# --Nlive is set automatically from the lalinferencnest section
# --ns is set automatically
[lalinferencenest]
nlive=1000
nmcmc=200
......@@ -28,6 +60,8 @@ nmcmc=200
#channel=[dummy,dummy,dummy]
[lalinferencemcmc]
downsample=1000
deltalogl=5
[segfind]
segment-url=https://segdb.ligo.caltech.edu
......@@ -36,3 +70,7 @@ segment-url=https://segdb.ligo.caltech.edu
l1-analyze = L1:DMT-SCIENCE:2
h1-analyze = H1:DMT-SCIENCE:2
v1-analyze = V1:ITF_SCIENCEMODE:6
[injections]
# options to specify software injections
#injection-file=/path/to/file.xml
......@@ -4,6 +4,7 @@
import glue
from glue import pipeline
import os
from lalapps import inspiralutils
# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
......@@ -19,8 +20,7 @@ def chooseEngineNode(name):
return EngineNode
class LALInferencePipelineDAG(pipeline.CondorDAG):
def __init__(self,log,cp,dax=False):
pipeline.CondorDAG.__init__(self,log,dax)
def __init__(self,cp,dax=False):
self.subfiles=[]
self.config=cp
self.engine=cp.get_option('analysis','engine')
......@@ -30,6 +30,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
else:
self.basepath=os.getcwd()
print 'No basepath specified, using current directory: %s'%(self.basepath)
self.daglogfile=os.path.join(cp.get_option('paths','daglogdir'),'lalinference_pipeline-'+id(self)+'.log')
pipeline.CondorDAG.__init__(self,log,dax)
if cp.has_option('paths','cachedir'):
self.cachepath=cp.get_option('paths','cachedir')
else:
......@@ -48,11 +50,66 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.dq={}
self.frtypes=cp.get_option('datafind','types')
self.use_available_data=False
self.webdir=cp.get('paths','webdir')
# Set up necessary job files.
self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config)
self.datafind_job.add_opt('url-type','file')
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.engine_job = EngineJob(self.config, os.path.join(self.basepath,'lalinference.sub'),self.logpath)
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
# Process the input to build list of analyses to do
self.times=[]
if cp.has_option('input','gps-time-file'):
times=self.scan_timefile(cp.get('input','gps-time-file'))
for time in times:
self.times.append(time)
# SimInspiral Table
if cp.has_option('input','injection-file'):
from pylal import SimInspiralUtils
injTable=SimInspiralUtils.ReadSimInspiralFromFiles([cp.get('input','injection-file')])
map(self.times.append, [inj.get_end() for inj in injTable])
# SnglInspiral Table
if cp.has_option('input','sngl-inspiral-file'):
from pylal import SnglInspiralUtils
trigTable=SnglInspiralUtils.ReadSnglInspiralFromFiles([opts.single_triggers])
map(self.times.append,[trig.get_end() for trig in trigTable])
# CoincInspiralTable
# Pipedown database
# Set up the segments
if not self.config.has_option('input','gps-start-time'):
self.config.set('input','gps-start-time',str(min(self.times)))
if not self.config.has_option('input','gps-end-time'):
self.config.set('input','gps-end-time',str(max(self.times)))
self.add_science_segments()
def scan_timefile(self,timefile):
import re
p=re.compile('[\d.]+')
times=[]
timefilehandle=open(timefile,'r')
for time in timefilehandle:
if not p.match(time):
continue
if float(time) in times:
print 'Skipping duplicate time %s'%(time)
continue
print 'Read time %s'%(time)
times.append(float(time))
timefilehandle.close()
return times
def setup_from_times(self,times):
"""
Generate a DAG from a list of times
"""
for time in self.times:
self.add_full_analysis_time(str(time))
def add_full_analysis_time(self,gpstime):
"""
......@@ -60,7 +117,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
"""
datafindnode=self.add_datafind(gpstime)
enginenode=self.add_engine(gpstime)
self.add_resultspage(enginenode.posteriorfile)
ifos=reduce(lambda a,b:a+b,enginenode.ifos)
pagedir=os.path.join(ifos,str(gpstime)+'-'+str(id(enginenode)))
self.add_results_page_node(outdir=pagedir,parent=enginenode)
def get_science_segment(self,ifo,gpstime):
# Check if time is in existing segment
......@@ -107,6 +166,17 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if df_node not in self.__nodes:
self.add_node(dfnode)
self.add_node(node)
return node
def add_results_page_node(self,outdir=None,parent=None,extra_options=None):
node=ResultsPageNode(self.results_page_job)
if parent is not None:
node.add_parent(parent)
infiles=parent.get_output_files()
for infile in infiles:
node.add_var_arg(infile)
node.set_output_dir(outdir)
self.add_node(node)
class EngineJob(pipeline.CondorDAGJob):
......@@ -124,6 +194,11 @@ class LALInferenceNestNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
self.engine='lalinferencenest'
def set_output_file(self,filename):
self.add_file_opt(self.outfilearg,filename,file_is_output_file=True)
self.paramsfile=filename+'_params.txt'
self.Bfilename=filename+'_B.txt'
class LALInferenceMCMCNode(EngineNode):
def __init__(self,li_job):
......@@ -133,7 +208,7 @@ class LALInferenceMCMCNode(EngineNode):
class EngineNode(pipeline.CondorDAGNode):
def __init__(self,li_job):
pipeline.CondorDAGNode.__init__(self,li_job)
def set_seed(self,seed):
self.add_var_opt('randomseed',seed)
......@@ -244,3 +319,32 @@ class ResultsPageJob(pipeline.CondorDAGJob):
if cp.has_option('results','skyres'):
self.add_opt('skyres',cp.get('results','skyres'))
class ResultsPageNode(pipeline.CondorDAGNode):
def __init__(self,results_page_job):
pipeline.CondorDAGNode.__init__(self,results_page_job)
self.webpath=self.job().get_cp().get('paths','webdir')
def set_event_number(self,event):
"""
Set the event number in the injection XML.
"""
if event is not None:
self.__event=int(event)
self.add_var_arg('--eventnum '+str(event))
def add_engine_parent(self,node):
"""
Add a parent node which is one of the engine nodes
And automatically set options accordingly
"""
self.add_parent(node)
for infile in node.get_output_files():
self.add_file_arg(infile)
if node isinstance(LALInferenceNestNode):
self.add_var_opt('ns','')
if node isinstance(LALInferenceMCMCNode):
self.add_var_opt('lalinfmcmc','')
def set_output_dir(self,dir):
self.add_var_opt('outpath',dir)
inspiralutils.mkdir(dir)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment