Skip to content
Snippets Groups Projects
Commit c6c86c13 authored by Chad Hanna's avatar Chad Hanna
Browse files

gstlal-inspiral/bin/gstlal_inspiral: rework to support local frame caching and...

gstlal-inspiral/bin/gstlal_inspiral: rework to support local frame caching and processing of serial files for offline analysis
parent 4836f256
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,8 @@ import signal
import subprocess
import socket
import time
import tempfile
import itertools
# The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
import pygtk
......@@ -68,6 +70,8 @@ from gstlal import far
from gstlal import inspiral
from gstlal import httpinterface
from gstlal import datasource
from gstlal import simulation
from gstlal import hoftcache
def excepthook(*args):
# system exception hook that forces hard exit. without this,
......@@ -122,14 +126,17 @@ def parse_command_line():
# append all the datasource specific options
datasource.append_options(parser)
# local caching to help with I/O for offline running
parser.add_option("--local-frame-caching", action = "store_true", help = "blah")
parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
parser.add_option("--output", metavar = "filename", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
parser.add_option("--output", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
parser.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
parser.add_option("--svd-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.")
parser.add_option("--svd-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
......@@ -146,7 +153,7 @@ def parse_command_line():
# Online options
parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required")
parser.add_option("--likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.")
parser.add_option("--likelihood-file", metavar = "filename", action = "append", help = "Set the name of the likelihood ratio data file to use (required).")
parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
......@@ -162,28 +169,39 @@ def parse_command_line():
if options.blind_injections and options.injections:
raise ValueError("must use only one of --blind-injections or --injections")
required_options = []
if options.local_frame_caching and not options.data_source == "frames":
raise ValueError('--local-frame-caching can only be used if --data-source = "frames"')
required_options = ["svd_bank", "likelihood_file", "output"]
if options.data_source in ("lvshm", "framexmit"):
required_options += ["job_tag", "marginalized_likelihood_file"]
missing_options = []
if options.svd_bank is None:
missing_options += ["--svd-bank"]
missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
missing_options = ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
if missing_options:
raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
# make sure we have the correct numbers of svd banks output files and likelihood files.
if options.data_source in ("lvshm", "framexmit") and len(options.svd_bank) > 1:
raise ValueError("Can only have one set of svd banks in online mode (i.e. when datasource is lvshm or framexmit)")
if not (len(options.svd_bank) == len(options.output) == len(options.likelihood_file)):
raise ValueError("must have equal numbers of svd banks, output files and likelihood files")
# parse the datasource specific information and do option checking
detectors = datasource.GWDataSourceInfo(options)
if len(detectors.channel_dict) < 2:
raise ValueError("only coincident searches are supported: must process data from at least two antennae")
# Get the banks and make the detectors
# FIXME add error checking on length of banks per detector, etc
svd_banks = inspiral.parse_banks(options.svd_bank)
# FIXME Put all svd banks for different detectors in one file.
try:
svd_banks = [inspiral.parse_svdbank_string(svdbank) for svdbank in options.svd_bank]
except ValueError as e:
raise e("--svd-banks: %s" % str(e))
# FIXME: should also check for read permissions
required_files = []
for instrument in svd_banks:
required_files.extend(svd_banks[instrument])
for svd_bank_set in svd_banks:
for instrument in svd_bank_set:
required_files.append(svd_bank_set[instrument])
if options.veto_segments_file:
required_files += [options.veto_segments_file]
missing_files = [filename for filename in required_files if not os.path.exists(filename)]
......@@ -201,12 +219,6 @@ def parse_command_line():
# Online specific initialization
# FIXME someday support other online sources
if options.data_source in ("lvshm", "framexmit"):
# check required options in this case
required_options = ["likelihood_file", "job_tag", "marginalized_likelihood_file"]
missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
if missing_options:
raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
# make an "infinite" extent segment
detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
......@@ -228,7 +240,7 @@ def parse_command_line():
@bottle.route("/registry.txt")
def register(fname = None):
yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(svd_banks.keys())))
yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(svd_banks[0].keys())))
# First do urls that do not depend on instruments
for request in ("registry.txt", "gracedb_far_threshold.txt", "latency_histogram.txt", "latency_history.txt", "snr_history.txt", "ram_history.txt", "likelihood.xml", "bank.txt", "segments.xml"):
......@@ -236,7 +248,7 @@ def parse_command_line():
yield "http://%s:16953/%s\n" % (host, request)
# Then do instrument dependent urls
for ifo in set(svd_banks.keys()):
for ifo in set(svd_banks[0].keys()):
for request in ("strain_add_drop.txt", "state_vector_on_off_gap.txt", "psd.txt"):
# FIXME don't hardcode port number
yield "http://%s:16953/%s/%s\n" % (host, ifo, request)
......@@ -245,13 +257,44 @@ def parse_command_line():
f.close()
else:
bad_options = []
for option in ["likelihood_file", "job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]:
for option in ["job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]:
if getattr(options, option) is not None:
bad_options.append(option)
bad_options.append("--%s" % option.replace("_","-"))
if bad_options:
raise ValueError("%s options should only be given for online running" % ",".join(bad_options))
raise ValueError("%s options can only be given for --data-source is lvshm or framexmit " % ", ".join(bad_options))
# Override the injection_filename for datasource to use blind injections.
# FIXME You cant have both regular and blind injections
if options.blind_injections is not None:
detectors.injection_filename = options.blind_injections
# Setup local caching
if options.local_frame_caching:
f, fname = tempfile.mkstemp(".cache")
if options.verbose:
print >> sys.stderr, "caching frame data locally to ", fname
f = open(fname, "w")
# FIXME shouldn't hard code sample rate. But we never analyze
# data at a higher rate. If the user provides svd bank files
# that request a higher rate an assertion would fail later
# anyway. Replace with a function to get max sample rate from
# provided svd banks.
detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), sample_rate = 4096, verbose = options.verbose)
for cacheentry in detectors.local_cache_list:
print >>f, str(cacheentry)
detectors.frame_cache = fname
# the injections are now present in the data so we don't want to do them twice
detectors.injection_filename = None
# Choose to optionally reconstruct segments around injections (not blind injections!)
if options.injections:
reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections)
else:
reconstruction_segment_list = None
# we're done
return options, filenames, process_params, svd_banks, detectors
return options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list
#
......@@ -268,7 +311,7 @@ def parse_command_line():
#
options, filenames, process_params, svd_banks, detectors = parse_command_line()
options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list = parse_command_line()
if not options.check_time_stamps:
pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src
......@@ -302,189 +345,221 @@ else:
#
# Parse template banks
# Process svd banks in serial
#
class OneTimeSignalHandler(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.count = 0
banks = inspiral.parse_bank_files(svd_banks, verbose = options.verbose)
@bottle.route("/bank.txt")
def get_filter_length_and_chirpmass(banks = banks):
bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
yield '%.14g %.4g %.4g' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
def __call__(self, signum, frame):
self.count += 1
if self.count == 1:
print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
try:
#FIXME how do I choose a timestamp?
self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns()))
if not self.pipeline.send_event(gst.event_new_eos()):
raise Exception("pipeline.send_event(EOS) returned failure")
except Exception, e:
print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
os._exit(1)
else:
print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
#
# Build pipeline
#
for svd_bank, output_filename, likelihood_file in zip(svd_banks, options.output, options.likelihood_file):
banks = inspiral.parse_bank_files(svd_bank, verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "assembling pipeline ...",
# FIXME If we set up this route in the loop there is a memory leak,
# don't make this function have a reference to bank
if options.data_source in ("lvshm", "framexmit"):
@bottle.route("/bank.txt")
def get_filter_length_and_chirpmass(banks = banks):
bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
yield '%.14g %.4g %.4g' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
pipeline = gst.Pipeline("gstlal_inspiral")
mainloop = gobject.MainLoop()
#
# Build pipeline
#
triggersrc = lloidparts.mkLLOIDmulti(
pipeline,
detectors = detectors,
banks = banks,
psd = psd,
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = options.ht_gate_threshold,
veto_segments = veto_segments,
verbose = options.verbose,
nxydump_segment = options.nxydump_segment,
chisq_type = options.chisq_type,
track_psd = options.track_psd,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
blind_injections = options.blind_injections
)
if options.verbose:
print >>sys.stderr, "assembling pipeline ...",
pipeline = gst.Pipeline("gstlal_inspiral")
mainloop = gobject.MainLoop()
triggersrc = lloidparts.mkLLOIDmulti(
pipeline,
detectors = detectors,
banks = banks,
psd = psd,
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = options.ht_gate_threshold,
veto_segments = veto_segments,
verbose = options.verbose,
nxydump_segment = options.nxydump_segment,
chisq_type = options.chisq_type,
track_psd = options.track_psd,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
reconstruction_segment_list = reconstruction_segment_list
)
if options.verbose:
print >>sys.stderr, "done"
if options.verbose:
print >>sys.stderr, "done"
#
# Load likelihood ratio data, assume injections are present!
#
#
# Load likelihood ratio data, assume injections are present!
#
if options.likelihood_file is not None:
FAR, proc_id = far.LocalRankingData.from_xml(utils.load_filename(options.likelihood_file, verbose = options.verbose, contenthandler = ligolw.LIGOLWContentHandler))
else:
FAR = far.LocalRankingData(segments.segment(None, None), far.DistributionsStats())
# initialize an empty trials table with the combinations of detectors
# that this job will analyze
FAR.trials_table.initialize_from_sngl_ifos(detectors.channel_dict.keys())
if options.data_source in ("lvshm", "framexmit"):
FAR, proc_id = far.LocalRankingData.from_xml(utils.load_filename(likelihood_file, verbose = options.verbose, contenthandler = ligolw.LIGOLWContentHandler))
else:
FAR = far.LocalRankingData(segments.segment(None, None), far.DistributionsStats())
# initialize an empty trials table with the combinations of detectors
# that this job will analyze
FAR.trials_table.initialize_from_sngl_ifos(detectors.channel_dict.keys())
#
# build output document
#
#
# build output document
#
if options.verbose:
print >>sys.stderr, "initializing output document ..."
output = inspiral.Data(
filename = output_filename or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
process_params = process_params,
pipeline = pipeline,
instruments = set(detectors.channel_dict),
seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
injection_filename = options.injections,
time_slide_file = options.time_slide_file,
coincidence_threshold = options.coincidence_threshold,
FAR = FAR,
likelihood_file = likelihood_file,
marginalized_likelihood_file = options.marginalized_likelihood_file,
likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
assign_likelihoods = options.data_source in ("lvshm" or "framexmit"),
comment = options.comment,
tmp_path = options.tmp_space,
thinca_interval = options.thinca_interval,
gracedb_far_threshold = options.gracedb_far_threshold,
gracedb_type = options.gracedb_type,
gracedb_group = options.gracedb_group,
verbose = options.verbose
)
if options.verbose:
print >>sys.stderr, "... output document initialized"
if options.verbose:
print >>sys.stderr, "initializing output document ..."
output = inspiral.Data(
filename = options.output or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
process_params = process_params,
pipeline = pipeline,
instruments = set(detectors.channel_dict),
seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
injection_filename = options.injections,
time_slide_file = options.time_slide_file,
coincidence_threshold = options.coincidence_threshold,
FAR = FAR,
likelihood_file = options.likelihood_file,
marginalized_likelihood_file = options.marginalized_likelihood_file,
likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
assign_likelihoods = options.likelihood_file is not None,
comment = options.comment,
tmp_path = options.tmp_space,
thinca_interval = options.thinca_interval,
gracedb_far_threshold = options.gracedb_far_threshold,
gracedb_type = options.gracedb_type,
gracedb_group = options.gracedb_group,
verbose = options.verbose
)
if options.verbose:
print >>sys.stderr, "... output document initialized"
# setup the pipeline hander with dq gates if in online mode
if options.data_source in ("lvshm", "framexmit"):
gates = dict([("%s_state_vector_gate" % ifo, "state vector gate") for ifo in detectors.channel_dict])
else:
gates = {}
handler = lloidparts.Handler(mainloop, pipeline, gates = gates, tag = options.job_tag, dataclass = output, verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline ...",
appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
if options.verbose:
print >>sys.stderr, "attached %d, done" % len(appsinks)
# setup the pipeline hander with dq gates if in online mode
if options.data_source in ("lvshm", "framexmit"):
gates = dict([("%s_state_vector_gate" % ifo, "state vector gate") for ifo in detectors.channel_dict])
else:
gates = {}
handler = lloidparts.Handler(mainloop, pipeline, gates = gates, tag = options.job_tag, dataclass = output, verbose = options.verbose)
#
# if we request a dot graph of the pipeline, set it up
#
if options.verbose:
print >>sys.stderr, "attaching appsinks to pipeline ...",
appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
if options.verbose:
print >>sys.stderr, "attached %d, done" % len(appsinks)
if options.write_pipeline is not None:
pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
#
# if we request a dot graph of the pipeline, set it up
#
#
# Run pipeline
#
if options.write_pipeline is not None:
pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
if options.data_source not in ("lvshm", "framexmit"):
if options.verbose:
print >>sys.stderr, "setting pipeline state to paused ..."
if pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError, "pipeline did not enter paused state"
else:
#
# start http interface
# Run pipeline
#
# FIXME: don't hard-code port, don't use in this program right now since more than one job runs per machine
httpinterface.start_servers(16953, verbose = options.verbose)
if options.data_source not in ("lvshm", "framexmit"):
if options.verbose:
print >>sys.stderr, "setting pipeline state to paused ..."
if pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError, "pipeline did not enter paused state"
else:
#
# start http interface
#
# FIXME: don't hard-code port, don't use in this program right now since more than one job runs per machine
httpinterface.start_servers(16953, verbose = options.verbose)
#
# setup sigint handler to shutdown pipeline. this is how the program stops
# gracefully, it is the only way to stop it. Otherwise it runs forever
# man.
#
signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
if options.verbose:
print >>sys.stderr, "setting pipeline state to playing ..."
if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError, "pipeline did not enter playing state"
if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "running pipeline ..."
mainloop.run()
#
# setup sigint handler to shutdown pipeline. this is how the program stops
# gracefully, it is the only way to stop it. Otherwise it runs forever
# man.
# write output file
#
class OneTimeSignalHandler(object):
def __init__(self, pipeline):
self.pipeline = pipeline
self.count = 0
output.write_output_file(filename = output_filename or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), likelihood_file = likelihood_file, verbose = options.verbose)
def __call__(self, signum, frame):
self.count += 1
if self.count == 1:
print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
try:
#FIXME how do I choose a timestamp?
self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns()))
if not self.pipeline.send_event(gst.event_new_eos()):
raise Exception("pipeline.send_event(EOS) returned failure")
except Exception, e:
print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
os._exit(1)
else:
print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
if options.verbose:
print >>sys.stderr, "setting pipeline state to playing ..."
if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError, "pipeline did not enter playing state"
#
# Cleanup template bank temp files
#
if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
if options.verbose:
print >>sys.stderr, "running pipeline ..."
mainloop.run()
for ifo in banks:
for bank in banks[ifo]:
if options.verbose:
print >> sys.stderr, "removing file: ", bank.template_bank_filename
os.remove(bank.template_bank_filename)
del handler
if pipeline.set_state(gst.STATE_NULL) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError, "pipeline could not be set to NULL"
#
# write output file
# Cleanup local caches
#
output.write_output_file(filename = options.output or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), likelihood_file = options.likelihood_file, verbose = options.verbose)
if options.local_frame_caching:
if options.verbose:
print >> sys.stderr, "deleting temporary cache file ", detectors.frame_cache
os.remove(detectors.frame_cache)
del detectors.local_cache_list
#
# done
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment