Skip to content
Snippets Groups Projects
Commit 6af8ba15 authored by Patrick Godwin's avatar Patrick Godwin Committed by ChiWai Chan
Browse files

Stream: add functionality to get elem by name, post bus messages. update...

Stream: add functionality to get elem by name, post bus messages. update lloidhandler.py to leverage this
parent e0a1aa68
No related branches found
No related tags found
No related merge requests found
......@@ -105,14 +105,6 @@ from ligo.segments import utils as segmentsUtils
#
def message_new_checkpoint(src, timestamp = None):
s = Gst.Structure.new_empty("CHECKPOINT")
message = Gst.Message.new_application(src, s)
if timestamp is not None:
message.timestamp = timestamp
return message
def subdir_from_T050017_filename(fname):
path = str(CacheEntry.from_T050017(fname).segment[0])[:5]
try:
......@@ -132,7 +124,7 @@ def subdir_from_T050017_filename(fname):
class EyeCandy(object):
def __init__(self, instruments, kafka_server, analysis_tag, job_tag, pipeline, segmentstracker, latencytracker):
def __init__(self, instruments, kafka_server, analysis_tag, job_tag, stream, segmentstracker, latencytracker):
self.kafka_server = kafka_server
self.analysis = analysis_tag
self.tag = job_tag
......@@ -157,7 +149,7 @@ class EyeCandy(object):
self.strain = {}
for instrument in instruments:
name = "%s_strain_audiorate" % instrument
elem = pipeline.get_by_name(name)
elem = stream.get_element_by_name(name)
if elem is not None:
self.strain[instrument] = elem
self.time_since_last_state = None
......@@ -401,7 +393,7 @@ class EyeCandy(object):
class SegmentsTracker(object):
def __init__(self, pipeline, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False):
def __init__(self, stream, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False):
self.lock = threading.Lock()
self.verbose = verbose
......@@ -442,7 +434,7 @@ class SegmentsTracker(object):
# gate element that should provide those segments, and
# connect handlers to collect the segments
if verbose:
print(sys.stderr, "connecting segment handlers to gates ...", file=sys.stderr)
print("connecting segment handlers to gates ...", file=sys.stderr)
for segtype, seglistdict in self.seglistdicts.items():
for instrument in seglistdict:
try:
......@@ -451,7 +443,7 @@ class SegmentsTracker(object):
# this segtype doesn't come from
# gate elements
continue
elem = pipeline.get_by_name(name)
elem = stream.get_element_by_name(name)
if elem is None:
# ignore missing gate elements
if verbose:
......@@ -665,7 +657,7 @@ class SegmentsTracker(object):
#
class LatencyTracker(object):
def __init__(self, pipeline, instruments, verbose = False):
def __init__(self, stream, instruments, verbose = False):
self.lock = threading.Lock()
self.verbose = verbose
......@@ -683,7 +675,7 @@ class LatencyTracker(object):
print(sys.stderr, "connecting pipeline latency handlers ...", file=sys.stderr)
for stage in self.pipeline_history.keys():
for instrument in self.pipeline_history[stage].keys():
elem = pipeline.get_by_name("%s_%s_latency" % (instrument, stage))
elem = stream.get_element_by_name("%s_%s_latency" % (instrument, stage))
if elem is None:
if verbose:
print("\tcould not find %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr)
......@@ -777,14 +769,14 @@ class LLOIDTracker:
# setup segment list collection from gates
#
self.segmentstracker = SegmentsTracker(stream.pipeline, rankingstat.instruments, verbose = verbose)
self.segmentstracker = SegmentsTracker(stream, rankingstat.instruments, verbose = verbose)
#
# setup latency collection from pipeline if requested
#
if track_latency:
self.latencytracker = LatencyTracker(stream.pipeline, rankingstat.instruments, verbose = verbose)
self.latencytracker = LatencyTracker(stream, rankingstat.instruments, verbose = verbose)
else:
self.latencytracker = None
......@@ -792,7 +784,7 @@ class LLOIDTracker:
# set up metric collection
#
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, stream.pipeline, self.segmentstracker, self.latencytracker)
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, stream, self.segmentstracker, self.latencytracker)
# FIXME: detangle this
self.eye_candy.lock = self.lock
......@@ -943,7 +935,7 @@ class LLOIDTracker:
self.absent_instruments = set()
for instrument in rankingstat.instruments:
name = "%s_ht_gate" % instrument
elem = self.stream.pipeline.get_by_name(name)
elem = self.stream.get_element_by_name(name)
if elem is None:
# FIXME: if there is no data for an
# instrument for which we have ranking
......@@ -1115,7 +1107,7 @@ class LLOIDTracker:
# .ranking_stat_input_url the file that has
# just been loaded above will not be
# overwritten.
self.stream.pipeline.get_bus().post(message_new_checkpoint(self.stream.pipeline, timestamp = buf.t0.ns()))
self.stream.post_message("CHECKPOINT", timestamp = buf.t0.ns())
# if a ranking statistic source url is set
# and is not the same as the file to which
......
......@@ -42,9 +42,7 @@ import pluggy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstAudio', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gi.repository import GstAudio
from gi.repository import GObject, Gst, GstAudio
from lal import LIGOTimeGPS
......@@ -232,6 +230,16 @@ class Stream:
if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"pipeline failed to enter {state.value_name}")
def get_element_by_name(self, name):
return self.pipeline.get_by_name(name)
def post_message(self, msg_name, timestamp=None):
s = Gst.Structure.new_empty(msg_name)
message = Gst.Message.new_application(self.pipeline, s)
if timestamp:
message.timestamp = timestamp
self.pipeline.get_bus().post(message)
def __getitem__(self, key):
return self.__class__(
name=self.name,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment