Verified Commit d6ee3804 authored by Soichiro Morisaki's avatar Soichiro Morisaki Committed by Adam Mercer
Browse files

lalinference_pipe_utils.py: reindentation

(cherry picked from commit 96d29392)
parent 9a9e0c1c
......@@ -849,9 +849,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
if self.config.has_option('condor','gdbinfo') and self.config.has_option('analysis','ugid') and self.config.getboolean('analysis','upload-to-gracedb'):
if self.config.has_option('gracedbinfo','server'):
gdb_srv=self.config.get('gracedbinfo','server')
gdb_srv=self.config.get('gracedbinfo','server')
else:
gdb_srv=None
gdb_srv=None
self.add_gracedb_info_node(None,event.GID,analysis='LIB',issky=True,server=gdb_srv)
......@@ -1215,7 +1215,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
enginenodes[0].set_snr_file()
if self.config.getboolean('analysis','coherence-test') and len(enginenodes[0].ifos)>1:
if self.site!='local':
zipfilename='postproc_'+evstring+'.tar.gz'
zipfilename='postproc_'+evstring+'.tar.gz'
else:
zipfilename=None
par_mergenodes=[]
......@@ -1227,7 +1227,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
cot_node,bwpsdnodes,bwpostnodes=self.add_engine_node(event,bwpsd=bwpsdnodes,bwpost=bwpostnodes,ifos=[ifo],co_test=True)
if cot_node is not None:
if i>0:
cot_node.add_var_arg('--dont-dump-extras')
cot_node.add_var_arg('--dont-dump-extras')
cotest_nodes.append(cot_node)
if len(cotest_nodes)==0:
return False
......@@ -1287,9 +1287,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if "summarypages" in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=True)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=True)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
else:
if self.site!='local':
zipfilename='postproc_'+evstring+'.tar.gz'
......@@ -1298,9 +1298,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
# Note: Disabled gzip_output for now. Possibly need it for future Pegasus use
if "summarypages" in self.config.get('condor','resultspage'):
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=False)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos,
evstring=evstring, coherence=False)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
else:
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
......@@ -1318,9 +1318,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('analysis','upload-to-gracedb'):
if self.config.has_option('gracedbinfo','server'):
gdb_srv=self.config.get('gracedbinfo','server')
gdb_srv=self.config.get('gracedbinfo','server')
else:
gdb_srv=None
gdb_srv=None
if self.config.getboolean('analysis','upload-to-gracedb') and event.GID is not None:
self.add_gracedb_start_node(event.GID,'LALInference',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
......@@ -1335,13 +1335,13 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_gracedb_start_node(ugid,'LIB',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
self.add_gracedb_log_node(respagenode,ugid,burst=True,server=gdb_srv)
if self.config.has_option('resultspage','email'):
emailto=self.config.get('resultspage','email')
emailto=self.config.get('resultspage','email')
else:
emailto=None
emailto=None
if self.config.has_option('gracedbinfo','server'):
gdb_srv=self.config.get('gracedbinfo','server')
gdb_srv=self.config.get('gracedbinfo','server')
else:
gdb_srv=None
gdb_srv=None
temp_node=self.add_gracedb_info_node(respagenode,ugid,analysis='LIB',email=emailto,server=gdb_srv)
if self.config.has_option('condor','ligo-skymap-plot') and self.config.has_option('condor','ligo-skymap-from-samples'):
......@@ -1424,9 +1424,9 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('analysis','upload-to-gracedb'):
if self.config.getboolean('analysis','upload-to-gracedb'):
if self.config.has_option('gracedbinfo','server'):
gdb_srv=self.config.get('gracedbinfo','server')
gdb_srv=self.config.get('gracedbinfo','server')
else:
gdb_srv=None
gdb_srv=None
self.add_gracedb_start_node(event.GID,'LALInference',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
self.add_gracedb_log_node(respagenode,event.GID,server=gdb_srv)
if self.config.has_option('condor','ligo-skymap-plot') and self.config.has_option('condor','ligo-skymap-from-samples'):
......@@ -1707,7 +1707,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
bayeswavepostnode[ifo].set_psdlength(bw_seglen)
bayeswavepostnode[ifo].set_srate(bw_srate)
bayeswavepost_fakecache = 'interp:'+bwPSDpath+ifo+'_fairdraw_asd.dat'
if event.timeslides.has_key(ifo):
slide=event.timeslides[ifo]
else:
......@@ -1719,7 +1719,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
#else:
#fakecachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
bayeswavepostnode[ifo].add_fake_ifo_data(ifo,seg,bayeswavepost_fakecache,self.channels[ifo],timeslide=slide)
if self.config.has_option('lalinference','flow'):
bayeswavepostnode[ifo].flows[ifo]=np.power(2,np.floor(np.log2(ast.literal_eval(self.config.get('lalinference','flow'))[ifo])))
bayeswavepostnode[ifo].set_seed(randomseed)
......@@ -1932,20 +1932,20 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.getboolean('analysis','add-lvem-tag'):
tag='pe,lvem'
if burst is False:
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag=tag)
node.set_filename(respagenode.webpath+'/corner/extrinsic.png')
self.add_node(node)
nodes.append(node)
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag=tag)
node.set_filename(respagenode.webpath+'/corner/extrinsic.png')
self.add_node(node)
nodes.append(node)
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
node.set_filename(respagenode.webpath+'/corner/intrinsic.png')
self.add_node(node)
nodes.append(node)
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
node.set_filename(respagenode.webpath+'/corner/intrinsic.png')
self.add_node(node)
nodes.append(node)
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
node.set_filename(respagenode.webpath+'/corner/sourceFrame.png')
self.add_node(node)
nodes.append(node)
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
node.set_filename(respagenode.webpath+'/corner/sourceFrame.png')
self.add_node(node)
nodes.append(node)
return nodes
......@@ -1983,50 +1983,50 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
def add_gracedb_info_node(self,respagenode,gid,analysis='LALInference',issky=False,prefix="LIB",email=None,server=None):
# if issky=True, this node will upload the FITS file into GDB. BCI and BSN will be used to decide which tags to use
# Otherwise, this node will upload information about parameters and bayes factors
if respagenode is not None:
samples=respagenode.posfile
hdf5samples=respagenode.get_in_files()[0] # This should only be called by LIB, which only uses one file
else:
# Try to find it
resnodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
for rs in resnodes:
if len(rs.ifos)>1:
respagenode=rs
# if issky=True, this node will upload the FITS file into GDB. BCI and BSN will be used to decide which tags to use
# Otherwise, this node will upload information about parameters and bayes factors
if respagenode is not None:
samples=respagenode.posfile
hdf5samples=respagenode.get_in_files()[0] # This should only be called by LIB, which only uses one file
if self.postruninfojob.isdefined is False:
return None
if issky is False:
node=PostRunInfoNode(self.postruninfojob,parent=respagenode,gid=gid,samples=hdf5samples,server=server)
if email is not None:
node.set_email(email)
else:
skynodes=filter(lambda x: isinstance(x,SkyMapNode) ,self.get_nodes())
for sk in skynodes:
skymap=sk.outdir+'/%s.fits'%prefix
message=' %s FITS sky map'%prefix
node=PostRunInfoNode(self.postruninfojob,parent=sk,gid=gid,samples=None,server=server)
node.set_skymap(skymap)
node.set_message(message)
bci=respagenode.get_bcifile()
if bci is not None:
node.set_bci(bci)
bsn=respagenode.get_bsnfile()
if bsn is not None:
node.set_bsn(bsn)
node.set_analysis(analysis)
node.finalize()
self.add_node(node)
return node
else:
# Try to find it
resnodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
for rs in resnodes:
if len(rs.ifos)>1:
respagenode=rs
samples=respagenode.posfile
hdf5samples=respagenode.get_in_files()[0] # This should only be called by LIB, which only uses one file
if self.postruninfojob.isdefined is False:
return None
if issky is False:
node=PostRunInfoNode(self.postruninfojob,parent=respagenode,gid=gid,samples=hdf5samples,server=server)
if email is not None:
node.set_email(email)
else:
skynodes=filter(lambda x: isinstance(x,SkyMapNode) ,self.get_nodes())
for sk in skynodes:
skymap=sk.outdir+'/%s.fits'%prefix
message=' %s FITS sky map'%prefix
node=PostRunInfoNode(self.postruninfojob,parent=sk,gid=gid,samples=None,server=server)
node.set_skymap(skymap)
node.set_message(message)
bci=respagenode.get_bcifile()
if bci is not None:
node.set_bci(bci)
bsn=respagenode.get_bsnfile()
if bsn is not None:
node.set_bsn(bsn)
node.set_analysis(analysis)
node.finalize()
self.add_node(node)
return node
def add_rom_weights_node(self,ifo,parent=None):
#try:
#node=self.computeroqweightsnodes[ifo]
#try:
#node=self.computeroqweightsnodes[ifo]
#except KeyError:
node=ROMNode(self.computeroqweights_job,ifo,parent.seglen,parent.flows[ifo])
self.computeroqweightsnode[ifo]=node
......@@ -2615,10 +2615,10 @@ class BayesWavePSDJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.ispreengine = False
self.add_condor_cmd('stream_output', True)
self.add_condor_cmd('stream_error', True)
self.add_condor_cmd('+CheckpointExitBySignal', False) #
self.add_condor_cmd('+CheckpointExitBySignal', False) #
self.add_condor_cmd('+CheckpointExitSignal', '"SIGTERM"')
self.add_condor_cmd('+CheckpointExitCode', 130)
self.add_condor_cmd('+SuccessCheckpointExitBySignal', False) #
self.add_condor_cmd('+SuccessCheckpointExitBySignal', False) #
self.add_condor_cmd('+SuccessCheckpointExitSignal', '"SIGTERM"')
self.add_condor_cmd('+SuccessCheckpointExitCode', 130)
self.add_condor_cmd('+WantFTOnCheckpoint', True)
......@@ -2717,7 +2717,7 @@ class PESummaryResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('request_memory','2000')
class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
......@@ -2766,9 +2766,9 @@ class PESummaryResultsPageNode(pipeline.CondorDAGNode):
self.webpath=path
option = self.determine_webdir_or_existing_webdir(path)
if option == "existing":
self.add_var_opt('existing_webdir',path)
self.add_var_opt('existing_webdir',path)
else:
self.add_var_opt('webdir',path)
self.add_var_opt('webdir',path)
def get_output_path(self):
return self.webpath
......@@ -2837,8 +2837,8 @@ class PESummaryResultsPageNode(pipeline.CondorDAGNode):
def set_ifos(self,ifos):
self.ifos=ifos
class ResultsPageNode(pipeline.CondorDAGNode):
def __init__(self,results_page_job,outpath=None):
......@@ -3428,22 +3428,22 @@ class PlotSkyMapNode(pipeline.CondorDAGNode):
super(PlotSkyMapNode,self).finalize()
class PostRunInfoJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
self.isdefined=True
if not cp.has_option('condor','gdbinfo'):
self.isdefined=False
return
exe=cp.get('condor','gdbinfo')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('analysis','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('analysis','accounting_group'))
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('RequestMemory','1000')
def __init__(self,cp,submitFile,logdir,dax=False):
self.isdefined=True
if not cp.has_option('condor','gdbinfo'):
self.isdefined=False
return
exe=cp.get('condor','gdbinfo')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax) # Job always runs locally
if cp.has_option('analysis','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('analysis','accounting_group'))
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('RequestMemory','1000')
class PostRunInfoNode(pipeline.CondorDAGNode):
def __init__(self,post_run_info_job,gid=None,parent=None,samples=None,server=None):
......@@ -3457,31 +3457,31 @@ class PostRunInfoNode(pipeline.CondorDAGNode):
self.set_gid(gid)
self.server=None
if server is not None:
self.set_server(server)
self.set_server(server)
def finalize(self):
self.add_var_opt('analysis',self.analysis)
self.__finalized=True
def set_parent(self,parentnode):
self.add_parent(parentnode)
self.add_parent(parentnode)
def set_samples(self,samples):
if samples is not None:
self.add_var_arg('--samples %s'%samples)
if samples is not None:
self.add_var_arg('--samples %s'%samples)
def set_skymap(self,skymap):
self.add_var_arg('--skymap %s'%skymap)
def set_message(self,message):
self.add_var_arg("--message '%s'"%message)
def set_email(self,email):
self.add_var_arg('--email %s'%email)
self.add_var_arg('--email %s'%email)
def set_gid(self,gid):
self.add_var_opt('gid',gid)
self.add_var_opt('gid',gid)
def set_bci(self,bci):
self.add_file_opt('bci',bci)
self.add_file_opt('bci',bci)
def set_bsn(self,bsn):
self.add_file_opt('bsn',bsn)
self.add_file_opt('bsn',bsn)
def set_analysis(self,analysis):
self.analysis=analysis
self.analysis=analysis
def set_server(self,server):
self.server=server
if server is not None:
self.add_var_arg('--server %s'%self.server)
self.server=server
if server is not None:
self.add_var_arg('--server %s'%self.server)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment