Forked from
lscsoft / GstLAL
2635 commits behind the upstream repository.
-
Kipp Cannon authored
- currently nonsense, redundant since addition of sum-of-SNR^2 cut. the latter will be fixed, cleaned up, and made to incorporate this feature
Kipp Cannon authored- currently nonsense, redundant since addition of sum-of-SNR^2 cut. the latter will be fixed, cleaned up, and made to incorporate this feature
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_inspiral_pipe 80.88 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2014 Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### The offline gstlal inspiral workflow generator; Use to make HTCondor DAGs to run CBC workflows
###
### Usage:
### ------
###
### It is rare that you would invoke this program in a standalone mode. Usually
### the inputs are complicated and best automated via a Makefile, e.g.,
### Makefile.triggers_example
###
### Diagram of the HTCondor workfow produced
### ----------------------------------------
###
### .. graphviz:: ../images/trigger_pipe.dot
###
"""
This program makes a dag to run gstlal_inspiral offline
"""
__author__ = 'Chad Hanna <chad.hanna@ligo.org>'
##############################################################################
# import standard modules and append the lalapps prefix to the python path
import sys, os, stat
import itertools
from optparse import OptionParser
##############################################################################
# import the modules we need to build the pipeline
import lal
import lal.series
from lal.utils import CacheEntry
from glue import pipeline
from ligo import segments
from glue.ligolw import ligolw
from glue.ligolw import lsctables
import glue.ligolw.utils as ligolw_utils
import glue.ligolw.utils.segments as ligolw_segments
from gstlal import inspiral, inspiral_pipe
from gstlal import dagparts as gstlaldagparts
from gstlal import datasource
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
#
# Utility functions
#
def sim_tag_from_inj_file(injections):
if injections is None:
return None
return injections.replace('.xml', '').replace('.gz', '').replace('-','_')
def get_bank_params(bank_cache, options, verbose = False):
max_time = 0
template_mchirp_dict = {}
for n, cache in enumerate(bank_cache.values()[0]):
for ce in map(CacheEntry, open(cache)):
for ce in map(CacheEntry, open(ce.path)):
xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2]), max(snglinspiraltable.getColumnByName('mchirp')[options.overlap[n]/2:-options.overlap[n]/2])]
xmldoc.unlink()
return max_time, template_mchirp_dict
def chunks(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
def flatten(lst):
"Flatten one level of nesting"
return list(itertools.chain.from_iterable(lst))
def subdir_path(dirlist):
output_path = '/'.join(dirlist)
try:
os.mkdir(output_path)
except:
pass
return output_path
#
# get a dictionary of all the disjoint 2+ detector combination segments
#
def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
start_pad = 512 + max_template_length
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(5 * start_pad)
for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
# never analyze H1H2 or H2L1 times
#if set(ifo_combos) == set(('H1', 'H2')) or set(ifo_combos) == set(('L1', 'H2')):
# print >> sys.stderr, "not analyzing: ", ifo_combos, " only time"
# continue
segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
segsdict[frozenset(ifo_combos)] = gstlaldagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
if not segsdict[frozenset(ifo_combos)]:
del segsdict[frozenset(ifo_combos)]
return segsdict
def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options):
psd_nodes = {}
for ifos in segsdict:
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
for seg in segsdict[ifos]:
psd_nodes[(ifos, seg)] = \
inspiral_pipe.generic_node(refPSDJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"data-source":"frames",
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
"psd-fft-length":options.psd_fft_length,
"frame-segments-name": options.frame_segments_name},
input_files = { "frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"write-psd":inspiral_pipe.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = subdir_path([refPSDJob.output_path, str(int(seg[0]))[:5]]))}
)
return psd_nodes
def inj_psd_node_gen(segsdict, options):
psd_nodes = {}
psd_cache_files = {}
for ce in map(CacheEntry, open(options.psd_cache)):
psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
for ifos in segsdict:
reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s)
ref_psd_file_num = 0
for seg in segsdict[ifos]:
while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
ref_psd_file_num += 1
psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
ref_psd_parent_nodes = []
return psd_nodes, ref_psd_parent_nodes
def model_node_gen(modelJob, dag, instruments, options, seg, template_bank):
if options.mass_model_file is None:
# choose, arbitrarily, the lowest instrument in alphabetical order
model_file_name = inspiral_pipe.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = modelJob.output_path)
model_node = inspiral_pipe.generic_node(modelJob, dag,
input_files = {"template-bank": template_bank},
opts = {"model":options.mass_model},
output_files = {"output": model_file_name},
parent_nodes = []
)
return [model_node], model_file_name
else:
return [], options.mass_model_file
def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
svd_nodes = {}
new_template_mchirp_dict = {}
for ifo, list_of_svd_caches in bank_cache.items():
bin_offset = 0
for j, svd_caches in enumerate(list_of_svd_caches):
svd_caches = map(CacheEntry, open(svd_caches))
for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
# First sort out the clipleft, clipright options
clipleft = []
clipright = []
ids = []
mchirp_interval = (float("inf"), 0)
individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
for n, f in enumerate(ce.path for ce in individual_svd_cache):
# handle template bank clipping
clipleft.append(options.overlap[j] / 2)
clipright.append(options.overlap[j] / 2)
ids.append("%d_%d" % (i+bin_offset, n))
if f in template_mchirp_dict:
mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
svd_bank_name = inspiral_pipe.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = svdJob.output_path)
if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval
svdnode = inspiral_pipe.generic_node(svdJob, dag,
parent_nodes = parent_nodes,
opts = {"svd-tolerance":options.tolerance,
"flow":options.flow,
"clipleft":clipleft,
"clipright":clipright,
"samples-min":options.samples_min[j],
"samples-max-256":options.samples_max_256,
"samples-max-64":options.samples_max_64,
"samples-max":options.samples_max,
"autocorrelation-length":options.autocorrelation_length,
"bank-id":ids,
"identity-transform":options.identity_transform,
"ortho-gate-fap":0.5},
input_files = {"reference-psd":psd},
input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
output_files = {"write-svd":svd_bank_name}
)
# impose a priority to help with depth first submission
svdnode.set_priority(99)
svd_nodes.setdefault(ifo, []).append(svdnode)
bin_offset += i+1
#
# Plot template/svd bank jobs
#
primary_ifo = bank_cache.keys()[0]
inspiral_pipe.generic_node(plotBanksJob, dag,
parent_nodes = sum(svd_nodes.values(),[]),
opts = {"plot-template-bank":"",
"output-dir": output_dir},
input_files = {"template-bank-file":options.template_bank}
)
return svd_nodes, new_template_mchirp_dict
def create_svd_bank_strings(svd_nodes, instruments = None):
# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
outstrings = []
for i in range(len(svd_nodes.values()[0])):
svd_bank_string = ""
for ifo in svd_nodes:
if instruments is not None and ifo not in instruments:
continue
try:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
except AttributeError:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
svd_bank_string = svd_bank_string.strip(",")
outstrings.append(svd_bank_string)
return outstrings
def svd_bank_cache_maker(svd_bank_strings, injection = False):
if injection:
dir_name = "gstlal_inspiral_inj"
else:
dir_name = "gstlal_inspiral"
svd_cache_entries = []
parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
for svd_bank_parsed_dict in parsed_svd_bank_strings:
for filename in svd_bank_parsed_dict.itervalues():
svd_cache_entries.append(CacheEntry.from_T050017(filename))
return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
inspiral_nodes = {}
for ifos in segsdict:
# setup dictionaries to hold the inspiral nodes
inspiral_nodes[(ifos, None)] = {}
ignore = {}
injection_files = []
for injections in options.injections:
min_chirp_mass, max_chirp_mass, injections = injections.split(':')
injection_files.append(injections)
min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
ignore[injections] = []
for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
if max_chirp_mass <= bounds[0]:
ignore[injections].append(int(bgbin_index))
# NOTE putting a break here assumes that the min chirp mass
# in a subbank increases with bin number, i.e. XXXX+1 has a
# greater minimum chirpmass than XXXX, for all XXXX. Note
# that the reverse is not true, bin XXXX+1 may have a lower
# max chirpmass than bin XXXX.
elif min_chirp_mass > bounds[1]:
ignore[injections].append(int(bgbin_index))
# FIXME choose better splitting?
numchunks = 10
# only use a channel dict with the relevant channels
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
# get the svd bank strings
svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())
# get a mapping between chunk counter and bgbin for setting priorities
bgbin_chunk_map = {}
for seg in segsdict[ifos]:
if injection_files:
output_seg_inj_path = subdir_path([gstlalInspiralInjJob.output_path, str(int(seg[0]))[:5]])
if gstlalInspiralJob is None:
# injection-only run
inspiral_nodes[(ifos, None)].setdefault(seg, [None])
else:
output_seg_path = subdir_path([gstlalInspiralJob.output_path, str(int(seg[0]))[:5]])
for chunk_counter, svd_bank_strings in enumerate(chunks(svd_bank_strings_full, numchunks)):
bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
# setup output names
output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i, s in enumerate(svd_bank_strings)]
dist_stat_names = [inspiral_pipe.T050017_filename(ifos, '%s_DIST_STATS' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i,s in enumerate(svd_bank_strings)]
for bgbin in bgbin_indices:
bgbin_chunk_map.setdefault(bgbin, chunk_counter)
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = None
if options.ht_gate_threshold_linear is not None:
# A scale is given
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
else:
if options.ht_gate_threshold is not None:
threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
# non injection node
noninjnode = inspiral_pipe.generic_node(gstlalInspiralJob, dag,
parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
opts = {"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":threshold_values,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"singles-threshold":options.singles_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames",
"local-frame-caching":"",
"min-instruments":options.min_instruments,
"reference-likelihood-file":options.reference_likelihood_file
},
input_files = { "time-slide-file":options.time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
"blind-injections":options.blind_injections,
"veto-segments-file":options.vetoes,
},
input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
output_cache_files = {
"output-cache":output_names,
"ranking-stat-output-cache":dist_stat_names
}
)
# Set a post script to check for file integrity
if options.gzip_test:
noninjnode.set_post_script("gzip_test.sh")
noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
# impose a priority to help with depth first submission
noninjnode.set_priority(chunk_counter+15)
inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
# process injections
for injections in injection_files:
# setup output names
sim_name = sim_tag_from_inj_file(injections)
bgbin_svd_bank_strings = [index_bank_string_tuple for i, index_bank_string_tuple in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]
for chunk_counter, bgbin_list in enumerate(chunks(bgbin_svd_bank_strings, numchunks)):
bgbin_indices, svd_bank_strings = zip(*bgbin_list)
output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID_%s' % (bgbin_index, sim_name), seg, '.xml.gz', path = output_paths[i]) for i, bgbin_index in enumerate(bgbin_indices)]
svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
try:
reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
except AttributeError:
# injection-only run
reference_psd = psd_nodes[(ifos, seg)]
parents = []
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = None
if options.ht_gate_threshold_linear is not None:
# A scale is given
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
else:
if options.ht_gate_threshold is not None:
threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
# setup injection node
injnode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, parent_nodes = parents,
opts = {"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":threshold_values,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"singles-threshold":options.singles_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames",
"local-frame-caching":"",
"min-instruments":options.min_instruments,
"reference-likelihood-file":options.reference_likelihood_file
},
input_files = { "time-slide-file":options.inj_time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":reference_psd,
"veto-segments-file":options.vetoes,
"injections": injections
},
input_cache_files = {"svd-bank-cache":svd_names},
input_cache_file_name = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017(filename) for filename in svd_names], '.cache').replace('SVD', 'SVD_%s' % sim_name),
output_cache_files = {
"output-cache":output_names
}
)
# Set a post script to check for file integrity
if options.gzip_test:
injnode.set_post_script("gzip_test.sh")
injnode.add_post_script_arg(" ".join(output_names))
# impose a priority to help with depth first submission
if bgbin_chunk_map:
injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
else:
injnode.set_priority(chunk_counter+1)
inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
# Replace mchirplo:mchirphi:inj.xml with inj.xml
options.injections = [inj.split(':')[-1] for inj in options.injections]
return inspiral_nodes
def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
# first get the previous output in a usable form
lloid_output = {}
for inj in options.injections + [None]:
lloid_output[sim_tag_from_inj_file(inj)] = {}
lloid_diststats = {}
if options.dist_stats_cache:
for ce in map(CacheEntry, open(options.dist_stats_cache)):
lloid_diststats[ce.description.split("_")[0]] = [ce.path]
for ifos in segsdict:
for seg in segsdict[ifos]:
# iterate over the mass space chunks for each segment
for node in inspiral_nodes[(ifos, None)][seg]:
if node is None:
break
len_out_files = len(node.output_files["output-cache"])
for f in node.output_files["output-cache"]:
# Store the output files and the node for use as a parent dependency
lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
for f in node.output_files["ranking-stat-output-cache"]:
lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
for inj in options.injections:
for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
if injnode is None:
continue
for f in injnode.output_files["output-cache"]:
# Store the output files and the node and injnode for use as a parent dependencies
bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
try:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
except KeyError:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
return lloid_output, lloid_diststats
def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWithZerolagJob, calcLikelihoodJob, calcLikelihoodJobInj, lalappsRunSqliteJob, toSqliteJob, marginalizeJob, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, mass_model_add_node, mass_model_file):
likelihood_nodes = {}
rankpdf_nodes = []
rankpdf_zerolag_nodes = []
outnodes = {}
instruments = "".join(sorted(instrument_set))
margnodes = {}
# NOTE! we rely on there being identical templates in each instrument, so we just take one of the values of the svd_nodes which are a dictionary
one_ifo_svd_nodes = svd_nodes.values()[0]
# Here n counts the bins
# first non-injections, which will get skipped if this is an injections-only run
for n, (outputs, diststats) in enumerate((lloid_output[None][key], lloid_diststats[key]) for key in sorted(lloid_output[None].keys())):
inputs = [o[0] for o in outputs]
parents = []
[parents.extend(o[1]) for o in outputs]
# FIXME we keep this here in case we someday want to have a
# mass bin dependent prior, but it really doesn't matter for
# the time being.
priornode = inspiral_pipe.generic_node(createPriorDistStatsJob, dag,
parent_nodes = [one_ifo_svd_nodes[n]] + mass_model_add_node,
opts = {"instrument":instrument_set, "background-prior":1, "min-instruments":options.min_instruments},
input_files = {"svd-file":one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file":mass_model_file},
output_files = {"write-likelihood":inspiral_pipe.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = createPriorDistStatsJob.output_path)}
)
# Create a file that has the priors *and* all of the diststats
# for a given bin marginalized over time. This is all that will
# be needed to compute the likelihood
diststats_per_bin_node = inspiral_pipe.generic_node(marginalizeJob, dag,
parent_nodes = [priornode] + parents,
opts = {"marginalize":"ranking-stat"},
input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"]]},
output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = marginalizeJob.output_path)},
input_cache_file_name = inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache')
)
calcranknode = inspiral_pipe.generic_node(calcRankPDFsJob, dag,
parent_nodes = [diststats_per_bin_node],
opts = {"ranking-stat-samples":options.ranking_stat_samples},
input_files = {"":diststats_per_bin_node.output_files["output"]},
output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsJob.output_path)}
)
calcrankzerolagnode = inspiral_pipe.generic_node(calcRankPDFsWithZerolagJob, dag,
parent_nodes = [diststats_per_bin_node],
opts = {"add-zerolag-to-background":"","ranking-stat-samples":options.ranking_stat_samples},
input_files = {"":diststats_per_bin_node.output_files["output"]},
output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsWithZerolagJob.output_path)}
)
margnodes['%04d' %(n,)] = diststats_per_bin_node
rankpdf_nodes.append(calcranknode)
rankpdf_zerolag_nodes.append(calcrankzerolagnode)
# Break up the likelihood jobs into chunks to process fewer files, e.g, 16
likelihood_nodes.setdefault(None,[]).append(
[inspiral_pipe.generic_node(calcLikelihoodJob, dag,
parent_nodes = [diststats_per_bin_node],
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"likelihood-url":diststats_per_bin_node.output_files["output"]},
input_cache_files = {"input-cache":chunked_inputs}
) for chunked_inputs in chunks(inputs, 16)]
)
# then injections
for inj in options.injections:
for n, (outputs, diststats, bgbin_index) in enumerate((lloid_output[sim_tag_from_inj_file(inj)][key], lloid_diststats[key], key) for key in sorted(lloid_output[sim_tag_from_inj_file(inj)].keys())):
if outputs is None:
continue
inputs = [o[0] for o in outputs]
parents = []
[parents.extend(o[1]) for o in outputs]
if margnodes:
parents.append(margnodes[bgbin_index])
likelihood_url = margnodes[bgbin_index].output_files["output"]
else:
likelihood_url = diststats[0]
# Break up the likelihood jobs into chunks to process fewer files, e.g., 16
likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append(
[inspiral_pipe.generic_node(calcLikelihoodJobInj, dag,
parent_nodes = parents,
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"likelihood-url":likelihood_url},
input_cache_files = {"input-cache":chunked_inputs}
) for chunked_inputs in chunks(inputs, 16)]
)
# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
files_to_group = 40
for subbank, (inj, nodes) in enumerate(likelihood_nodes.items()):
# Flatten the nodes for this sub bank
nodes = flatten(nodes)
merge_nodes = []
# Flatten the input/output files from calc_likelihood
inputs = flatten([node.input_files["input-cache"] for node in nodes])
if inj is None:
# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
if options.copy_raw_results:
merge_nodes[-1].set_pre_script("store_raw.sh")
merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
# Merging all the dbs from the same sub bank
for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
db = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite')
db = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(db).description[:4]]), db)
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":inputs},
output_files = {"database":db},
input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":db}
)
outnodes.setdefault(None, []).append(sqlitenode)
else:
# files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run
for n in range(0, len(inputs), files_to_group):
merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes,
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":inputs[n:n+files_to_group]}
)
)
if options.copy_raw_results:
merge_nodes[-1].set_pre_script("store_raw.sh")
merge_nodes[-1].add_pre_script_arg(" ".join(inputs[n:n+files_to_group]))
# Merging all the dbs from the same sub bank and injection run
for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]):
injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite')
injdb = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb)
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":inputs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":injdb}
)
outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode)
# make sure outnodes has a None key, even if its value is an empty list
outnodes.setdefault(None, [])
return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options):
rankpdf_nodes = sorted([CacheEntry(line).path for line in open(options.rank_pdf_cache)], key = lambda s: int(os.path.basename(s).split('-')[1].split('_')[0]))
rankpdf_zerolag_nodes = []
outnodes = {}
if options.num_files_per_background_bin == 1:
bgbin_lloid_map = {}
# Get list of all files for each background bin (will be same length for each bin)
for ce in map(CacheEntry, open(options.lloid_cache)):
bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
if len(bgbin_lloid_map.values()[0]) == 1:
# Starting with 1:1 mapping between files and bins,
# thus no merging is needed yet
outnodes[None] = [dbs[0] for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k))]
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
outnodes[sim_tag_from_inj_file(options.injections_for_merger[i])] = [CacheEntry(line).path for line in open(inj_lloid_cache)]
else:
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
# merge all of the dbs from the same subbank
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":dbs},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":noninjdb}
)
outnodes.setdefault(None, []).append(sqlitenode)
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
bgbin_lloid_map = {}
for ce in map(CacheEntry, open(inj_lloid_cache)):
bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path)
for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)):
injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
# merge all of the dbs from the same subbank
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":dbs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":injdb}
)
outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
else:
# Starting with output of analysis before naming convention
# update (commit 5efd78fee6b371c999f510d07be33ec64f385695),
# so all lloid files contain gps times for entire run, and are
# numbered by iterating through segments in a given bin first
# (e.g. files 0000 to 0009 may all belong to bin 0000, then
# files 0010 to 0019 would all belong to bin 0001, etc)
for ce_list in chunks(map(CacheEntry, open(options.lloid_cache)), options.num_files_per_background_bin):
hi_index = ce_list[-1].description.split('_')[0]
noninjdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
# merge all of the dbs from the same subbank
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":noninjdb}
)
outnodes.setdefault(None, []).append(sqlitenode)
for i, inj_lloid_cache in enumerate(options.inj_lloid_cache):
for ce_list in chunks(map(CacheEntry, open(inj_lloid_cache)), options.num_files_per_background_bin):
hi_index = ce_list[-1].description.split('_')[0]
injdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
# merge all of the dbs from the same subbank
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [],
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":[ce.path for ce in ce_list]},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":injdb}
)
outnodes.setdefault(sim_tag_from_inj_file(options.injections_for_merger[i]), []).append(sqlitenode)
return rankpdf_nodes, rankpdf_zerolag_nodes, outnodes
def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, toSqliteNoCacheJob, cpJob, innodes, ligolw_add_nodes, options, instruments):
num_chunks = 50
if options.vetoes is None:
vetoes = []
else:
vetoes = [options.vetoes]
chunk_nodes = []
dbs_to_delete = []
# Process the chirp mass bins in chunks to paralellize the merging process
for chunk, nodes in enumerate(chunks(innodes[None], num_chunks)):
try:
dbs = [node.input_files[""] for node in nodes]
parents = nodes
except AttributeError:
# analysis started at merger step but seeded by lloid files which
# have already been merged into one file per background
# bin, thus the analysis will begin at this point
dbs = nodes
parents = []
# Merge the final non injection database into chunks
noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache": dbs},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":noninjdb}
)
chunk_nodes.append(noninjsqlitenode)
dbs_to_delete.append(noninjdb)
# Merge the final non injection database
outnodes = []
injdbs = []
if options.non_injection_db:
# injection-only run
noninjdb = options.non_injection_db
else:
noninjdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_files = {"": (vetoes + [options.frame_segments_file])},
input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
output_files = {"database":noninjdb},
input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache')
)
# cluster the final non injection database
noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":noninjdb}
)
cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [noninjsqlitenode],
input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
)
outnodes.append(cpnode)
# FIXME far-injections currently doesnt work, either fix it or delete it
#for injections, far_injections in zip(options.injections, options.far_injections):
if options.injections:
iterable_injections = options.injections
else:
iterable_injections = options.injections_for_merger
for injections in iterable_injections:
# extract only the nodes that were used for injections
chunk_nodes = []
for chunk, injnodes in enumerate(chunks(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
try:
dbs = [injnode.input_files[""] for injnode in injnodes]
parents = injnodes
except AttributeError:
dbs = injnodes
parents = []
# Setup the final output names, etc.
injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path)
# merge
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_cache_files = {"input-cache":dbs},
output_files = {"database":injdb},
input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache')
)
# cluster
clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":injdb}
)
chunk_nodes.append(clusternode)
dbs_to_delete.append(injdb)
# Setup the final output names, etc.
injdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
injdbs.append(injdb)
injxml = injdb.replace('.sqlite','.xml.gz')
# FIXME far-injections currently doesnt work, either fix it or delete it
'''
# If there are injections that are too far away to be seen in a separate file, add them now.
if far_injections is not None:
xml_input = [injxml] + [far_injections]
else:
xml_input = injxml
'''
xml_input = injxml
# merge
sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes,
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
input_files = {"": (vetoes + [options.frame_segments_file, injections])},
input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]},
output_files = {"database":injdb},
input_cache_file_name = injdb.replace('.sqlite','.cache')
)
# cluster
clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode],
opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"":injdb}
)
clusternode = inspiral_pipe.generic_node(toXMLJob, dag, parent_nodes = [clusternode],
opts = {"tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
output_files = {"extract":injxml},
input_files = {"database":injdb}
)
inspinjnode = inspiral_pipe.generic_node(ligolwInspinjFindJob, dag, parent_nodes = [clusternode],
opts = {"time-window":0.9},
input_files = {"":injxml}
)
sqlitenode = inspiral_pipe.generic_node(toSqliteNoCacheJob, dag, parent_nodes = [inspinjnode],
opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""},
output_files = {"database":injdb},
input_files = {"":xml_input}
)
cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [sqlitenode],
input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
)
outnodes.append(cpnode)
return injdbs, noninjdb, outnodes, dbs_to_delete
def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes):
# compute FAPs and FARs
# split up the marginilization into groups of 10
try:
margin = [node.output_files["output"] for node in rankpdf_nodes]
parents = rankpdf_nodes
margin_zerolag = [node.output_files["output"] for node in rankpdf_zerolag_nodes]
parents_zerolag = rankpdf_zerolag_nodes
except AttributeError:
# analysis started at merger step
margin = rankpdf_nodes
parents = []
margin_zerolag = rankpdf_zerolag_nodes
parents_zerolag = []
margout = []
margzerolagout = []
margnodes = []
margzerolagnodes = []
margnum = 16
for i,n in enumerate(range(0, len(margin), margnum)):
margout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]], '.xml.gz', path = marginalizeJob.output_path))
margnodes.append(inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = parents,
opts = {"marginalize":"ranking-stat-pdf"},
output_files = {"output":margout[-1]},
input_cache_files = {"likelihood-cache":margin[n:n+margnum]},
input_cache_file_name = os.path.basename(margout[-1]).replace('.xml.gz','.cache')
))
if rankpdf_zerolag_nodes:
margzerolagout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin_zerolag[n:n+margnum]], '.xml.gz', path = marginalizeWithZerolagJob.output_path))
margzerolagnodes.append(inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = parents_zerolag,
opts = {"marginalize":"ranking-stat-pdf"},
output_files = {"output":margzerolagout[-1]},
input_cache_files = {"likelihood-cache":margin_zerolag[n:n+margnum]},
input_cache_file_name = os.path.basename(margzerolagout[-1]).replace('.xml.gz','.cache')
))
if options.marginalized_likelihood_file:
# injection-only run
parents = final_sqlite_nodes
parents_zerolag = final_sqlite_nodes
marginalized_likelihood_file = options.marginalized_likelihood_file
marginalized_likelihood_with_zerolag_file = options.marginalized_likelihood_with_zerolag_file
else:
margnode = inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = margnodes,
opts = {"marginalize":"ranking-stat-pdf"},
output_files = {"output":"marginalized_likelihood.xml.gz"},
input_cache_files = {"likelihood-cache":margout},
input_cache_file_name = "marginalized_likelihood.cache"
)
parents = [margnode] + final_sqlite_nodes
marginalized_likelihood_file = margnode.output_files["output"]
margnode = inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = margzerolagnodes,
opts = {"marginalize":"ranking-stat-pdf"},
output_files = {"output":"marginalized_likelihood_with_zerolag.xml.gz"},
input_cache_files = {"likelihood-cache":margzerolagout},
input_cache_file_name = "marginalized_likelihood_with_zerolag.cache"
)
parents_zerolag = [margnode] + final_sqlite_nodes
marginalized_likelihood_with_zerolag_file = margnode.output_files["output"]
farnode = inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents,
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"background-bins-file":marginalized_likelihood_file, "injection-db":injdbs, "non-injection-db":noninjdb}
)
inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents_zerolag,
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"background-bins-file":marginalized_likelihood_with_zerolag_file, "injection-db":[injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs], "non-injection-db":noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL')}
)
return farnode, margout + margzerolagout
def parse_command_line():
parser = OptionParser(description = __doc__)
# generic data source options
datasource.append_options(parser)
parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s. Note that 50% will be used for zero-padding.")
# reference_psd
parser.add_option("--reference-psd", help = "Don't measure PSDs, use this one instead")
# Template bank
parser.add_option("--template-bank", metavar = "filename", help = "Set the template bank xml file.")
parser.add_option("--mass-model", metavar = "filename", help = "Set the name of the mass model. Options are 'file', 'salpeter'")
parser.add_option("--mass-model-file", metavar = "filename", help = "Set the name of the mass model file, e.g., mass_model.h5. Required if --mass-model=file")
# SVD bank construction options
parser.add_option("--overlap", metavar = "num", type = "int", action = "append", help = "set the factor that describes the overlap of the sub banks, must be even!")
parser.add_option("--autocorrelation-length", type = "int", default = 201, help = "The minimum number of samples to use for auto-chisquared, default 201 should be odd")
parser.add_option("--samples-min", type = "int", action = "append", help = "The minimum number of samples to use for time slices default 1024 (can be given multiple times, one for each time --bank-cache is invoked)")
parser.add_option("--samples-max-256", type = "int", default = 1024, help = "The maximum number of samples to use for time slices with frequencies above 256Hz, default 1024")
parser.add_option("--samples-max-64", type = "int", default = 2048, help = "The maximum number of samples to use for time slices with frequencies above 64Hz, default 2048")
parser.add_option("--samples-max", type = "int", default = 4096, help = "The maximum number of samples to use for time slices with frequencies below 64Hz, default 4096")
parser.add_option("--bank-cache", metavar = "filenames", action = "append", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc.. (can be given multiple times)")
parser.add_option("--tolerance", metavar = "float", type = "float", default = 0.9999, help = "set the SVD tolerance, default 0.9999")
parser.add_option("--flow", metavar = "num", type = "float", default = 40, help = "set the low frequency cutoff, default 40 (Hz)")
parser.add_option("--identity-transform", action = "store_true", help = "Use identity transform, i.e. no SVD")
# trigger generation options
parser.add_option("--vetoes", metavar = "filename", help = "Set the veto xml file.")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--inj-time-slide-file", metavar = "filename", help = "Set the time slide table xml file for injections")
parser.add_option("--web-dir", metavar = "directory", help = "Set the web directory like /home/USER/public_html")
parser.add_option("--fir-stride", type="int", metavar = "secs", default = 8, help = "Set the duration of the fft output blocks, default 8")
parser.add_option("--control-peak-time", type="int", default = 8, metavar = "secs", help = "Set the peak finding time for the control signal, default 8")
parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
parser.add_option("--reference-likelihood-file", metavar = "file", help = "Set a reference likelihood file to compute initial likelihood ratios. Required")
parser.add_option("--num-banks", metavar = "str", help = "The number of parallel subbanks per gstlal_inspiral job. can be given as a list like 1,2,3,4 then it will split up the bank cache into N groups with M banks each.")
parser.add_option("--max-inspiral-jobs", type="int", metavar = "jobs", help = "Set the maximum number of gstlal_inspiral jobs to run simultaneously, default no constraint.")
parser.add_option("--ht-gate-threshold", type="float", help="set a threshold on whitened h(t) to veto glitches")
parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
parser.add_option("--inspiral-executable", default = "gstlal_inspiral", help = "Options gstlal_inspiral | gstlal_iir_inspiral, default gstlal_inspiral")
parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. Separate injection runs using the --injections option will still occur.")
# FIXME far-injections currently doesnt work, either fix it or delete it
#parser.add_option("--far-injections", action = "append", help = "Injection files with injections too far away to be seen and are not filtered. Required. See https://www.lsc-group.phys.uwm.edu/ligovirgo/cbcnote/NSBH/MdcInjections/MDC1 for example.")
parser.add_option("--singles-threshold", default=float("inf"), action = "store", metavar="THRESH", help = "Set the SNR threshold at which to record single-instrument events in the output (default = +inf, i.e. don't retain singles).")
parser.add_option("--copy-raw-results", default=False, action = "store_true", help = "Copy raw gstlal_inspiral results before applying clustering and other lossy operations.")
parser.add_option("--gzip-test", default=False, action = "store_true", help = "Perform gzip --test on all output files.")
parser.add_option("--verbose", action = "store_true", help = "Be verbose")
parser.add_option("--disable-calc-inj-snr", default=False, action = "store_true", help = "Disable injection SNR calculation")
parser.add_option("--ranking-stat-samples", metavar = "N", default = 2**24, type = "int", help = "Construct ranking statistic histograms by drawing this many samples from the ranking statistic generator (default = 2^24).")
# Override the datasource injection option
parser.remove_option("--injections")
parser.add_option("--injections", action = "append", help = "append injection files to analyze. Must prepend filename with X:Y:, where X and Y are floats, e.g. 1.2:3.1:filename, so that the injections are only searched for in regions of the template bank with X <= chirp mass < Y.")
# Data from a zero lag run in the case of an injection-only run.
parser.add_option("--dist-stats-cache", metavar = "filename", help = "Set the cache file for dist stats (required iff running injection-only analysis)")
parser.add_option("--svd-bank-cache", metavar = "filename", help = "Set the cache file for svd banks (required iff running injection-only analysis)")
parser.add_option("--psd-cache", metavar = "filename", help = "Set the cache file for psd (required iff running injection-only analysis)")
parser.add_option("--non-injection-db", metavar = "filename", help = "Set the non injection data base file (required iff running injection-only analysis)")
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the marginalized likelihood file (required iff running injection-only analysis)")
parser.add_option("--marginalized-likelihood-with-zerolag-file", metavar = "filename", help = "Set the marginalized likelihood with zerolag file (required iff running injection-only analysis)")
# Data from a previous run in the case of a run that starts at the merger step
parser.add_option("--lloid-cache", metavar = "filename", help = "Set the cache file for lloid (required iff starting an analysis at the merger step)")
parser.add_option("--inj-lloid-cache", metavar = "filename", action = "append", default = [], help = "Set the cache file for injection lloid files (required iff starting an analysis at the merger step) (can be given multiple times, should be given once per injection file)")
parser.add_option("--rank-pdf-cache", metavar = "filename", help = "Set the cache file for rank pdfs (required iff starting an analysis at the merger step)")
parser.add_option("--num-files-per-background-bin", metavar = "int", type = "int", default = 1, help = "Set the number of files per background bin for analyses which start at the merger step but are seeded by runs not following the current naming conventions")
parser.add_option("--injections-for-merger", metavar = "filename", action = "append", help = "append injection files used in previous run, must be provided in same order as corresponding inj-lloid-cache (required iff starting an analysis at the merger step)")
# Condor commands
parser.add_option("--request-cpu", default = "4", metavar = "integer", help = "set the inspiral CPU count, default = 4")
parser.add_option("--request-memory", default = "7GB", metavar = "integer", help = "set the inspiral memory, default = 7GB")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
options, filenames = parser.parse_args()
if options.mass_model not in ("salpeter", "file"):
raise ValueError("--mass-model must be 'salpeter' or 'file'")
if options.mass_model == "file" and not options.mass_model_file:
raise ValueError("--mass-model-file must be provided if --mass-model=file")
if options.num_banks:
options.num_banks = [int(v) for v in options.num_banks.split(",")]
if not options.samples_min and not options.svd_bank_cache:
options.samples_min = [1024]*len(options.bank_cache)
if not options.overlap and not options.svd_bank_cache:
options.overlap = [0]*len(options.bank_cache)
if not options.svd_bank_cache and any(overlap % 2 for overlap in options.overlap):
raise ValueError("overlap must be even")
if not options.svd_bank_cache and not (len(options.samples_min) == len(options.bank_cache) == len(options.overlap)):
raise ValueError("must provide same number of inputs for --samples-min, --bank-cache, --overlap")
missing_injection_options = []
for option in ("dist_stats_cache", "svd_bank_cache", "psd_cache", "non_injection_db", "marginalized_likelihood_file", "marginalized_likelihood_with_zerolag_file"):
if getattr(options, option) is None:
missing_injection_options.append(option)
if len(missing_injection_options) > 0 and len(missing_injection_options) < 6:
raise ValueError("missing injection-only options %s." % ", ".join([option for option in missing_injection_options]))
if len(missing_injection_options) == 0 and options.num_banks:
raise ValueError("cant specify --num-banks in injection-only run")
missing_merger_options = []
for option in ("lloid_cache", "inj_lloid_cache", "rank_pdf_cache"):
if getattr(options, option) is None:
missing_merger_options.append(option)
if len(missing_injection_options) > 0 and len(missing_injection_options) < 3:
raise ValueError("missing merger-run options %s." % ", ".join([option for option in missing_injection_options]))
if len(missing_injection_options) == 0 and options.num_banks:
raise ValueError("cant specify --num-banks in a run starting at the merger step")
fail = ""
required_options = []
if len(missing_merger_options) == 3:
required_options.append("reference_likelihood_file")
if len(missing_injection_options) == 6:
required_options.append("bank_cache")
for option in required_options:
if getattr(options, option) is None:
fail += "must provide option %s\n" % (option)
if fail: raise ValueError, fail
# FIXME far-injections currently doesnt work, either fix it or delete it
'''
if options.far_injections is not None and len(options.injections) != len(options.far_injections):
raise ValueError("number of injection files and far injection files must be equal")
if options.far_injections is None:
options.far_injections = [None for inj in options.injections]
'''
#FIXME a hack to find the sql paths
share_path = os.path.split(inspiral_pipe.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal')
options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql')
options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql')
return options, filenames
#
# Useful variables
#
options, filenames = parse_command_line()
if options.bank_cache:
# FIXME Add feature for multiple bank caches to online pipeline so that
# this functionality can return to inspiral_pipe
bank_cache = {}
for bank_cache_str in options.bank_cache:
for c in bank_cache_str.split(','):
ifo = c.split("=")[0]
cache = c.replace(ifo+"=","")
bank_cache.setdefault(ifo, []).append(cache)
detectors = datasource.GWDataSourceInfo(options)
channel_dict = detectors.channel_dict
if bank_cache:
instruments = "".join(sorted(bank_cache.keys()))
instrument_set = bank_cache.keys()
boundary_seg = detectors.seg
elif options.svd_bank_cache:
detectors = datasource.GWDataSourceInfo(options)
channel_dict = detectors.channel_dict
boundary_seg = detectors.seg
else:
with open(options.rank_pdf_cache) as f:
ce = CacheEntry(f.readline())
boundary_seg = ce.segment
instruments = ce.observatory
output_dir = "plots"
#
# Setup the dag
#
try:
os.mkdir("logs")
except:
pass
dag = inspiral_pipe.DAG("trigger_pipe")
if options.max_inspiral_jobs is not None:
dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs)
#
# Make an xml integrity checker
#
if options.gzip_test:
f = open("gzip_test.sh", "w")
f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
f.close()
os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
#
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
#
f = open("store_raw.sh", "w")
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
f.close()
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
#
# setup the job classes
#
if options.dist_stats_cache:
# injection-only run
gstlalInspiralJob = None
createPriorDistStatsJob = None
calcRankPDFsJob = None
calcRankPDFsWithZerolagJob = None
calcLikelihoodJob = None
marginalizeJob = None
marginalizeWithZerolagJob = None
elif options.lloid_cache:
# analysis starting at merger step
marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
else:
# set up jobs only needed for zerolag run
refPSDJob = inspiral_pipe.generic_job("gstlal_reference_psd", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}))
medianPSDJob = inspiral_pipe.generic_job("gstlal_median_of_psds", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotBanksJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_banks", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
svdJob = inspiral_pipe.generic_job("gstlal_svd_bank", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"}))
modelJob = inspiral_pipe.generic_job("gstlal_inspiral_mass_model", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
modelAddJob = inspiral_pipe.generic_job("gstlal_inspiral_add_mass_models", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
horizonJob = inspiral_pipe.generic_job("gstlal_plot_psd_horizon", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInspiralJob = inspiral_pipe.generic_job(options.inspiral_executable, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
createPriorDistStatsJob = inspiral_pipe.generic_job("gstlal_inspiral_create_prior_diststats", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
calcRankPDFsJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"}))
calcRankPDFsWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", tag_base = "gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"}))
calcLikelihoodJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInspiralInjJob = inspiral_pipe.generic_job(options.inspiral_executable, tag_base="gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
injSplitterJob = inspiral_pipe.generic_job("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInjSnrJob = inspiral_pipe.generic_job("gstlal_inspiral_injection_snr", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"}))
ligolwAddJob = inspiral_pipe.generic_job("ligolw_add", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
calcLikelihoodJobInj = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInspiralComputeFarFromSnrChisqHistogramsJob = inspiral_pipe.generic_job("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
ligolwInspinjFindJob = inspiral_pipe.generic_job("lalapps_inspinjfind", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toSqliteJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toSqliteNoCacheJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_inj_final", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
toXMLJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
lalappsRunSqliteJob = inspiral_pipe.generic_job("lalapps_run_sqlite", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotIndividualInjectionsSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotIndividualInjectionsSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotSensitivityJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_sensitivity", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
pageJob = inspiral_pipe.generic_job("gstlal_inspiral_summary_page", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
plotbackgroundJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_background", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"}))
cpJob = inspiral_pipe.generic_job("cp", tag_base = "cp", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
rmJob = inspiral_pipe.generic_job("rm", tag_base = "rm_intermediate_merger_products", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
#
# Get mchirp boundaries of banks, maximum duration of templates, and analysis segments
#
if options.bank_cache:
max_time, template_mchirp_dict = get_bank_params(bank_cache, options)
segsdict = analysis_segments(set(bank_cache.keys()), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)
if options.psd_cache:
template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache)
segsdict = analysis_segments(set(svd_nodes.keys()), detectors.frame_segments, boundary_seg, max_time, options.min_instruments)
psd_nodes, ref_psd_parent_nodes = inj_psd_node_gen(segsdict, options)
instruments = "".join(sorted(svd_nodes.keys()))
instrument_set = svd_nodes.keys()
elif options.lloid_cache:
# starting analysis at merger step, nothing to do here
pass
elif options.reference_psd is None:
#
# Compute the PSDs for each segment
#
psd_nodes = psd_node_gen(refPSDJob, dag, [], segsdict, channel_dict, options)
#
# Make the reference PSD cache
#
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
output_cache_file = open('reference_psd.cache', "w")
for node in psd_nodes.values():
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
output_cache_file.close()
#
# plot the horizon distance
#
inspiral_pipe.generic_node(horizonJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]},
output_files = {"":inspiral_pipe.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)}
)
#
# compute the median PSD
#
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
median_psd_node = \
inspiral_pipe.generic_node(medianPSDJob, dag,
parent_nodes = psd_nodes.values(),
input_files = {"input-cache": "reference_psd.cache"},
output_files = {"output-name": inspiral_pipe.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))}
)
ref_psd = median_psd_node.output_files["output-name"]
ref_psd_parent_nodes = [median_psd_node]
else:
ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
output_cache_file = open('reference_psd.cache', "w")
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
output_cache_file.close()
ref_psd_parent_nodes = []
#
# Calculate Expected SNR jobs
#
num_split_inj_snr_jobs = 100
ligolw_add_nodes = []
if not options.lloid_cache and not options.disable_calc_inj_snr:
for inj in options.injections:
inj_snr_nodes = []
inj_splitter_node = inspiral_pipe.generic_node(injSplitterJob, dag, parent_nodes=[],
opts = {"output-path":injSplitterJob.output_path, "usertag": sim_tag_from_inj_file(inj.split(":")[-1]), "nsplit": num_split_inj_snr_jobs},
input_files = {"": inj.split(":")[-1]}
)
inj_splitter_node.set_priority(98)
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
for i in xrange(num_split_inj_snr_jobs):
injSNRnode = inspiral_pipe.generic_node(gstlalInjSnrJob, dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
opts = {"flow":options.flow},
input_files = {"injection-file": "%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i), "reference-psd-cache": "reference_psd.cache"
}
)
injSNRnode.set_priority(98)
inj_snr_nodes.append(injSNRnode)
ligolw_add_nodes.append(inspiral_pipe.generic_node(ligolwAddJob, dag, parent_nodes=inj_snr_nodes,
input_files = {"": ' '.join(["%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in xrange(num_split_inj_snr_jobs)])},
opts = {"ilwdchar-compat":""},
output_files = {"output": inj.split(":")[-1]}
))
if options.bank_cache:
#
# Compute SVD banks
#
#svd_nodes, template_mchirp_dict = svd_node_gen(svdJob, dag, ref_psd_parent_nodes, ref_psd, inspiral_pipe.build_bank_groups(bank_cache, options.num_banks), options, boundary_seg, template_mchirp_dict)
svd_nodes, template_mchirp_dict = svd_node_gen(svdJob, dag, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict)
model_add_node, model_file_name = model_node_gen(modelJob, dag, instruments, options, boundary_seg, options.template_bank)
if not options.lloid_cache:
#
# Inspiral jobs by segment
#
inspiral_nodes = inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict)
#
# Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
#
lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
#
# Setup likelihood jobs, clustering and/or merging
#
rankpdf_nodes, rankpdf_zerolag_nodes, outnodes = rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWithZerolagJob, calcLikelihoodJob, calcLikelihoodJobInj, lalappsRunSqliteJob, toSqliteJob, marginalizeJob, svd_nodes, inspiral_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_add_node, model_file_name)
else:
#
# Merge lloid files into 1 file per bin if not already 1 file per bin
#
rankpdf_nodes, rankpdf_zerolag_nodes, outnodes = merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options)
#
# after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof)
#
injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSqliteJob, toSqliteNoCacheJob, cpJob, outnodes, ligolw_add_nodes, options, instruments)
#
# Compute FAP
#
farnode, margfiles_to_delete = compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, rankpdf_nodes, rankpdf_zerolag_nodes, injdbs, noninjdb, final_sqlite_nodes)
# make summary plots
plotnodes = []
plotnodes.append(inspiral_pipe.generic_node(plotSummaryJob, dag, parent_nodes=[farnode],
opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "ALL_LLOID_COMBINED", "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""},
input_files = {"":[noninjdb] + injdbs}
))
plotnodes.append(inspiral_pipe.generic_node(plotSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode],
opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "PRECESSION_LLOID_COMBINED", "plot-group":1, "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""},
input_files = {"":[noninjdb] + injdbs}
))
for injdb in injdbs:
plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryJob, dag, parent_nodes=[farnode],
opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag":injdb.replace(".sqlite","").split("-")[1], "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""},
input_files = {"":[noninjdb] + [injdb]}
))
plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode],
opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""},
input_files = {"":[noninjdb] + [injdb]}
))
# make sensitivity plots
plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode],
opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"},
input_files = {"zero-lag-database":noninjdb, "":injdbs}
))
for injdb in injdbs:
plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode],
opts = {"user-tag":injdb.replace(".sqlite","").split("-")[1], "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"},
input_files = {"zero-lag-database":noninjdb, "":injdb}
))
# make background plots
plotnodes.append(inspiral_pipe.generic_node(plotbackgroundJob, dag, parent_nodes = [farnode], opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb}))
# make a web page
inspiral_pipe.generic_node(pageJob, dag, parent_nodes = plotnodes,
opts = {"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), "webserver-dir":options.web_dir, "glob-path":output_dir, "output-user-tag":["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"] + [injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs] + [injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs]}
)
#
# rm intermediate merger products
#
for db in dbs_to_delete:
inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes,
input_files = {"": db}
)
for margfile in margfiles_to_delete:
inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes,
input_files = {"": margfile}
)
#
# all done
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()