Maintenance will be performed on git.ligo.org, chat.ligo.org, containers.ligo.org, and docs.ligo.org on Tuesday 22nd September 2020 starting at approximately 9am MST.It is expected to take around 15 minutes and there will be a short period of downtime towards the end of the maintenance window. Please address any comments, questions, or concerns to computing-help@igwn.org.

Commit 41e9e785 authored by Aaron Viets's avatar Aaron Viets

gstlal_compute_strain: Use GstController to the update line frequencies at specific timestamps

parent e624f539
Pipeline #83687 failed with stages
in 40 minutes
......@@ -1868,15 +1868,15 @@ if not compute_exact_kappas and (compute_fs or compute_srcq):
smooth_fs_squared_almost_nogate = pipeparts.mkaudioamplify(pipeline, smooth_XiR_nogate, src_pcal_line_freq)
smooth_fs_squared_nogate = pipeparts.mkaudioamplify(pipeline, smooth_fs_squared_almost_nogate, src_pcal_line_freq)
if src_pcal_line_freq == act_pcal_line_freq and "pcal1_linefreq" in head_dict:
head_dict["pcal1_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_almost, "current_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared, "current_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_almost_nogate, "current_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_nogate, "current_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_almost, "timestamped_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared, "timestamped_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_almost_nogate, "timestamped_average", "amplification", 1)
head_dict["pcal1_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_nogate, "timestamped_average", "amplification", 1)
elif src_pcal_line_freq != act_pcal_line_freq and "pcal4_linefreq" in head_dict:
head_dict["pcal4_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_almost, "current_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared, "current_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_almost_nogate, "current_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::current-average", calibration_parts.update_property_simple, smooth_fs_squared_nogate, "current_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_almost, "timestamped_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared, "timestamped_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_almost_nogate, "timestamped_average", "amplification", 1)
head_dict["pcal4_linefreq"].connect("notify::timestamped-average", calibration_parts.update_timestamped_property, smooth_fs_squared_nogate, "timestamped_average", "amplification", 1)
smooth_fs_squared = pipeparts.mktee(pipeline, smooth_fs_squared)
......
......@@ -516,6 +516,12 @@ static GstFlowReturn transform(GstBaseTransform *trans, GstBuffer *inbuf, GstBuf
}
element->next_in_offset = GST_BUFFER_OFFSET_END(inbuf);
/*
* Sync timestamps for properties that we want to be controlled
*/
gst_object_sync_values(GST_OBJECT(trans), GST_BUFFER_PTS(inbuf));
/*
* process buffer
*/
......@@ -664,7 +670,7 @@ static void gstlal_demodulate_class_init(GSTLALDemodulateClass *klass)
"The frequency of the calibration line corresponding to the calibration\n\t\t\t"
"factor 'kappa' we wish to extract from incoming stream",
-G_MAXDOUBLE, G_MAXDOUBLE, 300.,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT | GST_PARAM_CONTROLLABLE
)
);
g_object_class_install_property(
......@@ -675,7 +681,7 @@ static void gstlal_demodulate_class_init(GSTLALDemodulateClass *klass)
"Real part of prefactor",
"The real part of a prefactor by which to multiply the outputs",
-G_MAXDOUBLE, G_MAXDOUBLE, 1.0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT | GST_PARAM_CONTROLLABLE
)
);
g_object_class_install_property(
......@@ -686,7 +692,7 @@ static void gstlal_demodulate_class_init(GSTLALDemodulateClass *klass)
"Imaginary part of prefactor",
"The imaginary part of a prefactor by which to multiply the outputs",
-G_MAXDOUBLE, G_MAXDOUBLE, 0.0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT | GST_PARAM_CONTROLLABLE
)
);
}
......
......@@ -86,6 +86,7 @@ enum property {
ARG_SHIFT_SAMPLES,
ARG_UPDATE_WHEN_CHANGE,
ARG_CURRENT_AVERAGE,
ARG_TIMESTAMPED_AVERAGE,
ARG_FAKE
};
......@@ -108,7 +109,7 @@ static void rebuild_workspace_and_reset(GObject *object) {
#define DEFINE_AVERAGE_INPUT_DATA(DTYPE) \
static void average_input_data_ ## DTYPE(GSTLALProperty *element, DTYPE *src, guint64 src_size, guint64 pts) { \
static void average_input_data_ ## DTYPE(GSTLALProperty *element, DTYPE *src, guint64 src_size) { \
\
gint64 i; \
if(element->update_when_change) { \
......@@ -117,6 +118,10 @@ static void average_input_data_ ## DTYPE(GSTLALProperty *element, DTYPE *src, gu
if((double) src[i] != element->current_average) { \
element->current_average = (double) src[i]; \
GST_LOG_OBJECT(element, "Just computed new property"); \
/* When exactly did this change occur? */ \
element->timestamp += gst_util_uint64_scale_int_round((guint64) i, GST_SECOND, element->rate); \
/* Let other elements know when the change occurred */ \
g_object_notify_by_pspec(G_OBJECT(element), properties[ARG_TIMESTAMPED_AVERAGE]); \
/* Let other elements know about the update */ \
g_object_notify_by_pspec(G_OBJECT(element), properties[ARG_CURRENT_AVERAGE]); \
} \
......@@ -127,7 +132,7 @@ static void average_input_data_ ## DTYPE(GSTLALProperty *element, DTYPE *src, gu
if(element->num_in_avg) \
start_sample = 0; \
else \
start_sample = (gint64) (element->update_samples - (gst_util_uint64_scale_int_round(pts, element->rate, GST_SECOND) + element->average_samples - element->shift_samples) % element->update_samples) % element->update_samples; \
start_sample = (gint64) (element->update_samples - (gst_util_uint64_scale_int_round(element->timestamp, element->rate, GST_SECOND) + element->average_samples - element->shift_samples) % element->update_samples) % element->update_samples; \
\
/* How many samples from this buffer will we need to add into this average? */ \
samples_to_add = element->average_samples - element->num_in_avg < (gint64) src_size - start_sample ? element->average_samples - element->num_in_avg : (gint64) src_size - start_sample; \
......@@ -144,9 +149,14 @@ static void average_input_data_ ## DTYPE(GSTLALProperty *element, DTYPE *src, gu
\
/* We still need to divide by n to get the average */ \
element->current_average /= element->num_in_avg; \
\
/* When exactly did this change occur? */ \
element->timestamp += gst_util_uint64_scale_int_round((guint64) samples_to_add, GST_SECOND, element->rate); \
\
GST_LOG_OBJECT(element, "Just computed new property"); \
\
/* Let other elements know when the change occurred */ \
g_object_notify_by_pspec(G_OBJECT(element), properties[ARG_TIMESTAMPED_AVERAGE]); \
/* Let other elements know about the update */ \
g_object_notify_by_pspec(G_OBJECT(element), properties[ARG_CURRENT_AVERAGE]); \
\
......@@ -258,6 +268,7 @@ static GstFlowReturn render(GstBaseSink *sink, GstBuffer *buffer) {
if(!element->update_when_change)
element->current_average = 0.0;
}
element->timestamp = GST_BUFFER_PTS(buffer);
element->next_in_offset = GST_BUFFER_OFFSET_END(buffer);
GST_DEBUG_OBJECT(element, "have buffer spanning %" GST_BUFFER_BOUNDARIES_FORMAT, GST_BUFFER_BOUNDARIES_ARGS(buffer));
......@@ -271,13 +282,13 @@ static GstFlowReturn render(GstBaseSink *sink, GstBuffer *buffer) {
case GSTLAL_PROPERTY_SIGNED:
switch(element->unit_size) {
case 1:
average_input_data_gint8(element, (gint8 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_gint8(element, (gint8 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
case 2:
average_input_data_gint16(element, (gint16 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_gint16(element, (gint16 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
case 4:
average_input_data_gint32(element, (gint32 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_gint32(element, (gint32 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
default:
g_assert_not_reached();
......@@ -287,13 +298,13 @@ static GstFlowReturn render(GstBaseSink *sink, GstBuffer *buffer) {
case GSTLAL_PROPERTY_UNSIGNED:
switch(element->unit_size) {
case 1:
average_input_data_guint8(element, (guint8 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_guint8(element, (guint8 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
case 2:
average_input_data_guint16(element, (guint16 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_guint16(element, (guint16 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
case 4:
average_input_data_guint32(element, (guint32 *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_guint32(element, (guint32 *) mapinfo.data, mapinfo.size / element->unit_size);
break;
default:
g_assert_not_reached();
......@@ -303,10 +314,10 @@ static GstFlowReturn render(GstBaseSink *sink, GstBuffer *buffer) {
case GSTLAL_PROPERTY_FLOAT:
switch(element->unit_size) {
case 4:
average_input_data_float(element, (float *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_float(element, (float *) mapinfo.data, mapinfo.size / element->unit_size);
break;
case 8:
average_input_data_double(element, (double *) mapinfo.data, mapinfo.size / element->unit_size, GST_BUFFER_PTS(buffer));
average_input_data_double(element, (double *) mapinfo.data, mapinfo.size / element->unit_size);
break;
default:
g_assert_not_reached();
......@@ -402,6 +413,23 @@ static void get_property(GObject *object, enum property id, GValue *value, GPara
g_value_set_double(value, element->current_average);
break;
case ARG_TIMESTAMPED_AVERAGE: ;
GValue varray = G_VALUE_INIT;
g_value_init(&varray, GST_TYPE_ARRAY);
GValue t = G_VALUE_INIT;
GValue avg = G_VALUE_INIT;
g_value_init(&t, G_TYPE_DOUBLE);
g_value_init(&avg, G_TYPE_DOUBLE);
g_value_set_double(&t, (double) element->timestamp / GST_SECOND);
g_value_set_double(&avg, element->current_average);
gst_value_array_append_value(&varray, &t);
gst_value_array_append_value(&varray, &avg);
g_value_copy(&varray, value);
g_value_unset(&t);
g_value_unset(&avg);
g_value_unset(&varray);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, id, pspec);
break;
......@@ -495,6 +523,20 @@ static void gstlal_property_class_init(GSTLALPropertyClass *klass) {
-G_MAXDOUBLE, G_MAXDOUBLE, -G_MAXDOUBLE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS
);
properties[ARG_TIMESTAMPED_AVERAGE] = gst_param_spec_array(
"timestamped-average",
"Timestamped Average",
"A GstArray containing the timestamp in seconds and the current average. The\n\t\t\t"
"timestamp is first, then the average. Both are double-precision floats.",
g_param_spec_double(
"sample",
"Sample",
"Either the timestamp or the average value",
-G_MAXDOUBLE, G_MAXDOUBLE, 0.0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS
),
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS
);
g_object_class_install_property(
......@@ -522,6 +564,11 @@ static void gstlal_property_class_init(GSTLALPropertyClass *klass) {
ARG_CURRENT_AVERAGE,
properties[ARG_CURRENT_AVERAGE]
);
g_object_class_install_property(
gobject_class,
ARG_TIMESTAMPED_AVERAGE,
properties[ARG_TIMESTAMPED_AVERAGE]
);
}
......
......@@ -76,6 +76,7 @@ struct _GSTLALProperty {
gint64 average_samples;
gboolean update_when_change;
double current_average;
guint64 timestamp;
};
......
......@@ -25,6 +25,8 @@ import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
gi.require_version('GstController', '1.0')
from gi.repository import GstController
GObject.threads_init()
Gst.init(None)
......@@ -258,11 +260,11 @@ def demodulate(pipeline, head, freq, td, rate, filter_time, filter_latency, pref
head = pipeparts.mkgeneric(pipeline, head, "lal_demodulate", line_frequency = freq, prefactor_real = prefactor_real, prefactor_imag = prefactor_imag)
if type(freq_update) is list:
freq_update[0].connect("notify::current-average", update_property_simple, head, "current_average", "line_frequency", 1)
freq_update[1].connect("notify::current-average", update_property_simple, head, "current_average", "prefactor_real", 1)
freq_update[2].connect("notify::current-average", update_property_simple, head, "current_average", "prefactor_imag", 1)
freq_update[0].connect("notify::timestamped-average", update_timestamped_property, head, "timestamped_average", "line_frequency", 1)
freq_update[1].connect("notify::timestamped-average", update_timestamped_property, head, "timestamped_average", "prefactor_real", 1)
freq_update[2].connect("notify::timestamped-average", update_timestamped_property, head, "timestamped_average", "prefactor_imag", 1)
elif freq_update is not None:
freq_update.connect("notify::current-average", update_property_simple, head, "current_average", "line_frequency", 1)
freq_update.connect("notify::timestamped-average", update_timestamped_property, head, "timestamped_average", "line_frequency", 1)
head = mkresample(pipeline, head, 4, filter_latency == 0.0, rate)
if filter_latency != 0:
# Remove the first several seconds of output, which depend on start time
......@@ -372,10 +374,10 @@ def remove_lines_with_witnesses(pipeline, signal, witnesses, freqs, freq_vars, f
if freq_channels[m][n] is not None:
if type(freq_channels[m][n]) is float:
# It's a harmonic of the frequency in freq_channels[m][0]
freq_channels[m][0].connect("notify::current-average", update_property_simple, line_in_signal, "current_average", "line_frequency", freq_channels[m][n])
freq_channels[m][0].connect("notify::timestamped-average", update_timestamped_property, line_in_signal, "timestamped_average", "line_frequency", freq_channels[m][n])
else:
# The channel carries the correct frequency
freq_channels[m][n].connect("notify::current-average", update_property_simple, line_in_signal, "current_average", "line_frequency", 1)
freq_channels[m][n].connect("notify::timestamped-average", update_timestamped_property, line_in_signal, "timestamped_average", "line_frequency", 1)
line_in_signal = mkresample(pipeline, line_in_signal, downsample_quality, zero_latency, compute_rate)
line_in_signal = lowpass(pipeline, line_in_signal, compute_rate, length = filter_length, fcut = 0, filter_latency = filter_latency)
line_in_signal = pipeparts.mktee(pipeline, line_in_signal)
......@@ -393,10 +395,10 @@ def remove_lines_with_witnesses(pipeline, signal, witnesses, freqs, freq_vars, f
if freq_channels[m][n] is not None:
if type(freq_channels[m][n]) is float:
# It's a harmonic of the frequency in freq_channels[m][0]
freq_channels[m][0].connect("notify::current-average", update_property_simple, line_in_witness, "current_average", "line_frequency", freq_channels[m][n])
freq_channels[m][0].connect("notify::timestamped-average", update_timestamped_property, line_in_witness, "timestamped_average", "line_frequency", freq_channels[m][n])
else:
# The channel carries the correct frequency
freq_channels[m][n].connect("notify::current-average", update_property_simple, line_in_witness, "current_average", "line_frequency", 1)
freq_channels[m][n].connect("notify::timestamped-average", update_timestamped_property, line_in_witness, "timestamped_average", "line_frequency", 1)
line_in_witness = mkresample(pipeline, line_in_witness, downsample_quality, zero_latency, compute_rate)
line_in_witness = lowpass(pipeline, line_in_witness, compute_rate, length = filter_length, fcut = 0, filter_latency = filter_latency)
line_in_witness = pipeparts.mktee(pipeline, line_in_witness)
......@@ -438,10 +440,10 @@ def remove_lines_with_witnesses(pipeline, signal, witnesses, freqs, freq_vars, f
if freq_channels[m][n] is not None:
if type(freq_channels[m][n]) is float:
# It's a harmonic of the frequency in freq_channels[m][0]
freq_channels[m][0].connect("notify::current-average", update_property_simple, reconstructed_line_in_signal, "current_average", "line_frequency", -1.0 * freq_channels[m][n])
freq_channels[m][0].connect("notify::timestamped-average", update_timestamped_property, reconstructed_line_in_signal, "timestamped_average", "line_frequency", -1.0 * freq_channels[m][n])
else:
# The channel carries the correct frequency
freq_channels[m][n].connect("notify::current-average", update_property_simple, reconstructed_line_in_signal, "current_average", "line_frequency", -1.0)
freq_channels[m][n].connect("notify::timestamped-average", update_timestamped_property, reconstructed_line_in_signal, "timestamped_average", "line_frequency", -1.0)
reconstructed_line_in_signal = pipeparts.mkgeneric(pipeline, reconstructed_line_in_signal, "creal")
signal_minus_lines.append(reconstructed_line_in_signal)
......@@ -1177,7 +1179,7 @@ def compute_fcc(pipeline, SR, SI, fpcal2, freq_update = None):
fcc = mkmultiplier(pipeline, list_srcs(pipeline, pipeparts.mkaudioamplify(pipeline, SR, -1.0), mkpow(pipeline, SI, exponent=-1.0)))
fcc = pipeparts.mkaudioamplify(pipeline, fcc, fpcal2)
if freq_update is not None:
freq_update.connect("notify::current-average", update_property_simple, fcc, "current_average", "amplification", 1)
freq_update.connect("notify::timestamped-average", update_timestamped_property, fcc, "timestamped_average", "amplification", 1)
return fcc
def compute_Xi_from_filters_file(pipeline, pcalfpcal4, darmfpcal4, fpcal4, EP11_real, EP11_imag, EP12_real, EP12_imag, EP13_real, EP13_imag, EP14_real, EP14_imag, ktst, kpu, kc, fcc):
......@@ -1274,6 +1276,14 @@ def update_property_simple(prop_maker, arg, prop_taker, maker_prop_name, taker_p
prop = prop_maker.get_property(maker_prop_name)
prop_taker.set_property(taker_prop_name, prefactor * prop)
def update_timestamped_property(prop_maker, arg, prop_taker, maker_prop_name, taker_prop_name, prefactor):
prop = prop_maker.get_property(maker_prop_name)
cs = GstController.InterpolationControlSource.new()
binding = GstController.DirectControlBinding.new_absolute(prop_taker, taker_prop_name, cs)
prop_taker.add_control_binding(binding)
cs.set_property('mode', GstController.InterpolationMode.NONE) # no interpolation
cs.set(int(prop[0] * Gst.SECOND), prefactor * prop[1])
def update_filter(filter_maker, arg, filter_taker, maker_prop_name, taker_prop_name):
firfilter = filter_maker.get_property(maker_prop_name)[::-1]
filter_taker.set_property(taker_prop_name, firfilter)
......
......@@ -8,9 +8,9 @@ IFO = H
# determines where to look for filters files (e.g., O1, O2, O3, ER10, ER13, ER14, PreER10, PreER13, PreER14)
OBSRUN = O3
START = $(shell echo 1239984018 + 10000 | bc)
#1239984018
END = $(shell echo 1240048818 | bc)
START = $(shell echo 1239984268 - 256 | bc)
#1239997514 roaming line change
END = $(shell echo 1239984268 + 64542 + 256 | bc)
#1240048818
SHMRUNTIME = 600
# How much time does the calibration need to settle at the start and end?
......@@ -91,7 +91,7 @@ $(IFO)1_hoft_DCS_frames.cache: $(IFO)1_easy_raw_frames.cache filters framesdir
ls Frames/$(OBSRUN)/$(IFO)1/DCS/$(IFO)-$(IFO)1DCS-*.gwf | lalapps_path2cache > $@
$(IFO)1_hoft_DCS_realKappas_frames.cache: $(IFO)1_easy_raw_frames.cache filters framesdir
#GST_DEBUG=3 gstlal_compute_strain --gps-start-time $(START) --gps-end-time $(END) --frame-cache $(IFO)1_easy_raw_frames.cache --output-path Frames/$(OBSRUN)/$(IFO)1/DCS/ --frame-duration=64 --frames-per-file=1 --wings=0 --config-file $(DCSREALKAPPASCONFIGS)
GST_DEBUG=3 gstlal_compute_strain --gps-start-time $(START) --gps-end-time $(END) --frame-cache $(IFO)1_easy_raw_frames.cache --output-path Frames/$(OBSRUN)/$(IFO)1/DCS/ --frame-duration=64 --frames-per-file=1 --wings=0 --config-file $(DCSREALKAPPASCONFIGS)
ls Frames/$(OBSRUN)/$(IFO)1/DCS/$(IFO)-$(IFO)1DCS_REALKAPPASTEST-12520884*.gwf | lalapps_path2cache > $@
$(IFO)1_hoft_DCS_realKappasSRC_frames.cache: $(IFO)1_easy_raw_frames.cache filters framesdir
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment