Commit 86d5e2ce authored by John Douglas Veitch's avatar John Douglas Veitch Committed by Duncan Brown

Changes to pipeline.py, lalinference_pipe, LALInference, and inspiral.c to add...

Changes to pipeline.py, lalinference_pipe, LALInference, and inspiral.c to add support for targetting Grid sites using Pegasus
Original: 40d097059da695780979ff844cef5fef5288fda9
parent 01d5c9b3
......@@ -343,7 +343,6 @@ int main( int argc, char *argv[] )
/* frame input data */
LALCache *frInCache = NULL;
LALCache *frGlobCache = NULL;
LALCache *calCache = NULL;
LALFrStream *frStream = NULL;
FrChanIn frChan;
......@@ -683,40 +682,29 @@ int main( int argc, char *argv[] )
if ( globFrameData )
{
CHAR ifoRegExPattern[6];
CHAR globPattern[8];
if ( vrbflg ) fprintf( stdout, "globbing for *.gwf frame files from %c "
"of type %s in current directory\n", fqChanName[0], frInType );
if ( vrbflg ) fprintf( stdout, "globbing for %c-*.gwf frame files from %c "
"of type %s in current directory\n", fqChanName[0], fqChanName[0], frInType );
/* FIXME: This filters for the right detector by looking at the first
* character of the filename. Cannot distinguish between H1 and H2 this way!
*/
snprintf(globPattern,sizeof(globPattern),"%c-*.gwf",fqChanName[0]);
frGlobCache = NULL;
frInCache = NULL;
/* create a frame cache by globbing all *.gwf files in the pwd */
frGlobCache = XLALCacheGlob(NULL, NULL);
frInCache = XLALCacheGlob(NULL, globPattern);
/* check we globbed at least one frame file */
if ( ! frGlobCache->length )
{
fprintf( stderr, "error: no frame file files of type %s found\n",
frInType );
exit( 1 );
}
/* sieve out the requested data type */
snprintf( ifoRegExPattern,
sizeof(ifoRegExPattern) / sizeof(*ifoRegExPattern), ".*%c.*",
fqChanName[0] );
frInCache = XLALCacheDuplicate(frGlobCache);
XLALCacheSieve(frInCache, 0, 0, ifoRegExPattern, frInType, NULL);
/* check we got at least one frame file back after the sieve */
if ( ! frInCache->length )
{
fprintf( stderr, "error: no frame files of type %s globbed as input\n",
fprintf( stderr, "error: no frame file files of type %s found\n",
frInType );
exit( 1 );
}
XLALDestroyCache( frGlobCache );
}
else
{
......
......@@ -4,9 +4,12 @@
# (C) 2012 John Veitch
from lalapps import lalinference_pipe_utils as pipe_utils
from lalapps import inspiralutils
import ConfigParser
from optparse import OptionParser,OptionValueError
import sys
import ast
import os
usage=""" %prog [options] config.ini
Setup a Condor DAG file to run the LALInference pipeline based on
......@@ -19,7 +22,6 @@ or an ASCII list of GPS times with the --gps-time-file option.
The user must also specify and ini file which will contain the main analysis config.
"""
parser=OptionParser(usage)
parser.add_option("-r","--run-path",default=None,action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
......@@ -30,7 +32,9 @@ parser.add_option("--gid",action="store",type="string",default=None,help="GraceD
parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
parser.add_option("-x", "--dax",action="store_true",default=False, help="Delete the ligo_data_find jobs and populate frame LFNs in the DAX")
parser.add_option("-G", "--grid-site",action="store",type="string",metavar="SITE", help="Specify remote site in conjunction with --dax option. e.g. --grid-site=creamce for Bologna cluster.\
Supported options are: creamce and local",default=None)
(opts,args)=parser.parse_args()
......@@ -46,36 +50,63 @@ cp.optionxform = str
cp.readfp(open(inifile))
if opts.run_path is not None:
cp.set('paths','basedir',opts.run_path)
cp.set('paths','basedir',os.path.abspath(opts.run_path))
if opts.daglog_path is not None:
cp.set('paths','daglogdir',opts.daglog_path)
cp.set('paths','daglogdir',os.path.abspath(opts.daglog_path))
elif opts.run_path is not None:
cp.set('paths','daglogdir',opts.run_path)
cp.set('paths','daglogdir',os.path.abspath(opts.run_path))
else:
cp.set('paths','daglogdir',os.path.abspath(cp.get('paths','basedir')))
local_work_dir=cp.get('paths','daglogdir')
if opts.gps_time_file is not None:
cp.set('input','gps-time-file',opts.gps_time_file)
cp.set('input','gps-time-file',os.path.abspath(opts.gps_time_file))
if opts.single_triggers is not None:
cp.set('input','sngl-inspiral-file',opts.single_triggers)
cp.set('input','sngl-inspiral-file',os.path.abspath(opts.single_triggers))
if opts.injections is not None:
cp.set('input','injection-file',opts.injections)
cp.set('input','injection-file',os.path.abspath(opts.injections))
if opts.coinc_triggers is not None:
cp.set('input','coinc-inspiral-file',opts.coinc_triggers)
cp.set('input','coinc-inspiral-file',os.path.abspath(opts.coinc_triggers))
#if opts.lvalert is not None:
# cp.set('input','lvalert-file',os.path.abspath(opts.lvalert))
if opts.gid is not None:
cp.set('input','gid',opts.gid)
if opts.pipedown_db is not None:
cp.set('input','pipedown-db',opts.pipedown_db)
cp.set('input','pipedown-db',os.path.abspath(opts.pipedown_db))
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp)
dag=pipe_utils.LALInferencePipelineDAG(cp,dax=opts.dax,site=opts.grid_site)
if(opts.dax):
# Create a text file with the frames listed
pfnfile = dag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
else:
peg_frame_cache = '/dev/null'
# Create the dax scripts
# A directory to store the DAX temporary files
import uuid
execdir=os.path.join(local_work_dir,'lalinference_pegasus_'+str(uuid.uuid1()))
olddir=os.getcwd()
os.chdir(cp.get('paths','basedir'))
if opts.grid_site is not None:
site='local,'+opts.grid_site
else:
site=None
dag.prepare_dax(tmp_exec_dir=execdir,grid_site=site,peg_frame_cache=peg_frame_cache)
dag.write_sub_files()
dag.write_dag()
dag.write_script()
os.chdir(olddir)
# End of program
print 'Successfully created DAG file.'
print 'Now run condor_submit_dag %s\n'%(dag.get_dag_file())
......
......@@ -176,27 +176,62 @@ static int FindTimeSeriesStartAndEnd (
static const LALUnit strainPerCount={0,{0,0,0,0,0,1,-1},{0,0,0,0,0,0,0}};
static REAL8TimeSeries *readTseries(CHAR *cachefile, CHAR *channel, LIGOTimeGPS start, REAL8 length);
static REAL8TimeSeries *readTseries(LALCache *cache, CHAR *channel, LIGOTimeGPS start, REAL8 length);
static void makeWhiteData(LALInferenceIFOData *IFOdata);
static void PrintSNRsToFile(LALInferenceIFOData *IFOdata , SimInspiralTable *inj_table);
static REAL8TimeSeries *readTseries(CHAR *cachefile, CHAR *channel, LIGOTimeGPS start, REAL8 length)
static LALCache *GlobFramesPWD( char *ifo);
static LALCache *GlobFramesPWD(char *ifo)
{
LALCache *frGlobCache = NULL;
/* create a frame cache by globbing all *.gwf files in the pwd */
/* FIXME: This should really open all the files and see if the desired channel is in there */
char globPattern[8];
sprintf(globPattern,"%c-*.gwf",ifo[0]);
frGlobCache = XLALCacheGlob(NULL,globPattern);
/* check we globbed at least one frame file */
if ( ! frGlobCache->length )
{
fprintf( stderr, "error: no frame file files found\n");
exit( 1 );
}
CHAR ifoRegExPattern[6];
LALCache *frInCache=NULL;
/* sieve out the requested data type */
snprintf( ifoRegExPattern,
sizeof(ifoRegExPattern) / sizeof(*ifoRegExPattern), ".*%c.*",
ifo[0] );
{
fprintf(stderr,"GlobFramesPWD : Found unseived src files:\n");
for(UINT4 i=0;i<frGlobCache->length;i++)
fprintf(stderr,"(%s,%s,%s)\n",frGlobCache->list[i].src,frGlobCache->list[i].dsc,frGlobCache->list[i].url);
}
frInCache = XLALCacheDuplicate(frGlobCache);
XLALCacheSieve(frInCache, 0, 0, ifoRegExPattern, NULL, NULL);
{
fprintf(stderr,"GlobFramesPWD : Sieved frames with pattern %s. Found src files:\n",ifoRegExPattern);
for(UINT4 i=0;i<frInCache->length;i++)
fprintf(stderr,"(%s,%s,%s)\n",frInCache->list[i].src,frInCache->list[i].dsc,frInCache->list[i].url);
}
//XLALDestroyCache(frGlobCache);
return(frGlobCache);
}
static REAL8TimeSeries *readTseries(LALCache *cache, CHAR *channel, LIGOTimeGPS start, REAL8 length)
{
LALStatus status;
memset(&status,0,sizeof(status));
LALCache *cache = NULL;
LALFrStream *stream = NULL;
REAL8TimeSeries *out = NULL;
cache = XLALCacheImport( cachefile );
int err;
err = *XLALGetErrnoPtr();
if(cache==NULL) {fprintf(stderr,"ERROR: Unable to import cache file \"%s\",\n XLALError: \"%s\".\n",cachefile, XLALErrorString(err)); exit(-1);}
if(cache==NULL) fprintf(stderr,"readTseries ERROR: Received NULL pointer for channel %s\n",channel);
stream = XLALFrStreamCacheOpen( cache );
if(stream==NULL) {fprintf(stderr,"ERROR: Unable to open stream from frame cache file\n"); exit(-1);}
if(stream==NULL) {fprintf(stderr,"readTseries ERROR: Unable to open stream from frame cache file\n"); exit(-1);}
out = XLALFrStreamInputREAL8TimeSeries( stream, channel, &start, length , 0 );
if(out==NULL) fprintf(stderr,"ERROR: unable to read channel %s from %s at time %i\nCheck the specified data duration is not too long\n",channel,cachefile,start.gpsSeconds);
XLALDestroyCache(cache);
if(out==NULL) fprintf(stderr,"readTseries ERROR: unable to read channel %s at time %i\nCheck the specified data duration is not too long\n",channel,start.gpsSeconds);
LALFrClose(&status,&stream);
return out;
}
......@@ -241,14 +276,19 @@ static INT4 getDataOptionsByDetectors(ProcessParamsTable *commandLine, char ***i
*flows=XLALCalloc(*N,sizeof(REAL8));
*fhighs=XLALCalloc(*N,sizeof(REAL8));
*timeslides=XLALCalloc(*N,sizeof(REAL8));
int globFrames=!!LALInferenceGetProcParamVal(commandLine,"--glob-frame-data");
/* For each IFO, fetch the other options if available */
for(i=0;i<*N;i++)
{
/* Cache */
sprintf(tmp,"--%s-cache",(*ifos)[i]);
this=LALInferenceGetProcParamVal(commandLine,tmp);
if(!this){fprintf(stderr,"ERROR: Must specify a cache file for %s with --%s-cache\n",(*ifos)[i],(*ifos)[i]); exit(1);}
(*caches)[i]=XLALStringDuplicate(this->value);
if(!globFrames){
sprintf(tmp,"--%s-cache",(*ifos)[i]);
this=LALInferenceGetProcParamVal(commandLine,tmp);
if(!this){fprintf(stderr,"ERROR: Must specify a cache file for %s with --%s-cache\n",(*ifos)[i],(*ifos)[i]); exit(1);}
(*caches)[i]=XLALStringDuplicate(this->value);
}
/* Channel */
sprintf(tmp,"--%s-channel",(*ifos)[i]);
......@@ -322,6 +362,7 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
SimInspiralTable *injTable=NULL;
RandomParams *datarandparam;
UINT4 event=0;
int globFrames=0; // 0 = no, 1 = will search for frames in PWD
char *chartmp=NULL;
char **channels=NULL;
char **caches=NULL;
......@@ -341,11 +382,13 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
struct fvec *interp;
int interpFlag=0;
if(LALInferenceGetProcParamVal(commandLine,"--glob-frame-data")) globFrames=1;
/* Check if the new style command line arguments are used */
INT4 dataOpts=getDataOptionsByDetectors(commandLine, &IFOnames, &caches, &channels, &fLows , &fHighs, &timeslides, &Nifo);
/* Check for options if not given in the new style */
if(!dataOpts){
if(!LALInferenceGetProcParamVal(commandLine,"--cache")||!(LALInferenceGetProcParamVal(commandLine,"--IFO")||LALInferenceGetProcParamVal(commandLine,"--ifo")))
if(!(globFrames||LALInferenceGetProcParamVal(commandLine,"--cache"))||!(LALInferenceGetProcParamVal(commandLine,"--IFO")||LALInferenceGetProcParamVal(commandLine,"--ifo")))
{fprintf(stderr,USAGE); return(NULL);}
if(LALInferenceGetProcParamVal(commandLine,"--channel")){
LALInferenceParseCharacterOptionString(LALInferenceGetProcParamVal(commandLine,"--channel")->value,&channels,&Nchannel);
......@@ -734,15 +777,15 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
/* Check to see if an interpolation file is specified */
interpFlag=0;
interp=NULL;
if(strstr(caches[i],"interp:")==caches[i]){
/* Extract the file name */
char *interpfilename=&(caches[i][7]);
printf("Looking for interpolation file %s\n",interpfilename);
interpFlag=1;
interp=interpFromFile(interpfilename);
}
if( (globFrames)?0:strstr(caches[i],"interp:")==caches[i]){
/* Extract the file name */
char *interpfilename=&(caches[i][7]);
printf("Looking for interpolation file %s\n",interpfilename);
interpFlag=1;
interp=interpFromFile(interpfilename);
}
/* Check if fake data is requested */
if(interpFlag || (!(strcmp(caches[i],"LALLIGO") && strcmp(caches[i],"LALVirgo") && strcmp(caches[i],"LALGEO") && strcmp(caches[i],"LALEGO") && strcmp(caches[i],"LALSimLIGO") && strcmp(caches[i],"LALSimAdLIGO") && strcmp(caches[i],"LALSimVirgo") && strcmp(caches[i],"LALSimAdVirgo") && strcmp(caches[i],"LALAdLIGO"))))
if( (globFrames)?0:(interpFlag || (!(strcmp(caches[i],"LALLIGO") && strcmp(caches[i],"LALVirgo") && strcmp(caches[i],"LALGEO") && strcmp(caches[i],"LALEGO") && strcmp(caches[i],"LALSimLIGO") && strcmp(caches[i],"LALSimAdLIGO") && strcmp(caches[i],"LALSimVirgo") && strcmp(caches[i],"LALSimAdVirgo") && strcmp(caches[i],"LALAdLIGO")))))
{
//FakeFlag=1; - set but not used
if (!LALInferenceGetProcParamVal(commandLine,"--dataseed")){
......@@ -810,6 +853,22 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
XLALDestroyRandomParams(datarandparam);
}
else{ /* Not using fake data, load the data from a cache file */
LALCache *cache=NULL;
if(!globFrames)
{
cache = XLALCacheImport( caches[i] );
int err;
err = *XLALGetErrnoPtr();
if(cache==NULL) {fprintf(stderr,"ERROR: Unable to import cache file \"%s\",\n XLALError: \"%s\".\n",caches[i], XLALErrorString(err)); exit(-1);}
}
else
{
printf("Looking for frames for %s in PWD\n",IFOnames[i]);
cache= GlobFramesPWD(IFOnames[i]);
}
if(!cache) {fprintf(stderr,"ERROR: Cannot find any frame data!\n"); exit(1);}
if (LALInferenceGetProcParamVal(commandLine, "--psd")){
interp=NULL;
char *interpfilename=&(psds[i][0]);
......@@ -826,7 +885,7 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
}
}else{
fprintf(stderr,"Estimating PSD for %s using %i segments of %i samples (%lfs)\n",IFOnames[i],nSegs,(int)seglen,SegmentLength);
PSDtimeSeries=readTseries(caches[i],channels[i],GPSstart,PSDdatalength);
PSDtimeSeries=readTseries(cache,channels[i],GPSstart,PSDdatalength);
if(!PSDtimeSeries) {XLALPrintError("Error reading PSD data for %s\n",IFOnames[i]); XLAL_ERROR_NULL(XLAL_EFUNC);}
XLALResampleREAL8TimeSeries(PSDtimeSeries,1.0/SampleRate);
PSDtimeSeries=(REAL8TimeSeries *)XLALShrinkREAL8TimeSeries(PSDtimeSeries,(size_t) 0, (size_t) seglen*nSegs);
......@@ -1040,7 +1099,7 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
XLALGPSAdd(&segStart, deltaT);
fprintf(stderr,"Slid %s by %f s from %10.10lf to %10.10lf\n",IFOnames[i],deltaT,truesegstart.gpsSeconds+1e-9*truesegstart.gpsNanoSeconds,segStart.gpsSeconds+1e-9*segStart.gpsNanoSeconds);
}
IFOdata[i].timeData=readTseries(caches[i],channels[i],segStart,SegmentLength);
IFOdata[i].timeData=readTseries(cache,channels[i],segStart,SegmentLength);
segStart=truesegstart;
if(Ntimeslides) IFOdata[i].timeData->epoch=truesegstart;
/* FILE *out; */
......@@ -1069,6 +1128,8 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
IFOdata[i].freqData->data->data[j] /= sqrt(IFOdata[i].window->sumofsquares / IFOdata[i].window->data->length);
IFOdata[i].windowedTimeData->data->data[j] /= sqrt(IFOdata[i].window->sumofsquares / IFOdata[i].window->data->length);
}
XLALDestroyCache(cache); // Clean up cache
} /* End of data reading process */
// /* Now that the PSD is set up, make the TDW. */
......@@ -1177,7 +1238,6 @@ LALInferenceIFOData *LALInferenceReadData(ProcessParamsTable *commandLine)
fclose(out);
}
}
for (i=0;i<Nifo;i++) IFOdata[i].SNR=0.0; //SNR of the injection ONLY IF INJECTION. Set to 0.0 by default.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment