Commit 56278f5a authored by John Douglas Veitch's avatar John Douglas Veitch
Browse files

Add GRACEDB logging dag node

Original: 0f1a8adb16306eed2ad06bef7cb7be035cc5e3d6
parent 321cb0f7
......@@ -27,12 +27,11 @@ parser.add_option("-g","--gps-time-file",action="store",type="string",default=No
parser.add_option("-t","--single-triggers",action="store",type="string",default=None,help="SnglInspiralTable trigger list",metavar="SNGL_FILE.xml")
parser.add_option("-C","--coinc-triggers",action="store",type="string",default=None,help="CoinInspiralTable trigger list",metavar="COINC_FILE.xml")
parser.add_option("-L","--lvalert",action="store",type="string",default=None,help="LVAlert coinc file",metavar="coinc_G0000.xml")
parser.add_option("--gid",action="store",type="string",default=None,help="Optional GraceDB ID for submitting results")
parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
(opts,args)=parser.parse_args()
if len(args)!=1:
......@@ -69,6 +68,9 @@ if opts.coinc_triggers is not None:
if opts.lvalert is not None:
cp.set('input','lvalert-file',opts.lvalert)
if opts.gid is not None:
cp.set('input','gid',opts.gid)
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp)
dag.write_sub_files()
......
......@@ -48,7 +48,7 @@ class Event():
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
def readLValert(lvalertfile,SNRthreshold=0,GID=None):
def readLValert(lvalertfile,SNRthreshold=0,gid=None):
"""
Parse LV alert file, continaing coinc, sngl, coinc_event_map.
and create a list of Events as input for pipeline
......@@ -70,7 +70,7 @@ def readLValert(lvalertfile,SNRthreshold=0,GID=None):
these_sngls = [e for e in sngl_events if e.event_id in [c.event_id for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id] ]
dur = min([e.template_duration for e in these_sngls]) + 2 # Add 2s padding
srate = pow(2.0, ceil( log(max([e.f_final]), 2) ) ) # Round up to power of 2
ev=Event(CoincInspiral=coinc, GID=GID, ifos = ifos, duration = dur, srate = srate)
ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = dur, srate = srate)
if(coinc.snr>SNRthreshold): output.append(ev)
print "Found %d coinc events in table." % len(coinc_events)
......@@ -162,6 +162,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
self.merge_job = MergeNSJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath)
self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath)
self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath)
# Process the input to build list of analyses to do
self.events=self.setup_from_inputs()
self.times=[e.trig_time for e in self.events]
......@@ -237,11 +238,13 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
events=[Event(SnglInspiral=trig) for trig in trigTable]
if self.config.has_option('input','coinc-inspiral-file'):
from pylal import CoincInspiralUtils
coincTable = CoincInspiralUtils.ReadCoincInspiralFromFiles([self.config.get('input','coinc-inspiral-file')])
coincTable = CoincInspiralUtils.readCoincInspiralFromFiles([self.config.get('input','coinc-inspiral-file')])
events = [Event(CoincInspiral=coinc) for coinc in coincTable]
# LVAlert CoincInspiral Table
if self.config.has_option('input','gid'): gid=self.config.get('input','gid')
else: gid=None
if self.config.has_option('input','lvalert-file'):
events = readLValert(self.config.get('input','lvalert-file'))
events = readLValert(self.config.get('input','lvalert-file'),gid=gid)
# TODO: pipedown-database
# TODO: timeslides
return events
......@@ -271,6 +274,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_node(mergenode)
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode)
respagenode.set_bayes_coherent_noise(mergenode.get_ns_file()+'_B.txt')
if event.GID is not None:
self.add_gracedb_log_node(respagenode,event.GID)
if self.config.getboolean('analysis','coherence-test') and len(enginenodes[0].ifos)>1:
mkdirs(os.path.join(self.basepath,'coherence_test'))
par_mergenodes=[]
......@@ -291,7 +296,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.add_node(coherence_node)
respagenode.add_parent(coherence_node)
respagenode.set_bayes_coherent_incoherent(coherence_node.get_output_files()[0])
def add_full_analysis_lalinferencemcmc(self,event):
"""
Generate an end-to-end analysis of a given event
......@@ -309,6 +314,8 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
mkdirs(pagedir)
respagenode=self.add_results_page_node(outdir=pagedir)
map(respagenode.add_engine_parent, enginenodes)
if event.GID is not None:
self.add_gracedb_log_node(respagenode,event.GID)
def add_science_segments(self):
# Query the segment database for science segments and
......@@ -403,7 +410,13 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.add_parent(parent)
infile=parent.get_pos_file()
node.add_var_arg(infile)
node.set_output_dir(outdir)
node.set_output_path(outdir)
self.add_node(node)
return node
def add_gracedb_log_node(self,respagenode,gid):
node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid)
node.add_parent(respagenode)
self.add_node(node)
return node
......@@ -641,6 +654,8 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.set_output_path(path)
def set_output_path(self,path):
self.webpath=path
self.add_var_opt('outpath',path)
mkdirs(path)
self.posfile=os.path.join(path,'posterior_samples.dat')
def set_event_number(self,event):
"""
......@@ -659,9 +674,6 @@ class ResultsPageNode(pipeline.CondorDAGNode):
if isinstance(node,LALInferenceMCMCNode):
self.add_var_opt('lalinfmcmc','')
def set_output_dir(self,dir):
self.add_var_opt('outpath',dir)
mkdirs(dir)
def get_pos_file(self): return self.posfile
def set_bayes_coherent_incoherent(self,bcifile):
self.add_var_arg('--bci '+bcifile)
......@@ -818,13 +830,39 @@ class GraceDBJob(pipeline.CondorDAGJob):
self.set_stderr_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.baseurl=cp.get('paths','baseurl')
self.basepath=cp.get('paths','webdir')
class GraceDBNode(pipeline.CondorDAGNode):
"""
Run the gracedb executable to report the results
"""
def __init__(self,gracedb_job,pagepath,parents=None):
def __init__(self,gracedb_job,gid=None,parent=None):
pipeline.CondorDAGNode.__init__(self,gracedb_job)
self.resultsurl=os.path.join(gracedb_job.baseurl,pagepath)
# TODO: Add the command line arguments for gracedb to update the event
self.resultsurl=""
if gid: self.set_gid(gid)
if parent: self.set_parent_resultspage(parent,gid)
def set_page_path(self,path):
"""
Set the path to the results page, after self.baseurl.
"""
self.resultsurl=os.path.join(self.job().baseurl,path)
def set_gid(self,gid):
"""
Set the GraceDB ID to log to
"""
self.gid=gid
def set_parent_resultspage(self,respagenode,gid):
"""
Setup to log the results from the given parent results page node
"""
res=respagenode
self.set_page_path(res.webpath.replace(self.job().basepath,self.job().baseurl))
self.set_gid(gid)
def finalize(self):
self.add_var_arg('log')
self.add_var_arg(str(self.gid))
self.add_var_arg('parameter estimation finished. '+self.resultsurl)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment