Commit f06586a5 authored by Patrick Godwin's avatar Patrick Godwin

gstlal_inspiral_dag: fold in new additions from gstlal_inspiral_pipe since refactoring

parent f67f2e08
......@@ -62,6 +62,7 @@ from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import dagparts
from gstlal import datasource
from gstlal import paths as gstlal_config_paths
#----------------------------------------------------------
......@@ -231,9 +232,12 @@ def parse_command_line():
# Template bank
parser.add_option("--template-bank", metavar = "filename", help = "Set the template bank xml file.")
parser.add_option("--mass-model", metavar = "filename", help = "Set the name of the mass model. Options are 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', or 'file'")
parser.add_option("--mass-model", metavar = "filename", help = "Set the name of the mass model. Options are 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', 'uniform-template', or 'file'")
parser.add_option("--mass-model-file", metavar = "filename", help = "Set the name of the mass model file, e.g., mass_model.h5. Required if --mass-model=file")
# dtdphi option
parser.add_option("--dtdphi-file", metavar = "filename", action = "append", help = "dtdphi snr ratio pdfs to read from (hdf5 format)")
# SVD bank construction options
parser.add_option("--overlap", metavar = "num", type = "int", action = "append", help = "set the factor that describes the overlap of the sub banks, must be even!")
parser.add_option("--autocorrelation-length", type = "int", default = 201, help = "The minimum number of samples to use for auto-chisquared, default 201 should be odd")
......@@ -243,8 +247,8 @@ def parse_command_line():
parser.add_option("--samples-max", type = "int", default = 4096, help = "The maximum number of samples to use for time slices with frequencies below 64Hz, default 4096")
parser.add_option("--bank-cache", metavar = "filenames", action = "append", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc.. (can be given multiple times)")
parser.add_option("--tolerance", metavar = "float", type = "float", default = 0.9999, help = "set the SVD tolerance, default 0.9999")
parser.add_option("--flow", metavar = "num", type = "float", default = 40, help = "set the low frequency cutoff, default 40 (Hz)")
parser.add_option("--fmax", metavar = "num", type = "float", default = 1600, help = "set the max frequency cutoff, default 1600 (Hz)")
parser.add_option("--flow", metavar = "num", type = "float", action = "append", help = "set the low frequency cutoff. Can be given multiple times.")
parser.add_option("--fmax", metavar = "num", type = "float", default = 1600, help = "set the max frequency cutoff, default 1600 (Hz)")
parser.add_option("--sample-rate", metavar = "Hz", type = "int", help = "Set the sample rate. If not set, the sample rate will be based on the template frequency. The sample rate must be at least twice the highest frequency in the templates. If provided it must be a power of two")
parser.add_option("--identity-transform", action = "store_true", help = "Use identity transform, i.e. no SVD")
......@@ -301,11 +305,17 @@ def parse_command_line():
sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(bank_xmldoc)
assert len(sngl_inspiral_table) == len(set([r.template_id for r in sngl_inspiral_table])), "Template bank ids are not unique"
if options.mass_model not in ("narrow-bns", "broad-bns", "bbh", "ligo", "detected-logm", "file"):
raise ValueError("--mass-model must be 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', or 'file'")
if options.mass_model not in ("narrow-bns", "broad-bns", "bbh", "ligo", "detected-logm", "uniform-template", "file"):
raise ValueError("--mass-model must be 'narrow-bns', 'broad-bns', 'bbh', 'ligo', 'detected-logm', 'uniform-template', or 'file'")
if options.mass_model == "file" and not options.mass_model_file:
raise ValueError("--mass-model-file must be provided if --mass-model=file")
if not options.dtdphi_file:
options.dtdphi_file = [os.path.join(gstlal_config_paths["pkgdatadir"], "inspiral_dtdphi_pdf.h5")] * len(options.overlap)
if len(options.dtdphi_file) != len(options.overlap):
raise ValueError("You must provide as many dtdphi files as banks")
if options.num_banks:
options.num_banks = [int(v) for v in options.num_banks.split(",")]
......@@ -535,6 +545,7 @@ def median_psd_layer(dag, jobs, parent_nodes, *args):
def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
svd_nodes = {}
new_template_mchirp_dict = {}
svd_dtdphi_map = {}
for ifo, list_of_svd_caches in bank_cache.items():
bin_offset = 0
for j, svd_caches in enumerate(list_of_svd_caches):
......@@ -553,6 +564,7 @@ def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_m
ids.append("%d_%d" % (i+bin_offset, n))
if f in template_mchirp_dict:
mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
svd_dtdphi_map[i+bin_offset] = options.dtdphi_file[j]
svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path)
if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
......@@ -564,7 +576,7 @@ def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_m
parent_nodes = parent_nodes,
opts = {
"svd-tolerance":options.tolerance,
"flow":options.flow,
"flow":options.flow[j],
"sample-rate":options.sample_rate,
"clipleft":clipleft,
"clipright":clipright,
......@@ -598,7 +610,7 @@ def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_m
input_files = {"template-bank-file":options.template_bank},
)
return svd_nodes, new_template_mchirp_dict
return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map
def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
inspiral_nodes = {}
......@@ -626,7 +638,7 @@ def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, templa
ignore[injections].append(int(bgbin_index))
# FIXME choose better splitting?
numchunks = 10
numchunks = 50
# only use a channel dict with the relevant channels
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
......@@ -804,7 +816,11 @@ def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_s
injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)]
for injection_file in injection_files:
injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
opts = {"flow": options.flow, "fmax": options.fmax},
# FIXME somehow choose the actual flow based on mass?
# max(flow) is chosen for performance not
# correctness hopefully though it will be good
# enough
opts = {"flow":max(options.flow),"fmax":options.fmax},
input_files = {
"injection-file": injection_file,
"reference-psd-cache": "reference_psd.cache"
......@@ -971,7 +987,7 @@ def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_fi
input_files = {"": db}
)
def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file):
def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map):
instruments = "".join(sorted(instrument_set))
margnodes = {}
......@@ -989,9 +1005,20 @@ def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, optio
# mass bin dependent prior, but it really doesn't matter for
# the time being.
priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag,
parent_nodes = [one_ifo_svd_nodes[n]] + model_node,
opts = {"instrument": instrument_set, "background-prior": 1, "min-instruments": options.min_instruments},
input_files = {"svd-file": one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file": model_file},
parent_nodes = [one_ifo_svd_nodes[n]] + model_node or [],
opts = {
"instrument": instrument_set,
"background-prior": 1,
"min-instruments": options.min_instruments,
"coincidence-threshold":options.coincidence_threshold,
"df": "bandwidth"
},
input_files = {
"svd-file": one_ifo_svd_nodes[n].output_files["write-svd"],
"mass-model-file": mass_model_file,
"dtdphi-file": svd_dtdphi_map[n],
"psd-xml": ref_psd
},
output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])}
)
# Create a file that has the priors *and* all of the diststats
......@@ -1040,7 +1067,7 @@ def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument
def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
likelihood_nodes = {}
instruments = "".join(sorted(instrument_set))
chunk_size = 16
chunk_size = 100
bgbin_indices = sorted(lloid_output[None].keys())
for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[None][key], lloid_diststats[key], key) for key in bgbin_indices):
......@@ -1142,7 +1169,7 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
innodes = {}
# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
files_to_group = 40
files_to_group = 100
for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
if inj is None:
innode_key = None
......@@ -1177,7 +1204,7 @@ def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, o
# make sure outnodes has a None key, even if its value is an empty list
innodes.setdefault(None, [])
num_chunks = 50
num_chunks = 100
if options.vetoes is None:
vetoes = []
......@@ -1499,17 +1526,20 @@ if __name__ == '__main__':
if options.bank_cache:
# Compute SVD banks
svd_nodes, template_mchirp_dict = svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict)
svd_nodes, template_mchirp_dict, svd_dtdphi_map = svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict)
# mass model
model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd)
else:
model_node = None
model_file = options.mass_model_file
if not options.lloid_cache:
# Inspiral jobs by segment
inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
# marginalize jobs
marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file)
marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map)
# calc rank PDF jobs
rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment