Commit f3144e37 authored by James Clark's avatar James Clark

Merge branch '103-add-options-to-bayeswave_pipe-to-include-glitch-cleaning' into 'master'

Add BayesWaveCleanFrame pipe functionality

Closes #103

See merge request !192
parents 55d44b8a af983e91
Pipeline #168845 passed with stages
in 7 minutes and 23 seconds
......@@ -891,6 +891,7 @@ def condor_job_config(job_type, condor_job, config_parser):
* job_type='bayeswave'
* job_type='bayeswave_post'
* job_type='bayeswave_fpeak'
* job_type='bayeswave_clean_frame'
* job_type='megaplot.py'
* job_type='megasky.py'
......@@ -898,7 +899,7 @@ def condor_job_config(job_type, condor_job, config_parser):
(file transfers, executable location etc) accordingly
"""
valid_job_types=['bayeswave','bayeswave_post','bayeswave_fpeak','megasky','megaplot']
valid_job_types=['bayeswave','bayeswave_post','bayeswave_fpeak','bayeswave_clean_frame','megasky','megaplot']
try:
job_index = valid_job_types.index(job_type)
except ValueError:
......@@ -1594,6 +1595,108 @@ class bayeswave_fpeakNode(bayeswave_postNode):
pipeline.AnalysisNode.__init__(self)
class bayeswave_clean_frameJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
def __init__(self, cp, cacheFiles, injfile=None, numrel_data=None, dax=False):
condor_job_config('bayeswave_clean_frame',self,cp)
workdir = os.getcwd()
self.add_condor_cmd('initialdir', workdir)
self.set_sub_file(os.path.join(workdir, 'bayeswave_clean_frame.sub'))
self.set_stdout_file(os.path.join(workdir, 'logs',
'BayesWaveCleanFrame_$(macrooutputDir)-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(workdir, 'logs',
'BayesWaveCleanFrame_$(macrooutputDir)-$(cluster)-$(process).err'))
self.set_log_file(os.path.join(workdir, 'logs',
'BayesWaveCleanFrame_$(macrooutputDir)-$(cluster)-$(process).log'))
if cp.has_option('condor','arch'):
self.add_condor_cmd('+arch',cp.get('condor','arch'))
if cp.has_option('condor', 'bayeswave_clean_frame-request-memory'):
self.add_condor_cmd('request_memory',
cp.get('condor', 'bayeswave_clean_frame-request-memory'))
if cp.has_option('condor', 'bayeswave_clean_frame-request-disk'):
self.add_condor_cmd('request_disk',
cp.get('condor', 'bayeswave_clean_frame-request-disk'))
if cp.has_option('condor', 'bayeswave_clean_frame-cit-nodes'):
self.add_condor_cmd('+BayesWaveCgroup', 'True')
self.add_condor_cmd('Rank', '(TARGET.BayesWaveCgroup =?= True)')
#
# File Transfers
#
if cp.getboolean('condor', 'transfer-files'):
# File transfers
transferstring='$(macrooutputDir),datafind'
if cp.getboolean('condor','copy-frames'):
transferstring += ',$(macroframes)'
self.add_condor_cmd('transfer_input_files', transferstring)
self.add_condor_cmd('transfer_output_files', '$(macrooutputDir)')
ifo_clean = ast.literal_eval(cp.get('bayeswave_clean_frame_options', 'ifo'))
self.add_opt('ifo', ifo_clean)
channel_list = ast.literal_eval(cp.get('datafind', 'channel-list'))
frame_list = ast.literal_eval(cp.get('datafind', 'frtype-list'))
self.add_opt('channel', channel_list[ifo_clean])
self.add_opt('frame-type', frame_list[ifo_clean])
for item in cp.items('bayeswave_clean_frame_options'):
# Add any option and value which exists
self.add_opt(item[0], item[1])
class bayeswave_clean_frameNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
def __init__(self, bayeswave_clean_frame_job):
pipeline.CondorDAGNode.__init__(self, bayeswave_clean_frame_job)
pipeline.AnalysisNode.__init__(self)
def set_trigtime(self, trigtime):
self.add_var_opt('trigtime', '%.9f'%trigtime)
self.trigtime = trigtime
def set_segment_start(self, segment_start):
self.add_var_opt('segment-start', '%.9f'%segment_start)
def set_seglen(self, seglen):
self.add_var_opt('seglen', seglen)
self.seglen = seglen
def set_frame_srate(self, frame_srate):
self.add_var_opt('frame-srate', frame_srate)
self.frame_srate = frame_srate
def set_frame_start(self, frame_start):
self.add_var_opt('frame-start', frame_start)
self.frame_start = frame_start
def set_frame_length(self, frame_length):
self.add_var_opt('frame-length', frame_length)
self.frame_length = frame_length
def set_glitch_param_file(self, glitch_param_file):
self.add_var_opt('glitch-model',glitch_param_file)
def set_outdir(self, outdir):
self.add_var_opt('outdir', outdir)
#
# skymap job
#
......
......@@ -192,6 +192,7 @@ def parser():
parser.add_argument("--separate-post-dag", default=False, action="store_true")
parser.add_argument("--fpeak-analysis", default=False, action="store_true")
parser.add_argument("--trigger-time-delta", type=float, default=0.0)
parser.add_argument("--bayeswave-clean-frame", default=False, action="store_true")
parser.add_argument("--condor-submit", default=False, action="store_true")
# Advanced condor options
......@@ -270,6 +271,16 @@ if cp.has_option('bayeswave_fpeak_options', 'fpeak-analysis'):
if cp.has_option('bayeswave_fpeak_options','fpeak-flow'):
fpeak_flow=cp.getfloat('bayeswave_fpeak_options','fpeak-flow')
if cp.has_option('bayeswave_clean_frame_options', 'ifo'):
opts.bayeswave_clean_frame=True
clean_frame_ifo=cp.get('bayeswave_clean_frame_options', 'ifo')
if opts.bayeswave_clean_frame:
if cp.has_option('bayeswave_options', 'signalOnly'):
print('Cannot run glitch Cleaning with signalOnly model')
opts.bayeswave_clean_frame=False
# Make local copies of necessary input files
shutil.copy(opts.configfile, os.path.join(workdir, 'config.ini'))
......@@ -590,6 +601,9 @@ if (opts.cwb_trigger_list is not None) \
for ifo in ifo_list:
if cp.getboolean('datafind','sim-data'):
if opts.bayeswave_clean_frame:
print("Cannot clean frame in simulated data")
opts.bayeswave_clean_frame=False
print("Simulating noise", file=sys.stdout)
if 'interp' in frtype_list[ifo]:
......@@ -698,7 +712,23 @@ for ifo in ifo_list:
print("No matching segments for %s"%ifo, file=sys.stderr)
sys.exit()
os.chdir(curdir)
os.chdir(curdiir)
# Get the sampling rate of the frame
if opts.bayeswave_clean_frame:
try:
import lalframe.frread as fr
clean_ifo = ast.literal_eval(cp.get('bayeswave_clean_frame_options', 'ifo'))
trigtime = int(trigger_list.triggers[0].trigger_time)
channel_names = ast.literal_eval(cp.get('datafind','channel-list'))
example_ts = fr.read_timeseries(cache_files[clean_ifo], channel_names[clean_ifo], trigtime-1,2.0 )
clean_frame_srate = 1./example_ts.deltaT
print("Getting sampling rate of cleaned frame from input frame: %.2f Hz"%(clean_frame_srate))
except Exception as e:
clean_frame_srate = 16384.0
print("Setting clean frame srate to default: %.2f"%(clean_frame_srate))
# Set up cache files to point to local copies of frames in the working
# directory
......@@ -744,23 +774,27 @@ bayesline_medianpsd_dagname = os.path.join(workdir, os.path.basename(workdir)+'_
dagname = os.path.join(workdir, os.path.basename(workdir))
postdagname = os.path.join(workdir, os.path.basename(workdir)+'_post')
fpeakdagname = os.path.join(workdir, os.path.basename(workdir)+'_pfeak')
cleanframedagname = os.path.join(workdir, os.path.basename(workdir)+'_cleanframe')
bayesline_medianpsd_dag = pipeline.CondorDAG(log=bayesline_medianpsd_dagname+'.log')
dag = pipeline.CondorDAG(log=dagname+'.log')
postdag = pipeline.CondorDAG(log=postdagname+'.log')
fpeakdag = pipeline.CondorDAG(log=fpeakdagname+'.log')
cleanframedag = pipeline.CondorDAG(log=cleanframedagname+'.log')
# Set the name of the file that will contain the DAG.
bayesline_medianpsd_dag.set_dag_file(bayesline_medianpsd_dagname)
dag.set_dag_file(dagname)
postdag.set_dag_file(postdagname)
fpeakdag.set_dag_file(fpeakdagname)
cleanframedag.set_dag_file(cleanframedagname)
# Create DAG jobs
# bayesline: median bayesline spectral estimation
# bayeswave: main bayeswave analysis
# bayeswave_post: normal post-processing
# bayeswave_fpeak: Spectral analysis post-processing (typically for BNS)
# bayeswave_clean_frame: Produce glitch cleaned frames
# megasky: skymap job
# megaplot: remaining plots & webpage generation
# submitToGraceDB: upload skymap & PE to graceDB (optional)
......@@ -853,6 +887,11 @@ if opts.fpeak_analysis:
injfile=injfile, numrel_data=numrel_data)
bayeswave_fpeak_log = bayeswave_fpeak_job._CondorJob__log_file
if opts.bayeswave_clean_frame:
# The clean frame job is an instance of the BayesWaveCleanFrame job
bayeswave_clean_frame_job = pipe_utils.bayeswave_clean_frameJob(cp, cache_files, injfile=injfile, numrel_data=numrel_data)
bayeswave_clean_frame_log = bayeswave_clean_frame_job._CondorJob__log_file
megasky_job = pipe_utils.megaskyJob(cp, injfile)
megasky_log = megasky_job._CondorJob__log_file
megaplot_job = pipe_utils.megaplotJob(cp)
......@@ -1140,6 +1179,34 @@ for t,trigger in enumerate(trigger_list.triggers):
if cp.has_option('bayeswave_options','BW-inject'):
bayeswave_fpeak_node.set_BW_event(trigger.BW_event)
# ------------------------------------------------------------------
# BAYESWAVE CLEAN FRAME NODE
#
# Add options for bayeswave_clean_frame node
#
if opts.bayeswave_clean_frame:
clean_frame_outdir = os.path.join(outputDir, 'clean_frame')
if not os.path.exists(clean_frame_outdir):
os.makedirs(clean_frame_outdir)
bayeswave_clean_frame_node = pipe_utils.bayeswave_clean_frameNode(bayeswave_clean_frame_job)
bayeswave_clean_frame_node.set_trigtime(trigger.trigger_time)
clean_frame_duration = 4.0*trigger.seglen
bayeswave_clean_frame_node.set_frame_start(int(trigger.trigger_time - clean_frame_duration/2.0))
bayeswave_clean_frame_node.set_frame_srate(clean_frame_srate)
bayeswave_clean_frame_node.set_seglen(trigger.seglen)
if cp.has_option('bayeswave_options', 'fullOnly'):
model_type = 'full'
else:
model_type = 'glitch'
clean_ifo = ast.literal_eval(cp.get('bayeswave_clean_frame_options', 'ifo'))
glitch_param_file = os.path.join(outputDir, 'chains', '%s_params_%s.dat'%(model_type, clean_ifo))
bayeswave_clean_frame_node.set_glitch_param_file(glitch_param_file)
bayeswave_clean_frame_node.set_outdir(clean_frame_outdir)
# ------------------------------------------------------------------
# MEGAPY NODES
......@@ -1182,6 +1249,8 @@ for t,trigger in enumerate(trigger_list.triggers):
bayeswave_post_node.add_parent(bayeswave_node)
if opts.fpeak_analysis:
bayeswave_fpeak_node.add_parent(bayeswave_node)
if opts.bayeswave_clean_frame:
bayeswave_clean_frame_node.add_parent(bayeswave_node)
if not opts.skip_megapy:
megasky_node.add_parent(bayeswave_node)
megaplot_node.add_parent(bayeswave_post_node)
......@@ -1195,6 +1264,8 @@ for t,trigger in enumerate(trigger_list.triggers):
dag.add_node(bayeswave_psd_post_node)
dag.add_node(bayeswave_node)
if opts.bayeswave_clean_frame:
dag.add_node(bayeswave_clean_frame_node)
if not opts.skip_post and not opts.separate_post_dag:
dag.add_node(bayeswave_post_node)
......@@ -1233,6 +1304,8 @@ bayeswave_job._CondorJob__log_file = bayeswave_log
bayeswave_post_job._CondorJob__log_file = bayeswave_post_log
if opts.fpeak_analysis:
bayeswave_fpeak_job._CondorJob__log_file = bayeswave_fpeak_log
if opts.bayeswave_clean_frame:
bayeswave_clean_frame_job._CondorJob__log_file = bayeswave_clean_frame_log
megasky_job._CondorJob__log_file = megasky_log
megaplot_job._CondorJob__log_file = megaplot_log
......
......@@ -83,12 +83,15 @@ struct CleanFrameData
double bw_start; //bayeswave start time
double bw_trigtime; //bayeswave trigger time
char clean_suffix[MAXSTRINGSIZE]; //suffix to add to frame and channel type
char outdir[MAXSTRINGSIZE]; //Output directory
};
void print_usage();
void parse_command_line_args(int argc, char **argv, struct CleanFrameData *data);
static void output_frame(REAL8TimeSeries *timeData, REAL8TimeSeries *timeRes, REAL8TimeSeries *timeGlitch, CHAR *frameType, CHAR *ifo);
static void output_frame(REAL8TimeSeries *timeData, REAL8TimeSeries *timeRes, REAL8TimeSeries *timeGlitch, CHAR *frameType, CHAR *ifo, CHAR *outdir);
void printProgress (double percentage)
{
......@@ -146,6 +149,17 @@ int main(int argc, char *argv[])
FILE *infile;
FILE *vFile; //file pointer for verbose products
// Put in a default clean_suffix
// FIXME: We want to make this a required option
// to be supplied in the config file in the future
// The DCC number here refers to the cleaning recipe
// here: https://dcc.ligo.org/LIGO-T1700406
char *version = "T1700406_v4";
sprintf(data->clean_suffix, "%s", version);
// Put in the default directory name
char *outdir = ".";
sprintf(data->outdir, "%s", outdir);
// ------------------------------------------------- //
parse_command_line_args(argc, argv, data);
......@@ -164,7 +178,8 @@ int main(int argc, char *argv[])
timeData = readTseries(data->fr_cache,data->fr_chanl,epoch,data->fr_seglen);
timeGlitch = readTseries(data->fr_cache,data->fr_chanl,epoch,data->fr_seglen);
char * version="T1700406_v4";
version = data->clean_suffix;
outdir = data->outdir;
char outframeType[MAXSTRINGSIZE];
char outframeChannel[MAXSTRINGSIZE];
......@@ -334,7 +349,7 @@ int main(int argc, char *argv[])
// Output cleaned data!
output_frame(timeData, timeRes, timeGlitch, outframeType, data->ifo);
output_frame(timeData, timeRes, timeGlitch, outframeType, data->ifo, outdir);
XLALDestroyREAL8TimeSeries(timeData);
fprintf(stdout,"\n");
......@@ -352,7 +367,8 @@ static void output_frame(REAL8TimeSeries *timeData,
REAL8TimeSeries *timeRes,
REAL8TimeSeries *timeGlitch,
CHAR *frameType,
CHAR *ifo)
CHAR *ifo,
CHAR *outdir)
{
CHAR fname[2048];
INT4 duration;
......@@ -384,7 +400,7 @@ static void output_frame(REAL8TimeSeries *timeData,
/* get frame filename */
duration = gpsEnd - gpsStart;
snprintf( fname, FILENAME_MAX, "%c-%s-%d-%d.gwf", ifo[0], frameType, gpsStart, duration );
snprintf( fname, FILENAME_MAX, "%s/%c-%s-%d-%d.gwf", outdir, ifo[0], frameType, gpsStart, duration );
/* define frame */
frame = XLALFrameNew( &timeData->epoch, duration, "LIGO", 0, 1, detectorFlags );
......@@ -440,6 +456,8 @@ void parse_command_line_args(int argc, char **argv, struct CleanFrameData *data)
{"seglen", required_argument, 0, 0},
{"segment-start", required_argument, 0, 0},
{"trigtime", required_argument, 0, 0},
{"clean-suffix", required_argument, 0, 0},
{"outdir", required_argument, 0, 0},
{"median", no_argument, 0, 0},
{"verbose", no_argument, 0, 0},
{"help", no_argument, 0,'h'},
......@@ -526,6 +544,19 @@ void parse_command_line_args(int argc, char **argv, struct CleanFrameData *data)
argCount++;
data->bw_trigtime = (double)atof(optarg);
}
if(strcmp("clean-suffix", long_options[long_index].name) == 0)
{
argcheck[argCount]++;
argCount++;
sprintf(data->clean_suffix, "%s", optarg);
}
if(strcmp("outdir", long_options[long_index].name) == 0)
{
argcheck[argCount]++;
argCount++;
sprintf(data->outdir, "%s", optarg);
}
if(strcmp("verbose",long_options[long_index].name) == 0)
{
data->verboseFlag = 1;
......@@ -581,6 +612,8 @@ void parse_command_line_args(int argc, char **argv, struct CleanFrameData *data)
fprintf(stdout,"bayeswave segment length...%li\n",(long)data->bw_seglen);
fprintf(stdout,"bayeswave start time.......%li\n",(long)data->bw_start);
fprintf(stdout,"bayeswave trigger time.....%li\n",(long)data->bw_trigtime);
fprintf(stdout,"clean suffix...............%s\n",data->clean_suffix);
fprintf(stdout,"output directory...........%s\n",data->outdir);
fprintf(stdout,"\n");
}
......@@ -596,11 +629,14 @@ void print_usage()
fprintf(stdout,"--frame-start frame start time\n");
fprintf(stdout,"--frame-srate frame sammpling rate\n");
fprintf(stdout,"--seglen bayeswave segment length\n");
fprintf(stdout,"--srate bayeswave sampling rate\n");
fprintf(stdout,"--segment-start bayeswave segment start time\n");
fprintf(stdout,"--trigtime bayeswave trigger time\n");
fprintf(stdout,"\n");
fprintf(stdout,"Optional Arguments:\n");
fprintf(stdout,"--clean-suffix Suffix to be placed after frame and channel\n");
fprintf(stdout," names to indicate the cleaned frames\n");
fprintf(stdout," defaults to T1700406_v4 (FIXME)\n");
fprintf(stdout,"--outdir Output directory. Default is current directory ('.')\n");
fprintf(stdout,"--verbose print intermediate data products to disk\n");
fprintf(stdout,"--median use median glitch reconstruction\n");
fprintf(stdout," can only be used on a completed run\n");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment