diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index 95bb5e0c08765db029836b4c7e571062923d0e0f..47496cd35074a18b58666735f1be7d4a23f1e835 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -35,6 +35,8 @@ import signal import subprocess import socket import time +import tempfile +import itertools # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F import pygtk @@ -68,6 +70,8 @@ from gstlal import far from gstlal import inspiral from gstlal import httpinterface from gstlal import datasource +from gstlal import simulation +from gstlal import hoftcache def excepthook(*args): # system exception hook that forces hard exit. without this, @@ -122,14 +126,17 @@ def parse_command_line(): # append all the datasource specific options datasource.append_options(parser) + # local caching to help with I/O for offline running + parser.add_option("--local-frame-caching", action = "store_true", help = "blah") + parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s") parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).") parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes") parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.") - parser.add_option("--output", metavar = "filename", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).") + parser.add_option("--output", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).") parser.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).") parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given") - parser.add_option("--svd-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.") + parser.add_option("--svd-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.") parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets") parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal") parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8") @@ -146,7 +153,7 @@ def parse_command_line(): # Online options parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required") - parser.add_option("--likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.") + parser.add_option("--likelihood-file", metavar = "filename", action = "append", help = "Set the name of the likelihood ratio data file to use (required).") parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).") parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent") parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.") @@ -162,28 +169,39 @@ def parse_command_line(): if options.blind_injections and options.injections: raise ValueError("must use only one of --blind-injections or --injections") - required_options = [] + if options.local_frame_caching and not options.data_source == "frames": + raise ValueError('--local-frame-caching can only be used if --data-source = "frames"') + + required_options = ["svd_bank", "likelihood_file", "output"] + if options.data_source in ("lvshm", "framexmit"): + required_options += ["job_tag", "marginalized_likelihood_file"] - missing_options = [] - if options.svd_bank is None: - missing_options += ["--svd-bank"] - missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None] + missing_options = ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None] if missing_options: raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options))) + # make sure we have the correct numbers of svd banks output files and likelihood files. + if options.data_source in ("lvshm", "framexmit") and len(options.svd_bank) > 1: + raise ValueError("Can only have one set of svd banks in online mode (i.e. when datasource is lvshm or framexmit)") + if not (len(options.svd_bank) == len(options.output) == len(options.likelihood_file)): + raise ValueError("must have equal numbers of svd banks, output files and likelihood files") + # parse the datasource specific information and do option checking detectors = datasource.GWDataSourceInfo(options) if len(detectors.channel_dict) < 2: raise ValueError("only coincident searches are supported: must process data from at least two antennae") - # Get the banks and make the detectors - # FIXME add error checking on length of banks per detector, etc - svd_banks = inspiral.parse_banks(options.svd_bank) + # FIXME Put all svd banks for different detectors in one file. + try: + svd_banks = [inspiral.parse_svdbank_string(svdbank) for svdbank in options.svd_bank] + except ValueError as e: + raise e("--svd-banks: %s" % str(e)) # FIXME: should also check for read permissions required_files = [] - for instrument in svd_banks: - required_files.extend(svd_banks[instrument]) + for svd_bank_set in svd_banks: + for instrument in svd_bank_set: + required_files.append(svd_bank_set[instrument]) if options.veto_segments_file: required_files += [options.veto_segments_file] missing_files = [filename for filename in required_files if not os.path.exists(filename)] @@ -201,12 +219,6 @@ def parse_command_line(): # Online specific initialization # FIXME someday support other online sources if options.data_source in ("lvshm", "framexmit"): - # check required options in this case - required_options = ["likelihood_file", "job_tag", "marginalized_likelihood_file"] - - missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None] - if missing_options: - raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options)) # make an "infinite" extent segment detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)) @@ -228,7 +240,7 @@ def parse_command_line(): @bottle.route("/registry.txt") def register(fname = None): - yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(svd_banks.keys()))) + yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(svd_banks[0].keys()))) # First do urls that do not depend on instruments for request in ("registry.txt", "gracedb_far_threshold.txt", "latency_histogram.txt", "latency_history.txt", "snr_history.txt", "ram_history.txt", "likelihood.xml", "bank.txt", "segments.xml"): @@ -236,7 +248,7 @@ def parse_command_line(): yield "http://%s:16953/%s\n" % (host, request) # Then do instrument dependent urls - for ifo in set(svd_banks.keys()): + for ifo in set(svd_banks[0].keys()): for request in ("strain_add_drop.txt", "state_vector_on_off_gap.txt", "psd.txt"): # FIXME don't hardcode port number yield "http://%s:16953/%s/%s\n" % (host, ifo, request) @@ -245,13 +257,44 @@ def parse_command_line(): f.close() else: bad_options = [] - for option in ["likelihood_file", "job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]: + for option in ["job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]: if getattr(options, option) is not None: - bad_options.append(option) + bad_options.append("--%s" % option.replace("_","-")) if bad_options: - raise ValueError("%s options should only be given for online running" % ",".join(bad_options)) + raise ValueError("%s options can only be given for --data-source is lvshm or framexmit " % ", ".join(bad_options)) + + # Override the injection_filename for datasource to use blind injections. + # FIXME You cant have both regular and blind injections + if options.blind_injections is not None: + detectors.injection_filename = options.blind_injections + + # Setup local caching + if options.local_frame_caching: + f, fname = tempfile.mkstemp(".cache") + if options.verbose: + print >> sys.stderr, "caching frame data locally to ", fname + f = open(fname, "w") + # FIXME shouldn't hard code sample rate. But we never analyze + # data at a higher rate. If the user provides svd bank files + # that request a higher rate an assertion would fail later + # anyway. Replace with a function to get max sample rate from + # provided svd banks. + detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), sample_rate = 4096, verbose = options.verbose) + for cacheentry in detectors.local_cache_list: + print >>f, str(cacheentry) + detectors.frame_cache = fname + + # the injections are now present in the data so we don't want to do them twice + detectors.injection_filename = None + + # Choose to optionally reconstruct segments around injections (not blind injections!) + if options.injections: + reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections) + else: + reconstruction_segment_list = None + # we're done - return options, filenames, process_params, svd_banks, detectors + return options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list # @@ -268,7 +311,7 @@ def parse_command_line(): # -options, filenames, process_params, svd_banks, detectors = parse_command_line() +options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list = parse_command_line() if not options.check_time_stamps: pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src @@ -302,189 +345,221 @@ else: # -# Parse template banks +# Process svd banks in serial # +class OneTimeSignalHandler(object): + def __init__(self, pipeline): + self.pipeline = pipeline + self.count = 0 -banks = inspiral.parse_bank_files(svd_banks, verbose = options.verbose) -@bottle.route("/bank.txt") -def get_filter_length_and_chirpmass(banks = banks): - bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones - yield '%.14g %.4g %.4g' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp) + def __call__(self, signum, frame): + self.count += 1 + if self.count == 1: + print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum + try: + #FIXME how do I choose a timestamp? + self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns())) + if not self.pipeline.send_event(gst.event_new_eos()): + raise Exception("pipeline.send_event(EOS) returned failure") + except Exception, e: + print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e) + os._exit(1) + else: + print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count) -# -# Build pipeline -# +for svd_bank, output_filename, likelihood_file in zip(svd_banks, options.output, options.likelihood_file): + banks = inspiral.parse_bank_files(svd_bank, verbose = options.verbose) -if options.verbose: - print >>sys.stderr, "assembling pipeline ...", + # FIXME If we set up this route in the loop there is a memory leak, + # don't make this function have a reference to bank + if options.data_source in ("lvshm", "framexmit"): + @bottle.route("/bank.txt") + def get_filter_length_and_chirpmass(banks = banks): + bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones + yield '%.14g %.4g %.4g' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp) -pipeline = gst.Pipeline("gstlal_inspiral") -mainloop = gobject.MainLoop() + # + # Build pipeline + # -triggersrc = lloidparts.mkLLOIDmulti( - pipeline, - detectors = detectors, - banks = banks, - psd = psd, - psd_fft_length = options.psd_fft_length, - ht_gate_threshold = options.ht_gate_threshold, - veto_segments = veto_segments, - verbose = options.verbose, - nxydump_segment = options.nxydump_segment, - chisq_type = options.chisq_type, - track_psd = options.track_psd, - control_peak_time = options.control_peak_time, - fir_stride = options.fir_stride, - blind_injections = options.blind_injections -) + if options.verbose: + print >>sys.stderr, "assembling pipeline ...", + + pipeline = gst.Pipeline("gstlal_inspiral") + mainloop = gobject.MainLoop() + + triggersrc = lloidparts.mkLLOIDmulti( + pipeline, + detectors = detectors, + banks = banks, + psd = psd, + psd_fft_length = options.psd_fft_length, + ht_gate_threshold = options.ht_gate_threshold, + veto_segments = veto_segments, + verbose = options.verbose, + nxydump_segment = options.nxydump_segment, + chisq_type = options.chisq_type, + track_psd = options.track_psd, + control_peak_time = options.control_peak_time, + fir_stride = options.fir_stride, + reconstruction_segment_list = reconstruction_segment_list + ) + -if options.verbose: - print >>sys.stderr, "done" + if options.verbose: + print >>sys.stderr, "done" -# -# Load likelihood ratio data, assume injections are present! -# + # + # Load likelihood ratio data, assume injections are present! + # -if options.likelihood_file is not None: - FAR, proc_id = far.LocalRankingData.from_xml(utils.load_filename(options.likelihood_file, verbose = options.verbose, contenthandler = ligolw.LIGOLWContentHandler)) -else: - FAR = far.LocalRankingData(segments.segment(None, None), far.DistributionsStats()) - # initialize an empty trials table with the combinations of detectors - # that this job will analyze - FAR.trials_table.initialize_from_sngl_ifos(detectors.channel_dict.keys()) + if options.data_source in ("lvshm", "framexmit"): + FAR, proc_id = far.LocalRankingData.from_xml(utils.load_filename(likelihood_file, verbose = options.verbose, contenthandler = ligolw.LIGOLWContentHandler)) + else: + FAR = far.LocalRankingData(segments.segment(None, None), far.DistributionsStats()) + # initialize an empty trials table with the combinations of detectors + # that this job will analyze + FAR.trials_table.initialize_from_sngl_ifos(detectors.channel_dict.keys()) + # + # build output document + # -# -# build output document -# + if options.verbose: + print >>sys.stderr, "initializing output document ..." + output = inspiral.Data( + filename = output_filename or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))), + process_params = process_params, + pipeline = pipeline, + instruments = set(detectors.channel_dict), + seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time + injection_filename = options.injections, + time_slide_file = options.time_slide_file, + coincidence_threshold = options.coincidence_threshold, + FAR = FAR, + likelihood_file = likelihood_file, + marginalized_likelihood_file = options.marginalized_likelihood_file, + likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds + assign_likelihoods = options.data_source in ("lvshm" or "framexmit"), + comment = options.comment, + tmp_path = options.tmp_space, + thinca_interval = options.thinca_interval, + gracedb_far_threshold = options.gracedb_far_threshold, + gracedb_type = options.gracedb_type, + gracedb_group = options.gracedb_group, + verbose = options.verbose + ) + if options.verbose: + print >>sys.stderr, "... output document initialized" -if options.verbose: - print >>sys.stderr, "initializing output document ..." -output = inspiral.Data( - filename = options.output or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))), - process_params = process_params, - pipeline = pipeline, - instruments = set(detectors.channel_dict), - seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time - injection_filename = options.injections, - time_slide_file = options.time_slide_file, - coincidence_threshold = options.coincidence_threshold, - FAR = FAR, - likelihood_file = options.likelihood_file, - marginalized_likelihood_file = options.marginalized_likelihood_file, - likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds - assign_likelihoods = options.likelihood_file is not None, - comment = options.comment, - tmp_path = options.tmp_space, - thinca_interval = options.thinca_interval, - gracedb_far_threshold = options.gracedb_far_threshold, - gracedb_type = options.gracedb_type, - gracedb_group = options.gracedb_group, - verbose = options.verbose -) -if options.verbose: - print >>sys.stderr, "... output document initialized" - -# setup the pipeline hander with dq gates if in online mode -if options.data_source in ("lvshm", "framexmit"): - gates = dict([("%s_state_vector_gate" % ifo, "state vector gate") for ifo in detectors.channel_dict]) -else: - gates = {} -handler = lloidparts.Handler(mainloop, pipeline, gates = gates, tag = options.job_tag, dataclass = output, verbose = options.verbose) -if options.verbose: - print >>sys.stderr, "attaching appsinks to pipeline ...", -appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer) -appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs)) -if options.verbose: - print >>sys.stderr, "attached %d, done" % len(appsinks) + # setup the pipeline hander with dq gates if in online mode + if options.data_source in ("lvshm", "framexmit"): + gates = dict([("%s_state_vector_gate" % ifo, "state vector gate") for ifo in detectors.channel_dict]) + else: + gates = {} + handler = lloidparts.Handler(mainloop, pipeline, gates = gates, tag = options.job_tag, dataclass = output, verbose = options.verbose) -# -# if we request a dot graph of the pipeline, set it up -# + if options.verbose: + print >>sys.stderr, "attaching appsinks to pipeline ...", + appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer) + appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs)) + if options.verbose: + print >>sys.stderr, "attached %d, done" % len(appsinks) -if options.write_pipeline is not None: - pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose) - pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose) + # + # if we request a dot graph of the pipeline, set it up + # -# -# Run pipeline -# + if options.write_pipeline is not None: + pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose) + pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose) -if options.data_source not in ("lvshm", "framexmit"): - if options.verbose: - print >>sys.stderr, "setting pipeline state to paused ..." - if pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS: - raise RuntimeError, "pipeline did not enter paused state" -else: # - # start http interface + # Run pipeline # - # FIXME: don't hard-code port, don't use in this program right now since more than one job runs per machine - httpinterface.start_servers(16953, verbose = options.verbose) + + if options.data_source not in ("lvshm", "framexmit"): + if options.verbose: + print >>sys.stderr, "setting pipeline state to paused ..." + if pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS: + raise RuntimeError, "pipeline did not enter paused state" + else: + # + # start http interface + # + + # FIXME: don't hard-code port, don't use in this program right now since more than one job runs per machine + httpinterface.start_servers(16953, verbose = options.verbose) + + # + # setup sigint handler to shutdown pipeline. this is how the program stops + # gracefully, it is the only way to stop it. Otherwise it runs forever + # man. + # + + signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline)) + signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline)) + + + if options.verbose: + print >>sys.stderr, "setting pipeline state to playing ..." + if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS: + raise RuntimeError, "pipeline did not enter playing state" + + if options.write_pipeline is not None: + pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose) + + if options.verbose: + print >>sys.stderr, "running pipeline ..." + mainloop.run() + + # - # setup sigint handler to shutdown pipeline. this is how the program stops - # gracefully, it is the only way to stop it. Otherwise it runs forever - # man. + # write output file # - class OneTimeSignalHandler(object): - def __init__(self, pipeline): - self.pipeline = pipeline - self.count = 0 + output.write_output_file(filename = output_filename or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), likelihood_file = likelihood_file, verbose = options.verbose) - def __call__(self, signum, frame): - self.count += 1 - if self.count == 1: - print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum - try: - #FIXME how do I choose a timestamp? - self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns())) - if not self.pipeline.send_event(gst.event_new_eos()): - raise Exception("pipeline.send_event(EOS) returned failure") - except Exception, e: - print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e) - os._exit(1) - else: - print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count) - signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline)) - signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline)) - -if options.verbose: - print >>sys.stderr, "setting pipeline state to playing ..." -if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS: - raise RuntimeError, "pipeline did not enter playing state" + # + # Cleanup template bank temp files + # -if options.write_pipeline is not None: - pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose) -if options.verbose: - print >>sys.stderr, "running pipeline ..." -mainloop.run() + for ifo in banks: + for bank in banks[ifo]: + if options.verbose: + print >> sys.stderr, "removing file: ", bank.template_bank_filename + os.remove(bank.template_bank_filename) + del handler + if pipeline.set_state(gst.STATE_NULL) != gst.STATE_CHANGE_SUCCESS: + raise RuntimeError, "pipeline could not be set to NULL" # -# write output file +# Cleanup local caches # - -output.write_output_file(filename = options.output or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), likelihood_file = options.likelihood_file, verbose = options.verbose) - +if options.local_frame_caching: + if options.verbose: + print >> sys.stderr, "deleting temporary cache file ", detectors.frame_cache + os.remove(detectors.frame_cache) + del detectors.local_cache_list # # done