diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py index 2dec78607c09c003328aa155bccff4657e4860fe..cc1b2dc8baddd52dac9b88274a6fbb57010ee0ff 100644 --- a/gstlal-inspiral/python/lloidhandler.py +++ b/gstlal-inspiral/python/lloidhandler.py @@ -105,14 +105,6 @@ from ligo.segments import utils as segmentsUtils # -def message_new_checkpoint(src, timestamp = None): - s = Gst.Structure.new_empty("CHECKPOINT") - message = Gst.Message.new_application(src, s) - if timestamp is not None: - message.timestamp = timestamp - return message - - def subdir_from_T050017_filename(fname): path = str(CacheEntry.from_T050017(fname).segment[0])[:5] try: @@ -132,7 +124,7 @@ def subdir_from_T050017_filename(fname): class EyeCandy(object): - def __init__(self, instruments, kafka_server, analysis_tag, job_tag, pipeline, segmentstracker, latencytracker): + def __init__(self, instruments, kafka_server, analysis_tag, job_tag, stream, segmentstracker, latencytracker): self.kafka_server = kafka_server self.analysis = analysis_tag self.tag = job_tag @@ -157,7 +149,7 @@ class EyeCandy(object): self.strain = {} for instrument in instruments: name = "%s_strain_audiorate" % instrument - elem = pipeline.get_by_name(name) + elem = stream.get_element_by_name(name) if elem is not None: self.strain[instrument] = elem self.time_since_last_state = None @@ -401,7 +393,7 @@ class EyeCandy(object): class SegmentsTracker(object): - def __init__(self, pipeline, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False): + def __init__(self, stream, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False): self.lock = threading.Lock() self.verbose = verbose @@ -442,7 +434,7 @@ class SegmentsTracker(object): # gate element that should provide those segments, and # connect handlers to collect the segments if verbose: - print(sys.stderr, "connecting segment handlers to gates ...", file=sys.stderr) + print("connecting segment handlers to gates ...", file=sys.stderr) for segtype, seglistdict in self.seglistdicts.items(): for instrument in seglistdict: try: @@ -451,7 +443,7 @@ class SegmentsTracker(object): # this segtype doesn't come from # gate elements continue - elem = pipeline.get_by_name(name) + elem = stream.get_element_by_name(name) if elem is None: # ignore missing gate elements if verbose: @@ -665,7 +657,7 @@ class SegmentsTracker(object): # class LatencyTracker(object): - def __init__(self, pipeline, instruments, verbose = False): + def __init__(self, stream, instruments, verbose = False): self.lock = threading.Lock() self.verbose = verbose @@ -683,7 +675,7 @@ class LatencyTracker(object): print(sys.stderr, "connecting pipeline latency handlers ...", file=sys.stderr) for stage in self.pipeline_history.keys(): for instrument in self.pipeline_history[stage].keys(): - elem = pipeline.get_by_name("%s_%s_latency" % (instrument, stage)) + elem = stream.get_element_by_name("%s_%s_latency" % (instrument, stage)) if elem is None: if verbose: print("\tcould not find %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr) @@ -777,14 +769,14 @@ class LLOIDTracker: # setup segment list collection from gates # - self.segmentstracker = SegmentsTracker(stream.pipeline, rankingstat.instruments, verbose = verbose) + self.segmentstracker = SegmentsTracker(stream, rankingstat.instruments, verbose = verbose) # # setup latency collection from pipeline if requested # if track_latency: - self.latencytracker = LatencyTracker(stream.pipeline, rankingstat.instruments, verbose = verbose) + self.latencytracker = LatencyTracker(stream, rankingstat.instruments, verbose = verbose) else: self.latencytracker = None @@ -792,7 +784,7 @@ class LLOIDTracker: # set up metric collection # - self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, stream.pipeline, self.segmentstracker, self.latencytracker) + self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, stream, self.segmentstracker, self.latencytracker) # FIXME: detangle this self.eye_candy.lock = self.lock @@ -943,7 +935,7 @@ class LLOIDTracker: self.absent_instruments = set() for instrument in rankingstat.instruments: name = "%s_ht_gate" % instrument - elem = self.stream.pipeline.get_by_name(name) + elem = self.stream.get_element_by_name(name) if elem is None: # FIXME: if there is no data for an # instrument for which we have ranking @@ -1115,7 +1107,7 @@ class LLOIDTracker: # .ranking_stat_input_url the file that has # just been loaded above will not be # overwritten. - self.stream.pipeline.get_bus().post(message_new_checkpoint(self.stream.pipeline, timestamp = buf.t0.ns())) + self.stream.post_message("CHECKPOINT", timestamp = buf.t0.ns()) # if a ranking statistic source url is set # and is not the same as the file to which diff --git a/gstlal/python/stream.py b/gstlal/python/stream.py index f83571d08f14853e23314debea1dab5858d6c709..19dbf7a49a665cdcdef45d860691f13a6f80b354 100644 --- a/gstlal/python/stream.py +++ b/gstlal/python/stream.py @@ -42,9 +42,7 @@ import pluggy import gi gi.require_version('Gst', '1.0') gi.require_version('GstAudio', '1.0') -from gi.repository import GObject -from gi.repository import Gst -from gi.repository import GstAudio +from gi.repository import GObject, Gst, GstAudio from lal import LIGOTimeGPS @@ -232,6 +230,16 @@ class Stream: if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE: raise RuntimeError(f"pipeline failed to enter {state.value_name}") + def get_element_by_name(self, name): + return self.pipeline.get_by_name(name) + + def post_message(self, msg_name, timestamp=None): + s = Gst.Structure.new_empty(msg_name) + message = Gst.Message.new_application(self.pipeline, s) + if timestamp: + message.timestamp = timestamp + self.pipeline.get_bus().post(message) + def __getitem__(self, key): return self.__class__( name=self.name,