Commit 76952f1a authored by Carl-Johan Haster's avatar Carl-Johan Haster
Browse files

Add ThermoIntEvidence (combine HDF5 files) into lalinference pipeline, one mergescript

Original: ac90314706884c5b3fd7b84c12e6f29dfe2ea996
parent d947fdca
......@@ -317,6 +317,10 @@ INT4 init_ptmcmc(LALInferenceRunState *runState) {
/* Establish the random state across MPI threads */
init_mpi_randomstate(runState);
/* Add common fixed parameters to output */
LALInferenceAddPTMCMCMetaInfo(runState);
/* Give a new set of proposal args to each thread */
for (i=0; i<runState->nthreads; i++) {
thread = runState->threads[i];
......
......@@ -456,6 +456,8 @@ void record_likelihoods(LALInferenceThreadState *thread) {
LALInferenceAddVariable(thread->currentParams, "logprior", &(thread->currentPrior), LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_OUTPUT);
LALInferenceAddVariable(thread->currentParams, "logl", &(thread->currentLikelihood), LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_OUTPUT);
LALInferenceAddVariable(thread->currentParams, "deltalogl", &deltalogl, LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_OUTPUT);
LALInferenceAddVariable(thread->currentParams, "temperature", &(thread->temperature), LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_OUTPUT);
LALInferenceAddVariable(thread->currentParams, "nullLogL", &(thread->nullLikelihood), LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_OUTPUT);
LALInferenceIFOData *headIFO = thread->parent->data;
char name[256];
......@@ -1884,3 +1886,44 @@ void LALInferenceMCMCResumeRead(LALInferenceThreadState *thread, FILE *resumeFil
return;
}
void LALInferenceAddPTMCMCMetaInfo(LALInferenceRunState *runState) {
UINT4 nIFO = 0;
INT4 i, MPIsize, n_local_threads, ntemps, randomseed;
REAL8 seglen, epoch;
LALInferenceIFOData *ifodata1;
LALInferenceThreadState *thread;
MPI_Comm_size(MPI_COMM_WORLD, &MPIsize);
n_local_threads = runState->nthreads;
ntemps = MPIsize*n_local_threads;
randomseed = LALInferenceGetINT4Variable(runState->algorithmParams, "random_seed");
ifodata1 = runState->data;
while(ifodata1){
nIFO++;
ifodata1 = ifodata1->next;
}
REAL8 SampleRate = 4096.0; //default value of the sample rate from LALInferenceReadData()
if(LALInferenceGetProcParamVal(runState->commandLine, "--srate"))
SampleRate = atof(LALInferenceGetProcParamVal(runState->commandLine, "--srate")->value);
seglen = atof(LALInferenceGetProcParamVal(runState->commandLine,"--seglen")->value);
epoch = XLALGPSGetREAL8(&(runState->data->epoch));
for (i=0; i<runState->nthreads; i++) {
thread = runState->threads[i];
LALInferenceAddVariable(thread->currentParams, "nIFO", &nIFO, LALINFERENCE_UINT4_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "nLocalTemps", &n_local_threads, LALINFERENCE_UINT4_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "nTemps", &ntemps, LALINFERENCE_UINT4_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "randomSeed", &randomseed, LALINFERENCE_UINT4_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "sampleRate", &SampleRate, LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "segmentLength", &seglen, LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_FIXED);
LALInferenceAddVariable(thread->currentParams, "segmentStart", &epoch, LALINFERENCE_REAL8_t, LALINFERENCE_PARAM_FIXED);
}
}
......@@ -89,6 +89,7 @@ void LALInferenceNameOutputs(LALInferenceRunState *runState);
void LALInferenceCheckpointMCMC(LALInferenceRunState *runState);
void LALInferenceResumeMCMC(LALInferenceRunState *runState);
void LALInferenceReadMCMCCheckpoint(LALInferenceRunState *runState);
void LALInferenceAddPTMCMCMetaInfo(LALInferenceRunState *runState);
/** Reads final parameter values from the given output file, and
stores them in the current params to try to continue the run. */
......
......@@ -124,7 +124,9 @@ ignore-science-segments=True
# Replace with your own executable locations
datafind=/home/albert.einstein/bin/gw_data_find
mergescript=/home/albert.einstein/bin/lalapps_nest2pos
mergeNSscript=/home/albert.einstein/bin/lalapps_nest2pos
mergeMCMCscript=/home/albert.einstein/bin/cbcBayesMCMC2pos.py
combinePTMCMCh5script=/home/albert.einstein/bin/cbcBayesCombinePTMCMCh5s.py
resultspage=/home/albert.einstein/bin/cbcBayesPostProc.py
segfind=/home/albert.einstein/bin/ligolw_segment_query
ligolw_print=/home/albert.einstein/bin/ligolw_print
......
......@@ -682,8 +682,14 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
self.results_page_job.set_grid_site('local')
self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
self.cotest_results_page_job.set_grid_site('local')
self.merge_job = MergeNSJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax())
self.merge_job.set_grid_site('local')
if self.engine=='lalinferencemcmc':
self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
self.combine_job.set_grid_site('local')
self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
self.merge_job.set_grid_site('local')
else:
self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
self.merge_job.set_grid_site('local')
self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
self.coherence_test_job.set_grid_site('local')
self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
......@@ -1044,7 +1050,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
pagedir=os.path.join(self.webdir,evstring,myifos)
#pagedir=os.path.join(self.basepath,evstring,myifos)
mkdirs(pagedir)
mergenode=MergeNSNode(self.merge_job,parents=enginenodes)
mergenode=MergeNode(self.merge_job,parents=enginenodes,engine='nest')
mergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(myifos,evstring)))
self.add_node(mergenode)
# Call finalize to build final list of available data
......@@ -1064,7 +1070,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
mkdirs(os.path.join(self.basepath,'coherence_test'))
par_mergenodes=[]
for ifo in enginenodes[0].ifos:
co_merge_job = MergeNSJob(self.config,os.path.join(self.basepath,'merge_runs_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
co_merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs_%s.sub'%(ifo)),self.logpath,dax=self.is_dax(),engine='nest')
co_merge_job.set_grid_site('local')
cotest_nodes=[]
for i in range(Npar):
......@@ -1083,7 +1089,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
else:
co.set_psd_files()
co.set_snr_file()
pmergenode=MergeNSNode(co_merge_job,parents=cotest_nodes)
pmergenode=MergeNode(co_merge_job,parents=cotest_nodes,engine='nest')
pmergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(ifo,evstring)))
self.add_node(pmergenode)
par_mergenodes.append(pmergenode)
......@@ -1165,7 +1171,27 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
enginenodes[0].set_snr_file()
pagedir=os.path.join(self.webdir,evstring,myifos)
mkdirs(pagedir)
respagenode=self.add_results_page_node(outdir=pagedir,ifos=enginenodes[0].ifos)
combinenodes=[]
for i in range(Npar):
combinenodes.append(CombineMCMCNode(self.combine_job,parents=[enginenodes[i]]))
input_file = combinenodes[i].get_parent_posfile(enginenodes[i])
input_file_split_index = input_file.find('lalinferencemcmc-')
combinenodes[i].set_pos_output_file(input_file[:input_file_split_index]+'combine_'+input_file[input_file_split_index:])
combinenodes[i].add_var_arg(input_file)
number_of_mpi_jobs = self.config.getint('mpi','mpi_task_count')
for j in xrange(1,number_of_mpi_jobs):
combinenodes[i].add_var_arg(input_file+".%02d" % j)
self.add_node(combinenodes[i])
mergenode=MergeNode(self.merge_job,parents=combinenodes,engine='mcmc')
mergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(myifos,evstring)))
if self.config.has_option('resultspage','deltaLogP'):
mergenode.add_var_arg('--deltaLogP '+str(self.config.getfloat('resultspage','deltaLogP')))
if self.config.has_option('resultspage','downsample'):
mergenode.add_var_arg('--downsample '+str(self.config.getint('resultspage','downsample')))
if self.config.has_option('resultspage','fixedBurnin'):
mergenode.add_var_arg('--fixedBurnin '+str(self.config.getint('resultspage','fixedBurnin')))
self.add_node(mergenode)
respagenode=self.add_results_page_node(outdir=pagedir,parent=mergenode,gzip_output=None,ifos=enginenodes[0].ifos)
respagenode.set_psd_files(enginenodes[0].get_psd_files())
respagenode.set_snr_file(enginenodes[0].get_snr_file())
if os.path.exists(self.basepath+'/coinc.xml'):
......@@ -1174,7 +1200,6 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
if self.config.has_option('input','burst-injection-file') and event.event_id is not None:
respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
map(respagenode.add_engine_parent, enginenodes)
if event.GID is not None:
if self.config.has_option('analysis','upload-to-gracedb'):
if self.config.getboolean('analysis','upload-to-gracedb'):
......@@ -2218,8 +2243,10 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.__event=0
self.ifos=None
self.injfile=None
def set_gzip_output(self,path):
self.add_file_opt('archive',path,file_is_output_file=True)
def set_output_path(self,path):
self.webpath=path
#self.add_file_opt('outpath',path,file_is_output_file=True)
......@@ -2228,14 +2255,18 @@ class ResultsPageNode(pipeline.CondorDAGNode):
mkdirs(path)
self.posfile=os.path.join(path,'posterior_samples.dat')
self.add_output_file(self.posfile)
def get_output_path(self):
return self.webpath
def set_injection(self,injfile,eventnumber):
self.injfile=injfile
self.add_file_opt('inj',injfile)
self.set_event_number(eventnumber)
def get_injection(self):
return self.injfile
def set_event_number(self,event):
"""
Set the event number in the injection XML.
......@@ -2246,6 +2277,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
def get_event_number(self):
return self.__event
def set_psd_files(self,st):
if st is None:
return
......@@ -2272,12 +2304,16 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.add_file_arg(node.get_pos_file())
def get_pos_file(self): return self.posfile
def set_bayes_coherent_incoherent(self,bcifile):
self.add_file_opt('bci',bcifile)
def set_bayes_coherent_noise(self,bsnfile):
self.add_file_opt('bsn',bsnfile)
def set_header_file(self,headerfile):
self.add_file_opt('header',headerfile)
def set_ifos(self,ifos):
self.ifos=ifos
......@@ -2343,16 +2379,21 @@ class CoherenceTestNode(pipeline.CondorDAGNode):
for inco in self.incoherent_parents:
self.add_file_arg(inco.get_pos_file())
class MergeNSJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
class MergeJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining a job which merges several parallel nested sampling jobs into a single file
Class defining a job which merges several parallel nested sampling or MCMC jobs into a single file
Input arguments:
cp - A configparser object containing the setup of the analysis
submitFile - Path to store the submit file
logdir - A directory to hold the stderr, stdout files of the merge runs
dax - Is the job to be configured as a pegasus job, if so dax=True
engine - Set to either 'nest' or 'mcmc' for the appropriate behaviour
"""
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','mergescript')
def __init__(self,cp,submitFile,logdir,dax=False,engine='nest'):
if engine == 'mcmc':
exe=cp.get('condor','mergeMCMCscript')
else:
exe=cp.get('condor','mergeNSscript')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('analysis','accounting_group'):
......@@ -2373,33 +2414,88 @@ class MergeNSJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
if cp.has_option('merge','npos'):
if cp.has_option('merge','npos') and engine == 'nest':
self.add_opt('npos',cp.get('merge','npos'))
class MergeNSNode(pipeline.CondorDAGNode):
class MergeNode(pipeline.CondorDAGNode):
"""
Class defining the DAG node for a merge job
Class defining the DAG node for a NS merge job
Input arguments:
merge_job = A MergeJob object
parents = iterable of parent LALInferenceNest nodes (must have get_ns_file() method)
engine - Set to either 'nest' or 'mcmc' for the appropriate behaviour
"""
def __init__(self,merge_job,parents=None):
def __init__(self,merge_job,parents=None,engine='nest'):
pipeline.CondorDAGNode.__init__(self,merge_job)
if parents is not None:
for parent in parents:
self.add_engine_parent(parent)
if engine == 'nest':
self.add_engine_parent(parent)
else:
self.add_combine_parent(parent)
def add_engine_parent(self,parent):
self.add_parent(parent)
self.add_file_arg(parent.get_ns_file())
def add_combine_parent(self,parent):
self.add_parent(parent)
self.add_file_arg(parent.get_pos_file())
def set_pos_output_file(self,file):
self.add_file_opt('pos',file,file_is_output_file=True)
self.posfile=file
def get_pos_file(self): return self.posfile
class CombineMCMCJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining a job which combines several parallel MCMC chains into a single hdf5 file
Input arguments:
cp - A configparser object containing the setup of the analysis
submitFile - Path to store the submit file
logdir - A directory to hold the stderr, stdout files of the merge runs
"""
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','combinePTMCMCh5script')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
if cp.has_option('analysis','accounting_group'):
self.add_condor_cmd('accounting_group',cp.get('analysis','accounting_group'))
if cp.has_option('analysis','accounting_group_user'):
self.add_condor_cmd('accounting_group_user',cp.get('analysis','accounting_group_user'))
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'combine-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'combine-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
class CombineMCMCNode(pipeline.CondorDAGNode):
"""
Class defining the DAG node for a MCMC combine job
Input arguments:
combine_job = A CombineMCMCJob object
parents = iterable of parent LALInferenceMCMC nodes (must have get_ns_file() method)
"""
def __init__(self,combine_job,parents=None):
pipeline.CondorDAGNode.__init__(self,combine_job)
if parents is not None:
for parent in parents:
self.add_engine_parent(parent)
def add_engine_parent(self,parent):
self.add_parent(parent)
self.add_file_arg(parent.get_pos_file())
def get_parent_posfile(self,parent):
return parent.get_pos_file()
def set_pos_output_file(self,file):
self.add_file_opt('outfile',file,file_is_output_file=True)
self.posfile=file
def get_pos_file(self): return self.posfile
class GraceDBJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class for a gracedb job
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment