Commit 05ddfd8d authored by Ryan Everett's avatar Ryan Everett

Reduce the size of the trigger_pipe.sh file by invoking cache files. See PR #2232

parent 2817ebc8
......@@ -191,6 +191,7 @@ import gst
import lal
from glue.lal import CacheEntry
from glue import segments
from glue import segmentsUtils
from glue.ligolw import ligolw
......@@ -206,6 +207,7 @@ from gstlal import far
from gstlal import httpinterface
from gstlal import hoftcache
from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import pipeparts
from gstlal import simulation
......@@ -258,10 +260,12 @@ def parse_command_line():
parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
parser.add_option("--output", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
parser.add_option("--output", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
parser.add_option("--output-cache", metavar = "filename", help = "Provide a cache file with the names of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
parser.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
parser.add_option("--svd-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.")
parser.add_option("--svd-bank", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.")
parser.add_option("--svd-bank-cache", metavar = "filename", help = "Provide a cache file of svd-bank files")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
......@@ -278,7 +282,8 @@ def parse_command_line():
# Online options
parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc..")
parser.add_option("--likelihood-file", metavar = "filename", action = "append", help = "Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided)")
parser.add_option("--likelihood-file", metavar = "filename", action = "append", default = [], help = "Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided)")
parser.add_option("--likelihood-file-cache", metavar = "filename", help = "Cache file for likelihood ratio data to use for ranking events")
parser.add_option("--reference-likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use for ranking events (--data-source must be lvshm or framexmit) (--likelihood-snapshot-interval must provided) (either --likelihood-file or --reference-likelihood-file must be provided)")
parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data. If --likelihood-file is provided, the likelihood file will be overwritten by a snapshot of the trigger files and a duplicate snapshot will be generated to keep a record of past ranking statistics.")
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
......@@ -300,6 +305,19 @@ def parse_command_line():
#
# check for options, files that are always required
#
if options.svd_bank_cache:
svd_bank_dict = {}
for line in open(options.svd_bank_cache):
svd_bank_cache_entry = CacheEntry(line)
svd_bank_dict.setdefault(svd_bank_cache_entry.description, []).append("{0}:{1}".format(svd_bank_cache_entry.observatory, svd_bank_cache_entry.url))
options.svd_bank.extend([",".join(svd_string) for svd_string in svd_bank_dict.itervalues()])
if options.output_cache:
options.output.extend([CacheEntry(line).url for line in open(options.output_cache)])
if options.likelihood_file_cache:
options.likelihood_file.extend([CacheEntry(line).url for line in open(options.likelihood_file_cache)])
missing_options = ["--%s" % option.replace("_", "-") for option in ["svd_bank", "output"] if getattr(options, option) is None]
if options.likelihood_file is None and options.reference_likelihood_file is None:
......
......@@ -93,7 +93,7 @@ def parse_command_line():
paramdict = options.__dict__
options.likelihood_urls = []
if options.likelihood_urls is not None:
if options.likelihood_url is not None:
options.likelihood_urls += options.likelihood_url
if options.likelihood_cache is not None:
options.likelihood_urls += [CacheEntry(line).url for line in open(options.likelihood_cache)]
......
......@@ -205,8 +205,8 @@ def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_groups, options, seg):
"bank-id":ids,
"identity-transform":options.identity_transform,
"snr-threshold":4.0, "ortho-gate-fap":0.5},
input_files = { "template-bank":files,
"reference-psd":psd},
input_files = {"reference-psd":psd},
input_cache_files = {"template-bank-cache":files},
output_files = {"write-svd":svd_bank_name}
)
)
......@@ -225,6 +225,18 @@ def create_svd_bank_strings(svd_nodes, instruments = None):
outstrings.append(svd_bank_string)
return outstrings
def svd_bank_cache_maker(svd_bank_strings, counter, injection = False):
if injection:
dir_name = "gstlal_inspiral_inj"
else:
dir_name = "gstlal_inspiral"
svd_cache_entries = []
parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
for svd_bank_parsed_dict in parsed_svd_bank_strings:
for url in svd_bank_parsed_dict.itervalues():
svd_cache_entries.append(lal.CacheEntry.from_T050017(url))
return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, segsdict, options, channel_dict):
......@@ -261,7 +273,6 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s
"gps-start-time":seg[0].seconds,
"gps-end-time":seg[1].seconds,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"svd-bank":svd_bank_strings,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
......@@ -277,9 +288,10 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s
"blind-injections":options.blind_injections,
"veto-segments-file":options.vetoes,
},
output_files = {
"output":output_names,
"likelihood-file":dist_stat_names
input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings, chunk_counter)},
output_cache_files = {
"output-cache":output_names,
"likelihood-file-cache":dist_stat_names
}
)
# Set a post script to check for file integrity
......@@ -307,7 +319,6 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s
"gps-start-time":seg[0].seconds,
"gps-end-time":seg[1].seconds,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
"svd-bank":svd_bank_strings,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
......@@ -322,9 +333,10 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s
"veto-segments-file":options.vetoes,
"injections": injections
},
output_files = {
"output":output_names,
"likelihood-file":dist_stat_names
input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings, chunk_counter, injection = True)},
output_cache_files = {
"output-cache":output_names,
"likelihood-file-cache":dist_stat_names
}
)
# Set a post script to check for file integrity
......@@ -347,11 +359,11 @@ def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
for seg in segsdict[ifos]:
# iterate over the mass space chunks for each segment
for j, node in enumerate(inspiral_nodes[(ifos, None)][seg]):
len_out_files = len(node.output_files["output"])
for i,f in enumerate(node.output_files["output"]):
len_out_files = len(node.output_files["output-cache"])
for i,f in enumerate(node.output_files["output-cache"]):
# Store the output files and the node for use as a parent dependency
lloid_output[None].setdefault((j,i), []).append((f, [node]))
for i,f in enumerate(node.output_files["likelihood-file"]):
for i,f in enumerate(node.output_files["likelihood-file-cache"]):
lloid_diststats.setdefault((j,i) ,[]).append(f)
for inj in options.injections:
# NOTE This assumes that injection jobs
......@@ -359,7 +371,7 @@ def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
# terms of the mass space they cover,
# e.g., that the chunks ar the same!
injnode = inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg][j]
for i,f in enumerate(injnode.output_files["output"]):
for i,f in enumerate(injnode.output_files["output-cache"]):
# Store the output files and the node and injnode for use as a parent dependencies
lloid_output[sim_tag_from_inj_file(inj)].setdefault((j,i), []).append((f, [node, injnode]))
......@@ -387,7 +399,7 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcLikelihood
)
calcranknode = inspiral_pipe.generic_node(calcRankPDFsJob, dag,
parent_nodes = [priornode, snrpdfnode],
input_files = {"":diststats + [priornode.output_files["write-likelihood"], snrpdfnode.output_files["write-likelihood"]]}, #FIXME is this right, do I just add the output of the calc prior job?
input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"], snrpdfnode.output_files["write-likelihood"]]}, #FIXME is this right, do I just add the output of the calc prior job?
output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%d_CALC_RANK_PDFS' % (n,), boundary_seg[0].seconds, boundary_seg[1].seconds, '.xml.gz', path = calcRankPDFsJob.output_path)}
)
priornodes.append(priornode)
......@@ -399,9 +411,10 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcLikelihood
[inspiral_pipe.generic_node(calcLikelihoodJob, dag,
parent_nodes = [priornode, snrpdfnode] + parents, # add parents here in case a gstlal inpsiral job's trigger file is corrupted - then we can just mark that job as not done and this job will rerun.
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"likelihood-url":diststats +
input_files = {"":chunked_inputs},
input_cache_files = {"likelihood-cache":diststats +
[priornode.output_files["write-likelihood"],
snrpdfnode.output_files["write-likelihood"]], "":chunked_inputs
snrpdfnode.output_files["write-likelihood"]]
}
) for chunked_inputs in chunks(inputs, 25)]
)
......@@ -417,9 +430,10 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcLikelihood
[inspiral_pipe.generic_node(calcLikelihoodJobInj, dag,
parent_nodes = parents + [priornodes[n], snrpdfnode],
opts = {"tmp-space":inspiral_pipe.condor_scratch_space()},
input_files = {"likelihood-url":diststats +
input_files = {"":chunked_inputs},
input_cache_files = {"likelihood-cache":diststats +
[priornodes[n].output_files["write-likelihood"],
snrpdfnode.output_files["write-likelihood"]], "":chunked_inputs
snrpdfnode.output_files["write-likelihood"]]
}
) for chunked_inputs in chunks(inputs, 25)]
)
......
......@@ -33,6 +33,7 @@ from optparse import OptionParser
import uuid
from glue.lal import CacheEntry
from glue.ligolw import ligolw
from glue.ligolw import array as ligolw_array
from glue.ligolw import param as ligolw_param
......@@ -113,7 +114,8 @@ parser.add_option("--identity-transform", action = "store_true", default = False
parser.add_option("--padding", metavar = "pad", type = "float", default = 1.5, help = "Fractional amount to pad time slices.")
parser.add_option("--svd-tolerance", metavar = "match", type = "float", default = 0.9995, help = "Set the SVD reconstruction tolerance (default = 0.9995).")
parser.add_option("--reference-psd", metavar = "filename", help = "Load the spectrum from this LIGO light-weight XML file (required).")
parser.add_option("--template-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the template bank (required).")
parser.add_option("--template-bank", metavar = "filename", action = "append", default = [], help = "Set the name of the LIGO light-weight XML file from which to load the template bank (required).")
parser.add_option("--template-bank-cache", metavar = "filename", help = "Provide a cache file with the names of the LIGO light-weight XML file from which to load the template bank.")
parser.add_option("--ortho-gate-fap", metavar = "probability", type = "float", default = 0.5, help = "Set the orthogonal SNR projection gate false-alarm probability (default = 0.5).")
parser.add_option("--snr-threshold", metavar = "SNR", type = "float", default = 4.0, help = "Set the SNR threshold (default = 4.0). Currently this cannot be changed.")
parser.add_option("--write-svd-bank", metavar = "filename", help = "Set the filename in which to save the template bank (required).")
......@@ -130,6 +132,9 @@ parser.add_option("--write-psd", action = "store_true", default = False, help =
options, filenames = parser.parse_args()
if options.template_bank_cache:
options.template_bank.extend([CacheEntry(line).url for line in open(options.template_bank_cache)])
required_options = ("reference_psd", "template_bank", "write_svd_bank", "clipleft", "clipright")
missing_options = [option for option in required_options if getattr(options, option) is None]
......
......@@ -205,11 +205,16 @@ class generic_node(InspiralNode):
an empty argument by setting it to "". However options set to None are simply
ignored.
"""
def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}):
def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}):
InspiralNode.__init__(self, job, dag, parent_nodes)
self.input_files = input_files
self.output_files = output_files
self.input_files = input_files.copy()
self.input_files.update(input_cache_files)
self.output_files = output_files.copy()
self.output_files.update(output_cache_files)
self.cache_inputs = {}
self.cache_outputs = {}
for opt, val in opts.items() + output_files.items() + input_files.items():
if val is None:
......@@ -226,6 +231,26 @@ class generic_node(InspiralNode):
else:
self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))
# Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string
# is handled by gstlal_inspiral_pipe directly
for opt, val in input_cache_files.items():
cache_entries = [lal.CacheEntry.from_T050017(url) for url in val]
cache_file_name = "{0}/{1}_{2}.cache".format(job.tag_base, opt.replace("-cache", "").replace("-", "_"), job.number-1)
with open(cache_file_name, "w") as cache_file:
lal.Cache(cache_entries).tofile(cache_file)
self.add_var_opt(opt, cache_file_name)
# Keep track of the cache files being created
self.cache_inputs.setdefault(opt, []).append(cache_file_name)
for opt, val in output_cache_files.items():
cache_entries = [lal.CacheEntry.from_T050017(url) for url in val]
cache_file_name = "{0}/{1}_{2}.cache".format(job.tag_base, opt.replace("-cache", "").replace("-", "_"), job.number-1)
with open(cache_file_name, "w") as cache_file:
lal.Cache(cache_entries).tofile(cache_file)
self.add_var_opt(opt, cache_file_name)
# Keep track of the cache files being created
self.cache_outputs.setdefault(opt, []).append(cache_file_name)
def pipeline_dot_py_append_opts_hack(opt, vals):
"""!
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment