Commit 21c9850a authored by Madeline Wade's avatar Madeline Wade Committed by Madeline Wade

Added additional latency montioring to calibration pipeline. Exposed timestamp...

Added additional latency montioring to calibration pipeline. Exposed timestamp property in lal_latency.
parent df887d93
Pipeline #73068 failed with stages
in 1 minute and 7 seconds
......@@ -35,6 +35,7 @@ Gst.init(None)
from gstlal import simplehandler
from lal import LIGOTimeGPS
from kafka import errors
#
# =============================================================================
......@@ -73,11 +74,14 @@ class Handler(simplehandler.Handler):
self.kafka_server = kafka_server
if self.kafka_server is not None:
from kafka import KafkaProducer
self.producer = KafkaProducer(
bootstrap_servers = [kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'),
)
try:
self.producer = KafkaProducer(
bootstrap_servers = [kafka_server],
key_serializer = lambda m: json.dumps(m).encode('utf-8'),
value_serializer = lambda m: json.dumps(m).encode('utf-8'),
)
except errors.NoBrokersAvaialble:
self.producer = None
def appsink_statevector_new_buffer(self, elem, ifo, bitmaskdict):
with self.lock:
......@@ -91,9 +95,19 @@ class Handler(simplehandler.Handler):
state = int(state)
buf.unmap(mapinfo)
monitor_dict = {}
monitor_dict['time'] = float(buf_timestamp)
for key, bitmask in bitmaskdict.items():
monitor_dict[key] = state & bitmask
if self.kafka_server is not None:
if self.kafka_server is not None and self.producer is not None:
self.producer.send("%s_statevector_bit_check" % ifo, value = monitor_dict)
return Gst.FlowReturn.OK
def latency_new_buffer(self, elem, param):
with self.lock:
latency = elem.get_property("current-latency")
name = elem.get_property("name")
time = elem.get_property("timestamp")
if self.kafka_server is not None and self.producer is not None:
self.producer.send("%s_latency" % (name.split("_")[0]), value = {"time": time, name: latency})
return Gst.FlowReturn.OK
......@@ -108,6 +108,7 @@ static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE(
enum property {
ARG_SILENT = 1,
ARG_CURRENT_LATENCY,
ARG_TIMESTAMP,
ARG_FAKE
};
......@@ -153,6 +154,7 @@ static GstFlowReturn transform_ip(GstBaseTransform *trans, GstBuffer *buf)
gdouble buffer_time = (double) GST_TIME_AS_SECONDS(GST_BUFFER_PTS(buf));
element->current_latency = current_time - buffer_time;
element->timestamp = buffer_time;
/* Tell the world about the latency by updating the latency property */
GST_LOG_OBJECT(element, "Just computed new latency");
......@@ -206,6 +208,10 @@ static void set_property(GObject *object, enum property prop_id, const GValue *v
element->current_latency = g_value_get_double(value);
break;
case ARG_TIMESTAMP:
element->timestamp = g_value_get_double(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
......@@ -236,6 +242,10 @@ static void get_property(GObject *object, enum property prop_id, GValue *value,
g_value_set_double(value, element->current_latency);
break;
case ARG_TIMESTAMP:
g_value_set_double(value, element->timestamp);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
......@@ -282,6 +292,14 @@ static void gstlal_latency_class_init(GSTLALLatencyClass *klass)
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS
);
properties[ARG_TIMESTAMP] = g_param_spec_double(
"timestamp",
"Timestamp",
"The current buffer timestamp",
-G_MAXDOUBLE, G_MAXDOUBLE, 0.0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS
);
g_object_class_install_property(
gobject_class,
ARG_SILENT,
......@@ -292,6 +310,11 @@ static void gstlal_latency_class_init(GSTLALLatencyClass *klass)
ARG_CURRENT_LATENCY,
properties[ARG_CURRENT_LATENCY]
);
g_object_class_install_property(
gobject_class,
ARG_TIMESTAMP,
properties[ARG_TIMESTAMP]
);
gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&src_factory));
gst_element_class_add_pad_template(element_class, gst_static_pad_template_get(&sink_factory));
......
......@@ -79,6 +79,7 @@ struct _GSTLALLatency {
/* properties */
gboolean silent;
gdouble current_latency;
gdouble timestamp;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment