Commit 5d4230a4 authored by Vivien Raymond's avatar Vivien Raymond
Browse files

Added ROM in the pipeline, first try...

Original: 54648d12f714473e54760e191da5eb3d97bae506
parent 9c28d2e3
#!/usr/bin/env @PYTHONPROG@
# DAG generation code for running LALInference pipeline
# (C) 2012 John Veitch
# (C) 2012 John Veitch, Vivien Raymond
from lalapps import lalinference_pipe_utils as pipe_utils
import ConfigParser
......@@ -14,6 +14,7 @@ the config.ini file.
The user must specify either an injection file to analyse, with the --inj option,
a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
a GraceDB ID with the --gid option,
or an ASCII list of GPS times with the --gps-time-file option.
The user must also specify and ini file which will contain the main analysis config.
......
#flow DAG Class definitions for LALInference Pipeline
# (C) 2012 John Veitch, Kiersten Ruisard, Kan Wang
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
import itertools
import glue
......@@ -391,6 +391,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
else: self.veto_categories=[]
for ifo in self.ifos:
self.segments[ifo]=[]
self.romweightsnodes={}
self.dq={}
self.frtypes=ast.literal_eval(cp.get('datafind','types'))
self.channels=ast.literal_eval(cp.get('data','channels'))
......@@ -405,6 +406,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.datafind_job.set_universe('vanilla')
self.datafind_job.add_opt('url-type','file')
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,ispreengine=True)
self.romweights_job = ROMJob(self.config,os.path.join(self.basepath,'romweights.sub'),self.logpath)
self.engine_job = EngineJob(self.config, os.path.join(self.basepath,'lalinference.sub'),self.logpath)
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
self.merge_job = MergeNSJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath)
......@@ -740,14 +743,29 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
ifos=event.ifos
if ifos is None:
ifos=self.ifos
prenode={}
romweightsnode={}
for ifo in ifos:
prenode[ifo]=self.EngineNode(self.preengine_job)
node=self.EngineNode(self.engine_job)
end_time=event.trig_time
node.set_trig_time(end_time)
node.set_seed(random.randint(1,2**31))
if event.srate: node.set_srate(event.srate)
if event.trigSNR: node.set_trigSNR(event.trigSNR)
for ifo in ifos:
prenode[ifo].set_trig_time(end_time)
randomseed=random.randint(1,2**31)
node.set_seed(randomseed)
for ifo in ifos:
prenode[ifo].set_seed(randomseed)
if event.srate:
node.set_srate(event.srate)
for ifo in ifos:
prenode[ifo].set_srate(event.srate)
if event.trigSNR:
node.set_trigSNR(event.trigSNR)
if self.dataseed:
node.set_dataseed(self.dataseed+event.event_id)
for ifo in ifos:
prenode[ifo].set_dataseed(self.dataseed+event.event_id)
gotdata=0
for ifo in ifos:
if event.timeslides.has_key(ifo):
......@@ -756,7 +774,12 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
slide=0
for seg in self.segments[ifo]:
if end_time >= seg.start() and end_time < seg.end():
gotdata+=node.add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
prenode[ifo].add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
romweightsnode[ifo]=self.add_rom_weights_node(ifo,prenode[ifo])
if self.config.has_option('lalinference','roq'):
gotdata+=node.add_ifo_romweights(ifo,seg,self.channels[ifo],timeslide=slide,weightsnode=romweightsnode[ifo])
else:
gotdata+=node.add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
if self.config.has_option('lalinference','fake-cache'):
node.cachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
node.channels=ast.literal_eval(self.config.get('data','channels'))
......@@ -773,31 +796,44 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('input','gid'):
if os.path.isfile(os.path.join(self.basepath,'psd.xml.gz')):
psdpath=os.path.join(self.basepath,'psd.xml.gz')
node.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=None)
node.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=None)
for ifo in ifos:
prenode[ifo].psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=None)
if self.config.has_option('lalinference','flow'):
node.flows=ast.literal_eval(self.config.get('lalinference','flow'))
for ifo in ifos:
prenode[ifo].flows=node.flows[ifo]
if event.fhigh:
for ifo in ifos:
node.fhighs[ifo]=str(event.fhigh)
if self.config.has_option('lalinference','ER2-cache'):
node.cachefiles=ast.literal_eval(self.config.get('lalinference','ER2-cache'))
node.channels=ast.literal_eval(self.config.get('data','channels'))
node.psds=ast.literal_eval(self.config.get('lalinference','psds'))
for ifo in ifos:
node.add_input_file(os.path.join(self.basepath,node.cachefiles[ifo]))
node.cachefiles[ifo]=os.path.join(self.basepath,node.cachefiles[ifo])
node.add_input_file(os.path.join(self.basepath,node.psds[ifo]))
node.psds[ifo]=os.path.join(self.basepath,node.psds[ifo])
if len(ifos)==0: node.ifos=node.cachefiles.keys()
else: node.ifos=ifos
node.timeslides=dict([ (ifo,0) for ifo in node.ifos])
gotdata=1
else:
# Add the nodes it depends on
for seg in node.scisegs.values():
dfnode=seg.get_df_node()
if dfnode is not None and dfnode not in self.get_nodes():
self.add_node(dfnode)
prenode[ifo].fhighs[ifo]=str(event.fhigh)
for ifo in ifos:
prenode[ifo].set_max_psdlength(self.config.getint('input','max-psd-length'))
prenode[ifo].set_padding(self.config.getint('input','padding'))
#prenode[ifo].set_output_file('/dev/null')
prenode[ifo].add_var_arg('--Niter 1')
prenode[ifo].add_var_arg('--data-dump')
if self.config.has_option('lalinference','seglen'):
prenode[ifo].set_seglen(self.config.getint('lalinference','seglen'))
elif self.config.has_option('engine','seglen'):
prenode[ifo].set_seglen(self.config.getint('engine','seglen'))
else:
prenode[ifo].set_seglen(event.duration)
# Add the nodes it depends on
for seg in node.scisegs.values():
dfnode=seg.get_df_node()
if dfnode is not None and dfnode not in self.get_nodes():
self.add_node(dfnode)
if self.config.has_option('lalinference','roq'):
ifo_index=str(dfnode.get_observatory())+'1'
self.add_node(prenode[ifo_index])
self.add_node(romweightsnode[ifo_index])
prenode[ifo_index].add_output_file(os.path.join(self.basepath,ifo_index+'-freqDataWithInjection.dat'))
prenode[ifo_index].add_output_file(os.path.join(self.basepath,ifo_index+'-PDS.dat'))
romweightsnode[ifo_index].add_var_arg('-d '+os.path.join(self.basepath,ifo_index+'-freqDataWithInjection.dat'))
romweightsnode[ifo_index].add_input_file(os.path.join(self.basepath,ifo_index+'-freqDataWithInjection.dat'))
romweightsnode[ifo_index].add_var_arg('-p '+os.path.join(self.basepath,ifo_index+'-PSD.dat'))
romweightsnode[ifo_index].add_input_file(os.path.join(self.basepath,ifo_index+'-PSD.dat'))
if gotdata:
self.add_node(node)
else:
......@@ -805,7 +841,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
return None
if extra_options is not None:
for opt in extra_options.keys():
node.add_var_arg('--'+opt+' '+extra_options[opt])
node.add_var_arg('--'+opt+' '+extra_options[opt])
# Add control options
if self.config.has_option('input','injection-file'):
node.set_injection(self.config.get('input','injection-file'),event.event_id)
......@@ -824,6 +860,20 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
out_dir=os.path.join(self.basepath,'engine')
mkdirs(out_dir)
node.set_output_file(os.path.join(out_dir,node.engine+'-'+str(event.event_id)+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+str(node.id)))
if self.config.has_option('lalinference','roq'):
for ifo in ifos:
node.add_var_arg('--'+ifo+'-roqweights '+os.path.join(self.basepath,'weights_'+ifo+'.dat'))
node.add_input_file(os.path.join(self.basepath,'weights_'+ifo+'.dat'))
node.add_var_arg('--roqtime_steps '+os.path.join(self.basepath,'Num_tc_sub_domains.dat'))
node.add_input_file(os.path.join(self.basepath,'Num_tc_sub_domains.dat'))
if self.config.has_option('paths','rom_nodes'):
nodes_path=self.config.get('paths','rom_nodes')
node.add_var_arg('--roqnodes '+nodes_path)
node.add_input_file(nodes_path)
else:
print 'No nodes specified for ROM likelihood'
return None
return node
def add_results_page_node(self,outdir=None,parent=None,extra_options=None):
......@@ -842,27 +892,45 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_node(node)
return node
def add_rom_weights_node(self,ifo,parent=None):
try:
node=self.romweightsnodes[ifo]
except KeyError:
node=ROMNode(self.romweights_job,ifo)
self.romweightsnodes[ifo]=node
if parent is not None:
node.add_parent(parent)
#self.add_node(node)
return node
class EngineJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
def __init__(self,cp,submitFile,logdir,ispreengine=False):
self.ispreengine=ispreengine
self.engine=cp.get('analysis','engine')
basepath=cp.get('paths','basedir')
snrpath=os.path.join(basepath,'SNR')
self.snrpath=snrpath
mkdirs(snrpath)
if self.engine=='lalinferencemcmc':
exe=cp.get('condor','mpirun')
self.binary=cp.get('condor',self.engine)
#universe="parallel"
universe="vanilla"
self.write_sub_file=self.__write_sub_file_mcmc_mpi
elif self.engine=='lalinferencebambimpi':
exe=cp.get('condor','mpirun')
self.binary=cp.get('condor','lalinferencebambi')
universe="vanilla"
self.write_sub_file=self.__write_sub_file_mcmc_mpi
if ispreengine is False:
if self.engine=='lalinferencemcmc':
exe=cp.get('condor','mpirun')
self.binary=cp.get('condor',self.engine)
#universe="parallel"
universe="vanilla"
self.write_sub_file=self.__write_sub_file_mcmc_mpi
elif self.engine=='lalinferencebambimpi':
exe=cp.get('condor','mpirun')
self.binary=cp.get('condor','lalinferencebambi')
universe="vanilla"
self.write_sub_file=self.__write_sub_file_mcmc_mpi
else:
exe=cp.get('condor',self.engine)
universe="standard"
else:
self.engine=='lalinferencemcmc'
exe=cp.get('condor',self.engine)
universe="standard"
universe="vanilla"
pipeline.CondorDAGJob.__init__(self,universe,exe)
# Set the options which are always used
self.set_sub_file(submitFile)
......@@ -886,9 +954,11 @@ class EngineJob(pipeline.CondorDAGJob):
self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
self.add_condor_cmd('Requirements','(TARGET.'+cp.get('condor','queue')+' =?= True)')
if cp.has_section(self.engine):
self.add_ini_opts(cp,self.engine)
if ispreengine is False:
self.add_ini_opts(cp,self.engine)
if cp.has_section('engine'):
self.add_ini_opts(cp,'engine')
if ispreengine is False:
self.add_ini_opts(cp,'engine')
self.add_opt('snrpath',snrpath)
self.set_stdout_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).out'))
self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).err'))
......@@ -932,7 +1002,8 @@ class EngineNode(pipeline.CondorDAGNode):
self.maxlength=None
self.psdstart=None
self.cachefiles={}
self.id=EngineNode.new_id()
if li_job.ispreengine is False:
self.id=EngineNode.new_id()
self.__finaldata=False
self.snrpath=None
......@@ -1007,7 +1078,20 @@ class EngineNode(pipeline.CondorDAGNode):
self.channels[ifo]=channelname
return 1
else: return 0
def add_ifo_romweights(self,ifo,sciseg,channelname,timeslide=0,weightsnode=None):
self.ifos.append(ifo)
self.scisegs[ifo]=sciseg
df_node=sciseg.get_df_node()
if df_node is not None and weightsnode is not None:
self.add_parent(weightsnode)
self.cachefiles[ifo]=df_node.get_output_files()[0]
self.add_input_file(self.cachefiles[ifo])
self.timeslides[ifo]=timeslide
self.channels[ifo]=channelname
return 1
else: return 0
def finalize(self):
if not self.__finaldata:
self._finalize_ifo_data()
......@@ -1359,3 +1443,35 @@ class GraceDBNode(pipeline.CondorDAGNode):
#self.add_var_arg('"Parameter estimation finished. <a href=\"'+self.resultsurl+'/posplots.html\">'+self.resultsurl+'/posplots.html</a>"')
self.add_var_arg('Parameter estimation finished. '+self.resultsurl+'/posplots.html')
self.__finalized=True
class ROMJob(pipeline.CondorDAGJob):
"""
Class for a ROM compute weights job
"""
def __init__(self,cp,submitFile,logdir):
exe=cp.get('condor','romweights')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'romweights-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'romweights-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.add_arg('-B '+str(cp.get('paths','rom_basis')))
self.add_arg('-V '+str(cp.get('paths','rom_invV')))
self.add_arg('-t 0.1')
self.add_arg('-s 32')
self.add_arg('-f 40')
self.add_arg('-T 0.0001')
class ROMNode(pipeline.CondorDAGNode):
"""
Run the ROM compute weights script
"""
def __init__(self,romweights_job,ifo):
pipeline.CondorDAGNode.__init__(self,romweights_job)
self.__finalized=False
self.add_var_arg('-i '+ifo)
def finalize(self):
if self.__finalized:
return
self.__finalized=True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment