Skip to content
Snippets Groups Projects
Commit 93f0f942 authored by Chad Hanna's avatar Chad Hanna
Browse files

bin/gstlal_inspiral_pipe: new gstlal inspiral offline dag generator

parent a88bebde
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#
# Copyright (C) 2011 Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run gstlal_inspiral offline
"""
__author__ = 'Chad Hanna <channa@caltech.edu>'
##############################################################################
# import standard modules and append the lalapps prefix to the python path
import sys, os, copy, math
import subprocess, socket, tempfile
##############################################################################
# import the modules we need to build the pipeline
from glue import iterutils
from glue import pipeline
from glue import lal
from glue.ligolw import lsctables
from glue import segments
from glue.ligolw import array
from glue.ligolw import param
import glue.ligolw.utils as utils
import glue.ligolw.utils.segments as ligolw_segments
from optparse import OptionParser
from gstlal import inspiral, inspiral_pipe
from gstlal import dagparts as gstlaldagparts
import numpy
from pylal.datatypes import LIGOTimeGPS
from gstlal import datasource
#
# Utility functions
#
def T050017_filename(instruments, description, start, end, extension, path = None):
if type(instruments) != type(str()):
instruments = "".join(sorted(instruments))
duration = end - start
extension = extension.strip('.')
if path is not None:
return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
else:
return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
class generic_job(inspiral_pipe.InspiralJob):
def __init__(self, program, tag_base = None, condor_commands = {}):
executable = inspiral_pipe.which(program)
inspiral_pipe.InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1])
for cmd,val in condor_commands.items():
self.add_condor_cmd(cmd, val)
class generic_node(inspiral_pipe.InspiralNode):
def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}):
inspiral_pipe.InspiralNode.__init__(self, job, dag, parent_nodes)
self.input_files = input_files
self.output_files = output_files
for opt, val in opts.items() + output_files.items() + input_files.items():
if val is None:
continue # not the same as val = '' which is allowed
if not hasattr(val, "__iter__"): # catches list like things but not strings
if opt == "":
self.add_var_arg(val)
else:
self.add_var_opt(opt, val)
# Must be an iterable
else:
if opt == "":
[self.add_var_arg(a) for a in val]
else:
self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))
#
# Utility functions
#
# FIXME surely this is in glue
def parse_cache_str(instr):
dictcache = {}
if instr is None: return dictcache
for c in instr.split(','):
ifo = c.split("=")[0]
cache = c.replace(ifo+"=","")
dictcache[ifo] = cache
return dictcache
def pipeline_dot_py_append_opts_hack(opt, vals):
out = str(vals[0])
for v in vals[1:]:
out += " --%s %s" % (opt, str(v))
return out
def sim_tag_from_inj_file(injections):
if injections is None:
return None
return injections.replace('.xml', '').replace('.gz', '')
#
# get a dictionary of all the disjoint 2+ detector combination segments
#
def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg):
segsdict = segments.segmentlistdict()
for n in range(2, 1 + len(analyzable_instruments_set)):
for ifo_combos in iterutils.choices(list(analyzable_instruments_set), n):
# never analyze H1H2 or H2L1 times
if set(ifo_combos) == set(('H1', 'H2')) or set(ifo_combos) == set(('L1', 'H2')):
print >> sys.stderr, "not analyzing: ", ifo_combos, " only time"
continue
segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(2048) #FIXME don't hard code
segsdict[frozenset(ifo_combos)] = gstlaldagparts.breakupsegs(segsdict[frozenset(ifo_combos)], options.max_segment_length, 2048) #FIXME don't hardcode
if not segsdict[frozenset(ifo_combos)]:
del segsdict[frozenset(ifo_combos)]
return segsdict
def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options):
psd_nodes = {}
for ifos in segsdict:
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
for seg in segsdict[ifos]:
psd_nodes[(ifos, seg)] = \
generic_node(refPSDJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":seg[0].seconds,
"gps-end-time":seg[1].seconds,
"data-source":"frames",
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
"psd-fft-length":options.psd_fft_length,
"frame-segments-name": options.frame_segments_name},
input_files = { "frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"write-psd":T050017_filename(ifos, "REFERENCE_PSD", seg[0].seconds, seg[1].seconds, '.xml.gz', path = refPSDJob.output_path)}
)
return psd_nodes
def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_groups, options, seg):
svd_nodes = {}
for i, bank_group in enumerate(bank_groups):
for ifo, files in bank_group.items():
# First sort out the clipleft, clipright options
clipleft = []
clipright = []
ids = []
for n, f in enumerate(files):
# handle template bank clipping
if (n == 0) and (i == 0):
clipleft.append(0)
else:
clipleft.append(options.overlap / 2)
if (i == len(bank_groups) - 1) and (n == len(files) -1):
clipright.append(0)
else:
clipright.append(options.overlap / 2)
ids.append("%d_%d" % (i, n))
svd_bank_name = T050017_filename(ifo, '%d_SVD' % (i,), seg[0].seconds, seg[1].seconds, '.xml.gz', path = svdJob.output_path)
svd_nodes.setdefault(ifo, []).append(
generic_node(svdJob, dag,
parent_nodes = parent_nodes,
opts = {"svd-tolerance":options.tolerance,
"flow":options.flow,
"clipleft":clipleft,
"clipright":clipright,
"samples-min":options.samples_min,
"samples-max-256":options.samples_max_256,
"samples-max-64":options.samples_max_64,
"samples-max":options.samples_max,
"autocorrelation-length":options.autocorrelation_length,
"bank-id":ids,
"identity-transform":options.identity_transform,
"snr-threshold":4.0, "ortho-gate-fap":0.5},
input_files = { "template-bank":files,
"reference-psd":psd},
output_files = {"write-svd":svd_bank_name}
)
)
return svd_nodes
def inspiral_node_gen(gstlalInspiralJob, dag, svd_nodes, segsdict, options, channel_dict):
inspiral_nodes = {}
for ifos in segsdict:
# setup dictionaries to hold the inspiral nodes
inspiral_nodes[(ifos, None)] = {}
for injections in options.injections:
inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
for seg in segsdict[ifos]:
# only use a channel dict with the relevant channels
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
# setup svd bank input string
# FIXME, if we have a lot of sub banks we might have to arrange to have multiple inspiral jobs
svd_bank_string = ""
numbanks = None
for ifo in ifos:
svd_bank_string += ",".join(["%s:%s" % (ifo, node.output_files["write-svd"]) for node in svd_nodes[ifo]])
svd_bank_string += ","
if numbanks is None:
numbanks = len(svd_nodes[ifo])
else:
# check that we have the same number of banks from each detector
assert numbanks == len(svd_nodes[ifo])
svd_bank_string = svd_bank_string.strip(",")
# setup output names
output_names = [T050017_filename(ifos, '%d_LLOID' % (i,), seg[0].seconds, seg[1].seconds, '.sqlite', path = gstlalInspiralJob.output_path) for i in range(numbanks)] # assume all ifos have same number of banks, assertion would have failed by now
dist_stat_names = [T050017_filename(ifos, '%d_DIST_STATS' % (i,), seg[0].seconds, seg[1].seconds, '.xml.gz', path = gstlalInspiralJob.output_path) for i in range(numbanks)] # assume all ifos have same number of banks, assertion would have failed by now
# non injection node
noninjnode = generic_node(gstlalInspiralJob, dag, parent_nodes = sum(svd_nodes.values(),[]),
opts = {"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":options.ht_gate_threshold,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":options.gps_start_time,
"gps-end-time":options.gps_end_time,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"svd-bank":svd_bank_string, #FIXME the parsing of this should be fixed in gstlal inspiral so that these can be input files
"tmp-space":inspiral_pipe.log_path(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames"
},
input_files = { "time-slide-file":options.time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
"blind-injections":options.blind_injections,
"veto-segments-file":options.vetoes,
},
output_files = {
"output":output_names,
"likelihood-file":dist_stat_names
}
)
inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
# process injections
for injections in options.injections:
# setup output names
sim_name = sim_tag_from_inj_file(injections)
output_names = [T050017_filename(ifos, '%d_LLOID_%s' % (i, sim_name), seg[0].seconds, seg[1].seconds, '.sqlite', path = gstlalInspiralJob.output_path) for i in range(numbanks)] # assume all ifos have same number of banks, they better!
dist_stat_names = [T050017_filename(ifos, '%d_DIST_STATS_%s' % (i, sim_name), seg[0].seconds, seg[1].seconds, '.xml.gz', path = gstlalInspiralJob.output_path) for i in range(numbanks)] # assume all ifos have same number of banks, they better!
# setup injection node
injnode = generic_node(gstlalInspiralInjJob, dag, parent_nodes = sum(svd_nodes.values(),[]),
opts = {"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":options.ht_gate_threshold,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":options.gps_start_time,
"gps-end-time":options.gps_end_time,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"svd-bank":svd_bank_string, #FIXME the parsing of this should be fixed in gstlal inspiral so that these can be input files
"tmp-space":inspiral_pipe.log_path(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames"
},
input_files = { "time-slide-file":options.time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
"veto-segments-file":options.vetoes,
"injections": injections
},
output_files = {
"output":output_names,
"likelihood-file":dist_stat_names
}
)
inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
return inspiral_nodes
def adapt_gstlal_inpiral_output(inspiral_nodes, options):
# first get the previous output in a usable form
lloid_output = {}
for inj in options.injections + [None]:
lloid_output[sim_tag_from_inj_file(inj)] = {}
lloid_diststats = {}
for ifos in segsdict:
for seg in segsdict[ifos]:
for node in inspiral_nodes[(ifos, None)][seg]:
for i,f in enumerate(node.output_files["output"]):
lloid_output[None].setdefault(i,[]).append(f)
for i,f in enumerate(node.output_files["likelihood-file"]):
lloid_diststats.setdefault(i,[]).append(f)
for inj in options.injections:
for node in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
for i,f in enumerate(node.output_files["output"]):
lloid_output[sim_tag_from_inj_file(inj)].setdefault(i,[]).append(f)
return lloid_output, lloid_diststats
def rank_and_merge(dag, calcLikelihoodJob, lalappsRunSqliteJob, toSqliteJob, inspiral_nodes, lloid_output, lloid_diststats, segsdict, options, boundary_seg):
likelihood_nodes = {}
outnodes = {}
for ifos in segsdict:
# first non-injections
for n, (inputs, diststats) in enumerate(zip(lloid_output[None].values(), lloid_diststats.values())):
likelihood_nodes.setdefault(None,[]).append(
generic_node(calcLikelihoodJob, dag,
parent_nodes=sum(inspiral_nodes[(ifos, None)].values(),[]),
opts = {"tmp-space":inspiral_pipe.log_path(), "background-prior":1.0, "synthesize-injections":1000000},
input_files = {"likelihood-file":diststats, "":inputs},
output_files = {"write-likelihood":T050017_filename(ifos, '%d_CALC_LIKELIHOOD' % (n,), boundary_seg[0].seconds, boundary_seg[1].seconds, '.xml.gz', path = calcLikelihoodJob.output_path)}
)
)
# then injections
for inj in options.injections:
for n, (inputs, diststats) in enumerate(zip(lloid_output[sim_tag_from_inj_file(inj)].values(), lloid_diststats.values())):
likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
generic_node(calcLikelihoodJob, dag,
parent_nodes = sum(inspiral_nodes[(ifos, None)].values(),[]) + sum(inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))].values(),[]),
opts = {"tmp-space":inspiral_pipe.log_path(), "background-prior":1.0, "synthesize-injections":1000000},
input_files = {"likelihood-file":diststats, "":inputs},
output_files = {"write-likelihood":T050017_filename(ifos, '%d_CALC_LIKELIHOOD_%s' % (n,sim_tag_from_inj_file(inj)), boundary_seg[0].seconds, boundary_seg[1].seconds, '.xml.gz', path = calcLikelihoodJob.output_path)}
)
)
# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
files_to_group = 10
for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
merge_nodes = []
inputs = sum([node.input_files[""] for node in nodes], [])
if inj is None:
# 10 at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
# Merging all the dbs from the same sub bank
for subbank, inputs in enumerate([node.input_files[""] for node in nodes]):
db = T050017_filename(ifos, '%04d_LLOID' % (subbank,), int(boundary_seg[0]), int(boundary_seg[1]), '.sqlite')
sqlitenode = generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.log_path()},
input_files = {"":inputs},
output_files = {"database":db}
)
sqlitenode = generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":db}
)
outnodes.setdefault(None, []).append(sqlitenode)
else:
# 10 at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
# Merging all the dbs from the same sub bank and injection run
for subbank, inputs in enumerate([node.input_files[""] for node in nodes]):
injdb = T050017_filename(ifos, '%04d_LLOID_%s' % (subbank, sim_tag_from_inj_file(inj)), int(boundary_seg[0]), int(boundary_seg[1]), '.sqlite')
sqlitenode = generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.log_path()},
input_files = {"":inputs},
output_files = {"database":injdb}
)
sqlitenode = generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":injdb}
)
outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)
return likelihood_nodes, outnodes
def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, innodes, options):
if options.vetoes is None:
vetoes = []
else:
vetoes = [options.vetoes]
# Merge the final non injection database
noninjdb = T050017_filename(instruments, 'ALL_LLOID', int(boundary_seg[0]), int(boundary_seg[1]), '.sqlite')
sqlitenode = generic_node(toSqliteJob, dag, parent_nodes = innodes[None],
opts = {"replace":"", "tmp-space":inspiral_pipe.log_path()},
input_files = {"": ([node.input_files[""] for node in innodes[None]] + vetoes + [options.frame_segments_file])},
output_files = {"database":noninjdb}
)
# cluster the final non injection database
noninjsqlitenode = generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":noninjdb}
)
injdbs = []
outnodes = [noninjsqlitenode]
for injections in options.injections:
# Setup the final output names, etc.
injdb = T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), int(boundary_seg[0]), int(boundary_seg[1]), '.sqlite')
injdbs.append(injdb)
injxml = injdb+".xml.gz"
# extract only the ndoes that were used for injections
thisinjnodes = innodes[sim_tag_from_inj_file(injections)]
# merge
sqlitenode = generic_node(toSqliteJob, dag, parent_nodes = thisinjnodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.log_path()},
input_files = {"": ([node.input_files[""] for node in thisinjnodes] + vetoes + [options.frame_segments_file, injections])},
output_files = {"database":injdb}
)
# cluster
clusternode = generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.log_path()},
input_files = {"":injdb}
)
clusternode = generic_node(toXMLJob, dag, parent_nodes = [clusternode],
opts = {"replace":False, "tmp-space":inspiral_pipe.log_path()},
output_files = {"extract":injxml},
input_files = {"":injdb}
)
inspinjnode = generic_node(ligolwInspinjFindJob, dag, parent_nodes = [clusternode],
opts = {"time-window":0.9},
input_files = {"":injxml}
)
sqlitenode = generic_node(toSqliteJob, dag, parent_nodes = [inspinjnode],
opts = {"replace":True, "tmp-space":inspiral_pipe.log_path()},
output_files = {"database":injdb},
input_files = {"":injxml}
)
outnodes.append(sqlitenode)
return injdbs, noninjdb, outnodes
def compute_FAP(marginalizeJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, likelihood_nodes, injdbs, noninjdb, final_sqlite_nodes):
# compute FAPs and FARs
# split up the marginilization into groups of 10
margin = [node.output_files["write-likelihood"] for inj, nodes in likelihood_nodes.items() for node in nodes if inj is None]
margout = []
margnodes = []
margnum = 16
for i,n in enumerate(range(0, len(margin), margnum)):
margout.append("%d_marginalized_likelihood.xml.gz" % (i,))
margnodes.append(generic_node(marginalizeJob, dag, parent_nodes = final_sqlite_nodes,
output_files = {"output":margout[-1]},
input_files = {"":margin[n:n+margnum]}
))
margnode = generic_node(marginalizeJob, dag, parent_nodes = margnodes,
output_files = {"output":"marginalized_likelihood.xml.gz"},
input_files = {"":margout}
)
farnode = generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = [margnode],
opts = {"tmp-space":inspiral_pipe.log_path()},
input_files = {"background-bins-file":"marginalized_likelihood.xml.gz", "injection-dbs":injdbs, "non-injection-db":noninjdb}
)
return farnode
def parse_command_line():
parser = OptionParser(description = __doc__)
# generic data source options
datasource.append_options(parser)
parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
# SVD bank construction options
parser.add_option("--overlap", metavar = "num", type = "int", default = 0, help = "set the factor that describes the overlap of the sub banks, must be even!")
parser.add_option("--autocorrelation-length", type = "int", default = 201, help = "The minimum number of samples to use for auto-chisquared, default 201 should be odd")
parser.add_option("--samples-min", type = "int", default = 1024, help = "The minimum number of samples to use for time slices default 1024")
parser.add_option("--samples-max-256", type = "int", default = 1024, help = "The maximum number of samples to use for time slices with frequencies above 256Hz, default 1024")
parser.add_option("--samples-max-64", type = "int", default = 2048, help = "The maximum number of samples to use for time slices with frequencies above 64Hz, default 2048")
parser.add_option("--samples-max", type = "int", default = 4096, help = "The maximum number of samples to use for time slices with frequencies below 64Hz, default 4096")
parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
parser.add_option("--tolerance", metavar = "float", type = "float", default = 0.9999, help = "set the SVD tolerance, default 0.9999")
parser.add_option("--flow", metavar = "num", type = "float", default = 40, help = "set the low frequency cutoff, default 40 (Hz)")
parser.add_option("--identity-transform", action = "store_true", help = "Use identity transform, i.e. no SVD")
# trigger generation options
parser.add_option("--vetoes", metavar = "filename", help = "Set the veto xml file.")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--web-dir", metavar = "directory", help = "Set the web directory like /home/USER/public_html")
parser.add_option("--fir-stride", type="int", metavar = "secs", default = 8, help = "Set the duration of the fft output blocks, default 8")
parser.add_option("--control-peak-time", type="int", default = 8, metavar = "secs", help = "Set the peak finding time for the control signal, default 8")
parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
parser.add_option("--max-segment-length", type="int", metavar = "dur", default = 30000, help = "Break up segments longer than dur seconds into shorter (contiguous, non-overlapping) segments. Default 30000 seconds.")
parser.add_option("--num-banks", metavar = "str", help = "the number of banks per job. can be given as a list like 1,2,3,4 then it will split up the bank cache into N groups with M banks each.")
parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
parser.add_option("--ht-gate-threshold", type="float", help="set a threshold on whitened h(t) to veto glitches")
parser.add_option("--inspiral-executable", default = "gstlal_inspiral", help = "Options gstlal_inspiral | gstlal_iir_inspiral, default gstlal_inspiral")
parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.")
parser.add_option("--verbose", action = "store_true", help = "Be verbose")
# Override the datasource injection option
parser.remove_option("--injections")
parser.add_option("--injections", action = "append", help = "append injection files to analyze")
options, filenames = parser.parse_args()
options.num_banks = [int(v) for v in options.num_banks.split(",")]
if options.overlap % 2:
raise ValueError("overlap must be even")
fail = ""
for option in ("bank_cache",):
if getattr(options, option) is None:
fail += "must provide option %s\n" % (option)
if fail: raise ValueError, fail
#FIXME a hack to find the sql paths
share_path = os.path.split(inspiral_pipe.which('gstlal_reference_psd'))[0].replace('bin', 'share/gstlal')
options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
return options, filenames
#
# Useful variables
#
options, filenames = parse_command_line()
bank_cache = parse_cache_str(options.bank_cache)
detectors = datasource.GWDataSourceInfo(options)
channel_dict = detectors.channel_dict
instruments = "".join(sorted(bank_cache.keys()))
boundary_seg = detectors.seg
name_tag = "plots/gstlal-%d-%d_" % (int(boundary_seg[0]), int(boundary_seg[1]))
#
# Setup the dag
#
try:
os.mkdir("logs")
except:
pass
dag = inspiral_pipe.DAG("trigger_pipe")
if options.max_inspiral_jobs is not None:
dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs)
#
# setup the job classes
#
refPSDJob = generic_job('gstlal_reference_psd')
medianPSDJob = generic_job('gstlal_median_of_psds')
svdJob = generic_job('gstlal_svd_bank')
horizonJob = generic_job("gstlal_plot_psd_horizon")
gstlalInspiralJob = generic_job(options.inspiral_executable, condor_commands = {"requirements":"( CAN_RUN_MULTICORE )", "request_cpus":"8", "+RequiresMultipleCores":"True"})
gstlalInspiralInjJob = generic_job(options.inspiral_executable, tag_base="gstlal_inspiral_inj", condor_commands = {"requirements":"( CAN_RUN_MULTICORE )", "request_cpus":"8", "+RequiresMultipleCores":"True"})
calcLikelihoodJob = generic_job("gstlal_inspiral_calc_likelihood")
calcLikelihoodJobInj = generic_job("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj')
gstlalInspiralComputeFarFromSnrChisqHistogramsJob = generic_job("gstlal_compute_far_from_snr_chisq_histograms")
ligolwInspinjFindJob = generic_job("ligolw_inspinjfind")
toSqliteJob = generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml")
toXMLJob = generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml")
lalappsRunSqliteJob = generic_job("lalapps_run_sqlite")
plotSummaryJob = generic_job("gstlal_inspiral_plotsummary")
plotSensitivityJob = generic_job("gstlal_inspiral_plot_sensitivity")
openpageJob = generic_job("gstlal_s5_pbh_summary_page", tag_base = 'gstlal_s5_pbh_summary_page_open')
pageJob = generic_job("gstlal_s5_pbh_summary_page")
marginalizeJob = generic_job("gstlal_inspiral_marginalize_likelihood")
plotbackgroundJob = generic_job("gstlal_inspiral_plot_background")
#
# Get the analysis segments
#
segsdict = analysis_segments(set(bank_cache.keys()), detectors.frame_segments, boundary_seg)
#
# Compute the PSDs for each segment
#
psd_nodes = psd_node_gen(refPSDJob, dag, [], segsdict, channel_dict, options)
#
# plot the horizon distance
#
generic_node(horizonJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
output_files = {"":name_tag + "horizon.png"}
)
#
# compute the median PSD
#
median_psd_node = \
generic_node(medianPSDJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
output_files = {"output-name": T050017_filename(instruments, "REFERENCE_PSD", boundary_seg[0].seconds, boundary_seg[1].seconds, '.xml.gz', path = medianPSDJob.output_path)}
)
#
# Compute SVD banks
#
svd_nodes = svd_node_gen(svdJob, dag, [median_psd_node], median_psd_node.output_files["output-name"], inspiral_pipe.build_bank_groups(bank_cache, options.num_banks), options, boundary_seg)
#
# Inspiral jobs by segment
#
inspiral_nodes = inspiral_node_gen(gstlalInspiralJob, dag, svd_nodes, segsdict, options, channel_dict)
#
# Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
#
lloid_output, lloid_diststats = adapt_gstlal_inpiral_output(inspiral_nodes, options)
#
# Setup likelihood jobs, clustering and merging
#
likelihood_nodes, outnodes = rank_and_merge(dag, calcLikelihoodJob, lalappsRunSqliteJob, toSqliteJob, inspiral_nodes, lloid_output, lloid_diststats, segsdict, options, boundary_seg)
#
# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
#
injdbs, noninjdb, final_sqlite_nodes = finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, outnodes, options)
#
# Compute FAP
#
farnode = compute_FAP(marginalizeJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, likelihood_nodes, injdbs, noninjdb, final_sqlite_nodes)
# make summary plots
plotnodes = []
plotnodes.append(generic_node(plotSummaryJob, dag, parent_nodes=[farnode],
opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.log_path(), "base":name_tag},
input_files = {"":[noninjdb] + injdbs}
))
# make sensitivity plots
plotnodes.append(generic_node(plotSensitivityJob, dag, parent_nodes=[farnode],
opts = {"user-tag":name_tag.split("/")[-1], "output-dir":name_tag.rstrip(name_tag.split("/")[-1]), "tmp-space":inspiral_pipe.log_path(), "veto-segments-name":"vetoes", "bin-by-total-mass":"", "bin-by-mass1-mass2":"", "bin-by-mass1-mass2":"", "include-play":""},
input_files = {"--zero-lag-database":noninjdb, "":injdbs}
))
# make backround plots
plotnodes.append(generic_node(plotbackgroundJob, dag, parent_nodes = [farnode], opts = {"base":name_tag}, input_files = {"marginalized-file":"post_marginalized_likelihood.xml.gz"}))
# make a web page
generic_node(openpageJob, dag, parent_nodes = plotnodes,
opts = {"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), "webserver-dir":options.web_dir, "output-name-tag":name_tag, "open-box":""}
)
generic_node(pageJob, dag, parent_nodes = plotnodes,
opts = {"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), "webserver-dir":options.web_dir, "output-name-tag":name_tag}
)
#
# all done
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment