Commit 1d782b93 authored by Salvatore Vitale's avatar Salvatore Vitale
Browse files

LALInferencePipe to take list of approximants and samplers. Tested with condor

Original: ae78604bca491f499e4b87bc1b4dd028a9b79ce7
parent 4bc11754
......@@ -8,6 +8,7 @@ from optparse import OptionParser,OptionValueError
import sys
import ast
import os
from glue import pipeline
usage=""" %prog [options] config.ini
Setup a Condor DAG file to run the LALInference pipeline based on
......@@ -50,145 +51,230 @@ if len(args)!=1:
inifile=args[0]
cp=ConfigParser.ConfigParser()
fp=open(inifile)
cp.optionxform = str
cp.readfp(open(inifile))
if opts.condor_submit and opts.pegasus_submit:
print 'Error: Please only specify one of --condor-submit or --pegasus-submit'
sys.exit(1)
if opts.run_path is not None:
cp.set('paths','basedir',os.path.abspath(opts.run_path))
cp.readfp(fp)
single_approx=True
approx='approx'
if cp.has_option('engine','approx'):
pass
elif cp.has_option('engine','approximant'):
approx='approximant'
else:
print "Error: was expecting an 'approx' filed in the [engine] section\n"
sys.exit(1)
if not cp.has_option('paths','basedir'):
print 'Warning: No --run-path specified, using %s'%(os.getcwd())
cp.set('paths','basedir',os.path.abspath(os.getcwd()))
apps=cp.get('engine',approx)
if ',' in apps:
single_approx=False
apps=apps.split(',')
else:
apps=[apps]
if opts.daglog_path is not None:
cp.set('paths','daglogdir',os.path.abspath(opts.daglog_path))
elif opts.run_path is not None:
cp.set('paths','daglogdir',os.path.abspath(opts.run_path))
single_sampler=True
samps=cp.get('analysis','engine')
if ',' in samps:
single_sampler=False
samps=samps.split(',')
else:
cp.set('paths','daglogdir',os.path.abspath(cp.get('paths','basedir')))
samps=[samps]
rundir_root=os.path.abspath(opts.run_path)
if single_sampler is False or single_approx is False:
import uuid
if opts.daglog_path is not None:
daglogdir=os.path.abspath(opts.daglog_path)
elif opts.run_path is not None:
daglogdir=os.path.abspath(opts.run_path)
else:
daglogdir=os.path.abspath(cp.get('paths','basedir'))
outerdaglog=os.path.join(daglogdir,'lalinference_multi_'+str(uuid.uuid1())+'.log')
outerdag=pipeline.CondorDAG(outerdaglog,dax=opts.dax)
outerdag.set_dag_file(os.path.join(rundir_root,'multidag'))
fp.close()
for sampler in samps:
for app in apps:
if not os.path.isdir(os.path.join(rundir_root,sampler,app)):
os.makedirs(os.path.join(rundir_root,sampler,app))
opts.run_path=os.path.abspath(os.path.join(rundir_root,sampler,app))
inifile=args[0]
cp=ConfigParser.ConfigParser()
fp=open(inifile)
cp.optionxform = str
cp.readfp(fp)
cp.set('engine',approx,app)
cp.set('analysis','engine',sampler)
for p in dict(cp.items('paths')).keys():
#current value
out=cp.get('paths',p)
# append approximant prefix
cp.set('paths',p,os.path.join(out,sampler,app))
if opts.condor_submit and opts.pegasus_submit:
print 'Error: Please only specify one of --condor-submit or --pegasus-submit'
sys.exit(1)
if opts.run_path is not None:
cp.set('paths','basedir',os.path.abspath(opts.run_path))
if not cp.has_option('paths','basedir'):
print 'Warning: No --run-path specified, using %s'%(os.getcwd())
cp.set('paths','basedir',os.path.abspath(os.getcwd()))
if opts.daglog_path is not None:
cp.set('paths','daglogdir',os.path.abspath(opts.daglog_path))
elif opts.run_path is not None:
cp.set('paths','daglogdir',os.path.abspath(opts.run_path))
else:
cp.set('paths','daglogdir',os.path.abspath(cp.get('paths','basedir')))
local_work_dir=cp.get('paths','daglogdir')
local_work_dir=cp.get('paths','daglogdir')
if opts.gps_time_file is not None:
cp.set('input','gps-time-file',os.path.abspath(opts.gps_time_file))
if opts.gps_time_file is not None:
cp.set('input','gps-time-file',os.path.abspath(opts.gps_time_file))
if opts.single_triggers is not None:
cp.set('input','sngl-inspiral-file',os.path.abspath(opts.single_triggers))
if opts.single_triggers is not None:
cp.set('input','sngl-inspiral-file',os.path.abspath(opts.single_triggers))
if opts.injections is not None:
cp.set('input','injection-file',os.path.abspath(opts.injections))
if opts.injections is not None:
cp.set('input','injection-file',os.path.abspath(opts.injections))
if opts.burst_injections is not None:
if opts.injections is not None:
print "ERROR: cannot pass both inspiral and burst tables for injection\n"
sys.exit(1)
cp.set('input','burst-injection-file',os.path.abspath(opts.burst_injections))
if opts.burst_injections is not None:
if opts.injections is not None:
print "ERROR: cannot pass both inspiral and burst tables for injection\n"
sys.exit(1)
cp.set('input','burst-injection-file',os.path.abspath(opts.burst_injections))
if opts.coinc_triggers is not None:
cp.set('input','coinc-inspiral-file',os.path.abspath(opts.coinc_triggers))
if opts.coinc_triggers is not None:
cp.set('input','coinc-inspiral-file',os.path.abspath(opts.coinc_triggers))
#if opts.lvalert is not None:
# cp.set('input','lvalert-file',os.path.abspath(opts.lvalert))
#if opts.lvalert is not None:
# cp.set('input','lvalert-file',os.path.abspath(opts.lvalert))
if opts.gid is not None:
cp.set('input','gid',opts.gid)
if opts.gid is not None:
cp.set('input','gid',opts.gid)
if opts.pipedown_db is not None:
cp.set('input','pipedown-db',os.path.abspath(opts.pipedown_db))
if opts.pipedown_db is not None:
cp.set('input','pipedown-db',os.path.abspath(opts.pipedown_db))
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp,dax=opts.dax,site=opts.grid_site)
if((opts.dax) and not cp.has_option('lalinference','fake-cache')):
# Create a text file with the frames listed
pfnfile = dag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
else:
peg_frame_cache = '/dev/null'
# A directory to store the DAX temporary files
import uuid
execdir=os.path.join(local_work_dir,'lalinference_pegasus_'+str(uuid.uuid1()))
olddir=os.getcwd()
os.chdir(cp.get('paths','basedir'))
if opts.grid_site is not None:
site='local,'+opts.grid_site
else:
site=None
# Create the DAX scripts
if opts.dax:
dag.prepare_dax(tmp_exec_dir=execdir,grid_site=site,peg_frame_cache=peg_frame_cache)
# Ugly hack to replace pegasus.transfer.links=true in the pegasus.properties files created by pipeline.py
# Turns off the creation of links for files on the local file system. We use pegasus.transfer.links=false
# to make sure we have a copy of the data in the runing directory (useful when the data comes from temporary
# low latency buffer).
if cp.has_option('analysis','pegasus.transfer.links'):
if cp.get('analysis','pegasus.transfer.links')=='false':
lines=[]
with open('pegasus.properties') as fin:
for line in fin:
line = line.replace('pegasus.transfer.links=true', 'pegasus.transfer.links=false')
lines.append(line)
with open('pegasus.properties','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('analysis','accounting_group'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group">'+cp.get('analysis','accounting_group')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('analysis','accounting_group_user'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group_user">'+cp.get('analysis','accounting_group_user')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
dag.write_sub_files()
dag.write_dag()
dag.write_script()
os.chdir(olddir)
# Tell user about output, and submit it if requested
print 'Successfully created DAG file.'
fulldagpath=os.path.join(cp.get('paths','basedir'),dag.get_dag_file())
if not opts.dax:
print 'Now run condor_submit_dag %s\n'%(fulldagpath)
if opts.condor_submit:
import subprocess
from subprocess import Popen
x = subprocess.Popen(['condor_submit_dag',fulldagpath])
x.wait()
if x.returncode==0:
print 'Submitted DAG file'
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp,dax=opts.dax,site=opts.grid_site)
if((opts.dax) and not cp.has_option('lalinference','fake-cache')):
# Create a text file with the frames listed
pfnfile = dag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
else:
print 'Unable to submit DAG file'
if opts.pegasus_submit:
import subprocess
from subprocess import Popen
os.chdir(os.path.abspath(cp.get('paths','basedir')))
x = subprocess.Popen('./pegasus_submit_dax')
x.wait()
if x.returncode==0:
print 'Submitted DAX file'
peg_frame_cache = '/dev/null'
# A directory to store the DAX temporary files
import uuid
execdir=os.path.join(local_work_dir,'lalinference_pegasus_'+str(uuid.uuid1()))
olddir=os.getcwd()
os.chdir(cp.get('paths','basedir'))
if opts.grid_site is not None:
site='local,'+opts.grid_site
else:
print 'Unable to submit DAX file'
site=None
# Create the DAX scripts
if opts.dax:
dag.prepare_dax(tmp_exec_dir=execdir,grid_site=site,peg_frame_cache=peg_frame_cache)
# Ugly hack to replace pegasus.transfer.links=true in the pegasus.properties files created by pipeline.py
# Turns off the creation of links for files on the local file system. We use pegasus.transfer.links=false
# to make sure we have a copy of the data in the runing directory (useful when the data comes from temporary
# low latency buffer).
if cp.has_option('analysis','pegasus.transfer.links'):
if cp.get('analysis','pegasus.transfer.links')=='false':
lines=[]
with open('pegasus.properties') as fin:
for line in fin:
line = line.replace('pegasus.transfer.links=true', 'pegasus.transfer.links=false')
lines.append(line)
with open('pegasus.properties','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('analysis','accounting_group'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group">'+cp.get('analysis','accounting_group')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('analysis','accounting_group_user'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group_user">'+cp.get('analysis','accounting_group_user')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
if single_sampler is False or single_approx is False:
full_dag_path=os.path.join(cp.get('paths','basedir'),dag.get_dag_file())
dagjob=pipeline.CondorDAGManJob(full_dag_path,dir=rundir_root)
dagnode=pipeline.CondorDAGManNode(dagjob)
outerdag.add_node(dagnode)
dag.write_sub_files()
dag.write_dag()
dag.write_script()
os.chdir(olddir)
# Tell user about output, and submit it if requested
print 'Successfully created DAG file.'
fulldagpath=os.path.join(cp.get('paths','basedir'),dag.get_dag_file())
if not opts.dax and not (single_sampler is False or single_approx is False):
print 'Now run condor_submit_dag %s\n'%(fulldagpath)
if opts.condor_submit and not (single_sampler is False or single_approx is False):
import subprocess
from subprocess import Popen
x = subprocess.Popen(['condor_submit_dag',fulldagpath])
x.wait()
if x.returncode==0:
print 'Submitted DAG file'
else:
print 'Unable to submit DAG file'
if opts.pegasus_submit and not (single_sampler is False or single_approx is False):
import subprocess
from subprocess import Popen
os.chdir(os.path.abspath(cp.get('paths','basedir')))
x = subprocess.Popen('./pegasus_submit_dax')
x.wait()
if x.returncode==0:
print 'Submitted DAX file'
else:
print 'Unable to submit DAX file'
fp.close()
if single_sampler is False or single_approx is False:
if(opts.dax):
# Create a text file with the frames listed
pfnfile = outerdag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
outerdag.write_sub_files()
outerdag.write_dag()
outerdag.write_script()
# End of program
print 'Successfully created DAG file.'
print 'Now run condor_submit_dag %s\n'%(outerdag.get_dag_file())
......@@ -23,6 +23,8 @@ ifos=['H1','L1','V1']
#accounting_group=ligo.dev.o1.cbc.pe.lalinference
# select the engine to use, lalinferencenest, lalinferencemcmc or lalinferencebambimpi
# A comma separated list can be provided. In that case a dag for each sampler will be created, along with a top level dag to run them all, e.g.
# engine=lalinferencenest,lalinferencemcmc
engine=lalinferencenest
# Number of parallel runs for each event. Using parallel runs increases the number of sample, giving a better exploration of the parameter space.
......@@ -199,10 +201,11 @@ resume=
# lalinference_bambi automatically resumes, use this if you want to force a start from scratch
#noresume=
# approx can be used to manually specify an approximant
# If this is not given, the code will use whatever was injected in the case of a software injection
# Or TaylorF2threePointFivePN if no injection was given.
#approx=TaylorF2threePointFivePN
# approx is used to specify an approximant, it can either be a single approximant or a comma separated list e.g.
# approx=TaylorF2threePointFivePN,IMRPhenomPv2
# If a list is given, a dag for each approximant will be created and an upper level dag will control them
# All the dirs (including webdir) will be given an appropriate suffix
approx=TaylorF2threePointFivePN
# Control the amplitude order (default: max available)
#amporder=0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment