Commit ad2da6d5 authored by Michalis Agathos's avatar Michalis Agathos
Browse files

Added TIGER pipeline infrastructure in:

 - lalinference
 - lalapps pipelines
 - lalapps inspinj
Original: abd5da0b8946bb05454b929dd986a64ae0590dfb
parent 6a3252e2
......@@ -420,6 +420,7 @@ extern int vrbflg;
ProcessParamsTable *next_process_param( const char *name, const char *type,
const char *fmt, ... );
void read_mass_data( char *filename );
void read_time_data( char *filename );
void read_nr_data( char* filename );
void read_source_data( char* filename );
void sourceComplete(void);
......@@ -475,6 +476,7 @@ char *massFileName = NULL;
char *nrFileName = NULL;
char *sourceFileName = NULL;
char *outputFileName = NULL;
char *injtimesFileName = NULL;
char *exttrigFileName = NULL;
char *IPNSkyPositionsFile = NULL;
......@@ -574,6 +576,9 @@ struct {
REAL8 mass2;
} *mass_data;
int n_times;
LIGOTimeGPS* inj_times;
struct FakeGalaxy{
char name[LIGOMETA_SOURCE_MAX];
REAL8 ra;
......@@ -1162,6 +1167,62 @@ read_mass_data( char* filename )
fclose( fp );
}
void read_time_data( char* filename)
{
char line[256];
FILE *fp;
int n = 0;
INT4 this_time = 0;
fp=fopen( filename, "r" );
if ( ! fp )
{
perror( "read_time_data" );
fprintf( stderr,
"Error while trying to open file %s\n",
filename );
exit( 1 );
}
/* count the number of lines in the file */
n_times=0;
while ( fgets( line, sizeof( line ), fp ) )
++n_times;
/* alloc space for the data */
inj_times = LALCalloc( n_times, sizeof(*inj_times) );
if ( !inj_times )
{
fprintf( stderr, "Allocation error for inj_times\n" );
exit( 1 );
}
/* 'rewind' the file */
rewind( fp );
/* read the file finally */
while ( fgets( line, sizeof( line ), fp ) )
{
sscanf( line, "%d", &this_time);
if ( this_time < 441417609 )
{
fprintf( stderr, "invalid injection time %d:\n"
"GPS start time is prior to "
"Jan 01, 1994 00:00:00 UTC:\n"
"(%d specified)\n",
n, this_time );
exit( 1 );
}
inj_times[n].gpsSeconds = this_time;
inj_times[n].gpsNanoSeconds = 0;
// printf("%d Time: %d\t%d\n", n, inj_times[n].gpsSeconds, inj_times[n].gpsNanoSeconds);
n++;
}
/* close the file */
fclose( fp );
}
void
read_nr_data( char* filename )
{
......@@ -1882,6 +1943,7 @@ int main( int argc, char *argv[] )
{"t-distr", required_argument, 0, '('},
{"time-step", required_argument, 0, 't'},
{"time-interval", required_argument, 0, 'i'},
{"time-file", required_argument, 0, 1035},
{"seed", required_argument, 0, 's'},
{"waveform", required_argument, 0, 'w'},
{"amp-order", required_argument, 0, 'q'},
......@@ -2044,6 +2106,15 @@ int main( int argc, char *argv[] )
"%s", LALoptarg );
break;
case 1035:
LALoptarg_len = strlen( LALoptarg ) + 1;
injtimesFileName = calloc( 1, LALoptarg_len * sizeof(char) );
memcpy( injtimesFileName, LALoptarg, LALoptarg_len * sizeof(char) );
this_proc_param = this_proc_param->next =
next_process_param( long_options[option_index].name, "string",
"%s", LALoptarg );
break;
case 'E':
LALoptarg_len = strlen( LALoptarg ) + 1;
exttrigFileName = calloc( 1, LALoptarg_len * sizeof(char) );
......@@ -2136,6 +2207,10 @@ int main( int argc, char *argv[] )
{
tDistr=LALINSPIRAL_EXPONENTIAL_TIME_DIST;
}
else if (!strcmp(dummy, "file"))
{
tDistr=LALINSPIRAL_FILE_TIME_DIST;
}
else
{
tDistr=LALINSPIRAL_UNKNOWN_TIME_DIST;
......@@ -3141,7 +3216,7 @@ int main( int argc, char *argv[] )
exit( 1 );
}
if (gpsStartTime.gpsSeconds==-1 || gpsEndTime.gpsSeconds==-1)
if ( (gpsStartTime.gpsSeconds==-1 || gpsEndTime.gpsSeconds==-1) && tDistr != LALINSPIRAL_FILE_TIME_DIST)
{
fprintf( stderr,
"Must specify both --gps-start-time and --gps-end-time.\n");
......@@ -3853,20 +3928,43 @@ int main( int argc, char *argv[] )
meanTimeStep = mean_time_step_sfr(maxZ, localRate);
}
if (meanTimeStep<=0)
if (meanTimeStep<=0 && tDistr != LALINSPIRAL_FILE_TIME_DIST)
{
fprintf( stderr,
"Minimum time step value must be larger than zero\n" );
exit( 1 );
}
if (timeInterval > 0. && tDistr == LALINSPIRAL_EXPONENTIAL_TIME_DIST)
if (!injtimesFileName && tDistr == LALINSPIRAL_FILE_TIME_DIST)
{
fprintf(stderr, "No filename for injection GPStimes is given. Use --time-file.\n");
}
if ( injtimesFileName && tDistr != LALINSPIRAL_FILE_TIME_DIST )
{
fprintf( stderr,
"Cannot specify an injection times file for your choice of --t-distr.\n" );
exit( 1 );
}
if (timeInterval > 0. && (tDistr == LALINSPIRAL_EXPONENTIAL_TIME_DIST || tDistr == LALINSPIRAL_FILE_TIME_DIST) )
{
fprintf( stderr,
"time interval must be zero\n" );
exit( 1 );
}
if ( injtimesFileName && tDistr == LALINSPIRAL_FILE_TIME_DIST)
{
if (meanTimeStep > 0.)
{
fprintf(stderr, "Minimum time step value must be larger than zero\n" );
exit(1);
}
// printf("Reading injection times from file %s\n", injtimesFileName);
read_time_data(injtimesFileName);
}
if ( userTag && outCompress )
{
snprintf( fname, sizeof(fname), "HL-INJECTIONS_%d_%s-%d-%ld.xml.gz",
......@@ -3935,6 +4033,11 @@ int main( int argc, char *argv[] )
ninj = 0;
ncount = 0;
currentGpsTime = gpsStartTime;
if (tDistr == LALINSPIRAL_FILE_TIME_DIST){
currentGpsTime.gpsSeconds = inj_times[0].gpsSeconds;
currentGpsTime.gpsNanoSeconds = inj_times[0].gpsNanoSeconds;
}
while ( 1 )
{
/* increase counter */
......@@ -4381,12 +4484,19 @@ int main( int argc, char *argv[] )
{
XLALGPSAdd( &currentGpsTime, -(REAL8)meanTimeStep * log( XLALUniformDeviate(randParams) ) );
}
else if (tDistr == LALINSPIRAL_FILE_TIME_DIST)
{
if (ninj >= (size_t) n_times)
break;
currentGpsTime.gpsSeconds = inj_times[ninj].gpsSeconds;
currentGpsTime.gpsNanoSeconds = inj_times[ninj].gpsNanoSeconds;
}
else
{
currentGpsTime = gpsStartTime;
XLALGPSAdd( &currentGpsTime, ninj * meanTimeStep );
}
if ( XLALGPSCmp( &currentGpsTime, &gpsEndTime ) >= 0 )
if ( XLALGPSCmp( &currentGpsTime, &gpsEndTime ) >= 0 && tDistr!=LALINSPIRAL_FILE_TIME_DIST )
break;
/* allocate and go to next SimInspiralTable */
......
......@@ -54,6 +54,8 @@ pybin_scripts = \
lalapps_merge_nested_sampling_runs \
lalinference_burst_pp_pipe \
lalinference_pp_pipe \
lalinference_multi_pipe \
lalinference_tiger_pipe \
lalapps_compute_roq_weights \
lalapps_merge_posteriors \
lalapps_test_run_lalinference_pipe_example
......
#!/usr/bin/env @PYTHONPROG@
# DAG generation code for running LALInference pipeline
# (C) 2012 John Veitch
# 2013 Salvatore Vitale: extended to work with several ini files
from lalapps import lalinference_pipe_utils as pipe_utils
import ConfigParser
from optparse import OptionParser,OptionValueError
import sys
usage=""" %prog [options] config1.ini config2.ini ... configN.ini
Setup a Condor DAG file to run the LALInference pipeline based on
N config.ini files.
The user must specify either an injection file to analyse, with the --inj option,
a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
or an ASCII list of GPS times with the --gps-time-file option.
The user must also specify and ini file which will contain the main analysis config.
"""
import os
def vararg_callback(option, opt_str, value, parser):
assert value is None
value = []
def floatable(str):
try:
float(str)
return True
except ValueError:
return False
for arg in parser.rargs:
# stop on --foo like options
if arg[:2] == "--" and len(arg) > 2:
break
# stop on -a, but not on -3 or -3.0
if arg[:1] == "-" and len(arg) > 1 and not floatable(arg):
break
value.append(arg)
del parser.rargs[:len(value)]
setattr(parser.values, option.dest, value)
parser=OptionParser(usage)
parser.add_option("-r","--run-path",default=None,action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
parser.add_option("-g","--gps-time-file",action="store",type="string",default=None,help="Text file containing list of GPS times to analyse",metavar="TIMES.txt")
parser.add_option("-t","--single-triggers",action="store",type="string",default=None,help="SnglInspiralTable trigger list",metavar="SNGL_FILE.xml")
parser.add_option("-C","--coinc-triggers",action="store",type="string",default=None,help="CoinInspiralTable trigger list",metavar="COINC_FILE.xml")
parser.add_option("-L","--lvalert",action="store",type="string",default=None,help="LVAlert coinc file",metavar="coinc_G0000.xml")
parser.add_option("--gid",action="store",type="string",default=None,help="Optional GraceDB ID for submitting results")
parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
parser.add_option("-F","--folder-names",dest="fnames",action="callback", callback=vararg_callback,help="Space separated list of folders that will be created, corresponding to the TIGER parameters that are being tested or GR. The order has to be the same used with the ini files!",default=None,metavar="GR phi1")
parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
parser.add_option("-x", "--dax",action="store_true",default=False, help="Delete the ligo_data_find jobs and populate frame LFNs in the DAX -- WARNING: not tested in multi_pipe! Don't assume it will work.")
parser.add_option("-G", "--grid-site",action="store",type="string",metavar="SITE", help="Specify remote site in conjunction with --dax option. e.g. --grid-site=creamce for Bologna cluster.\
Supported options are: creamce and local",default=None)
(opts,args)=parser.parse_args()
if len(args)>1:
print 'Using %s ini files\n'%len(args)
elif len(args)==1:
inifile=args[0]
inits=args
ninits=len(inits)
fnames=opts.fnames
nfnames=len(fnames)
if not ninits==nfnames:
print "You seem to be using %d parser files and %d foldernames. These two numbers must be the same. Exiting...\n"%(ninits,nfnames)
sys.exit(1)
fnames_dic={}
for fname in fnames:
fnames_dic[fnames.index(fname)]=str(fname)
glob_hyp=fnames
hyp_str=" "
for hy in glob_hyp:
hyp_str+=hy+" "
cp=ConfigParser.ConfigParser()
cp.optionxform = str
first_dag=True
common_path=opts.run_path
for inifile in inits:
cp.readfp(open(inifile))
if opts.run_path is not None:
cp.set('paths','basedir',opts.run_path)
if opts.daglog_path is not None:
cp.set('paths','daglogdir',opts.daglog_path)
else:
cp.set('paths','daglogdir',opts.run_path)
if opts.gps_time_file is not None:
cp.set('input','gps-time-file',opts.gps_time_file)
if opts.single_triggers is not None:
cp.set('input','sngl-inspiral-file',opts.single_triggers)
if opts.injections is not None:
cp.set('input','injection-file',opts.injections)
if opts.coinc_triggers is not None:
cp.set('input','coinc-inspiral-file',opts.coinc_triggers)
if opts.lvalert is not None:
cp.set('input','lvalert-file',opts.lvalert)
if opts.gid is not None:
cp.set('input','gid',opts.gid)
if opts.pipedown_db is not None:
cp.set('input','pipedown-db',opts.pipedown_db)
if opts.run_path is not None:
cp.set('paths','basedir',os.path.join(str(common_path),str(fnames_dic[inits.index(inifile)])))
# Create the DAG from the configparser object
if first_dag:
dag=pipe_utils.LALInferencePipelineDAG(cp,site=opts.grid_site)
first_dag=False
dag.write_sub_files()
dag2=dag
else:
dag2=pipe_utils.LALInferencePipelineDAG(cp,first_dag=False,previous_dag=dag,site=opts.grid_site)
dag2.write_sub_files()
dag=dag2
# Create the DAG from the configparser object
dag2.set_dag_file(os.path.join(common_path,'common_dag'))
dag2.write_dag()
dag2.write_script()
# End of program
print 'Successfully created DAG file.'
print 'Now run condor_submit_dag %s\n'%(dag2.get_dag_file())
if opts.condor_submit:
import subprocess
from subprocess import Popen
x = subprocess.Popen(['condor_submit_dag',dag.get_dag_file()])
x.wait()
if x.returncode==0:
print 'Submitted DAG file'
else:
print 'Unable to submit DAG file'
......@@ -445,7 +445,7 @@ def create_pfn_tuple(filename,protocol='file://',site='local'):
return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
class LALInferencePipelineDAG(pipeline.CondorDAG):
def __init__(self,cp,dax=False,site='local'):
def __init__(self,cp,dax=False,first_dag=True,previous_dag=None,site='local'):
self.subfiles=[]
self.config=cp
self.engine=get_engine_name(cp)
......@@ -461,10 +461,18 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
os.chdir(self.basepath)
self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
mkdirs(self.posteriorpath)
daglogdir=cp.get('paths','daglogdir')
mkdirs(daglogdir)
self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
pipeline.CondorDAG.__init__(self,self.daglogfile,dax=dax)
if first_dag:
daglogdir=cp.get('paths','daglogdir')
mkdirs(daglogdir)
self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
pipeline.CondorDAG.__init__(self,self.daglogfile,dax=dax)
elif not first_dag and previous_dag is not None:
daglogdir=cp.get('paths','daglogdir')
mkdirs(daglogdir)
self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
pipeline.CondorDAG.__init__(self,self.daglogfile,dax=dax)
for node in previous_dag.get_nodes():
self.add_node(node)
if cp.has_option('paths','cachedir'):
self.cachepath=cp.get('paths','cachedir')
else:
......
from optparse import OptionParser
import argparse
import io
import os
import ast
import sys
import getpass
import ConfigParser
from lalapps import inspiralutils
user=getpass.getuser()
## 2013 Salvatore Vitale, Michalis Agathos. Python to setup both injection and init files for TIGER runs.
## Will create the injection, the configs, the folders, and call the pipeline exec file.
## 2014 User can pass pre-existing xml table.
## 2014 Will look for science and veto segments and generate times for unvetoed injections.
## 2014 All options moved to configuration file.
################################################################################
#
# DEFINE CONSTANTS
#
################################################################################
usage='''%prog [options] config.ini
Setup a batch of runs for TIGER and invoke lalinference_multi_pipe based on the config.ini file provided as command-line argument.
A pre-existing injection file may be used with the -I option, otherwise one will be generated automatically.
The base directory for standard output and the output directory for post-processing should also be provided using the -O and -P options respectively.
Additionally, a local directory for log output may be specified with the -L option and a scratch directory with the -S option.
'''
'''
MGparams_approx_dic = {
"TaylorF2Test":["dphi0","dphi1", "dphi2", "dphi3", "dphi4", "dphi5", "dphi5l", "dphi6", "dphi6l", "dphi7"],
"TaylorT4Test":["dchi0","dchi1", "dchi2", "dchi3", "dchi4", "dchi5", "dchi5l", "dchi6", "dchi6l", "dchi7"],
"SpinTaylorT4Test":["dchi0","dchi1", "dchi2", "dchi3", "dchi4", "dchi5", "dchi5l", "dchi6", "dchi6l", "dchi7"]
}
'''
################################################################################
#
# DEFINE FUNCTIONS
#
################################################################################
def combinations(iterable, r):
# combinations('ABCD', 2) --> AB AC AD BC BD CD
# combinations(range(4), 3) --> 012 013 023 123
pool = tuple(iterable)
n = len(pool)
if r > n:
return
indices = range(r)
yield tuple(pool[i] for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != i + n - r:
break
else:
return
indices[i] += 1
for j in range(i+1, r):
indices[j] = indices[j-1] + 1
yield tuple(pool[i] for i in indices)
def createCombinationsFromList(list):
outputlist = []
outputlist.append("GR")
for L in xrange(len(list)):
for subset in combinations(list, L+1):
"""
temp = ""
for subsetitem in subset:
temp += subsetitem
"""
outputlist.append(subset)
return outputlist
def write_pipe_init(dic, cp):
#cp_temp = cp_ConfigParser(cp)
cp_temp = cp #FIXME: deep copy
#FIXME: cp_temp.read_dict(dic) instead!
for sec in dic.keys():
for opt in dic[sec].keys():
val = dic[sec][opt]
if not type(val)==str:
val=str(val)
cp_temp.set(sec, opt, val)
with open("pipeline.ini", "w") as ofile:
cp_temp.write(ofile)
def ensure_dir(f):
if not os.path.isdir(f):
os.makedirs(f)
# Deep copy ConfigParser instance
def cp_ConfigParser(config):
config_string = io.StringIO()
config.write(config_string)
# We must reset the buffer ready for reading.
config_string.seek(0)
new_config = ConfigParser.RawConfigParser()
new_config.read_file(config_string)
return new_config
################################################################################
#
# PARSE COMMAND LINE OPTIONS
#
################################################################################
#parser = OptionParser()
parser = argparse.ArgumentParser(usage)
parser.add_argument('config', metavar='CONFIG_FILE', type=str, nargs='+', help='A (list of) configuration file(s) containing sections and options for the entire pipeline')
parser.add_argument("-O", dest='basedir', type=str, help="Path to base directory")
parser.add_argument("-P", dest='postproc', type=str, help="Path to post-processing output (optional)", default=None)
parser.add_argument("-I", dest='injfile', type=str, help="Path to a pre-existing injection .xml file (optional)", default=None)
parser.add_argument("-L", dest='logdir', type=str, help="Path to log directory (optional)", default=None)
parser.add_argument("-S", dest='scratchdir', type=str, help="Path to scratch directory (optional)", default=None)
args = parser.parse_args()
config_file = args.config
if args.basedir is not None:
basefolder = os.path.abspath(args.basedir)
else:
print 'Error: No base directory provided! Exiting...'
sys.exit(1)
if args.postproc is not None:
webdir = os.path.abspath(args.postproc)
else:
webdir = None
if args.injfile is not None:
injfile = os.path.abspath(args.injfile)
if os.path.exists(injfile):
print 'TIGER: Reading injections from file'
else:
print 'Error: Cannot find xml file for injections: ' + injfile
sys.exit(1)
else:
injfile = None
if args.logdir is not None:
logdir = os.path.abspath(args.logdir)
else:
logdir = None
if args.scratchdir is not None:
scratchdir = os.path.abspath(args.scratchdir)
else:
scratchdir = None
################################################################################
#
# READ TIGER VARIABLES FROM CONFIG FILE
#
################################################################################
cp = ConfigParser.RawConfigParser()
cp.optionxform = str
# FIXME: bring all defaults here
cp.read(config_file)
if not cp.has_section('tiger'):
print 'Invalid configuration file! No "tiger" section found.'
sys.exit(1)
dic_engine = {}
# Your inspinj seed. The inspnest dataseed will be created from this, adding three zeros at the end (e.g. inspinj 7001 --> inspnest 7001000)
inspinj_seed = cp.getint('tiger', 'seed')
dataseed = 1000*inspinj_seed
if cp.has_option('analysis', 'dataseed'):
dataseed = cp.getint('analysis', 'dataseed')
dic_analysis={"dataseed":dataseed,}
# A descriptive name of what is being run. Will be the name of the postprocessing result folder and a part of the injection file, for the records
tiger_tag = cp.get('tiger', 'tiger-tag')
# Setup output directory names
basefolder = os.path.join(basefolder, tiger_tag, str(inspinj_seed))