Commit 5c292447 authored by Patrick Godwin's avatar Patrick Godwin

move functions from gstlal_inspiral_pipe to inspiral_pipe.py for reusability

parent 1e476a3b
Pipeline #77288 passed with stages
in 21 minutes and 58 seconds
......@@ -43,23 +43,17 @@ __author__ = 'Chad Hanna <chad.hanna@ligo.org>, Patrick Godwin <patrick.godwin@l
### imports
from collections import OrderedDict
import functools
import itertools
import os
from optparse import OptionParser
import stat
import os
import numpy
import lal.series
from lal.utils import CacheEntry
from ligo import segments
from ligo.lw import ligolw
from ligo.lw import lsctables
import ligo.lw.utils as ligolw_utils
from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import dagparts
from gstlal import datasource
......@@ -74,150 +68,6 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
lsctables.use_in(LIGOLWContentHandler)
#----------------------------------------------------------
### utility functions
def sim_tag_from_inj_file(injections):
if injections is None:
return None
return injections.replace('.xml', '').replace('.gz', '').replace('-','_')
def get_bank_params(options, verbose = False):
bank_cache = {}
for bank_cache_str in options.bank_cache:
for c in bank_cache_str.split(','):
ifo = c.split("=")[0]
cache = c.replace(ifo+"=","")
bank_cache.setdefault(ifo, []).append(cache)
max_time = 0
template_mchirp_dict = {}
for n, cache in enumerate(bank_cache.values()[0]):
for ce in map(CacheEntry, open(cache)):
for ce in map(CacheEntry, open(ce.path)):
xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
idx = options.overlap[n]/2
template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])]
xmldoc.unlink()
return template_mchirp_dict, bank_cache, max_time
def subdir_path(dirlist):
output_path = '/'.join(dirlist)
try:
os.mkdir(output_path)
except:
pass
return output_path
def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
"""get a dictionary of all the disjoint 2+ detector combination segments
"""
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
start_pad = 512 + max_template_length
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(5 * start_pad)
for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
if not segsdict[frozenset(ifo_combos)]:
del segsdict[frozenset(ifo_combos)]
return segsdict
def create_svd_bank_strings(svd_nodes, instruments = None):
# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
outstrings = []
for i in range(len(svd_nodes.values()[0])):
svd_bank_string = ""
for ifo in svd_nodes:
if instruments is not None and ifo not in instruments:
continue
try:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
except AttributeError:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
svd_bank_string = svd_bank_string.strip(",")
outstrings.append(svd_bank_string)
return outstrings
def svd_bank_cache_maker(svd_bank_strings, injection = False):
if injection:
dir_name = "gstlal_inspiral_inj"
else:
dir_name = "gstlal_inspiral"
svd_cache_entries = []
parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
for svd_bank_parsed_dict in parsed_svd_bank_strings:
for filename in svd_bank_parsed_dict.itervalues():
svd_cache_entries.append(CacheEntry.from_T050017(filename))
return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
# first get the previous output in a usable form
lloid_output = {}
for inj in options.injections + [None]:
lloid_output[sim_tag_from_inj_file(inj)] = {}
lloid_diststats = {}
if options.dist_stats_cache:
for ce in map(CacheEntry, open(options.dist_stats_cache)):
lloid_diststats[ce.description.split("_")[0]] = [ce.path]
for ifos in segsdict:
for seg in segsdict[ifos]:
# iterate over the mass space chunks for each segment
for node in inspiral_nodes[(ifos, None)][seg]:
if node is None:
break
len_out_files = len(node.output_files["output-cache"])
for f in node.output_files["output-cache"]:
# Store the output files and the node for use as a parent dependency
lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
for f in node.output_files["ranking-stat-output-cache"]:
lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
for inj in options.injections:
for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
if injnode is None:
continue
for f in injnode.output_files["output-cache"]:
# Store the output files and the node and injnode for use as a parent dependencies
bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
try:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
except KeyError:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
return lloid_output, lloid_diststats
def set_up_scripts(options):
# Make an xml integrity checker
if options.gzip_test:
with open("gzip_test.sh", "w") as f:
f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
with open("store_raw.sh", "w") as f:
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
def load_reference_psd(options):
ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
with open('reference_psd.cache', "w") as output_cache_file:
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
return ref_psd
#----------------------------------------------------------
### command line options
......@@ -376,38 +226,6 @@ def parse_command_line():
#----------------------------------------------------------
### DAG utilities
def get_threshold_values(bgbin_indices, svd_bank_strings, options):
"""Calculate the appropriate ht-gate-threshold values according to the scale given
"""
if options.ht_gate_threshold_linear is not None:
# A scale is given
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)
return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
elif options.ht_gate_threshold is not None:
return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
else:
return None
def inputs_to_db(jobs, inputs, job_type = 'toSqlite'):
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs]
db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite')
return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db)
def cache_to_db(cache, jobs):
hi_index = cache[-1].description.split('_')[0]
db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
return db
def get_rank_file(instruments, boundary_seg, n, basename, job=None):
if job:
return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
else:
return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache')
def set_up_jobs(options):
jobs = {}
......@@ -519,910 +337,6 @@ def set_up_jobs(options):
return jobs
#----------------------------------------------------------
### DAG layers
def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
psd_nodes = {}
for ifos in segsdict:
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
for seg in segsdict[ifos]:
psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]])
psd_nodes[(ifos, seg)] = dagparts.DAGNode(
jobs['refPSD'],
dag,
parent_nodes = parent_nodes,
opts = {
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"data-source":"frames",
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
"psd-fft-length":options.psd_fft_length,
"frame-segments-name": options.frame_segments_name
},
input_files = {
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file
},
output_files = {
"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path)
},
)
# Make the reference PSD cache
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
with open('reference_psd.cache', "w") as output_cache_file:
for node in psd_nodes.values():
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
return psd_nodes
def median_psd_layer(dag, jobs, parent_nodes, *args):
gpsmod5 = str(int(boundary_seg[0]))[:5]
median_psd_path = subdir_path([jobs['medianPSD'].output_path, gpsmod5])
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
median_psd_nodes = []
for chunk, nodes in enumerate(dagparts.groups(parent_nodes.values(), 50)):
median_psd_node = \
dagparts.DAGNode(jobs['medianPSD'], dag,
parent_nodes = parent_nodes.values(),
input_files = {"": [node.output_files["write-psd"] for node in nodes]},
output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = median_psd_path)}
)
median_psd_nodes.append(median_psd_node)
median_psd_node = \
dagparts.DAGNode(jobs['medianPSD'], dag,
parent_nodes = median_psd_nodes,
input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]},
output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]))}
)
return median_psd_node
def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict):
svd_nodes = {}
new_template_mchirp_dict = {}
svd_dtdphi_map = {}
for ifo, list_of_svd_caches in bank_cache.items():
bin_offset = 0
for j, svd_caches in enumerate(list_of_svd_caches):
svd_caches = map(CacheEntry, open(svd_caches))
for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
# First sort out the clipleft, clipright options
clipleft = []
clipright = []
ids = []
mchirp_interval = (float("inf"), 0)
individual_svd_cache = map(CacheEntry, open(individual_svd_cache))
for n, f in enumerate(ce.path for ce in individual_svd_cache):
# handle template bank clipping
clipleft.append(options.overlap[j] / 2)
clipright.append(options.overlap[j] / 2)
ids.append("%d_%d" % (i+bin_offset, n))
if f in template_mchirp_dict:
mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path)
if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval
svdnode = dagparts.DAGNode(
jobs['svd'],
dag,
parent_nodes = parent_nodes,
opts = {
"svd-tolerance":options.tolerance,
"flow":options.flow[j],
"sample-rate":options.sample_rate,
"clipleft":clipleft,
"clipright":clipright,
"samples-min":options.samples_min[j],
"samples-max-256":options.samples_max_256,
"samples-max-64":options.samples_max_64,
"samples-max":options.samples_max,
"autocorrelation-length":options.autocorrelation_length,
"bank-id":ids,
"identity-transform":options.identity_transform,
"ortho-gate-fap":0.5
},
input_files = {"reference-psd":psd},
input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
output_files = {"write-svd":svd_bank_name},
)
# impose a priority to help with depth first submission
svdnode.set_priority(99)
svd_nodes.setdefault(ifo, []).append(svdnode)
bin_offset += i+1
# Plot template/svd bank jobs
primary_ifo = bank_cache.keys()[0]
dagparts.DAGNode(
jobs['plotBanks'],
dag,
parent_nodes = sum(svd_nodes.values(),[]),
opts = {"plot-template-bank":"", "output-dir": output_dir},
input_files = {"template-bank-file":options.template_bank},
)
return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map
def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
inspiral_nodes = {}
for ifos in segsdict:
# FIXME: handles more than 3 ifos with same cpu/memory requests
inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3)
inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3)
# setup dictionaries to hold the inspiral nodes
inspiral_nodes[(ifos, None)] = {}
ignore = {}
injection_files = []
for injections in options.injections:
min_chirp_mass, max_chirp_mass, injections = injections.split(':')
injection_files.append(injections)
min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
ignore[injections] = []
for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)):
if max_chirp_mass <= bounds[0]:
ignore[injections].append(int(bgbin_index))
# NOTE putting a break here assumes that the min chirp mass
# in a subbank increases with bin number, i.e. XXXX+1 has a
# greater minimum chirpmass than XXXX, for all XXXX. Note
# that the reverse is not true, bin XXXX+1 may have a lower
# max chirpmass than bin XXXX.
elif min_chirp_mass > bounds[1]:
ignore[injections].append(int(bgbin_index))
# FIXME choose better splitting?
numchunks = 50
# only use a channel dict with the relevant channels
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
# get the svd bank strings
svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys())
# get a mapping between chunk counter and bgbin for setting priorities
bgbin_chunk_map = {}
for seg in segsdict[ifos]:
if injection_files:
output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]])
if jobs[inspiral_name] is None:
# injection-only run
inspiral_nodes[(ifos, None)].setdefault(seg, [None])
else:
output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]])
for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
# setup output names
output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
for bgbin in bgbin_indices:
bgbin_chunk_map.setdefault(bgbin, chunk_counter)
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
# non injection node
noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag,
parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
opts = {
"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":threshold_values,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"tmp-space":dagparts.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"singles-threshold":options.singles_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames",
"local-frame-caching":"",
"min-instruments":options.min_instruments,
"reference-likelihood-file":options.reference_likelihood_file
},
input_files = {
"time-slide-file":options.time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
"blind-injections":options.blind_injections,
"veto-segments-file":options.vetoes,
},
input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
output_cache_files = {
"output-cache":output_names,
"ranking-stat-output-cache":dist_stat_names
}
)
# Set a post script to check for file integrity
if options.gzip_test:
noninjnode.set_post_script("gzip_test.sh")
noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
# impose a priority to help with depth first submission
noninjnode.set_priority(chunk_counter+15)
inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
# process injections
for injections in injection_files:
# setup output names
sim_name = sim_tag_from_inj_file(injections)
bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]]
for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)):
bgbin_indices, svd_bank_strings = zip(*bgbin_list)
output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
try:
reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
except AttributeError: ### injection-only run
reference_psd = psd_nodes[(ifos, seg)]
parents = []
svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names]
input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name)
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options)
# setup injection node
# FIXME: handles more than 3 ifos with same cpu/memory requests
injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag,
parent_nodes = parents,
opts = {
"psd-fft-length":options.psd_fft_length,
"ht-gate-threshold":threshold_values,
"frame-segments-name":options.frame_segments_name,
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"tmp-space":dagparts.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"singles-threshold":options.singles_threshold,
"fir-stride":options.fir_stride,
"data-source":"frames",
"local-frame-caching":"",
"min-instruments":options.min_instruments,
"reference-likelihood-file":options.reference_likelihood_file
},
input_files = {
"time-slide-file":options.inj_time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":reference_psd,
"veto-segments-file":options.vetoes,
"injections": injections
},
input_cache_files = {"svd-bank-cache":svd_names},
input_cache_file_name = input_cache_name,
output_cache_files = {"output-cache":output_names}
)
# Set a post script to check for file integrity
if options.gzip_test:
injnode.set_post_script("gzip_test.sh")
injnode.add_post_script_arg(" ".join(output_names))
# impose a priority to help with depth first submission
if bgbin_chunk_map:
injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
else:
injnode.set_priority(chunk_counter+1)
inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
# Replace mchirplo:mchirphi:inj.xml with inj.xml
options.injections = [inj.split(':')[-1] for inj in options.injections]
# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
return inspiral_nodes, lloid_output, lloid_diststats
def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
ligolw_add_nodes = []
for inj in options.injections:
inj_snr_nodes = []
inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[],
opts = {
"output-path":jobs['injSplitter'].output_path,
"usertag": sim_tag_from_inj_file(inj.split(":")[-1]),
"nsplit": num_split_inj_snr_jobs
},
input_files = {"": inj.split(":")[-1]}
)
inj_splitter_node.set_priority(98)
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)]
for injection_file in injection_files:
injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
# FIXME somehow choose the actual flow based on mass?
# max(flow) is chosen for performance not
# correctness hopefully though it will be good
# enough
opts = {"flow":max(options.flow),"fmax":options.fmax},
input_files = {
"injection-file": injection_file,
"reference-psd-cache": "reference_psd.cache"
}
)
injSNRnode.set_priority(98)
inj_snr_nodes.append(injSNRnode)
addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes,
input_files = {"": ' '.join(injection_files)},
output_files = {"output": inj.split(":")[-1]}
)
ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode],