Commit 52bb35f2 authored by Vivien Raymond's avatar Vivien Raymond

LALInference_pipe update: default nparallel, random engine and --pegasus-submit option

Original: 1c4d14584d71516bab0f68db9680fec039b9612d
parent e823bab9
......@@ -34,6 +34,7 @@ parser.add_option("--gid",action="store",type="string",default=None,help="GraceD
parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
parser.add_option("--pegasus-submit",action="store_true",default=False,help="Automatically submit the pegasus dax")
parser.add_option("-x", "--dax",action="store_true",default=False, help="Delete the ligo_data_find jobs and populate frame LFNs in the DAX")
parser.add_option("-G", "--grid-site",action="store",type="string",metavar="SITE", help="Specify remote site in conjunction with --dax option. e.g. --grid-site=creamce for Bologna cluster.\
Supported options are: creamce and local",default=None)
......@@ -130,3 +131,15 @@ if opts.condor_submit:
print 'Submitted DAG file'
else:
print 'Unable to submit DAG file'
if opts.pegasus_submit:
import subprocess
from subprocess import Popen
os.chdir(os.path.abspath(cp.get('paths','basedir')))
x = subprocess.Popen('./pegasus_submit_dax')
x.wait()
if x.returncode==0:
print 'Submitted DAX file'
else:
print 'Unable to submit DAX file'
......@@ -25,7 +25,8 @@ ifos=['H1','L1','V1']
# select the engine to use, lalinferencenest, lalinferencemcmc or lalinferencebambimpi
engine=lalinferencenest
# Number of parallel runs for each event. Using parallel runs increases the number of sample, giving a better exploration of the parameter space. Should be set to 2 (or 3) for lalinferencemcmc, 1 for lalinference_bambi.
# Number of parallel runs for each event. Using parallel runs increases the number of sample, giving a better exploration of the parameter space.
# Default 2 for lalinferencemcmc, 1 for lalinference_bambi and 4 for lalinferencenest.
nparallel=5
# Turn the coherence test?
......
......@@ -269,6 +269,20 @@ def chooseEngineNode(name):
return LALInferenceBAMBINode
return EngineNode
def get_engine_name(cp):
name=cp.get('analysis','engine')
if name=='random':
engine_list=['lalinferencenest','lalinferencemcmc']#,'lalinferencebambimpi']
if cp.has_option('input','gid'):
gid=cp.get('input','gid')
engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
else:
engine_number=random.randint(0,1)
return engine_list[engine_number]
else:
return name
def scan_timefile(timefile):
import re
p=re.compile('[\d.]+')
......@@ -380,7 +394,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
def __init__(self,cp,dax=False,site='local'):
self.subfiles=[]
self.config=cp
self.engine=cp.get('analysis','engine')
self.engine=get_engine_name(cp)
self.EngineNode=chooseEngineNode(self.engine)
self.site=site
if cp.has_option('paths','basedir'):
......@@ -703,7 +717,10 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
evstring=str(event.event_id)
if event.trig_time is not None:
evstring=str(event.trig_time)+'-'+str(event.event_id)
Npar=self.config.getint('analysis','nparallel')
if self.config.has_option('analysis','nparallel'):
Npar=self.config.getint('analysis','nparallel')
else:
Npar=4
# Set up the parallel engine nodes
enginenodes=[]
for i in range(Npar):
......@@ -791,7 +808,10 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
evstring=str(event.event_id)
if event.trig_time is not None:
evstring=str(event.trig_time)+'-'+str(event.event_id)
Npar=self.config.getint('analysis','nparallel')
if self.config.has_option('analysis','nparallel'):
Npar=self.config.getint('analysis','nparallel')
else:
Npar=2
enginenodes=[]
for i in range(Npar):
enginenodes.append(self.add_engine_node(event))
......@@ -1110,7 +1130,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,ispreengine=False,dax=False,site=None):
self.ispreengine=ispreengine
self.engine=cp.get('analysis','engine')
self.engine=get_engine_name(cp)
basepath=cp.get('paths','basedir')
if ispreengine is True:
roqpath=os.path.join(basepath,'ROQdata')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment