diff --git a/gstlal-inspiral/gst/lal/gstlal_itacac.c b/gstlal-inspiral/gst/lal/gstlal_itacac.c index 975fea0704c19199881a5011f22ee01bc09c0829..3ce821b9d87f23e32726e083f53e495be54e4bf8 100644 --- a/gstlal-inspiral/gst/lal/gstlal_itacac.c +++ b/gstlal-inspiral/gst/lal/gstlal_itacac.c @@ -139,8 +139,7 @@ static guint64 output_num_bytes(GSTLALItacacPad *itacacpad) { static int reset_time_and_offset(GSTLALItacac *itacac) { // NOTE This should only get called when itacac is first starting up, // there is an assert that guarantees this - //itacac->next_output_offset = 0; - itacac->next_output_offset = itacac->initial_output_offset; + itacac->next_output_offset = 0; itacac->next_output_timestamp = GST_CLOCK_TIME_NONE; return 0; } @@ -503,7 +502,8 @@ static GstFlowReturn push_gap(GSTLALItacac *itacac, guint samps) { GST_BUFFER_OFFSET_END(srcbuf) = itacac->next_output_offset + samps; GST_BUFFER_PTS(srcbuf) = itacac->next_output_timestamp + itacac->difftime; GST_BUFFER_DURATION(srcbuf) = (GstClockTime) gst_util_uint64_scale_int_round(GST_SECOND, samps, itacac->rate); - GST_BUFFER_DTS(srcbuf) = GST_BUFFER_PTS(srcbuf); + GST_DEBUG_OBJECT(stderr, "pushing gap %" GST_BUFFER_BOUNDARIES_FORMAT "\n", GST_BUFFER_BOUNDARIES_ARGS(srcbuf)); + //GST_BUFFER_DTS(srcbuf) = GST_BUFFER_PTS(srcbuf); return gst_aggregator_finish_buffer(GST_AGGREGATOR(itacac), srcbuf); @@ -519,7 +519,6 @@ static GstFlowReturn final_setup(GSTLALItacac *itacac) { for(padlist = element->sinkpads; padlist !=NULL; padlist = padlist->next) { GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); GstBuffer *sinkbuf = gst_aggregator_pad_peek_buffer(GST_AGGREGATOR_PAD(itacacpad)); - itacac->initial_output_offset = itacac->initial_output_offset < GST_BUFFER_OFFSET(sinkbuf) ? itacac->initial_output_offset : GST_BUFFER_OFFSET(sinkbuf); gst_buffer_unref(sinkbuf); // FIXME Should gst_object_sync_values be called here too? if(padlist == element->sinkpads){ @@ -536,7 +535,6 @@ static GstFlowReturn final_setup(GSTLALItacac *itacac) { } - itacac->next_output_offset = itacac->initial_output_offset; // Set up the order that we want to check the pads for coincidence // FIXME For now this is only being used to find snr time series @@ -639,10 +637,14 @@ static void copy_nongapsamps(GSTLALItacac *itacac, GSTLALItacacPad *itacacpad, g gsl_matrix_set(itacacpad->data->duration_dataoffset_trigwindowoffset_peakfindinglength_matrix, data_container_index, 3, (double) peak_finding_length); // copy the samples that we will call the peak finding library on (if no events are found the result will be a GAP) - if (itacac->peak_type == GSTLAL_PEAK_COMPLEX) - gst_audioadapter_copy_samples(itacacpad->adapter, (float complex *) itacacpad->data->data + offset_from_copied_data * itacacpad->maxdata->channels, copysamps, NULL, NULL); - else if (itacac->peak_type == GSTLAL_PEAK_DOUBLE_COMPLEX) + if (itacac->peak_type == GSTLAL_PEAK_COMPLEX) { + //gst_audioadapter_copy_samples(itacacpad->adapter, (float complex *) itacacpad->data->data + offset_from_copied_data * itacacpad->maxdata->channels, copysamps, NULL, NULL); + itacacpad->data->dataptr.as_complex = (float complex *) itacacpad->data->data; + gst_audioadapter_copy_samples(itacacpad->adapter, itacacpad->data->dataptr.as_complex + offset_from_copied_data * itacacpad->maxdata->channels, copysamps, NULL, NULL); + } else if (itacac->peak_type == GSTLAL_PEAK_DOUBLE_COMPLEX) { + fprintf(stderr, "using GSTLAL_PEAK_DOUBLE_COMPLEX\n\n\n\n\n"); gst_audioadapter_copy_samples(itacacpad->adapter, (double complex *) itacacpad->data->data + offset_from_copied_data * itacacpad->maxdata->channels, copysamps, NULL, NULL); + } } @@ -697,6 +699,12 @@ static void generate_triggers(GSTLALItacac *itacac, GSTLALItacacPad *itacacpad, } } } + /* + for(channel = 0; channel < this_maxdata->channels; channel++) { + if(cabs((double complex) (this_maxdata->interpvalues).as_float_complex[channel]) > 0 && cabs((double complex) (this_maxdata->interpvalues).as_float_complex[channel]) < 6) + fprintf(stderr, "%s snr = %lf < 6\n", itacacpad->instrument, cabs((double complex) (this_maxdata->interpvalues).as_float_complex[channel]) ); + } + */ } else if (itacac->peak_type == GSTLAL_PEAK_DOUBLE_COMPLEX) { // Find the peak, making sure to put the data pointer at the start of the interval we care about @@ -717,8 +725,9 @@ static void generate_triggers(GSTLALItacac *itacac, GSTLALItacacPad *itacacpad, else g_assert_not_reached(); - // AEP- 180417 Turning XLAL Errors back on - gsl_set_error_handler(old_gsl_error_handler); + // AEP- 180417 Turning XLAL Errors back on + gsl_set_error_handler(old_gsl_error_handler); + // Compute \chi^2 values if we can if(itacacpad->autocorrelation_matrix && !this_maxdata->no_peaks_past_threshold) { @@ -1084,6 +1093,8 @@ static GstFlowReturn process(GSTLALItacac *itacac) { for(padlist = element->sinkpads; padlist != NULL; padlist = padlist->next) { itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(itacacpad->waiting) + continue; samples_left_in_window = itacacpad->n; @@ -1322,6 +1333,8 @@ static GstFlowReturn process(GSTLALItacac *itacac) { for(padlist = element->sinkpads; padlist != NULL; padlist = padlist->next) { itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(itacacpad->waiting) + continue; data_container_index = 0; duration = (guint) gsl_matrix_get(itacacpad->data->duration_dataoffset_trigwindowoffset_peakfindinglength_matrix, 0, 0); @@ -1364,9 +1377,15 @@ static GstFlowReturn process(GSTLALItacac *itacac) { populate_snr_in_other_detectors(itacac, itacacpad); } - if(triggers_generated && itacacpad->autocorrelation_matrix) + if(triggers_generated && itacacpad->autocorrelation_matrix) { + /* + if(srcbuf == NULL) + fprintf(stderr, "%s about to create srcbuf, next_output_timestamp = %lu + 1e-9*%lu\n", itacacpad->instrument, itacac->next_output_timestamp / 1000000000, itacac->next_output_timestamp - (itacac->next_output_timestamp / 1000000000) * 1000000000); + else + fprintf(stderr, "%s about to append to srcbuf, next_output_timestamp = %lu + 1e-9*%lu\n", itacacpad->instrument, itacac->next_output_timestamp / 1000000000, itacac->next_output_timestamp - (itacac->next_output_timestamp / 1000000000) * 1000000000); + */ srcbuf = hardcoded_srcbuf_crap(itacac, itacacpad, srcbuf); - else if(triggers_generated) { + } else if(triggers_generated) { if(srcbuf == NULL) { srcbuf = gstlal_snglinspiral_new_buffer_from_peak(itacacpad->maxdata, itacacpad->bankarray, GST_PAD((itacac->aggregator).srcpad), itacac->next_output_offset, itacacpad->n, itacac->next_output_timestamp, itacac->rate, itacacpad->chi2, NULL, NULL, NULL, NULL, itacac->difftime); } else { @@ -1383,7 +1402,7 @@ static GstFlowReturn process(GSTLALItacac *itacac) { if(!itacac->EOS) { if(srcbuf != NULL) result = gst_aggregator_finish_buffer(GST_AGGREGATOR(itacac), srcbuf); - else + else // FIXME itacacpad->n only works because we still have // itacacpad defined from before result = push_gap(itacac, itacacpad->n); @@ -1398,10 +1417,9 @@ static GstFlowReturn process(GSTLALItacac *itacac) { // If there aren't any samples left to process, then we're ready to return GST_FLOW_EOS if(max_num_samps_left_in_any_pad > 0) { - if(srcbuf != NULL) { - fprintf(stderr, "pushing buffer at EOS, still have more samps to process, %" GST_BUFFER_BOUNDARIES_FORMAT "\n", GST_BUFFER_BOUNDARIES_ARGS(srcbuf)); + if(srcbuf != NULL) gst_aggregator_finish_buffer(GST_AGGREGATOR(itacac), srcbuf); - } else + else push_gap(itacac, itacacpad->n); result = process(itacac); @@ -1441,8 +1459,12 @@ static GstFlowReturn aggregate(GstAggregator *aggregator, gboolean timeout) // Get the buffer from the pad we're looking at and assert it // has a valid timestamp GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); - //GST_DEBUG_OBJECT(GST_AGGREGATOR_PAD(itacacpad), "%s aggregator", itacacpad->instrument); //sinkbuf = gst_aggregator_pad_pop_buffer(GST_AGGREGATOR_PAD(itacacpad)); + // We don't need to worry about this if this pad is waiting + //if(!itacac->waiting && itacacpad->waiting) + if(itacacpad->waiting && gst_audioadapter_available_samples(itacacpad->adapter) != 0) + continue; + GstBuffer *sinkbuf = gst_aggregator_pad_peek_buffer(GST_AGGREGATOR_PAD(itacacpad)); if(sinkbuf == NULL) { GST_DEBUG_OBJECT(itacac, "%s sinkbuf is NULL", itacacpad->instrument); @@ -1481,9 +1503,10 @@ static GstFlowReturn aggregate(GstAggregator *aggregator, gboolean timeout) // FIXME I don't think this logic works for itacac, it came from itac, need to think carefully about what to do around disconts if (GST_BUFFER_FLAG_IS_SET(sinkbuf, GST_BUFFER_FLAG_DISCONT)) { // FIXME For now, this should ensure we only see disconts at start up - g_assert(itacac->next_output_offset == itacac->initial_output_offset || itacacpad->waiting); + g_assert(gst_audioadapter_available_samples(itacacpad->adapter) == 0); if(!itacacpad->waiting) { reset_time_and_offset(itacac); + itacacpad->initial_timestamp = GST_CLOCK_TIME_NONE; gst_audioadapter_clear(itacacpad->adapter); } } @@ -1497,45 +1520,87 @@ static GstFlowReturn aggregate(GstAggregator *aggregator, gboolean timeout) continue; } - if(itacacpad->waiting) { - // Wait to use buffers from pad until next_output_offset passes this buffers offset - if(itacac->next_output_offset < GST_BUFFER_OFFSET(sinkbuf)) { - gst_buffer_unref(sinkbuf); - continue; - } - gst_buffer_ref(sinkbuf); - gst_audioadapter_push(itacacpad->adapter, sinkbuf); - if((guint) (itacac->next_output_offset - GST_BUFFER_OFFSET(sinkbuf)) >= itacacpad->maxdata->pad) { - // Leave a pad worth of samples for chisq pad - gst_audioadapter_flush_samples(itacacpad->adapter, (guint) (itacac->next_output_offset - GST_BUFFER_OFFSET(sinkbuf)) - itacacpad->maxdata->pad); - itacacpad->last_gap = FALSE; - } else if(itacac->next_output_offset != GST_BUFFER_OFFSET(sinkbuf) ) { - itacacpad->adjust_window = (guint) (itacac->next_output_offset - GST_BUFFER_OFFSET(sinkbuf)); - itacacpad->last_gap = FALSE; - } - // If we dont have a valid first timestamp yet take this one - if(itacac->next_output_timestamp == GST_CLOCK_TIME_NONE) - itacac->next_output_timestamp = GST_BUFFER_PTS(sinkbuf); + // Grab timestamp for this pad if we dont have it already + if(itacacpad->initial_timestamp == GST_CLOCK_TIME_NONE) { + g_assert(gst_audioadapter_available_samples(itacacpad->adapter) == 0); + itacacpad->initial_timestamp = GST_BUFFER_PTS(sinkbuf); + } - gst_buffer_unref(sinkbuf); - gst_aggregator_pad_drop_buffer(GST_AGGREGATOR_PAD(itacacpad)); - itacacpad->waiting = FALSE; - if(itacac->waiting) - itacac->waiting = FALSE; + gst_buffer_ref(sinkbuf); + // Push buf to gstaudioadapter + guint samples_before = gst_audioadapter_available_samples(itacacpad->adapter); + gst_audioadapter_push(itacacpad->adapter, sinkbuf); + gst_aggregator_pad_drop_buffer(GST_AGGREGATOR_PAD(itacacpad)); - } else { + /* + if(gst_audioadapter_available_samples(itacacpad->adapter) > samples_before) + fprintf(stderr, "pushed samples from %s sinkbuf to adapter, sinkbuf timestamp = %lu + 1e-9*%lu, duration = %lu ns, available samples = %u\n", itacacpad->instrument, GST_BUFFER_PTS(sinkbuf) / 1000000000, GST_BUFFER_PTS(sinkbuf) - (GST_BUFFER_PTS(sinkbuf) / 1000000000) * 1000000000, GST_BUFFER_DURATION(sinkbuf), gst_audioadapter_available_samples(itacacpad->adapter)); + */ + gst_buffer_unref(sinkbuf); - // If we dont have a valid first timestamp yet take this one - if(itacac->next_output_timestamp == GST_CLOCK_TIME_NONE) - itacac->next_output_timestamp = GST_BUFFER_PTS(sinkbuf); + } - gst_audioadapter_push(itacacpad->adapter, sinkbuf); - gst_aggregator_pad_drop_buffer(GST_AGGREGATOR_PAD(itacacpad)); + if(itacac->waiting) { + // Check if timestamps of all sinkpads are the same, if not, + // take the earliest timestamp as the next output timestamp and + // start processing samples from sinkpads that have that + // timestamp + for(padlist = GST_ELEMENT(aggregator)->sinkpads; padlist != NULL; padlist = padlist->next) { + GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(padlist == GST_ELEMENT(aggregator)->sinkpads) + itacac->next_output_timestamp = itacacpad->initial_timestamp; + else + itacac->next_output_timestamp = itacac->next_output_timestamp <= itacacpad->initial_timestamp ? itacac->next_output_timestamp : itacacpad->initial_timestamp; + } + + if(itacac->next_output_timestamp != GST_CLOCK_TIME_NONE) { + for(padlist = GST_ELEMENT(aggregator)->sinkpads; padlist != NULL; padlist = padlist->next) { + GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(itacacpad->initial_timestamp == itacac->next_output_timestamp && gst_audioadapter_available_samples(itacacpad->adapter) > 0) { + itacacpad->waiting = FALSE; + fprintf(stderr, "Setting %s itacacpad->waiting to FALSE\n", itacacpad->instrument); + if(itacac->waiting) { + itacac->waiting = FALSE; + fprintf(stderr, "Setting itacac->waiting to FALSE\n"); + } + } + } + if(!itacac->waiting) + result = process(itacac); + } + + } else { + if(itacac->next_output_timestamp == GST_CLOCK_TIME_NONE) { + for(padlist = GST_ELEMENT(aggregator)->sinkpads; padlist != NULL; padlist = padlist->next) { + GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(padlist == GST_ELEMENT(aggregator)->sinkpads) + itacac->next_output_timestamp = itacacpad->initial_timestamp; + else + itacac->next_output_timestamp = itacac->next_output_timestamp <= itacacpad->initial_timestamp ? itacac->next_output_timestamp : itacacpad->initial_timestamp; + } + } + // Figure out if we can start taking data from each pad that is still waiting + for(padlist = GST_ELEMENT(aggregator)->sinkpads; padlist != NULL; padlist = padlist->next) { + GSTLALItacacPad *itacacpad = GSTLAL_ITACAC_PAD(padlist->data); + if(!itacacpad->waiting || itacacpad->initial_timestamp > itacac->next_output_timestamp) + continue; + + // FIXME Assumes n is the same for all detectors + guint num_samples_behind = (guint) (itacac->next_output_timestamp - itacacpad->initial_timestamp) / (1000000000 / itacacpad->n); + if(num_samples_behind > itacacpad->maxdata->pad) + gst_audioadapter_flush_samples(itacacpad->adapter, num_samples_behind - itacacpad->maxdata->pad); + else if(num_samples_behind < itacacpad->maxdata->pad) + itacacpad->adjust_window = num_samples_behind; + + itacacpad->waiting = FALSE; + fprintf(stderr, "Setting %s itacacpad->waiting to FALSE in second loop\n", itacacpad->instrument); + if(itacacpad->initial_timestamp != itacac->next_output_timestamp) + itacacpad->last_gap = FALSE; } - } - if(!itacac->waiting) result = process(itacac); + } + return result; } @@ -1826,6 +1891,7 @@ static void gstlal_itacac_pad_init(GSTLALItacacPad *itacacpad) itacacpad->waiting = TRUE; itacacpad->adjust_window = 0; + itacacpad->initial_timestamp = GST_CLOCK_TIME_NONE; gst_pad_use_fixed_caps(GST_PAD(itacacpad)); @@ -1837,7 +1903,6 @@ static void gstlal_itacac_init(GSTLALItacac *itacac) itacac->channels = 0; itacac->difftime = 0; - itacac->initial_output_offset = G_MAXUINT64; reset_time_and_offset(itacac); diff --git a/gstlal-inspiral/gst/lal/gstlal_itacac.h b/gstlal-inspiral/gst/lal/gstlal_itacac.h index 83cd838ecfba825a8d285db816631a05e6915231..8dac3d266a665e28864e8e6ef5f075978324cd08 100644 --- a/gstlal-inspiral/gst/lal/gstlal_itacac.h +++ b/gstlal-inspiral/gst/lal/gstlal_itacac.h @@ -71,6 +71,12 @@ G_BEGIN_DECLS (G_TYPE_CHECK_CLASS_TYPE((klass), GSTLAL_ITACAC_TYPE)) struct data_container { + union { + float complex *as_complex; + double complex *as_double_complex; + void *as_void; + } dataptr; + gsl_matrix *duration_dataoffset_trigwindowoffset_peakfindinglength_matrix; void *data; }; @@ -111,6 +117,7 @@ typedef struct { gboolean EOS; gboolean waiting; + GstClockTime initial_timestamp; guint adjust_window; GList *next_in_coinc_order; @@ -128,7 +135,7 @@ typedef struct { gint rate; guint channels; gstlal_peak_type_specifier peak_type; - guint64 initial_output_offset; + GstClockTime initial_output_timestamp; guint64 next_output_offset; GstClockTime next_output_timestamp; GstClockTimeDiff difftime;