Commit 75fb4aa7 authored by John Douglas Veitch's avatar John Douglas Veitch

Nearly finished flow

Original: e794ce6b9dd46506b978b4f0b7f66e2459a345f5
parent 429ab68f
......@@ -13,7 +13,22 @@ import pdb
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO']
class Event():
"""
Represents a unique event to run on
"""
def __init__(self,trig_time=None,SimInspiral=None,SnglInspiral=None,event_id=None,timeslide_dict=None):
self.trigtime=trig_time
self.injection=SimInspiral
self.sngltrigger=SnglInspiral
self.timeslides=timeslide_dict
if event_id is not None:
self.event_id=event_id
else:
self.event_id=id(self)
dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
def mkdirs(path):
"""
......@@ -93,7 +108,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.datafind_job.set_sub_file(os.path.join(self.basepath,'datafind.sub'))
self.engine_job = EngineJob(self.config, os.path.join(self.basepath,'lalinference.sub'),self.logpath)
self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
self.merge_job = MergeNSJob(self.config,os.path.join(self,basepath,'merge_runs.sub',self.logpath)
# Process the input to build list of analyses to do
self.times=[]
......@@ -140,7 +155,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
def get_required_data(self,times):
"""
Calculate teh data that will be needed to process all events
Calculate the data that will be needed to process all events
"""
psdlength=self.config.getint('input','max-psd-length')
padding=self.config.getint('input','padding')
......@@ -155,12 +170,35 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
for time in self.times:
self.add_full_analysis_time(time)
def add_full_analysis_lalinferencenest(self,event):
"""
Generate an end-to-end analysis of a given event (Event class)
For LALinferenceNest code. Uses parallel runs if specified
"""
evstring=str(event.event_id)
if event.trig_time is not None:
evstring=evstring+'-'+str(event.gps_time)
Npar=self.config.get('analysis','nparallel')
# Set up the parallel engine nodes
enginenodes=[]
for i in range(Npar):
enginenodes.append(self.add_engine_node(event))
# Merge the results together
mergenode=MergeNSNode(self.merge_job)
map(mergenode.add_engine_parent, enginenodes)
mergedir=os.path.join(self.basepath,'nested_samples')
mergenode.set_output_file(os.path.join(mergedir,'outfile_%s.dat'%evstring))
self.add_node(mergenode)
def add_full_analysis_time(self,gpstime):
"""
Analyse a given GPS time
"""
ev=Event(trig_time=gpstime)
# datafindnode=self.get_datafind_node(gpstime)
enginenode=self.add_engine_node(gpstime)
enginenode=self.add_engine_node(ev)
ifos=reduce(lambda a,b:a+b,enginenode.ifos)
pagedir=os.path.join(self.basepath,str(gpstime)+'-'+'%x'%(id(enginenode)),ifos)
mkdirs(pagedir)
......@@ -169,6 +207,10 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
def add_science_segments(self):
# Query the segment database for science segments and
# add them to the pool of segments
segmentdir=os.path.join(self.basepath,'segments')
mkdirs(segmentdir)
curdir=os.getcwd()
os.chdir(segmentdir)
for ifo in self.ifos:
(segFileName,dqVetoes)=inspiralutils.findSegmentsToAnalyze(self.config, ifo, self.veto_categories, generate_segments=True,\
use_available_data=self.use_available_data , data_quality_vetoes=False)
......@@ -182,6 +224,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
df_node=self.get_datafind_node(ifo,self.frtypes[ifo],int(sciseg.start()),int(sciseg.end()))
sciseg.set_df_node(df_node)
self.segments[ifo].append(sciseg)
os.chdir(curdir)
def get_datafind_node(self,ifo,frtype,gpsstart,gpsend):
node=pipeline.LSCDataFindNode(self.datafind_job)
......@@ -191,9 +234,14 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.set_end(gpsend)
#self.add_node(node)
return node
def add_engine_node(self,end_time,extra_options=None):
def add_engine_node(self,event,ifos=self.ifos,extra_options=None):
"""
Add an engine node to the dag. Will find the appropriate cache files automatically.
Will determine the data to be read and the output file.
"""
node=self.EngineNode(self.engine_job)
end_time=event.trig_time
node.set_trig_time(end_time)
for ifo in self.ifos:
for seg in self.segments[ifo]:
......@@ -219,14 +267,16 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.set_max_psdlength(self.config.getint('input','max-psd-length'))
out_dir=os.path.join(self.basepath,'engine')
mkdirs(out_dir)
node.set_output_file(os.path.join(out_dir,node.engine+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+'%x'%(id(node))))
node.set_output_file(os.path.join(out_dir,node.engine+'-'+str(event.event_id)+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+'%x'%(id(node))))
if event.injection is not None:
node.set_injection(event.injections,event.event_id)
return node
def add_results_page_node(self,outdir=None,parent=None,extra_options=None):
node=ResultsPageNode(self.results_page_job)
if parent is not None:
node.add_parent(parent)
infiles=parent.get_output_files()
infiles=parent.get_pos_file()
for infile in infiles:
node.add_var_arg(infile)
node.set_output_dir(outdir)
......@@ -290,8 +340,15 @@ class EngineNode(pipeline.CondorDAGNode):
if event is not None:
self.__event=int(event)
self.add_var_opt('event',str(event))
get_trig_time = lambda self: self.__trigtime
def set_injection(self,injfile,event):
"""
Set a software injection to be performed.
"""
self.add_var_opt('inj',injfile)
self.set_event_number(event)
def get_trig_time(self): return self.__trigtime
def add_ifo_data(self,ifo,sciseg,channelname,timeslide=0):
self.ifos.append(ifo)
......@@ -306,7 +363,7 @@ class EngineNode(pipeline.CondorDAGNode):
def _finalize_ifo_data(self):
"""
Add list of IFOs and data to analyse to command line arguments.
Add final list of IFOs and data to analyse to command line arguments.
"""
ifostring='['
cachestring='['
......@@ -369,10 +426,17 @@ class LALInferenceNestNode(EngineNode):
self.outfilearg='outfile'
def set_output_file(self,filename):
self.add_file_opt(self.outfilearg,filename+'.dat',file_is_output_file=True)
self.nsfile=filename+'.dat'
self.add_file_opt(self.outfilearg,self.nsfile,file_is_output_file=True)
self.paramsfile=filename+'_params.txt'
self.Bfilename=filename+'_B.txt'
def get_B_file(self):
return self.Bfilename
def get_ns_file(self):
return self.nsfile
class LALInferenceMCMCNode(EngineNode):
def __init__(self,li_job):
EngineNode.__init__(self,li_job)
......@@ -380,8 +444,12 @@ class LALInferenceMCMCNode(EngineNode):
self.outfilearg='outfile'
def set_output_file(self,filename):
self.posfile=filename+'.00'
self.add_file_opt(self.outfilearg,filename)
self.add_output_file(filename+'.00')
self.add_output_file(self.posfile)
def get_pos_file(self):
return self.posfile
class ResultsPageJob(pipeline.CondorDAGJob):
def __init__(self,cp,submitFile,logdir):
......@@ -426,4 +494,150 @@ class ResultsPageNode(pipeline.CondorDAGNode):
def set_output_dir(self,dir):
self.add_var_opt('outpath',dir)
mkdirs(dir)
class CoherenceTestJob(pipeline.CondorDAGJob):
"""
Class defining the coherence test job to be run as part of a pipeline.
"""
def __init__(self,cp,submitFile,logdir):
exe=cp.get('condor','coherencetest')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
self.add_opt('coherent-incoherent-noise','')
self.add_condor_cmd('getenv','True')
self.set_stdout_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).err'))
self.set_sub_file(submitFile)
class CoherenceTestNode(pipeline.CondorDAGNode):
"""
Class defining the node for the coherence test
"""
def __init__(self,coherencetest_job,outfile=None):
pipeline.CondorDAGNode.__init__(self,coherencetest_job)
self.incoherent_parents=[]
self.coherent_parent=None
if outfile is not None:
self.set_output_file(outfile)
def add_coherent_parent(self,node):
"""
Add a parent node which is an engine node, and process its outputfiles
"""
self.coherent_parent=node
self.add_parent(node)
def add_incoherent_parent(self,node):
"""
Add a parent node which provides one of the single-ifo evidence values
"""
self.incoherent_parents.append(node)
self.add_parent(node)
def finalize(self):
"""
Construct command line
"""
self.add_file_arg(self.coherent_parent.get_B_file())
for inco in self.incoherent_parents:
self.add_file_arg(inco.get_B_file())
def set_output_file(self,file):
"""
Set the output file
"""
self.add_file_opt('outsamp',file,file_is_output_file=True)
self.nsfile=file
def set_pos_output_file(self,file):
"""
Set the posterior output file
"""
self.add_file_opt('outpos',file,file_is_output_file=True)
self.posfile=file
def get_pos_file(self): return self.posfile
def get_ns_file(self): return self.nsfile
class CombineZJob(pipeline.CondorDAGJob):
"""
Class defining a combineZ script job to be run as part of a pipeline
This job combine runs with adjacent prior areas and produces the posterior samples
Input Arguments:
cp - A ConfigParser object containing the combinez section
submitFile - Path to store the submit file
logdir - A directory to hold the stderr, stdout files of combineZ
"""
def __init__(self,cp,submitFile,logdir):
exe=cp.get('condor','combinez')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
self.add_opt('Nlive',str(int(cp.get('lalinferencenest','nlive'))*int(cp.get('analysis','nparallel'))))
self.set_stdout_file(os.path.join(logdir,'combineZ-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'combineZ-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.set_sub_file(submitFile)
class CombineZNode(pipeline.CondorDAGNode):
"""
Class defining a Condor DAG Node for combineZ jobs
Input Arguments:
combine_job - A CombineZJob object
"""
def __init__(self,combine_job):
pipeline.CondorDAGNode.__init__(self,combine_job)
def add_engine_parent(self,node):
self.add_parent(node)
self.add_file_arg(node.get_ns_file())
def set_output_file(self,file):
self.add_file_opt('outsamp',file,file_is_output_file=True)
self.nsfile=file
def set_pos_output_file(self,file):
self.add_file_opt('outpos',file,file_is_output_file=True)
self.posfile=file
def get_pos_file(self): return self.posfile
def get_ns_file(self): return self.nsfile
class MergeNSJob(pipeline.CondorDAGJob):
"""
Class defining a job which merges several parallel nested sampling jobs into a single file
Input arguments:
cp - A configparser object containing the setup of the analysis
submitFile - Path to store the submit file
logdir - A directory to hold the stderr, stdout files of the merge runs
"""
def __init__(self,cp,submitFile,logdir):
exe=cp.get('condor','mergescript')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
self.set_sub_file(submitFile)
self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
self.add_opt('Nlive',cp.get('lalinferencenest','nlive'))
self.add_condor_cmd('getenv','True')
class MergeNSNode(pipeline.CondorDAGNode):
"""
Class defining the DAG node for a merge job
Input arguments:
merge_job = A MergeJob object
parents = iterable of parent nodes (must have get_ns_file() method)
"""
def __init__(self,merge_job,parents=None):
pipeline.CondorDAGNode.__init__(self,merge_job)
if parents is not None:
for parent in parents:
self.add_engine_parent(parent)
def add_engine_parent(self,parent):
self.add_parent(node)
self.add_file_arg(node.get_ns_file())
def set_output_file(self,file):
self.add_file_opt('out',file,file_is_output_file=True)
self.nsfile=file
def set_pos_output_file(self,file):
self.add_file_opt('posterior',file,file_is_output_file=True)
self.posfile=file
def get_pos_file(self): return self.posfile
def get_ns_file(self): return self.nsfile
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment