From 60a60f8f6c33a2919fe9620004bf747ddb3fe8fa Mon Sep 17 00:00:00 2001 From: Kipp Cannon <kipp.cannon@ligo.org> Date: Thu, 14 Jun 2018 18:45:12 +0900 Subject: [PATCH] gstlal-inspiral: merge Data and Handler - move Handler to its own module - rename .gen_psd_xmldoc() to .__get_psd_xmldoc() following conventions used in inspiral.Data - inspiral.py: excise gracedb code from Data object, encapsulate in GracedBWrapper class - remove now unused write_psd_fileobj() from reference_psd.py - move Data class to lloidhandler.py - rename ._gatehandler() .__gatehandler() for consistency - remove .flush(), unused locking wrapper of .__flush() - remove .do_gracedb_alerts(), unused locking wrapper of .__do_gracedb_alerts() - excise eye candy from Data class, move to stand-alone EyeCandy class - merge what remains of Data and Handler into a single class - excise segment tracking from result, move to stand-alone SegmentsTracker class - use Handler's PSDs for GracedB uploads instead of getting new ones (addresses a FIXME) --- gstlal-inspiral/bin/gstlal_inspiral | 49 +- gstlal-inspiral/python/Makefile.am | 1 + gstlal-inspiral/python/inspiral.py | 975 +++-------------- gstlal-inspiral/python/lloidhandler.py | 1354 ++++++++++++++++++++++++ gstlal-inspiral/python/lloidparts.py | 416 -------- gstlal/python/reference_psd.py | 8 - 6 files changed, 1501 insertions(+), 1302 deletions(-) create mode 100644 gstlal-inspiral/python/lloidhandler.py diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index 39c0f6d8fe..bd9439c592 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -168,12 +168,13 @@ from glue.ligolw import utils as ligolw_utils from glue.ligolw.utils import segments as ligolw_segments from gstlal import bottle from gstlal import datasource -from gstlal import lloidparts from gstlal import far from gstlal import httpinterface from gstlal import hoftcache from gstlal import inspiral from gstlal import inspiral_pipe +from gstlal import lloidhandler +from gstlal import lloidparts from gstlal import pipeparts from gstlal import simulation @@ -506,7 +507,7 @@ class OneTimeSignalHandler(object): print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum try: #FIXME how do I choose a timestamp? - self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp = inspiral.now().ns())) + self.pipeline.get_bus().post(lloidhandler.message_new_checkpoint(self.pipeline, timestamp = inspiral.now().ns())) if not self.pipeline.send_event(Gst.Event.new_eos()): raise Exception("pipeline.send_event(EOS) returned failure") except Exception as e: @@ -765,14 +766,16 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, # - # build output document + # construct and configure pipeline handler # if options.verbose: - print >>sys.stderr, "initializing output document ..." - output = inspiral.Data( - inspiral.CoincsDocument( + print >>sys.stderr, "initializing pipeline handler ..." + handler = lloidhandler.Handler( + mainloop = mainloop, + pipeline = pipeline, + coincs_document = inspiral.CoincsDocument( url = output_url or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.instrumentsproperty.set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))), process_params = process_params, comment = options.comment, @@ -784,8 +787,18 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, replace_file = True, verbose = options.verbose ), - pipeline = pipeline, rankingstat = rankingstat, + gracedbwrapper = inspiral.GracedBWrapper( + instruments = rankingstat.instruments, + far_threshold = options.gracedb_far_threshold, + min_instruments = options.min_instruments, + group = options.gracedb_group, + search = options.gracedb_search, + pipeline = options.gracedb_pipeline, + service_url = options.gracedb_service_url, + upload_auxiliary_data = (options.gracedb_service_url == gracedb_default_service_url), + verbose = options.verbose + ), ranking_stat_input_url = options.ranking_stat_input, ranking_stat_output_url = ranking_stat_output_url, rankingstatpdf_url = options.ranking_stat_pdf, @@ -794,28 +807,15 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, thinca_interval = options.thinca_interval, min_log_L = options.min_log_L, sngls_snr_threshold = options.singles_threshold, - gracedb_far_threshold = options.gracedb_far_threshold, - gracedb_min_instruments = options.min_instruments, - gracedb_group = options.gracedb_group, - gracedb_search = options.gracedb_search, - gracedb_pipeline = options.gracedb_pipeline, - gracedb_service_url = options.gracedb_service_url, - upload_auxiliary_data_to_gracedb = (options.gracedb_service_url == gracedb_default_service_url), - verbose = options.verbose - ) - if options.verbose: - print >>sys.stderr, "... output document initialized" - - handler = lloidparts.Handler(mainloop, pipeline, - output, - instruments = rankingstat.instruments, tag = options.job_tag, verbose = options.verbose ) + if options.verbose: + print >>sys.stderr, "... pipeline handler initialized" if options.verbose: print >>sys.stderr, "attaching appsinks to pipeline ...", - appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer) + appsync = pipeparts.AppSync(appsink_new_buffer = handler.appsink_new_buffer) appsinks = set(appsync.add_sink(pipeline, src, caps = Gst.Caps.from_string("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs)) if options.verbose: print >>sys.stderr, "attached %d, done" % len(appsinks) @@ -873,7 +873,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, # - output.write_output_url(url = output_url or output.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose) + handler.write_output_url(url = output_url or handler.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose) # @@ -907,7 +907,6 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS: raise RuntimeError("pipeline could not be set to NULL") del handler.pipeline - del output.pipeline del handler del bank del banks diff --git a/gstlal-inspiral/python/Makefile.am b/gstlal-inspiral/python/Makefile.am index b1721881b3..9f76e9446e 100644 --- a/gstlal-inspiral/python/Makefile.am +++ b/gstlal-inspiral/python/Makefile.am @@ -17,6 +17,7 @@ pkgpython_PYTHON = \ imr_utils.py \ inspiral_pipe.py \ inspiral.py \ + lloidhandler.py \ lloidparts.py \ lloidplots.py \ llweb.py \ diff --git a/gstlal-inspiral/python/inspiral.py b/gstlal-inspiral/python/inspiral.py index d1ebe9a481..d9ca892d92 100644 --- a/gstlal-inspiral/python/inspiral.py +++ b/gstlal-inspiral/python/inspiral.py @@ -50,39 +50,17 @@ # -import bisect -from collections import deque -try: - from fpconst import NaN -except ImportError: - # fpconst is not part of the standard library and might not be - # available - NaN = float("nan") -import itertools import math import numpy -import os -import resource from scipy import random import sqlite3 import StringIO -import subprocess import sys -import threading import time import httplib import tempfile -import shutil - -import gi -gi.require_version('Gst', '1.0') -from gi.repository import GObject, Gst -GObject.threads_init() -Gst.init(None) from glue import iterutils -from ligo import segments -from glue import segmentsUtils from glue.ligolw import ligolw from glue.ligolw import dbtables from glue.ligolw import ilwd @@ -97,15 +75,11 @@ from glue.ligolw.utils import segments as ligolw_segments from glue.ligolw.utils import time_slide as ligolw_time_slide import lal from lal import LIGOTimeGPS -from lal import rate from lal import series as lalseries -from lal.utils import CacheEntry from ligo import gracedb from gstlal import bottle from gstlal import cbc_template_iir -from gstlal import far -from gstlal import reference_psd from gstlal import streamthinca from gstlal import svd_bank @@ -139,14 +113,6 @@ def now(): return LIGOTimeGPS(lal.UTCToGPS(time.gmtime())) -def message_new_checkpoint(src, timestamp = None): - s = Gst.Structure.new_empty("CHECKPOINT") - message = Gst.Message.new_application(src, s) - if timestamp is not None: - message.timestamp = timestamp - return message - - def parse_svdbank_string(bank_string): """ parses strings of form @@ -235,27 +201,6 @@ def parse_iirbank_files(iir_banks, verbose, snr_threshold = 4.0): return banks -def subdir_from_T050017_filename(fname): - path = str(CacheEntry.from_T050017(fname).segment[0])[:5] - try: - os.mkdir(path) - except OSError: - pass - return path - - -def lower_bound_in_seglist(seglist, x): - """ - Return the index of the segment immediately at or before x in the - coalesced segment list seglist. Operation is O(log n). - """ - # NOTE: this is an operation that is performed in a number of - # locations in various codes, and is something that I've screwed up - # more than once. maybe this should be put into segments.py itself - i = bisect.bisect_right(seglist, x) - return i - 1 if i else 0 - - # # ============================================================================= # @@ -437,606 +382,149 @@ class CoincsDocument(object): del self.xmldoc -class Data(object): - def __init__(self, coincs_document, pipeline, rankingstat, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, min_log_L = None, sngls_snr_threshold = None, gracedb_far_threshold = None, gracedb_min_instruments = None, gracedb_group = "Test", gracedb_search = "LowMass", gracedb_pipeline = "gstlal", gracedb_service_url = "https://gracedb.ligo.org/api/", upload_auxiliary_data_to_gracedb = True, verbose = False): - # - # initialize - # +# +# ============================================================================= +# +# GracedB Wrapper +# +# ============================================================================= +# - self.lock = threading.Lock() - self.coincs_document = coincs_document - self.pipeline = pipeline - self.verbose = verbose - self.upload_auxiliary_data_to_gracedb = upload_auxiliary_data_to_gracedb - # None to disable periodic snapshots, otherwise seconds - self.likelihood_snapshot_interval = likelihood_snapshot_interval - self.likelihood_snapshot_timestamp = None - # gracedb far threshold - self.gracedb_far_threshold = gracedb_far_threshold - self.gracedb_min_instruments = gracedb_min_instruments - self.gracedb_group = gracedb_group - self.gracedb_search = gracedb_search - self.gracedb_pipeline = gracedb_pipeline - self.gracedb_service_url = gracedb_service_url - # - # seglistdicts is populated the pipeline handler. - # +class GracedBWrapper(object): + retries = 5 + retry_delay = 5.0 - self.seglistdicts = None + DEFAULT_SERVICE_URL = gracedb.rest.DEFAULT_SERVICE_URL - # - # setup bottle routes - # + def __init__(self, instruments, far_threshold = None, min_instruments = None, group = "Test", search = "LowMass", pipeline = "gstlal", service_url = None, upload_auxiliary_data = True, verbose = False): + self.instruments = frozenset(instruments) + self.min_instruments = min_instruments + self.group = group + self.search = search + self.pipeline = pipeline + self.service_url = service_url if service_url is not None else self.DEFAULT_SERVICE_URL + self.upload_auxiliary_data = upload_auxiliary_data + self.verbose = verbose + # must initialize after .service_url because this might + # cause the client to be created, which requires + # .service_url to have already been set + self.far_threshold = far_threshold - bottle.route("/latency_histogram.txt")(self.web_get_latency_histogram) - bottle.route("/latency_history.txt")(self.web_get_latency_history) - bottle.route("/snr_history.txt")(self.web_get_snr_history) - for instrument in rankingstat.instruments: - bottle.route("/%s_snr_history.txt" % instrument)(lambda : self.web_get_sngl_snr_history(ifo = instrument)) - bottle.route("/likelihood_history.txt")(self.web_get_likelihood_history) - bottle.route("/far_history.txt")(self.web_get_far_history) - bottle.route("/ram_history.txt")(self.web_get_ram_history) - bottle.route("/rankingstat.xml")(self.web_get_rankingstat) - bottle.route("/zerolag_rankingstatpdf.xml")(self.web_get_zerolag_rankingstatpdf) bottle.route("/gracedb_far_threshold.txt", method = "GET")(self.web_get_gracedb_far_threshold) bottle.route("/gracedb_far_threshold.txt", method = "POST")(self.web_set_gracedb_far_threshold) bottle.route("/gracedb_min_instruments.txt", method = "GET")(self.web_get_gracedb_min_instruments) bottle.route("/gracedb_min_instruments.txt", method = "POST")(self.web_set_gracedb_min_instruments) - bottle.route("/sngls_snr_threshold.txt", method = "GET")(self.web_get_sngls_snr_threshold) - bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold) - - # - # all triggers up to this index have had their SNR time - # series deleted to save memory - # - - self.snr_time_series_cleanup_index = 0 - - # - # attach a StreamThinca instance to ourselves - # - - self.stream_thinca = streamthinca.StreamThinca( - coincidence_threshold = rankingstat.delta_t, - thinca_interval = thinca_interval, # seconds - min_instruments = rankingstat.min_instruments, - min_log_L = min_log_L, - sngls_snr_threshold = sngls_snr_threshold - ) - - # - # setup likelihood ratio book-keeping. - # - # in online mode, if ranking_stat_input_url is set then on - # each snapshot interval, and before providing stream - # thinca with its ranking statistic information, the - # current rankingstat object is replaced with the contents - # of that file. this is intended to be used by trigger - # generator jobs on the injection branch of an online - # analysis to import ranking statistic information from - # their non-injection cousins instead of using whatever - # statistics they've collected internally. - # ranking_stat_input_url is not used when running offline. - # - # ranking_stat_output_url provides the name of the file to - # which the internally-collected ranking statistic - # information is to be written whenever output is written - # to disk. if set to None, then only the trigger file will - # be written, no ranking statistic information will be - # written. normally it is set to a non-null value, but - # injection jobs might be configured to disable ranking - # statistic output since they produce nonsense. - # - - self.ranking_stat_input_url = ranking_stat_input_url - self.ranking_stat_output_url = ranking_stat_output_url - self.rankingstat = rankingstat - - # - # if we have been supplied with external ranking statistic - # information then use it to enable ranking statistic - # assignment in streamthinca. otherwise, if we have not - # been and yet we have been asked to apply the min log L - # cut anyway then enable ranking statistic assignment using - # the dataless ranking statistic variant - # - - if self.ranking_stat_input_url is not None: - if self.rankingstat.is_healthy(): - self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() - else: - del self.stream_thinca.rankingstat - elif min_log_L is not None: - self.stream_thinca.rankingstat = far.DatalessRankingStat( - template_ids = rankingstat.template_ids, - instruments = rankingstat.instruments, - min_instruments = rankingstat.min_instruments, - delta_t = rankingstat.delta_t - ).finish() - # - # zero_lag_ranking_stats is a RankingStatPDF object that is - # used to accumulate a histogram of the likelihood ratio - # values assigned to zero-lag candidates. this is required - # to implement the extinction model for low-significance - # events during online running but otherwise is optional. - # - # FIXME: if the file does not exist or is not readable, - # the code silently initializes a new, empty, histogram. - # it would be better to determine whether or not the file - # is required and fail when it is missing - # - - if zerolag_rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(zerolag_rankingstatpdf_url), os.R_OK): - _, self.zerolag_rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(zerolag_rankingstatpdf_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) - if self.zerolag_rankingstatpdf is None: - raise ValueError("\"%s\" does not contain ranking statistic PDF data" % zerolag_rankingstatpdf_url) - elif zerolag_rankingstatpdf_url is not None: - # initialize an all-zeros set of PDFs - self.zerolag_rankingstatpdf = far.RankingStatPDF(rankingstat, nsamples = 0) + @property + def far_threshold(self): + return self._far_threshold + + @far_threshold.setter + def far_threshold(self, far_threshold): + self._far_threshold = far_threshold + if far_threshold is None or far_threshold < 0.: + self.gracedb_client = None else: - self.zerolag_rankingstatpdf = None - self.zerolag_rankingstatpdf_url = zerolag_rankingstatpdf_url + self.gracedb_client = gracedb.rest.GraceDb(self.service_url) - # - # rankingstatpdf contains the RankingStatPDF object (loaded - # from rankingstatpdf_url) used to initialize the FAPFAR - # object for on-the-fly FAP and FAR assignment. except to - # initialize the FAPFAR object it is not used for anything, - # but is retained so that it can be exposed through the web - # interface for diagnostic purposes and uploaded to gracedb - # with candidates. the extinction model is applied to - # initialize the FAPFAR object but the original is retained - # for upload to gracedb, etc. - # + def web_get_gracedb_far_threshold(self): + with self.lock: + if self.far_threshold is not None: + yield "rate=%.17g\n" % self.far_threshold + else: + yield "rate=\n" - self.rankingstatpdf_url = rankingstatpdf_url - self.load_rankingstat_pdf() + def web_set_gracedb_far_threshold(self): + try: + with self.lock: + rate = bottle.request.forms["rate"] + if rate: + self.far_threshold = float(rate) + yield "OK: rate=%.17g\n" % self.far_threshold + else: + self.far_threshold = None + yield "OK: rate=\n" + except: + yield "error\n" - # - # Fun output stuff - # - - self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),))) - self.latency_history = deque(maxlen = 1000) - self.snr_history = deque(maxlen = 1000) - self.likelihood_history = deque(maxlen = 1000) - self.far_history = deque(maxlen = 1000) - self.ram_history = deque(maxlen = 2) - self.ifo_snr_history = dict((instrument, deque(maxlen = 10000)) for instrument in rankingstat.instruments) - - def load_rankingstat_pdf(self): - # FIXME: if the file can't be accessed the code silently - # disables FAP/FAR assignment. need to figure out when - # failure is OK and when it's not OK and put a better check - # here. - if self.rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(self.rankingstatpdf_url), os.R_OK): - _, self.rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.rankingstatpdf_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) - if self.rankingstatpdf is None: - raise ValueError("\"%s\" does not contain ranking statistic PDFs" % url) - if not self.rankingstat.template_ids <= self.rankingstatpdf.template_ids: - raise ValueError("\"%s\" is for the wrong templates") - if self.rankingstatpdf.is_healthy(): - self.fapfar = far.FAPFAR(self.rankingstatpdf.new_with_extinction()) + def web_get_gracedb_min_instruments(self): + with self.lock: + if self.min_instruments is not None: + yield "gracedb_min_instruments=%d\n" % self.min_instruments else: - self.fapfar = None - else: - self.rankingstatpdf = None - self.fapfar = None + yield "gracedb_min_instruments=\n" - def appsink_new_buffer(self, elem): - with self.lock: - # retrieve triggers from appsink element - buf = elem.emit("pull-sample").get_buffer() - events = [] - for i in range(buf.n_memory()): - memory = buf.peek_memory(i) - result, mapinfo = memory.map(Gst.MapFlags.READ) - assert result - # NOTE NOTE NOTE NOTE - # It is critical that the correct class' - # .from_buffer() method be used here. This - # code is interpreting the buffer's - # contents as an array of C structures and - # building instances of python wrappers of - # those structures but if the python - # wrappers are for the wrong structure - # declaration then terrible terrible things - # will happen - # NOTE NOTE NOTE NOTE - # FIXME why does mapinfo.data come out as - # an empty list on some occasions??? - if mapinfo.data: - events.extend(streamthinca.SnglInspiral.from_buffer(mapinfo.data)) - memory.unmap(mapinfo) - - # FIXME: ugly way to get the instrument - instrument = elem.get_name().split("_", 1)[0] - - # extract segment. move the segment's upper - # boundary to include all triggers. ARGH the 1 ns - # offset is needed for the respective trigger to be - # "in" the segment (segments are open from above) - # FIXME: is there another way? - buf_timestamp = LIGOTimeGPS(0, buf.pts) - buf_seg = segments.segment(buf_timestamp, max((buf_timestamp + LIGOTimeGPS(0, buf.duration),) + tuple(event.end + 0.000000001 for event in events))) - buf_is_gap = bool(buf.mini_object.flags & Gst.BufferFlags.GAP) - # sanity check that gap buffers are empty - assert not (buf_is_gap and events) - - # safety check end times. OK for end times to be - # past end of buffer, but we cannot allow triggr - # times to go backwards. they cannot precede the - # buffer's start because, below, - # streamthinca.add_events() will be told the - # trigger list is complete upto this buffer's time - # stamp. - assert all(event.end >= buf_timestamp for event in events) - # we have extended the buf_seg above to enclose the - # triggers, make sure that worked - assert all(event.end in buf_seg for event in events) - - # set all effective distances to NaN. - # gstlal_inspiral's effective distances are - # incorrect, and the PE codes require us to either - # provide correct effective distances or - # communicate to them that they are incorrect. - # they have explained that setting them to NaN is - # sufficient for the latter. - # FIXME: fix the effective distances - for event in events: - event.eff_distance = NaN - - # Find max SNR sngles - if events: - max_snr_event = max(events, key = lambda t: t.snr) - self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr)) - - # set metadata on triggers. because this uses the - # ID generator attached to the database-backed - # sngl_inspiral table, and that generator has been - # synced to the database' contents, the IDs - # assigned here will not collide with any already - # in the database - for event in events: - event.process_id = self.coincs_document.process_id - event.event_id = self.coincs_document.get_next_sngl_id() - - # update likelihood snapshot if needed - if self.likelihood_snapshot_interval is not None and (self.likelihood_snapshot_timestamp is None or buf_timestamp - self.likelihood_snapshot_timestamp >= self.likelihood_snapshot_interval): - self.likelihood_snapshot_timestamp = buf_timestamp - - # post a checkpoint message. - # FIXME: make sure this triggers - # self.snapshot_output_url() to be invoked. - # lloidparts takes care of that for now, - # but spreading the program logic around - # like that isn't a good idea, this code - # should be responsible for it somehow, no? - # NOTE: self.snapshot_output_url() writes - # the current rankingstat object to the - # location identified by .ranking_stat_output_url, - # so if that is either not set or at least - # set to a different name than - # .ranking_stat_input_url the file that has - # just been loaded above will not be - # overwritten. - self.pipeline.get_bus().post(message_new_checkpoint(self.pipeline, timestamp = buf_timestamp.ns())) - - # if a ranking statistic source url is set - # and is not the same as the file to which - # we are writing our ranking statistic data - # then overwrite rankingstat with its - # contents. the use case is online - # injection jobs that need to periodically - # grab new ranking statistic data from - # their corresponding non-injection partner - if self.ranking_stat_input_url is not None and self.ranking_stat_input_url != self.ranking_stat_output_url: - params_before = self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t - self.rankingstat, _ = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.ranking_stat_input_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) - if params_before != (self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t): - raise ValueError("'%s' contains incompatible ranking statistic configuration" % self.ranking_stat_input_url) - - # update streamthinca's ranking statistic - # data - if self.rankingstat.is_healthy(): - self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() + def web_set_gracedb_min_instruments(self): + try: + with self.lock: + min_instruments = bottle.request.forms["gracedb_min_instruments"] + if min_instruments is not None: + self.min_instruments = int(min_instruments) + yield "OK: gracedb_min_instruments=%d\n" % self.min_instruments else: - del self.stream_thinca.rankingstat - - # optionally get updated ranking statistic - # PDF data and enable FAP/FAR assignment - self.load_rankingstat_pdf() - - # add triggers to trigger rate record. this needs - # to be done without any cuts on coincidence, etc., - # so that the total trigger count agrees with the - # total livetime from the SNR buffers. we assume - # the density of real signals is so small that this - # count is not sensitive to their presence. NOTE: - # this is not true locally. a genuine signal, if - # loud, can significantly increase the local - # density of triggers, therefore the trigger rate - # measurement machinery must take care to average - # the rate over a sufficiently long period of time - # that the rate estimates are insensitive to the - # presence of signals. the current implementation - # averages over whole science segments. NOTE: this - # must be done before running stream thinca (below) - # so that the "how many instruments were on test" - # is aware of this buffer. - if not buf_is_gap: - self.rankingstat.denominator.triggerrates[instrument].add_ratebin(map(float, buf_seg), len(events)) - - # extract times when instruments were producing - # SNR. used to define "on instruments" for coinc - # tables among other things. will only need - # segment information for the times for which we - # have triggers, so use stream_thinca's discard - # boundary and a bisection search to clip the lists - # to reduce subsequent operation count. - discard_boundary = float(self.stream_thinca.discard_boundary) - snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) - - # times when SNR was available. used only for code - # correctness checks - one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) - # FIXME: this is needed to work around rounding - # problems in safety checks below, trying to - # compare GPS trigger times to float segment - # boundaries (the boundaries don't have enough - # precision to know if triggers near the edge are - # in or out). it would be better not to have to - # screw around like this. - one_or_more_instruments.protract(1e-3) # 1 ms - - # times when at least 2 instruments were generating - # SNR. used to sieve triggers for inclusion in the - # denominator. - two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) - # FIXME: see comment above. - two_or_more_instruments.protract(1e-3) # 1 ms - - # run stream thinca. update the parameter - # distribution data from sngls that weren't used in - # zero-lag multi-instrument coincs. NOTE: we rely - # on the arguments to .chain() being evaluated in - # left-to-right order so that .add_events() is - # evaluated before .last_coincs because the former - # initializes the latter. we skip singles - # collected during times when only one instrument - # was on. NOTE: the definition of "on" is blurry - # since we can recover triggers with end times - # outside of the available strain data, but the - # purpose of this test is simply to prevent signals - # occuring during single-detector times from - # contaminating our noise model, so it's not - # necessary for this test to be super precisely - # defined. - for event in itertools.chain(self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, snr_segments, fapfar = self.fapfar), self.stream_thinca.last_coincs.single_sngl_inspirals() if self.stream_thinca.last_coincs else ()): - if self.ranking_stat_output_url is None: - continue - assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) - if event.end in two_or_more_instruments: - self.rankingstat.denominator.increment(event) - self.coincs_document.commit() - - # update zero-lag bin counts in rankingstat. - if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): - self.rankingstat.zerolag.increment(event) - - # Cluster last coincs before recording number of zero - # lag events or sending alerts to gracedb - # FIXME Do proper clustering that saves states between - # thinca intervals and uses an independent clustering - # window. This can also go wrong if there are multiple - # events with an identical likelihood. It will just - # choose the event with the highest event id - if self.stream_thinca.last_coincs: - self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) - - # Add events to the observed likelihood histogram - # post "clustering" - # FIXME proper clustering is really needed (see - # above) - if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 - - # do GraceDB alerts - if self.gracedb_far_threshold is not None: - if self.rankingstatpdf is not None and self.rankingstatpdf.is_healthy(): - self.__do_gracedb_alerts() - self.__update_eye_candy() - - # after doing alerts, no longer need per-trigger - # SNR data for the triggers that are too old to be - # used to form candidates. to avoid searching the - # entire list of triggers each time, we stop when - # we encounter the first trigger whose SNR series - # might still be needed, save its index, and start - # the search from there next time - discard_boundary = self.stream_thinca.discard_boundary - for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): - if event.end >= discard_boundary: - break - del event.snr_time_series - - def T050017_filename(self, description, extension): - # input check - if "-" in description: - raise ValueError("invalid characters in description '%s'" % description) - - # determine current extent - segs = segments.segmentlist(seglistdict.extent_all() for seglistdict in self.seglistdicts.values() if any(seglistdict.values())) - if segs: - start, end = segs.extent() - else: - # silence errors at start-up. - # FIXME: this is probably dumb. who cares. - start = end = now() - - # construct and return filename - start, end = int(math.floor(start)), int(math.ceil(end)) - return "%s-%s-%d-%d.%s" % ("".join(sorted(self.rankingstat.instruments)), description, start, end - start, extension) - - def __get_rankingstat_xmldoc(self): - # generate a ranking statistic output document. NOTE: if - # we are in possession of ranking statistic PDFs then those - # are included in the output. this allows a single - # document to be uploaded to gracedb. in an online - # analysis, those PDFs come from the marginlization process - # and represent the full distribution of ranking statistics - # across the search, and include with them the analysis' - # total observed zero-lag ranking statistic histogram --- - # everything required to re-evaluate the FAP and FAR for an - # uploaded candidate. - xmldoc = ligolw.Document() - xmldoc.appendChild(ligolw.LIGO_LW()) - process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, ifos = self.rankingstat.instruments) - far.gen_likelihood_control_doc(xmldoc, self.rankingstat, self.rankingstatpdf) - ligolw_process.set_process_end_time(process) - return xmldoc - - def web_get_rankingstat(self): - with self.lock: - output = StringIO.StringIO() - ligolw_utils.write_fileobj(self.__get_rankingstat_xmldoc(), output) - outstr = output.getvalue() - output.close() - return outstr - - def __get_zerolag_rankingstatpdf_xmldoc(self): - xmldoc = ligolw.Document() - xmldoc.appendChild(ligolw.LIGO_LW()) - process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, ifos = self.rankingstat.instruments) - far.gen_likelihood_control_doc(xmldoc, None, self.zerolag_rankingstatpdf) - ligolw_process.set_process_end_time(process) - return xmldoc - - def web_get_zerolag_rankingstatpdf(self): - with self.lock: - output = StringIO.StringIO() - ligolw_utils.write_fileobj(self.__get_zerolag_rankingstatpdf_xmldoc(), output) - outstr = output.getvalue() - output.close() - return outstr - - def __flush(self): - # run StreamThinca's .flush(). returns the last remaining - # non-coincident sngls. add them to the distribution. as - # above in appsink_new_buffer() we skip singles collected - # during times when only one instrument was one. - - # times when at least 2 instruments were generating SNR. - # used to sieve triggers for inclusion in the denominator. - discard_boundary = float(self.stream_thinca.discard_boundary) - snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) - one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) - # FIXME: see comment in appsink_new_buffer() - one_or_more_instruments.protract(1e-3) # 1 ms - two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) - # FIXME: see comment in appsink_new_buffer() - two_or_more_instruments.protract(1e-3) # 1 ms - - for event in self.stream_thinca.flush(self.coincs_document.xmldoc, self.coincs_document.process_id, snr_segments, fapfar = self.fapfar): - if self.ranking_stat_output_url is None: - continue - assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) - if event.end in two_or_more_instruments: - self.rankingstat.denominator.increment(event) - self.coincs_document.commit() - - # update zero-lag bin counts in rankingstat. - if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): - self.rankingstat.zerolag.increment(event) - - # Cluster last coincs before recording number of zero - # lag events or sending alerts to gracedb - # FIXME Do proper clustering that saves states between - # thinca intervals and uses an independent clustering - # window. This can also go wrong if there are multiple - # events with an identical likelihood. It will just - # choose the event with the highest event id - if self.stream_thinca.last_coincs: - self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) - - # Add events to the observed likelihood histogram post - # "clustering" - # FIXME proper clustering is really needed (see above) - if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: - for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): - if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: - self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 - - # do GraceDB alerts - if self.gracedb_far_threshold is not None and self.rankingstatpdf is not None and self.rankingstatpdf.is_healthy(): - self.__do_gracedb_alerts() - - # after doing alerts, no longer need per-trigger SNR data - # for the triggers that are too old to be used to form - # candidates. to avoid searching the entire list of - # triggers each time, we stop when we encounter the first - # trigger whose SNR series might still be needed, save its - # index, and start the search from there next time - discard_boundary = self.stream_thinca.discard_boundary - for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): - if event.end >= discard_boundary: - break - del event.snr_time_series - - # garbage collect last_coincs - # FIXME: this should not be needed. something is wrong. - # if this is needed, then why don't we have to garbage - # collect everything ourselves? - self.stream_thinca.last_coincs = {} - - def flush(self): - with self.lock: - self.__flush() + self.min_instruments = None + yield "OK: gracedb_min_instruments=\n" + except: + yield "error\n" - def __do_gracedb_alerts(self, retries = 5, retry_delay = 5.): - # sanity check - if self.fapfar is None: - raise ValueError("gracedb alerts cannot be enabled without fap/far data") + def __upload_aux_data(self, message, filename, tag, contents, gracedb_ids): + assert self.gracedb_client is not None, ".gracedb_client is None; did you forget to set .far_threshold?" + for gracedb_id in gracedb_ids: + if self.verbose: + print >>sys.stderr, "posting '%s' to gracedb ID %d ..." % (filename, gracedb_id) + for attempt in range(1, self.retries + 1): + try: + resp = self.gracedb_client.writeLog(gracedb_id, message, filename = filename, filecontents = contents, tagname = tag) + except gracedb.rest.HTTPError as resp: + pass + else: + if resp.status == httplib.CREATED: + break + print >>sys.stderr, "gracedb upload of %s for ID %s failed on attempt %d/%d: %d: %s" % (filename, gracedb_id, attempt, self.retries, resp.status, httplib.responses.get(resp.status, "Unknown")) + time.sleep(random.lognormal(math.log(self.retry_delay), .5)) + else: + print >>sys.stderr, "gracedb upload of %s for ID %s failed" % (filename, gracedb_id) - # no-op short circuit - if not self.stream_thinca.last_coincs: + def __upload_aux_xmldoc(self, message, filename, tag, xmldoc, gracedb_ids): + # check for no-op + if not gracedb_ids: return + fobj = StringIO.StringIO() + ligolw_utils.write_fileobj(xmldoc, fobj, gz = filename.endswith(".gz")) + self.__upload_aux_data(message, filename, tag, fobj.getvalue(), gracedb_ids) + del fobj - gracedb_client = gracedb.rest.GraceDb(self.gracedb_service_url) + def __do_alerts(self, last_coincs, psddict, rankingstat_xmldoc_func): gracedb_ids = [] - coinc_inspiral_index = self.stream_thinca.last_coincs.coinc_inspiral_index + + # no-op short circuit + if self.far_threshold is None or not self.stream_thinca.last_coincs: + return gracedb_ids + + coinc_inspiral_index = last_coincs.coinc_inspiral_index # This appears to be a silly for loop since # coinc_event_index will only have one value, but we're # future proofing this at the point where it could have # multiple clustered events - for coinc_event in self.stream_thinca.last_coincs.coinc_event_index.values(): + for coinc_event in last_coincs.coinc_event_index.values(): # # continue if the false alarm rate is not low # enough, or is nan, or there aren't enough # instruments participating in this coinc # - if coinc_inspiral_index[coinc_event.coinc_event_id].combined_far is None or coinc_inspiral_index[coinc_event.coinc_event_id].combined_far > self.gracedb_far_threshold or numpy.isnan(coinc_inspiral_index[coinc_event.coinc_event_id].combined_far) or len(self.stream_thinca.last_coincs.sngl_inspirals(coinc_event.coinc_event_id)) < self.gracedb_min_instruments: + if coinc_inspiral_index[coinc_event.coinc_event_id].combined_far is None or coinc_inspiral_index[coinc_event.coinc_event_id].combined_far > self.far_threshold or numpy.isnan(coinc_inspiral_index[coinc_event.coinc_event_id].combined_far) or len(last_coincs.sngl_inspirals(coinc_event.coinc_event_id)) < self.min_instruments: continue # # fake a filename for end-user convenience # - observatories = "".join(sorted(set(instrument[0] for instrument in self.rankingstat.instruments))) - instruments = "".join(sorted(self.rankingstat.instruments)) - description = "%s_%s_%s_%s" % (instruments, ("%.4g" % coinc_inspiral_index[coinc_event.coinc_event_id].mass).replace(".", "_").replace("-", "_"), self.gracedb_group, self.gracedb_search) + observatories = "".join(sorted(set(instrument[0] for instrument in self.instruments))) + instruments = "".join(sorted(self.instruments)) + description = "%s_%s_%s_%s" % (instruments, ("%.4g" % coinc_inspiral_index[coinc_event.coinc_event_id].mass).replace(".", "_").replace("-", "_"), self.group, self.search) end_time = int(coinc_inspiral_index[coinc_event.coinc_event_id].end) filename = "%s-%s-%d-%d.xml" % (observatories, description, end_time, 0) @@ -1054,7 +542,7 @@ class Data(object): if self.verbose: print >>sys.stderr, "sending %s to gracedb ..." % filename message = StringIO.StringIO() - xmldoc = self.stream_thinca.last_coincs[coinc_event.coinc_event_id] + xmldoc = last_coincs[coinc_event.coinc_event_id] # give the alert all the standard inspiral # columns (attributes should all be # populated). FIXME: ugly. @@ -1066,7 +554,7 @@ class Data(object): # already has it pass # add SNR time series if available - for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event.coinc_event_id): + for event in last_coincs.sngl_inspirals(coinc_event.coinc_event_id): snr_time_series = event.snr_time_series if snr_time_series is not None: xmldoc.childNodes[-1].appendChild(lalseries.build_COMPLEX8TimeSeries(snr_time_series)).appendChild(ligolw_param.Param.from_pyvalue(u"event_id", event.event_id)) @@ -1075,9 +563,9 @@ class Data(object): xmldoc.unlink() # FIXME: make this optional from command line? if True: - for attempt in range(1, retries + 1): + for attempt in range(1, self.retries + 1): try: - resp = gracedb_client.createEvent(self.gracedb_group, self.gracedb_pipeline, filename, filecontents = message.getvalue(), search = self.gracedb_search) + resp = self.gracedb_client.createEvent(self.group, self.pipeline, filename, filecontents = message.getvalue(), search = self.search) except gracedb.rest.HTTPError as resp: pass else: @@ -1087,244 +575,25 @@ class Data(object): print >>sys.stderr, "event assigned grace ID %s" % resp_json["graceid"] gracedb_ids.append(resp_json["graceid"]) break - print >>sys.stderr, "gracedb upload of %s failed on attempt %d/%d: %d: %s" % (filename, attempt, retries, resp.status, httplib.responses.get(resp.status, "Unknown")) - time.sleep(random.lognormal(math.log(retry_delay), .5)) + print >>sys.stderr, "gracedb upload of %s failed on attempt %d/%d: %d: %s" % (filename, attempt, self.retries, resp.status, httplib.responses.get(resp.status, "Unknown")) + time.sleep(random.lognormal(math.log(self.retry_delay), .5)) else: print >>sys.stderr, "gracedb upload of %s failed" % filename else: - proc = subprocess.Popen(("/bin/cp", "/dev/stdin", filename), stdin = subprocess.PIPE) - proc.stdin.write(message.getvalue()) - proc.stdin.flush() - proc.stdin.close() + with open(filename, "w") as f: + f.write(message.getvalue()) message.close() - if gracedb_ids and self.upload_auxiliary_data_to_gracedb: - # - # retrieve and upload PSDs - # - - if self.verbose: - print >>sys.stderr, "retrieving PSDs from whiteners and generating psd.xml.gz ..." - psddict = {} - for instrument in self.rankingstat.instruments: - elem = self.pipeline.get_by_name("lal_whiten_%s" % instrument) - data = numpy.array(elem.get_property("mean-psd")) - psddict[instrument] = lal.CreateREAL8FrequencySeries( - name = "PSD", - epoch = LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0), - f0 = 0.0, - deltaF = elem.get_property("delta-f"), - sampleUnits = lal.Unit("s strain^2"), # FIXME: don't hard-code this - length = len(data) - ) - psddict[instrument].data.data = data - fobj = StringIO.StringIO() - reference_psd.write_psd_fileobj(fobj, psddict, gz = True) - message, filename, tag, contents = ("strain spectral densities", "psd.xml.gz", "psd", fobj.getvalue()) - self.__upload_gracedb_aux_data(message, filename, tag, contents, gracedb_ids, retries, gracedb_client) - - # - # retrieve and upload Ranking Data - # - - if self.verbose: - print >>sys.stderr, "generating ranking_data.xml.gz ..." - fobj = StringIO.StringIO() - ligolw_utils.write_fileobj(self.__get_rankingstat_xmldoc(), fobj, gz = True) - message, filename, tag, contents = ("ranking statistic PDFs", "ranking_data.xml.gz", "ranking statistic", fobj.getvalue()) - del fobj - self.__upload_gracedb_aux_data(message, filename, tag, contents, gracedb_ids, retries, gracedb_client) - - def __upload_gracedb_aux_data(self, message, filename, tag, contents, gracedb_ids, retries, gracedb_client, retry_delay = 5): - for gracedb_id in gracedb_ids: - for attempt in range(1, retries + 1): - try: - resp = gracedb_client.writeLog(gracedb_id, message, filename = filename, filecontents = contents, tagname = tag) - except gracedb.rest.HTTPError as resp: - pass - else: - if resp.status == httplib.CREATED: - break - print >>sys.stderr, "gracedb upload of %s for ID %s failed on attempt %d/%d: %d: %s" % (filename, gracedb_id, attempt, retries, resp.status, httplib.responses.get(resp.status, "Unknown")) - time.sleep(random.lognormal(math.log(retry_delay), .5)) - else: - print >>sys.stderr, "gracedb upload of %s for ID %s failed" % (filename, gracedb_id) - - def do_gracedb_alerts(self): - with self.lock: - self.__do_gracedb_alerts() - - def __update_eye_candy(self): - self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB - if self.stream_thinca.last_coincs: - latency_val = None - snr_val = (0,0) - like_val = (0,0) - far_val = (0,0) - coinc_inspiral_index = self.stream_thinca.last_coincs.coinc_inspiral_index - coinc_event_index = self.stream_thinca.last_coincs.coinc_event_index - for coinc_event_id, coinc_inspiral in coinc_inspiral_index.items(): - # FIXME: update when a proper column is available - latency = coinc_inspiral.minimum_duration - self.latency_histogram[latency,] += 1 - if latency_val is None: - t = float(coinc_inspiral_index[coinc_event_id].end) - latency_val = (t, latency) - snr = coinc_inspiral_index[coinc_event_id].snr - if snr >= snr_val[1]: - t = float(coinc_inspiral_index[coinc_event_id].end) - snr_val = (t, snr) - for coinc_event_id, coinc_inspiral in coinc_event_index.items(): - like = coinc_event_index[coinc_event_id].likelihood - if like >= like_val[1]: - t = float(coinc_inspiral_index[coinc_event_id].end) - like_val = (t, like) - far_val = (t, coinc_inspiral_index[coinc_event_id].combined_far) - if latency_val is not None: - self.latency_history.append(latency_val) - if snr_val != (0,0): - self.snr_history.append(snr_val) - if like_val != (0,0): - self.likelihood_history.append(like_val) - if far_val != (0,0): - self.far_history.append(far_val) - - def update_eye_candy(self): - with self.lock: - self.__update_eye_candy() - - def web_get_latency_histogram(self): - with self.lock: - for latency, number in zip(self.latency_histogram.centres()[0][1:-1], self.latency_histogram.array[1:-1]): - yield "%e %e\n" % (latency, number) - - def web_get_latency_history(self): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, latency in self.latency_history: - yield "%f %e\n" % (time, latency) - - def web_get_snr_history(self): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, snr in self.snr_history: - yield "%f %e\n" % (time, snr) - - def web_get_sngl_snr_history(self, ifo): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, snr in self.ifo_snr_history[ifo]: - yield "%f %e\n" % (time, snr) - - def web_get_likelihood_history(self): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, like in self.likelihood_history: - yield "%f %e\n" % (time, like) - - def web_get_far_history(self): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, far in self.far_history: - yield "%f %e\n" % (time, far) - - def web_get_ram_history(self): - with self.lock: - # first one in the list is sacrificed for a time stamp - for time, ram in self.ram_history: - yield "%f %e\n" % (time, ram) - - def web_get_gracedb_far_threshold(self): - with self.lock: - if self.gracedb_far_threshold is not None: - yield "rate=%.17g\n" % self.gracedb_far_threshold - else: - yield "rate=\n" - - def web_set_gracedb_far_threshold(self): - try: - with self.lock: - rate = bottle.request.forms["rate"] - if rate: - self.gracedb_far_threshold = float(rate) - yield "OK: rate=%.17g\n" % self.gracedb_far_threshold - else: - self.gracedb_far_threshold = None - yield "OK: rate=\n" - except: - yield "error\n" - - def web_get_gracedb_min_instruments(self): - with self.lock: - if self.gracedb_min_instruments is not None: - yield "gracedb_min_instruments=%d\n" % self.gracedb_min_instruments - else: - yield "gracedb_min_instruments=\n" - - def web_set_gracedb_min_instruments(self): - try: - with self.lock: - gracedb_min_instruments = bottle.request.forms["gracedb_min_instruments"] - if gracedb_min_instruments is not None: - self.gracedb_min_instruments = int(gracedb_min_instruments) - yield "OK: gracedb_min_instruments=%d\n" % self.gracedb_min_instruments - else: - self.gracedb_min_instruments = None - yield "OK: gracedb_min_instruments=\n" - except: - yield "error\n" - - def web_get_sngls_snr_threshold(self): - with self.lock: - if self.stream_thinca.sngls_snr_threshold is not None: - yield "snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold - else: - yield "snr=\n" + # + # upload PSDs and ranking statistic data + # - def web_set_sngls_snr_threshold(self): - try: - with self.lock: - snr_threshold = bottle.request.forms["snr"] - if snr_threshold: - self.stream_thinca.sngls_snr_threshold = float(snr_threshold) - yield "OK: snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold - else: - self.stream_thinca.sngls_snr_threshold = None - yield "OK: snr=\n" - except: - yield "error\n" + if self.upload_auxiliary_data: + self.__upload_aux_xmldoc("strain spectral densities", "psd.xml.gz", "psd", lalseries.make_psd_xmldoc(psddict), gracedb_ids) + self.__upload_aux_xmldoc("ranking statistic PDFs", "ranking_data.xml.gz", "ranking statistic", rankingstat_xmldoc_func(), gracedb_ids) - def __write_output_url(self, url = None, verbose = False): - self.__flush() - if url is not None: - self.coincs_document.url = url - self.coincs_document.write_output_url(seglistdicts = self.seglistdicts, verbose = verbose) - # can't be used anymore - del self.coincs_document - - def __write_ranking_stat_url(self, url, description, snapshot = False, verbose = False): - # write the ranking statistic file. - ligolw_utils.write_url(self.__get_rankingstat_xmldoc(), url, gz = (url or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None) - # Snapshots get their own custom file and path - if snapshot: - fname = self.T050017_filename(description + '_DISTSTATS', 'xml.gz') - shutil.copy(ligolw_utils.local_path_from_url(url), os.path.join(subdir_from_T050017_filename(fname), fname)) - - def write_output_url(self, url = None, description = "", verbose = False): - with self.lock: - if self.ranking_stat_output_url is not None: - self.__write_ranking_stat_url(self.ranking_stat_output_url, description, verbose = verbose) - self.__write_output_url(url = url, verbose = verbose) + # + # done + # - def snapshot_output_url(self, description, extension, verbose = False): - with self.lock: - fname = self.T050017_filename(description, extension) - fname = os.path.join(subdir_from_T050017_filename(fname), fname) - if self.ranking_stat_output_url is not None: - self.__write_ranking_stat_url(self.ranking_stat_output_url, description, snapshot = True, verbose = verbose) - if self.zerolag_rankingstatpdf_url is not None: - ligolw_utils.write_url(self.__get_zerolag_rankingstatpdf_xmldoc(), self.zerolag_rankingstatpdf_url, gz = (self.zerolag_rankingstatpdf_url or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None) - coincs_document = self.coincs_document.get_another() - self.__write_output_url(url = fname, verbose = verbose) - self.coincs_document = coincs_document - self.snr_time_series_cleanup_index = 0 + return gracedb_ids diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py new file mode 100644 index 0000000000..fa1f78c34f --- /dev/null +++ b/gstlal-inspiral/python/lloidhandler.py @@ -0,0 +1,1354 @@ +# Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +## +# @file +# +# gstlal_inspiral's GStreamer pipeline handler. +# +# +# Review Status +# +# migrated from lloidparts.py 2018-06-15 +# +# | Names | Hash | Date | Diff to Head of Master | +# | ------------------------------------- | ---------------------------------------- | ---------- | --------------------------- | +# | Sathya, Duncan Me, Jolien, Kipp, Chad | 2f5f73f15a1903dc7cc4383ef30a4187091797d1 | 2014-05-02 | <a href="@gstlal_inspiral_cgit_diff/python/lloidparts.py?id=HEAD&id2=2f5f73f15a1903dc7cc4383ef30a4187091797d1">lloidparts.py</a> | +# +# + +## +# @package lloidparts +# +# gstlal_inspiral's GStreamer pipeline handler. +# + + +# +# ============================================================================= +# +# Preamble +# +# ============================================================================= +# + + +import bisect +from collections import deque +try: + from fpconst import NaN + from fpconst import PosInf +except ImportError: + # fpconst is not part of the standard library and might not be + # available + NaN = float("nan") + PosInf = float("+inf") +import itertools +import math +import numpy +import os +import resource +import StringIO +import sys +import threading +import time +import shutil + + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import GObject, Gst +GObject.threads_init() +Gst.init(None) + + +from glue import segments +from glue import segmentsUtils +from glue.ligolw import ligolw +from glue.ligolw import utils as ligolw_utils +from glue.ligolw.utils import process as ligolw_process +from glue.ligolw.utils import segments as ligolw_segments +from gstlal import bottle +from gstlal import far +from gstlal import inspiral +from gstlal import pipeio +from gstlal import reference_psd +from gstlal import simplehandler +from gstlal import streamthinca +import lal +from lal import LIGOTimeGPS +from lal import rate +from lal.utils import CacheEntry + + +# +# ============================================================================= +# +# Misc +# +# ============================================================================= +# + + +def message_new_checkpoint(src, timestamp = None): + s = Gst.Structure.new_empty("CHECKPOINT") + message = Gst.Message.new_application(src, s) + if timestamp is not None: + message.timestamp = timestamp + return message + + +def subdir_from_T050017_filename(fname): + path = str(CacheEntry.from_T050017(fname).segment[0])[:5] + try: + os.mkdir(path) + except OSError: + pass + return path + + +def lower_bound_in_seglist(seglist, x): + """ + Return the index of the segment immediately at or before x in the + coalesced segment list seglist. Operation is O(log n). + """ + # NOTE: this is an operation that is performed in a number of + # locations in various codes, and is something that I've screwed up + # more than once. maybe this should be put into segments.py itself + i = bisect.bisect_right(seglist, x) + return i - 1 if i else 0 + + +# +# ============================================================================= +# +# Web Eye Candy +# +# ============================================================================= +# + + +class EyeCandy(object): + def __init__(self, instruments): + self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),))) + self.latency_history = deque(maxlen = 1000) + self.snr_history = deque(maxlen = 1000) + self.likelihood_history = deque(maxlen = 1000) + self.far_history = deque(maxlen = 1000) + self.ram_history = deque(maxlen = 2) + self.ifo_snr_history = dict((instrument, deque(maxlen = 10000)) for instrument in instruments) + + # + # setup bottle routes + # + + bottle.route("/latency_histogram.txt")(self.web_get_latency_histogram) + bottle.route("/latency_history.txt")(self.web_get_latency_history) + bottle.route("/snr_history.txt")(self.web_get_snr_history) + for instrument in instruments: + bottle.route("/%s_snr_history.txt" % instrument)(lambda : self.web_get_sngl_snr_history(ifo = instrument)) + bottle.route("/likelihood_history.txt")(self.web_get_likelihood_history) + bottle.route("/far_history.txt")(self.web_get_far_history) + bottle.route("/ram_history.txt")(self.web_get_ram_history) + + def update(self, last_coincs): + self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB + if last_coincs: + latency_val = None + snr_val = (0,0) + like_val = (0,0) + far_val = (0,0) + coinc_inspiral_index = last_coincs.coinc_inspiral_index + coinc_event_index = last_coincs.coinc_event_index + for coinc_event_id, coinc_inspiral in coinc_inspiral_index.items(): + # FIXME: update when a proper column is available + latency = coinc_inspiral.minimum_duration + self.latency_histogram[latency,] += 1 + if latency_val is None: + t = float(coinc_inspiral_index[coinc_event_id].end) + latency_val = (t, latency) + snr = coinc_inspiral_index[coinc_event_id].snr + if snr >= snr_val[1]: + t = float(coinc_inspiral_index[coinc_event_id].end) + snr_val = (t, snr) + for coinc_event_id, coinc_inspiral in coinc_event_index.items(): + like = coinc_event_index[coinc_event_id].likelihood + if like >= like_val[1]: + t = float(coinc_inspiral_index[coinc_event_id].end) + like_val = (t, like) + far_val = (t, coinc_inspiral_index[coinc_event_id].combined_far) + if latency_val is not None: + self.latency_history.append(latency_val) + if snr_val != (0,0): + self.snr_history.append(snr_val) + if like_val != (0,0): + self.likelihood_history.append(like_val) + if far_val != (0,0): + self.far_history.append(far_val) + + def web_get_latency_histogram(self): + with self.lock: + for latency, number in zip(self.latency_histogram.centres()[0][1:-1], self.latency_histogram.array[1:-1]): + yield "%e %e\n" % (latency, number) + + def web_get_latency_history(self): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, latency in self.latency_history: + yield "%f %e\n" % (time, latency) + + def web_get_snr_history(self): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, snr in self.snr_history: + yield "%f %e\n" % (time, snr) + + def web_get_sngl_snr_history(self, ifo): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, snr in self.ifo_snr_history[ifo]: + yield "%f %e\n" % (time, snr) + + def web_get_likelihood_history(self): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, like in self.likelihood_history: + yield "%f %e\n" % (time, like) + + def web_get_far_history(self): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, far in self.far_history: + yield "%f %e\n" % (time, far) + + def web_get_ram_history(self): + with self.lock: + # first one in the list is sacrificed for a time stamp + for time, ram in self.ram_history: + yield "%f %e\n" % (time, ram) + + +# +# ============================================================================= +# +# Segmentlist Tracker +# +# ============================================================================= +# + + +class SegmentsTracker(object): + def __init__(self, pipeline, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False): + self.lock = threading.Lock() + self.verbose = verbose + + # setup segment list collection from gates + # + # FIXME: knowledge of what gates are present in what + # configurations, what they are called, etc., somehow needs to live + # with the code that constructs the gates + # + # FIXME: in the offline pipeline, state vector segments + # don't get recorded. however, except for the h(t) gate + # segments these are all inputs to the pipeline so it + # probably doesn't matter. nevertheless, they maybe should + # go into the event database for completeness of that + # record, or maybe not because it could result in a lot of + # duplication of on-disk data. who knows. think about it. + gate_suffix = { + # FIXME uncomment the framesegments line once the + # online analysis has a frame segments gate + #"framesegments": "frame_segments_gate", + "statevectorsegments": "state_vector_gate", + "whitehtsegments": "ht_gate" + } + + # dictionary mapping segtype to segmentlist dictionary + # mapping instrument to segment list + self.seglistdicts = dict((segtype, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in instruments)) for segtype in gate_suffix) + + # create a copy to keep track of recent segment history + self.recent_segment_histories = self.seglistdicts.copy() + self.segment_history_duration = segment_history_duration + + # iterate over segment types and instruments, look for the + # gate element that should provide those segments, and + # connect handlers to collect the segments + if verbose: + print >>sys.stderr, "connecting segment handlers to gates ..." + for segtype, seglistdict in self.seglistdicts.items(): + for instrument in seglistdict: + try: + name = "%s_%s" % (instrument, gate_suffix[segtype]) + except KeyError: + # this segtype doesn't come from + # gate elements + continue + elem = pipeline.get_by_name(name) + if elem is None: + # ignore missing gate elements + if verbose: + print >>sys.stderr, "\tcould not find %s for %s '%s'" % (name, instrument, segtype) + continue + if verbose: + print >>sys.stderr, "\tfound %s for %s '%s'" % (name, instrument, segtype) + elem.connect("start", self.gatehandler, (segtype, instrument, "on")) + elem.connect("stop", self.gatehandler, (segtype, instrument, "off")) + elem.set_property("emit-signals", True) + if verbose: + print >>sys.stderr, "... done connecting segment handlers to gates" + + + def __gatehandler(self, elem, timestamp, (segtype, instrument, new_state)): + """! + A handler that intercepts gate state transitions. + + @param elem A reference to the lal_gate element or None (only used for verbosity) + @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds) + @param segtype the class of segments this gate is defining, e.g., "datasegments", etc.. + @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc.. + @param new_state the state transition, must be either "on" or "off" + + Must be called with the lock held. + """ + timestamp = LIGOTimeGPS(0, timestamp) # timestamp is in nanoseconds + + if self.verbose and elem is not None: + print >>sys.stderr, "%s: %s '%s' state transition: %s @ %s" % (elem.get_name(), instrument, segtype, new_state, str(timestamp)) + + if new_state == "off": + # record end of segment + self.seglistdicts[segtype][instrument] -= segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) + elif new_state == "on": + # record start of new segment + self.seglistdicts[segtype][instrument] += segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) + else: + assert False, "impossible new_state '%s'" % new_state + + + def gatehandler(self, elem, timestamp, (segtype, instrument, new_state)): + with self.lock: + self.__gatehandler(elem, timestamp, (segtype, instrument, new_state)) + + + def terminate(self, timestamp): + # terminate all segments + with self.lock: + for segtype, seglistdict in self.seglistdicts.items(): + for instrument in seglistdict: + self.__gatehandler(None, timestamp, (segtype, instrument, "off")) + + + def gen_segments_xmldoc(self): + """! + A method to output the segment list in a valid ligolw xml + format. + + Must be called with the lock held. + """ + xmldoc = ligolw.Document() + xmldoc.appendChild(ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) + with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: + for segtype, seglistdict in self.seglistdicts.items(): + ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") + ligolw_process.set_process_end_time(process) + return xmldoc + + + def __T050017_filename(self, description, extension): + """! + Must be called with the lock held. + """ + # input check + if "-" in description: + raise ValueError("invalid characters in description '%s'" % description) + + # determine current extent + instruments = set(instrument for seglistdict in self.seglistdicts.values() for instrument in seglistdict) + segs = segments.segmentlist(seglistdict.extent_all() for seglistdict in self.seglistdicts.values() if any(seglistdict.values())) + if segs: + start, end = segs.extent() + if math.isinf(end): + end = inspiral.now() + else: + # silence errors at start-up. + # FIXME: this is probably dumb. who cares. + start = end = inspiral.now() + + # construct and return filename + start, end = int(math.floor(start)), int(math.ceil(end)) + return "%s-%s-%d-%d.%s" % ("".join(sorted(instruments)), description, start, end - start, extension) + + + def T050017_filename(self, description, extension): + with self.lock: + return self.__T050017_filename(description, extension) + + + def flush_segments_to_disk(self, tag, timestamp): + """! + Flush segments to disk, e.g., when checkpointing or shutting + down an online pipeline. + + @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk + """ + with self.lock: + # make a copy of the current segmentlistdicts + seglistdicts = dict((key, value.copy()) for key, value in self.seglistdicts.items()) + + # keep everything before timestamp in the current + # segmentlistdicts. keep everything after + # timestamp in the copy. we need to apply the cut + # this way around so that the T050017 filename + # constructed below has the desired start and + # duration + for seglistdict in self.seglistdicts.values(): + seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(timestamp, segments.PosInfinity)])) + for seglistdict in seglistdicts.values(): + seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(segments.NegInfinity, timestamp)])) + + # write the current (clipped) segmentlistdicts to disk + fname = self.__T050017_filename("%s_SEGMENTS" % tag, "xml.gz") + fname = os.path.join(subdir_from_T050017_filename(fname), fname) + ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None) + + # continue with the (clipped) copy + self.seglistdicts = seglistdicts + + + def web_get_segments_xml(self): + """! + provide a bottle route to get segment information via a url + """ + with self.lock: + output = StringIO.StringIO() + ligolw_utils.write_fileobj(self.gen_segments_xmldoc(), output) + outstr = output.getvalue() + output.close() + return outstr + + + def update_recent_segment_history(self): + """! + A method to update the recent segment histories + + Must be called with the lock held. + """ + current_gps_time = lal.GPSTimeNow() + interval_to_discard = segments.segmentlist((segments.segment(segments.NegInfinity, current_gps_time - self.segment_history_duration),)) + for segtype, seglistdict in self.recent_segment_histories.items(): + seglistdict.extend(self.seglistdicts[segtype]) + seglistdict.coalesce() + for seglist in seglistdict.values(): + seglist -= interval_to_discard + + + def gen_recent_segment_history_xmldoc(self): + """! + Construct and return a LIGOLW XML tree containing the + recent segment histories. + + Must be called with the lock held. + """ + self.update_recent_segment_history() + xmldoc = ligolw.Document() + xmldoc.appendChild(ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) + with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: + for segtype, seglistdict in self.recent_segment_histories.items(): + ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") + ligolw_process.set_process_end_time(process) + return xmldoc + + + def web_get_recent_segment_history_xml(self): + """! + provide a bottle route to get recent segment history + information via a url + """ + with self.lock: + output = StringIO.StringIO() + ligolw_utils.write_fileobj(self.gen_recent_segment_history_xmldoc(), output) + outstr = output.getvalue() + output.close() + return outstr + + +# +# ============================================================================= +# +# Pipeline Handler +# +# ============================================================================= +# + + +class Handler(simplehandler.Handler): + """! + A subclass of simplehandler.Handler to be used with e.g., + gstlal_inspiral + + Implements additional message handling for dealing with spectrum + messages and checkpoints for the online analysis including periodic + dumps of segment information, trigger files and background + distribution statistics. + """ + def __init__(self, mainloop, pipeline, coincs_document, rankingstat, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, min_log_L = None, sngls_snr_threshold = None, tag = "", verbose = False): + """! + @param mainloop The main application's event loop + @param pipeline The gstreamer pipeline that is being controlled by this handler + @param dataclass A Data class instance + @param tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag + @param verbose Be verbose + """ + super(Handler, self).__init__(mainloop, pipeline) + + # + # initialize + # + + self.lock = threading.Lock() + self.coincs_document = coincs_document + self.pipeline = pipeline + self.tag = tag + self.verbose = verbose + # None to disable periodic snapshots, otherwise seconds + self.likelihood_snapshot_interval = likelihood_snapshot_interval + self.likelihood_snapshot_timestamp = None + + self.gracedbwrapper = gracedbwrapper + # FIXME: detangle this + self.gracedbwrapper.lock = self.lock + + self.eye_candy = EyeCandy(rankingstat.instruments) + # FIXME: detangle this + self.eye_candy.lock = self.lock + + # + # setup segment list collection from gates + # + + self.segmentstracker = SegmentsTracker(pipeline, rankingstat.instruments, verbose = verbose) + + # + # setup bottle routes (and rahwts) + # + + bottle.route("/cumulative_segments.xml")(self.segmentstracker.web_get_recent_segment_history_xml) + bottle.route("/psds.xml")(self.web_get_psd_xml) + bottle.route("/rankingstat.xml")(self.web_get_rankingstat) + bottle.route("/segments.xml")(self.segmentstracker.web_get_segments_xml) + bottle.route("/sngls_snr_threshold.txt", method = "GET")(self.web_get_sngls_snr_threshold) + bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold) + bottle.route("/zerolag_rankingstatpdf.xml")(self.web_get_zerolag_rankingstatpdf) + + # + # all triggers up to this index have had their SNR time + # series deleted to save memory + # + + self.snr_time_series_cleanup_index = 0 + + # + # attach a StreamThinca instance to ourselves + # + + self.stream_thinca = streamthinca.StreamThinca( + coincidence_threshold = rankingstat.delta_t, + thinca_interval = thinca_interval, # seconds + min_instruments = rankingstat.min_instruments, + min_log_L = min_log_L, + sngls_snr_threshold = sngls_snr_threshold + ) + + # + # setup likelihood ratio book-keeping. + # + # in online mode, if ranking_stat_input_url is set then on + # each snapshot interval, and before providing stream + # thinca with its ranking statistic information, the + # current rankingstat object is replaced with the contents + # of that file. this is intended to be used by trigger + # generator jobs on the injection branch of an online + # analysis to import ranking statistic information from + # their non-injection cousins instead of using whatever + # statistics they've collected internally. + # ranking_stat_input_url is not used when running offline. + # + # ranking_stat_output_url provides the name of the file to + # which the internally-collected ranking statistic + # information is to be written whenever output is written + # to disk. if set to None, then only the trigger file will + # be written, no ranking statistic information will be + # written. normally it is set to a non-null value, but + # injection jobs might be configured to disable ranking + # statistic output since they produce nonsense. + # + + self.ranking_stat_input_url = ranking_stat_input_url + self.ranking_stat_output_url = ranking_stat_output_url + self.rankingstat = rankingstat + + # + # if we have been supplied with external ranking statistic + # information then use it to enable ranking statistic + # assignment in streamthinca. otherwise, if we have not + # been and yet we have been asked to apply the min log L + # cut anyway then enable ranking statistic assignment using + # the dataless ranking statistic variant + # + + if self.ranking_stat_input_url is not None: + if self.rankingstat.is_healthy(): + self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() + else: + del self.stream_thinca.rankingstat + elif min_log_L is not None: + self.stream_thinca.rankingstat = far.DatalessRankingStat( + template_ids = rankingstat.template_ids, + instruments = rankingstat.instruments, + min_instruments = rankingstat.min_instruments, + delta_t = rankingstat.delta_t + ).finish() + + # + # zero_lag_ranking_stats is a RankingStatPDF object that is + # used to accumulate a histogram of the likelihood ratio + # values assigned to zero-lag candidates. this is required + # to implement the extinction model for low-significance + # events during online running but otherwise is optional. + # + # FIXME: if the file does not exist or is not readable, + # the code silently initializes a new, empty, histogram. + # it would be better to determine whether or not the file + # is required and fail when it is missing + # + + if zerolag_rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(zerolag_rankingstatpdf_url), os.R_OK): + _, self.zerolag_rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(zerolag_rankingstatpdf_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) + if self.zerolag_rankingstatpdf is None: + raise ValueError("\"%s\" does not contain ranking statistic PDF data" % zerolag_rankingstatpdf_url) + elif zerolag_rankingstatpdf_url is not None: + # initialize an all-zeros set of PDFs + self.zerolag_rankingstatpdf = far.RankingStatPDF(rankingstat, nsamples = 0) + else: + self.zerolag_rankingstatpdf = None + self.zerolag_rankingstatpdf_url = zerolag_rankingstatpdf_url + + # + # rankingstatpdf contains the RankingStatPDF object (loaded + # from rankingstatpdf_url) used to initialize the FAPFAR + # object for on-the-fly FAP and FAR assignment. except to + # initialize the FAPFAR object it is not used for anything, + # but is retained so that it can be exposed through the web + # interface for diagnostic purposes and uploaded to gracedb + # with candidates. the extinction model is applied to + # initialize the FAPFAR object but the original is retained + # for upload to gracedb, etc. + # + + self.rankingstatpdf_url = rankingstatpdf_url + self.load_rankingstat_pdf() + + # + # most recent PSDs + # + + self.psds = {} + + # + # use state vector elements to 0 horizon distances when + # instruments are off + # + + if verbose: + print >>sys.stderr, "connecting horizon distance handlers to gates ..." + for instrument in rankingstat.instruments: + name = "%s_ht_gate" % instrument + elem = pipeline.get_by_name(name) + if elem is None: + raise ValueError("cannot find \"%s\" element for %s" % (name, instrument)) + if verbose: + print >>sys.stderr, "\tfound %s for %s" % (name, instrument) + elem.connect("start", self.horizgatehandler, (instrument, True)) + elem.connect("stop", self.horizgatehandler, (instrument, False)) + elem.set_property("emit-signals", True) + if verbose: + print >>sys.stderr, "... done connecting horizon distance handlers to gates" + + + def do_on_message(self, bus, message): + """! + Handle application-specific message types, e.g., spectrum + and checkpointing messages. + + @param bus A reference to the pipeline's bus + @param message A reference to the incoming message + """ + # + # return value of True tells parent class that we have done + # all that is needed in response to the message, and that + # it should ignore it. a return value of False means the + # parent class should do what it thinks should be done + # + if message.type == Gst.MessageType.ELEMENT: + if message.get_structure().get_name() == "spectrum": + # get the instrument, psd, and timestamp. + # the "stability" is a measure of the + # fraction of the configured averaging + # timescale used to obtain this + # measurement. + # NOTE: epoch is used for the timestamp, + # this is the middle of the most recent FFT + # interval used to obtain this PSD + instrument = message.src.get_name().split("_")[-1] + psd = pipeio.parse_spectrum_message(message) + timestamp = psd.epoch + stability = float(message.src.get_property("n-samples")) / message.src.get_property("average-samples") + + # save + self.psds[instrument] = psd + + # update horizon distance history. if the + # whitener's average is not satisfactorily + # converged, claim the horizon distance is + # 0 (equivalent to claiming the instrument + # to be off at this time). this has the + # effect of vetoing candidates from these + # times. + if stability > 0.3: + # FIXME: get canonical masses from + # the template bank bin that we're + # analyzing + horizon_distance = reference_psd.HorizonDistance(10.0, 0.85 * (psd.f0 + (len(psd.data.data) - 1) * psd.deltaF), psd.deltaF, 1.4, 1.4)(psd, 8.0)[0] + assert not (math.isnan(horizon_distance) or math.isinf(horizon_distance)) + else: + horizon_distance = 0. + self.record_horizon_distance(instrument, float(timestamp), horizon_distance) + return True + elif message.type == Gst.MessageType.APPLICATION: + if message.get_structure().get_name() == "CHECKPOINT": + self.checkpoint(LIGOTimeGPS(0, message.timestamp)) + return True + elif message.type == Gst.MessageType.EOS: + with self.lock: + # FIXME: how to choose correct timestamp? + # note that EOS messages' timestamps are + # set to CLOCK_TIME_NONE so they can't be + # used for this. + try: + # seconds + timestamp = self.rankingstat.segmentlists.extent_all()[1] + except ValueError: + # no segments + return False + # convert to integer nanoseconds + timestamp = LIGOTimeGPS(timestamp).ns() + # terminate all segments + self.segmentstracker.terminate(timestamp) + # NOTE: we do not zero the horizon + # distances in the ranking statistic at EOS + # NOTE: never return True from the EOS code-path, + # so as to not stop the parent class from doing + # EOS-related stuff + return False + return False + + + def load_rankingstat_pdf(self): + # FIXME: if the file can't be accessed the code silently + # disables FAP/FAR assignment. need to figure out when + # failure is OK and when it's not OK and put a better check + # here. + if self.rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(self.rankingstatpdf_url), os.R_OK): + _, self.rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.rankingstatpdf_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) + if self.rankingstatpdf is None: + raise ValueError("\"%s\" does not contain ranking statistic PDFs" % url) + if not self.rankingstat.template_ids <= self.rankingstatpdf.template_ids: + raise ValueError("\"%s\" is for the wrong templates") + if self.rankingstatpdf.is_healthy(): + self.fapfar = far.FAPFAR(self.rankingstatpdf.new_with_extinction()) + else: + self.fapfar = None + else: + self.rankingstatpdf = None + self.fapfar = None + + + def appsink_new_buffer(self, elem): + with self.lock: + # retrieve triggers from appsink element + buf = elem.emit("pull-sample").get_buffer() + events = [] + for i in range(buf.n_memory()): + memory = buf.peek_memory(i) + result, mapinfo = memory.map(Gst.MapFlags.READ) + assert result + # NOTE NOTE NOTE NOTE + # It is critical that the correct class' + # .from_buffer() method be used here. This + # code is interpreting the buffer's + # contents as an array of C structures and + # building instances of python wrappers of + # those structures but if the python + # wrappers are for the wrong structure + # declaration then terrible terrible things + # will happen + # NOTE NOTE NOTE NOTE + # FIXME why does mapinfo.data come out as + # an empty list on some occasions??? + if mapinfo.data: + events.extend(streamthinca.SnglInspiral.from_buffer(mapinfo.data)) + memory.unmap(mapinfo) + + # FIXME: ugly way to get the instrument + instrument = elem.get_name().split("_", 1)[0] + + # extract segment. move the segment's upper + # boundary to include all triggers. ARGH the 1 ns + # offset is needed for the respective trigger to be + # "in" the segment (segments are open from above) + # FIXME: is there another way? + buf_timestamp = LIGOTimeGPS(0, buf.pts) + buf_seg = segments.segment(buf_timestamp, max((buf_timestamp + LIGOTimeGPS(0, buf.duration),) + tuple(event.end + 0.000000001 for event in events))) + buf_is_gap = bool(buf.mini_object.flags & Gst.BufferFlags.GAP) + # sanity check that gap buffers are empty + assert not (buf_is_gap and events) + + # safety check end times. OK for end times to be + # past end of buffer, but we cannot allow triggr + # times to go backwards. they cannot precede the + # buffer's start because, below, + # streamthinca.add_events() will be told the + # trigger list is complete upto this buffer's time + # stamp. + assert all(event.end >= buf_timestamp for event in events) + # we have extended the buf_seg above to enclose the + # triggers, make sure that worked + assert all(event.end in buf_seg for event in events) + + # set all effective distances to NaN. + # gstlal_inspiral's effective distances are + # incorrect, and the PE codes require us to either + # provide correct effective distances or + # communicate to them that they are incorrect. + # they have explained that setting them to NaN is + # sufficient for the latter. + # FIXME: fix the effective distances + for event in events: + event.eff_distance = NaN + + # Find max SNR sngles + if events: + max_snr_event = max(events, key = lambda t: t.snr) + self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr)) + + # set metadata on triggers. because this uses the + # ID generator attached to the database-backed + # sngl_inspiral table, and that generator has been + # synced to the database' contents, the IDs + # assigned here will not collide with any already + # in the database + for event in events: + event.process_id = self.coincs_document.process_id + event.event_id = self.coincs_document.get_next_sngl_id() + + # update likelihood snapshot if needed + if self.likelihood_snapshot_interval is not None and (self.likelihood_snapshot_timestamp is None or buf_timestamp - self.likelihood_snapshot_timestamp >= self.likelihood_snapshot_interval): + self.likelihood_snapshot_timestamp = buf_timestamp + + # post a checkpoint message. + # FIXME: make sure this triggers + # self.snapshot_output_url() to be invoked. + # lloidparts takes care of that for now, + # but spreading the program logic around + # like that isn't a good idea, this code + # should be responsible for it somehow, no? + # NOTE: self.snapshot_output_url() writes + # the current rankingstat object to the + # location identified by .ranking_stat_output_url, + # so if that is either not set or at least + # set to a different name than + # .ranking_stat_input_url the file that has + # just been loaded above will not be + # overwritten. + self.pipeline.get_bus().post(message_new_checkpoint(self.pipeline, timestamp = buf_timestamp.ns())) + + # if a ranking statistic source url is set + # and is not the same as the file to which + # we are writing our ranking statistic data + # then overwrite rankingstat with its + # contents. the use case is online + # injection jobs that need to periodically + # grab new ranking statistic data from + # their corresponding non-injection partner + if self.ranking_stat_input_url is not None and self.ranking_stat_input_url != self.ranking_stat_output_url: + params_before = self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t + self.rankingstat, _ = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.ranking_stat_input_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) + if params_before != (self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t): + raise ValueError("'%s' contains incompatible ranking statistic configuration" % self.ranking_stat_input_url) + + # update streamthinca's ranking statistic + # data + if self.rankingstat.is_healthy(): + self.stream_thinca.rankingstat = far.OnlineFrakensteinRankingStat(self.rankingstat, self.rankingstat).finish() + else: + del self.stream_thinca.rankingstat + + # optionally get updated ranking statistic + # PDF data and enable FAP/FAR assignment + self.load_rankingstat_pdf() + + # add triggers to trigger rate record. this needs + # to be done without any cuts on coincidence, etc., + # so that the total trigger count agrees with the + # total livetime from the SNR buffers. we assume + # the density of real signals is so small that this + # count is not sensitive to their presence. NOTE: + # this is not true locally. a genuine signal, if + # loud, can significantly increase the local + # density of triggers, therefore the trigger rate + # measurement machinery must take care to average + # the rate over a sufficiently long period of time + # that the rate estimates are insensitive to the + # presence of signals. the current implementation + # averages over whole science segments. NOTE: this + # must be done before running stream thinca (below) + # so that the "how many instruments were on test" + # is aware of this buffer. + if not buf_is_gap: + self.rankingstat.denominator.triggerrates[instrument].add_ratebin(map(float, buf_seg), len(events)) + + # extract times when instruments were producing + # SNR. used to define "on instruments" for coinc + # tables among other things. will only need + # segment information for the times for which we + # have triggers, so use stream_thinca's discard + # boundary and a bisection search to clip the lists + # to reduce subsequent operation count. + discard_boundary = float(self.stream_thinca.discard_boundary) + snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) + + # times when SNR was available. used only for code + # correctness checks + one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) + # FIXME: this is needed to work around rounding + # problems in safety checks below, trying to + # compare GPS trigger times to float segment + # boundaries (the boundaries don't have enough + # precision to know if triggers near the edge are + # in or out). it would be better not to have to + # screw around like this. + one_or_more_instruments.protract(1e-3) # 1 ms + + # times when at least 2 instruments were generating + # SNR. used to sieve triggers for inclusion in the + # denominator. + two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) + # FIXME: see comment above. + two_or_more_instruments.protract(1e-3) # 1 ms + + # run stream thinca. update the parameter + # distribution data from sngls that weren't used in + # zero-lag multi-instrument coincs. NOTE: we rely + # on the arguments to .chain() being evaluated in + # left-to-right order so that .add_events() is + # evaluated before .last_coincs because the former + # initializes the latter. we skip singles + # collected during times when only one instrument + # was on. NOTE: the definition of "on" is blurry + # since we can recover triggers with end times + # outside of the available strain data, but the + # purpose of this test is simply to prevent signals + # occuring during single-detector times from + # contaminating our noise model, so it's not + # necessary for this test to be super precisely + # defined. + for event in itertools.chain(self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, snr_segments, fapfar = self.fapfar), self.stream_thinca.last_coincs.single_sngl_inspirals() if self.stream_thinca.last_coincs else ()): + if self.ranking_stat_output_url is None: + continue + assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) + if event.end in two_or_more_instruments: + self.rankingstat.denominator.increment(event) + self.coincs_document.commit() + + # update zero-lag bin counts in rankingstat. + if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: + for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): + if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: + for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): + self.rankingstat.zerolag.increment(event) + + # Cluster last coincs before recording number of zero + # lag events or sending alerts to gracedb + # FIXME Do proper clustering that saves states between + # thinca intervals and uses an independent clustering + # window. This can also go wrong if there are multiple + # events with an identical likelihood. It will just + # choose the event with the highest event id + if self.stream_thinca.last_coincs: + self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) + + # Add events to the observed likelihood histogram + # post "clustering" + # FIXME proper clustering is really needed (see + # above) + if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: + for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): + if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: + self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 + + # do GraceDB alerts and update eye candy + self.__do_gracedb_alerts() + self.eye_candy.update(self.stream_thinca.last_coincs) + + # after doing alerts, no longer need per-trigger + # SNR data for the triggers that are too old to be + # used to form candidates. to avoid searching the + # entire list of triggers each time, we stop when + # we encounter the first trigger whose SNR series + # might still be needed, save its index, and start + # the search from there next time + discard_boundary = self.stream_thinca.discard_boundary + for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): + if event.end >= discard_boundary: + break + del event.snr_time_series + + + def _record_horizon_distance(self, instrument, timestamp, horizon_distance): + """ + timestamp can be a float or a slice with float boundaries. + """ + # retrieve the horizon history for this instrument + horizon_history = self.rankingstat.numerator.horizon_history[instrument] + # NOTE: timestamps are floats here (or float slices), not + # LIGOTimeGPS. whitener should be reporting PSDs with + # integer timestamps so the timestamps are not naturally + # high precision, and, anyway, we don't need nanosecond + # precision for the horizon distance history. + horizon_history[timestamp] = horizon_distance + + + def record_horizon_distance(self, instrument, timestamp, horizon_distance): + """ + timestamp can be a float or a slice with float boundaries. + """ + with self.lock: + self._record_horizon_distance(instrument, timestamp, horizon_distance) + + + def horizgatehandler(self, elem, timestamp, (instrument, new_state)): + """! + A handler that intercepts h(t) gate state transitions to 0 horizon distances. + + @param elem A reference to the lal_gate element or None (only used for verbosity) + @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds) + @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc.. + @param new_state the state transition, must be either True or False + """ + # essentially we want to set the horizon distance record to + # 0 at both on-to-off and off-to-on transitions so that the + # horizon distance is reported to be 0 at all times within + # the "off" period. the problem is gaps due to glitch + # excision, which is done using a gate that comes after the + # whitener. the whitener sees data flowing during these + # periods and can report PSDs (and thus trigger horizon + # distances to get recorded) but because of the glitch + # excision the real sensitivity of the instrument is 0. + # there's not much we can do to prevent the PSD from being + # reported, but we try to retroactively correct the problem + # when it occurs by using a slice to re-zero the entire + # "off" interval preceding each off-to-on transition. this + # requires access to the segment data to get the timestamp + # for the start of the off interval, but we could also + # record that number in this class here for our own use if + # the entanglement with the segments tracker becomes a + # problem. + # + # FIXME: there is a potential race condition in that we're + # relying on all spurious PSD messages that we are to + # override with 0 to already have been reported when this + # method gets invoked by the whitehtsegments, i.e., we're + # relying on the whitehtsegments gate to report state + # transitions *after* any potential whitener PSD messages + # have been generated. this is normally guaranteed because + # the whitener does not generate PSD messages during gap + # intervals. + + timestamp = float(LIGOTimeGPS(0, timestamp)) + + if self.verbose: + print >>sys.stderr, "%s: %s horizon distance state transition: %s @ %s" % (elem.get_name(), instrument, ("on" if new_state else "off"), str(timestamp)) + + with self.lock: + # retrieve the horizon history for this instrument + horizon_history = self.rankingstat.numerator.horizon_history[instrument] + + if new_state: + # this if statement is checking if + # ~self.seglistdicts[segtype][instrument] + # is empty, and if not then it zeros the + # interval spanned by the last segment in + # that list, but what's done below avoids + # the explicit segment arithmetic + segtype = "whitehtsegments" + if len(self.segmentstracker.seglistdicts[segtype][instrument]) > 1: + self._record_horizon_distance(instrument, slice(float(self.segmentstracker.seglistdicts[segtype][instrument][-2][-1]), timestamp), 0.) + else: + self._record_horizon_distance(instrument, timestamp, 0.) + elif self.rankingstat.numerator.horizon_history[instrument]: + self._record_horizon_distance(instrument, slice(timestamp, None), 0.) + else: + self._record_horizon_distance(instrument, timestamp, 0.) + + + def checkpoint(self, timestamp): + """! + Checkpoint, e.g., flush segments and triggers to disk. + + @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk + """ + # FIXME: the timestamp is used to close off open segments + # and so should *not* be the timestamp of the current + # buffer, necessarily, but rather a GPS time guaranteed to + # precede any future state transitions of any segment list. + # especially if the pipeline is ever run in an "advanced + # warning" configuration using the GPS time of a trigger + # buffer would be an especially bad choice. + self.segmentstracker.flush_segments_to_disk(self.tag, timestamp) + try: + self.snapshot_output_url("%s_LLOID" % self.tag, "xml.gz", verbose = self.verbose) + except TypeError as te: + print >>sys.stderr, "Warning: couldn't build output file on checkpoint, probably there aren't any triggers: %s" % te + + + def __get_psd_xmldoc(self): + xmldoc = lal.series.make_psd_xmldoc(self.psds) + process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}, ifos = self.psds) + ligolw_process.set_process_end_time(process) + return xmldoc + + + def web_get_psd_xml(self): + with self.lock: + output = StringIO.StringIO() + ligolw_utils.write_fileobj(self.__get_psd_xmldoc(), output) + outstr = output.getvalue() + output.close() + return outstr + + + def __get_rankingstat_xmldoc(self): + # generate a ranking statistic output document. NOTE: if + # we are in possession of ranking statistic PDFs then those + # are included in the output. this allows a single + # document to be uploaded to gracedb. in an online + # analysis, those PDFs come from the marginlization process + # and represent the full distribution of ranking statistics + # across the search, and include with them the analysis' + # total observed zero-lag ranking statistic histogram --- + # everything required to re-evaluate the FAP and FAR for an + # uploaded candidate. + xmldoc = ligolw.Document() + xmldoc.appendChild(ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, ifos = self.rankingstat.instruments) + far.gen_likelihood_control_doc(xmldoc, self.rankingstat, self.rankingstatpdf) + ligolw_process.set_process_end_time(process) + return xmldoc + + + def web_get_rankingstat(self): + with self.lock: + output = StringIO.StringIO() + ligolw_utils.write_fileobj(self.__get_rankingstat_xmldoc(), output) + outstr = output.getvalue() + output.close() + return outstr + + + def __get_zerolag_rankingstatpdf_xmldoc(self): + xmldoc = ligolw.Document() + xmldoc.appendChild(ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, ifos = self.rankingstat.instruments) + far.gen_likelihood_control_doc(xmldoc, None, self.zerolag_rankingstatpdf) + ligolw_process.set_process_end_time(process) + return xmldoc + + + def web_get_zerolag_rankingstatpdf(self): + with self.lock: + output = StringIO.StringIO() + ligolw_utils.write_fileobj(self.__get_zerolag_rankingstatpdf_xmldoc(), output) + outstr = output.getvalue() + output.close() + return outstr + + + def __flush(self): + # run StreamThinca's .flush(). returns the last remaining + # non-coincident sngls. add them to the distribution. as + # above in appsink_new_buffer() we skip singles collected + # during times when only one instrument was one. + + # times when at least 2 instruments were generating SNR. + # used to sieve triggers for inclusion in the denominator. + discard_boundary = float(self.stream_thinca.discard_boundary) + snr_segments = segments.segmentlistdict((instrument, ratebinlist[lower_bound_in_seglist(ratebinlist, discard_boundary):].segmentlist()) for instrument, ratebinlist in self.rankingstat.denominator.triggerrates.items()) + one_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 1) + # FIXME: see comment in appsink_new_buffer() + one_or_more_instruments.protract(1e-3) # 1 ms + two_or_more_instruments = segmentsUtils.vote(snr_segments.values(), 2) + # FIXME: see comment in appsink_new_buffer() + two_or_more_instruments.protract(1e-3) # 1 ms + + for event in self.stream_thinca.flush(self.coincs_document.xmldoc, self.coincs_document.process_id, snr_segments, fapfar = self.fapfar): + if self.ranking_stat_output_url is None: + continue + assert event.end in one_or_more_instruments, "trigger at time (%s) with no SNR (%s)" % (str(event.end), str(one_or_more_instruments)) + if event.end in two_or_more_instruments: + self.rankingstat.denominator.increment(event) + self.coincs_document.commit() + + # update zero-lag bin counts in rankingstat. + if self.stream_thinca.last_coincs and self.ranking_stat_output_url is not None: + for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): + if coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: + for event in self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id): + self.rankingstat.zerolag.increment(event) + + # Cluster last coincs before recording number of zero + # lag events or sending alerts to gracedb + # FIXME Do proper clustering that saves states between + # thinca intervals and uses an independent clustering + # window. This can also go wrong if there are multiple + # events with an identical likelihood. It will just + # choose the event with the highest event id + if self.stream_thinca.last_coincs: + self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.items(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)]) + + # Add events to the observed likelihood histogram post + # "clustering" + # FIXME proper clustering is really needed (see above) + if self.stream_thinca.last_coincs and self.zerolag_rankingstatpdf is not None: + for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items(): + if coinc_event.likelihood is not None and coinc_event.time_slide_id in self.stream_thinca.last_coincs.zero_lag_time_slide_ids: + self.zerolag_rankingstatpdf.zero_lag_lr_lnpdf.count[coinc_event.likelihood,] += 1 + + # do GraceDB alerts + self.__do_gracedb_alerts() + + # after doing alerts, no longer need per-trigger SNR data + # for the triggers that are too old to be used to form + # candidates. to avoid searching the entire list of + # triggers each time, we stop when we encounter the first + # trigger whose SNR series might still be needed, save its + # index, and start the search from there next time + discard_boundary = self.stream_thinca.discard_boundary + for self.snr_time_series_cleanup_index, event in enumerate(self.coincs_document.sngl_inspiral_table[self.snr_time_series_cleanup_index:], self.snr_time_series_cleanup_index): + if event.end >= discard_boundary: + break + del event.snr_time_series + + # garbage collect last_coincs + # FIXME: this should not be needed. something is wrong. + # if this is needed, then why don't we have to garbage + # collect everything ourselves? + self.stream_thinca.last_coincs = {} + + + def __do_gracedb_alerts(self): + # check for no-op + if self.rankingstatpdf is None or not self.rankingstatpdf.is_healthy(): + return + # FIXME: this is redundant with checks inside + # gracedbwrapper, which is where it logically belongs, but + # we need it here to avoid triggering the ValuError that + # follows when alerts are disabled. think of some + # different way to organize the safety checks to avoid the + # need for this redundancy + if self.gracedbwrapper.far_threshold is None: + return + + # sanity check + if self.fapfar is None: + raise ValueError("gracedb alerts cannot be enabled without fap/far data") + + # do alerts + self.gracedbwrapper.__do_alerts(self.stream_thinca.last_coincs, self.psds, self.__get_rankingstat_xmldoc) + + + def web_get_sngls_snr_threshold(self): + with self.lock: + if self.stream_thinca.sngls_snr_threshold is not None: + yield "snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold + else: + yield "snr=\n" + + + def web_set_sngls_snr_threshold(self): + try: + with self.lock: + snr_threshold = bottle.request.forms["snr"] + if snr_threshold: + self.stream_thinca.sngls_snr_threshold = float(snr_threshold) + yield "OK: snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold + else: + self.stream_thinca.sngls_snr_threshold = None + yield "OK: snr=\n" + except: + yield "error\n" + + + def __write_output_url(self, url = None, verbose = False): + self.__flush() + if url is not None: + self.coincs_document.url = url + with self.segmentstracker.lock: + self.coincs_document.write_output_url(seglistdicts = self.segmentstracker.seglistdicts, verbose = verbose) + # can't be used anymore + del self.coincs_document + + + def __write_ranking_stat_url(self, url, description, snapshot = False, verbose = False): + # write the ranking statistic file. + ligolw_utils.write_url(self.__get_rankingstat_xmldoc(), url, gz = (url or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None) + # Snapshots get their own custom file and path + if snapshot: + fname = self.segmentstracker.T050017_filename(description + '_DISTSTATS', 'xml.gz') + shutil.copy(ligolw_utils.local_path_from_url(url), os.path.join(subdir_from_T050017_filename(fname), fname)) + + + def write_output_url(self, url = None, description = "", verbose = False): + with self.lock: + if self.ranking_stat_output_url is not None: + self.__write_ranking_stat_url(self.ranking_stat_output_url, description, verbose = verbose) + self.__write_output_url(url = url, verbose = verbose) + + + def snapshot_output_url(self, description, extension, verbose = False): + with self.lock: + fname = self.segmentstracker.T050017_filename(description, extension) + fname = os.path.join(subdir_from_T050017_filename(fname), fname) + if self.ranking_stat_output_url is not None: + self.__write_ranking_stat_url(self.ranking_stat_output_url, description, snapshot = True, verbose = verbose) + if self.zerolag_rankingstatpdf_url is not None: + ligolw_utils.write_url(self.__get_zerolag_rankingstatpdf_xmldoc(), self.zerolag_rankingstatpdf_url, gz = (self.zerolag_rankingstatpdf_url or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None) + coincs_document = self.coincs_document.get_another() + self.__write_output_url(url = fname, verbose = verbose) + self.coincs_document = coincs_document + self.snr_time_series_cleanup_index = 0 diff --git a/gstlal-inspiral/python/lloidparts.py b/gstlal-inspiral/python/lloidparts.py index ba3f91d0e3..e286656177 100644 --- a/gstlal-inspiral/python/lloidparts.py +++ b/gstlal-inspiral/python/lloidparts.py @@ -68,10 +68,6 @@ import math import numpy -import os -import StringIO -import sys -import warnings import gi @@ -82,21 +78,9 @@ Gst.init(None) from glue import iterutils -from ligo import segments -from glue.ligolw import ligolw -from glue.ligolw import utils as ligolw_utils -from glue.ligolw.utils import segments as ligolw_segments -from glue.ligolw.utils import process as ligolw_process -from gstlal import bottle from gstlal import datasource -from gstlal import inspiral from gstlal import multirate_datasource -from gstlal import pipeio from gstlal import pipeparts -from gstlal import reference_psd -from gstlal import simplehandler -import lal -from lal import LIGOTimeGPS # @@ -181,406 +165,6 @@ def mkcontrolsnksrc(pipeline, rate, verbose = False, suffix = None, control_peak return snk, src -class Handler(simplehandler.Handler): - """! - A subclass of simplehandler.Handler to be used with e.g., - gstlal_inspiral - - Implements additional message handling for dealing with spectrum - messages and checkpoints for the online analysis including periodic - dumps of segment information, trigger files and background - distribution statistics. - """ - def __init__(self, mainloop, pipeline, dataclass, instruments, tag = "", segment_history_duration = LIGOTimeGPS(2592000), verbose = False): - """! - @param mainloop The main application's event loop - @param pipeline The gstreamer pipeline that is being controlled by this handler - @param dataclass An inspiral.Data class instance - @param tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag - @param verbose Be verbose - """ - super(Handler, self).__init__(mainloop, pipeline) - - self.dataclass = dataclass - - self.tag = tag - self.verbose = verbose - - # setup segment list collection from gates - # - # FIXME: knowledge of what gates are present in what - # configurations, what they are called, etc., somehow needs to live - # with the code that constructs the gates - # - # FIXME: in the offline pipeline, state vector segments - # don't get recorded. however, except for the h(t) gate - # segments these are all inputs to the pipeline so it - # probably doesn't matter. nevertheless, they maybe should - # go into the event database for completeness of that - # record, or maybe not because it could result in a lot of - # duplication of on-disk data. who knows. think about it. - gate_suffix = { - # FIXME uncomment the framesegments line once the - # online analysis has a frame segments gate - #"framesegments": "frame_segments_gate", - "statevectorsegments": "state_vector_gate", - "whitehtsegments": "ht_gate" - } - - # dictionary mapping segtype to segmentlist dictionary - # mapping instrument to segment list - self.seglistdicts = dict((segtype, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in instruments)) for segtype in gate_suffix) - # hook the Data class's livetime record keeping into ours - # so that all segments come here and the Data class has all - # of ours - # FIXME: don't do this, get rid of the Data class - dataclass.seglistdicts = self.seglistdicts - - # create a copy to keep track of recent segment history - self.recent_segment_histories = self.seglistdicts.copy() - self.segment_history_duration = segment_history_duration - - # iterate over segment types and instruments, look for the - # gate element that should provide those segments, and - # connect handlers to collect the segments - if verbose: - print >>sys.stderr, "connecting segment handlers to gates ..." - for segtype, seglistdict in self.seglistdicts.items(): - for instrument in seglistdict: - try: - name = "%s_%s" % (instrument, gate_suffix[segtype]) - except KeyError: - # this segtype doesn't come from - # gate elements - continue - elem = self.pipeline.get_by_name(name) - if elem is None: - # ignore missing gate elements - if verbose: - print >>sys.stderr, "\tcould not find %s for %s '%s'" % (name, instrument, segtype) - continue - if verbose: - print >>sys.stderr, "\tfound %s for %s '%s'" % (name, instrument, segtype) - elem.connect("start", self.gatehandler, (segtype, instrument, "on")) - elem.connect("stop", self.gatehandler, (segtype, instrument, "off")) - elem.set_property("emit-signals", True) - if verbose: - print >>sys.stderr, "... done connecting segment handlers to gates" - - # most recent spectrum reported by each whitener - self.psds = {} - bottle.route("/psds.xml")(self.web_get_psd_xml) - - # segment lists - bottle.route("/segments.xml")(self.web_get_segments_xml) - bottle.route("/cumulative_segments.xml")(self.web_get_recent_segment_history_xml) - - def do_on_message(self, bus, message): - """! - Handle application-specific message types, e.g., spectrum - and checkpointing messages. - - @param bus A reference to the pipeline's bus - @param message A reference to the incoming message - """ - # - # return value of True tells parent class that we have done - # all that is needed in response to the message, and that - # it should ignore it. a return value of False means the - # parent class should do what it thinks should be done - # - if message.type == Gst.MessageType.ELEMENT: - if message.get_structure().get_name() == "spectrum": - # get the instrument, psd, and timestamp. - # the "stability" is a measure of the - # fraction of the configured averaging - # timescale used to obtain this - # measurement. - # NOTE: epoch is used for the timestamp, - # this is the middle of the most recent FFT - # interval used to obtain this PSD - instrument = message.src.get_name().split("_")[-1] - psd = pipeio.parse_spectrum_message(message) - timestamp = psd.epoch - stability = float(message.src.get_property("n-samples")) / message.src.get_property("average-samples") - - # save - self.psds[instrument] = psd - - # update horizon distance history. if the - # whitener's average is not satisfactorily - # converged, claim the horizon distance is - # 0 (equivalent to claiming the instrument - # to be off at this time). this has the - # effect of vetoing candidates from these - # times. - if stability > 0.3: - # FIXME: get canonical masses from - # the template bank bin that we're - # analyzing - horizon_distance = reference_psd.HorizonDistance(10.0, 0.85 * (psd.f0 + (len(psd.data.data) - 1) * psd.deltaF), psd.deltaF, 1.4, 1.4)(psd, 8.0)[0] - assert not (math.isnan(horizon_distance) or math.isinf(horizon_distance)) - else: - horizon_distance = 0. - self.record_horizon_distance(instrument, float(timestamp), horizon_distance) - return True - elif message.type == Gst.MessageType.APPLICATION: - if message.get_structure().get_name() == "CHECKPOINT": - self.checkpoint(LIGOTimeGPS(0, message.timestamp)) - return True - elif message.type == Gst.MessageType.EOS: - with self.dataclass.lock: - # FIXME: how to choose correct timestamp? - # note that EOS messages' timestamps are - # set to CLOCK_TIME_NONE so they can't be - # used for this. - try: - # seconds - timestamp = self.dataclass.rankingstat.segmentlists.extent_all()[1] - except ValueError: - # no segments - return False - # convert to integer nanoseconds - timestamp = LIGOTimeGPS(timestamp).ns() - # terminate all segments - for segtype, seglistdict in self.seglistdicts.items(): - for instrument in seglistdict: - self._gatehandler(None, timestamp, (segtype, instrument, "off")) - # NOTE: never return True from the EOS code-path, - # so as to not stop the parent class to do - # EOS-related stuff - return False - return False - - def _record_horizon_distance(self, instrument, timestamp, horizon_distance): - """ - timestamp can be a float or a slice with float boundaries. - """ - # retrieve the horizon history for this instrument - horizon_history = self.dataclass.rankingstat.numerator.horizon_history[instrument] - # NOTE: timestamps are floats here (or float slices), not - # LIGOTimeGPS. whitener should be reporting PSDs with - # integer timestamps so the timestamps are not naturally - # high precision, and, anyway, we don't need nanosecond - # precision for the horizon distance history. - horizon_history[timestamp] = horizon_distance - - def record_horizon_distance(self, instrument, timestamp, horizon_distance): - """ - timestamp can be a float or a slice with float boundaries. - """ - with self.dataclass.lock: - self._record_horizon_distance(instrument, timestamp, horizon_distance) - - def checkpoint(self, timestamp): - """! - Checkpoint, e.g., flush segments and triggers to disk. - - @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk - """ - # FIXME: the timestamp is used to close off open segments - # and so should *not* be the timestamp of the current - # buffer, necessarily, but rather a GPS time guaranteed to - # precede any future state transitions of any segment list. - # especially if the pipeline is ever run in an "advanced - # warning" configuration using the GPS time of a trigger - # buffer would be an especially bad choice. - self.flush_segments_to_disk(timestamp) - try: - self.dataclass.snapshot_output_url("%s_LLOID" % self.tag, "xml.gz", verbose = self.verbose) - except TypeError as te: - print >>sys.stderr, "Warning: couldn't build output file on checkpoint, probably there aren't any triggers: %s" % te - # Resync seglistdicts in dataclass and here - # FIXME: don't do this, get rid of the Data class - self.dataclass.seglistdicts = self.seglistdicts - - def flush_segments_to_disk(self, timestamp): - """! - Flush segments to disk, e.g., when checkpointing or shutting - down an online pipeline. - - @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk - """ - with self.dataclass.lock: - # make a copy of the current segmentlistdicts - seglistdicts = dict((key, value.copy()) for key, value in self.seglistdicts.items()) - - # keep everything before timestamp in the current - # segmentlistdicts. keep everything after - # timestamp in the copy. we need to apply the cut - # this way around so that the T050017 filename - # constructed below has the desired start and - # duration - for seglistdict in self.seglistdicts.values(): - seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(timestamp, segments.PosInfinity)])) - for seglistdict in seglistdicts.values(): - seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(segments.NegInfinity, timestamp)])) - - # write the current (clipped) segmentlistdicts to - # disk - fname = self.dataclass.T050017_filename("%s_SEGMENTS" % self.tag, "xml.gz") - fname = os.path.join(inspiral.subdir_from_T050017_filename(fname), fname) - ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None) - - # continue with the (clipped) copy - self.seglistdicts = seglistdicts - - def _gatehandler(self, elem, timestamp, (segtype, instrument, new_state)): - # FIXME: this method could conceivably be patched to know - # what to do with state transitions that appear to have - # gone backwards, i.e., it could correct segments that are - # already in the segmentlist. this might be one way to - # sort out the problem of segment state snap-shotting code - # artificially claiming segments to be on beyond the time - # when they should stop. - timestamp = LIGOTimeGPS(0, timestamp) # timestamp is in nanoseconds - - if self.verbose and elem is not None: - print >>sys.stderr, "%s: %s '%s' state transition: %s @ %s" % (elem.get_name(), instrument, segtype, new_state, str(timestamp)) - - if new_state == "off": - # record end of segment - self.seglistdicts[segtype][instrument] -= segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) - # set the horizon distance history to 0 at - # on-to-off transitions of whitened h(t) - if segtype == "whitehtsegments": - if self.dataclass.rankingstat.numerator.horizon_history[instrument]: - self._record_horizon_distance(instrument, slice(float(timestamp), None), 0.) - else: - self._record_horizon_distance(instrument, float(timestamp), 0.) - elif new_state == "on": - # record start of new segment - self.seglistdicts[segtype][instrument] += segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) - # place a 0 in the horizon distance history at the - # time of an off-to-on transition so that the - # horizon distance queries in the interval of off - # state are certain to return 0. - # - # FIXME: we're relying on the whitehtsegments gate - # to report state transitions *after* any potential - # whitener PSD messages have been generated. this - # is normally guaranteed because the whitener does - # not generate PSD messages during gap intervals. - # the problem is gaps due to glitch excission, - # which is done using a gate that comes after the - # whitener. the whitener might generate a PSD - # message that inserts a horizon distance into the - # history precisely when the glitch excission has - # turned off the instrument and the sensitivity - # should be 0. we're hoping that us zeroing the - # preceding "off" period will erase those entries, - # but that won't work if they get inserted after we - # do this stuff here. it's not the end of the - # world if we get this wrong, it's just one way in - # which the horizon distance history could be - # somewhat inaccurate. - if segtype == "whitehtsegments": - # this if statement is checking if - # ~self.seglistdicts[segtype][instrument] - # is empty, and if not then it zeros the - # interval spanned by the last segment in - # that list, but what's done below avoids - # the explicit segment arithmetic - if len(self.seglistdicts[segtype][instrument]) > 1: - self._record_horizon_distance(instrument, slice(float(self.seglistdicts[segtype][instrument][-2][-1]), float(timestamp)), 0.) - else: - self._record_horizon_distance(instrument, float(timestamp), 0.) - else: - assert False, "impossible new_state '%s'" % new_state - - def gatehandler(self, elem, timestamp, (segtype, instrument, new_state)): - """! - A handler that intercepts gate state transitions. This can set - the "on" segments for each detector - - @param elem A reference to the lal_gate element or None (only used for verbosity) - @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds) - @param segtype the class of segments this gate is defining, e.g., "datasegments", etc.. - @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc.. - @param new_state the state transition, must be either "on" or "off" - """ - with self.dataclass.lock: - self._gatehandler(elem, timestamp, (segtype, instrument, new_state)) - - def gen_segments_xmldoc(self): - """! - A method to output the segment list in a valid ligolw xml - format. - """ - xmldoc = ligolw.Document() - xmldoc.appendChild(ligolw.LIGO_LW()) - process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) - with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: - for segtype, seglistdict in self.seglistdicts.items(): - ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") - ligolw_process.set_process_end_time(process) - return xmldoc - - def web_get_segments_xml(self): - """! - provide a bottle route to get segment information via a url - """ - with self.dataclass.lock: - output = StringIO.StringIO() - ligolw_utils.write_fileobj(self.gen_segments_xmldoc(), output) - outstr = output.getvalue() - output.close() - return outstr - - def update_recent_segment_history(self): - """! - A method to update the recent segment histories - """ - current_gps_time = lal.GPSTimeNow() - interval_to_discard = segments.segmentlist((segments.segment(segments.NegInfinity, current_gps_time - self.segment_history_duration),)) - for segtype, seglistdict in self.recent_segment_histories.items(): - seglistdict.extend(self.seglistdicts[segtype]) - seglistdict.coalesce() - for seglist in seglistdict.values(): - seglist -= interval_to_discard - - def gen_recent_segment_history_xmldoc(self): - """! - Construct and return a LIGOLW XML tree containing the - recent segment histories. - """ - self.update_recent_segment_history() - xmldoc = ligolw.Document() - xmldoc.appendChild(ligolw.LIGO_LW()) - process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) - with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: - for segtype, seglistdict in self.recent_segment_histories.items(): - ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") - ligolw_process.set_process_end_time(process) - return xmldoc - - def web_get_recent_segment_history_xml(self): - """! - provide a bottle route to get recent segment history - information via a url - """ - with self.dataclass.lock: - output = StringIO.StringIO() - ligolw_utils.write_fileobj(self.gen_recent_segment_history_xmldoc(), output) - outstr = output.getvalue() - output.close() - return outstr - - def gen_psd_xmldoc(self): - xmldoc = lal.series.make_psd_xmldoc(self.psds) - process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}, ifos = self.psds) - ligolw_process.set_process_end_time(process) - return xmldoc - - def web_get_psd_xml(self): - with self.dataclass.lock: - output = StringIO.StringIO() - ligolw_utils.write_fileobj(self.gen_psd_xmldoc(), output) - outstr = output.getvalue() - output.close() - return outstr - - ## # _Gstreamer graph describing this function:_ # diff --git a/gstlal/python/reference_psd.py b/gstlal/python/reference_psd.py index bac931cd18..b6afca5f9c 100644 --- a/gstlal/python/reference_psd.py +++ b/gstlal/python/reference_psd.py @@ -199,14 +199,6 @@ def measure_psd(gw_data_source_info, instrument, rate, psd_fft_length = 8, verbo return handler.psd -def write_psd_fileobj(fileobj, psddict, gz = False): - """ - Wrapper around make_psd_xmldoc() to write the XML document directly - to a Python file object. - """ - ligolw_utils.write_fileobj(lal.series.make_psd_xmldoc(psddict), fileobj, gz = gz) - - def write_psd(filename, psddict, verbose = False, trap_signals = None): """ Wrapper around make_psd_xmldoc() to write the XML document directly -- GitLab