Skip to content
Snippets Groups Projects
Commit bf30d423 authored by Kipp Cannon's avatar Kipp Cannon
Browse files

add event handling

- copy event handling from adder
- override template bank's instrument and channel name with info from tags
parent 4a6b893a
No related branches found
No related tags found
No related merge requests found
......@@ -178,10 +178,28 @@ static int setup_bankfile_input(GSTLALTriggerGen *element, char *bank_filename)
static SnglInspiralTable *record_inspiral_event(SnglInspiralTable *dest, LIGOTimeGPS end_time, double complex z, double chisq, int channel, GSTLALTriggerGen *element)
{
double xi;
/*
* copy the template whole
*/
*dest = element->bank[channel];
/*
* replace the instrument and channel name. note: in the
* traditional inspiral pipeline the instrument and channel name
* are encoded in the templates, but we want to accomodate a
* scenario in which the same template bank is used across multiple
* instruments, so we need to replace the information stored in the
* templates with what we know about the data stream
*/
snprintf(dest->ifo, LIGOMETA_IFO_MAX * sizeof(*dest->ifo), element->instrument);
snprintf(dest->channel, LIGOMETA_CHANNEL_MAX * sizeof(*dest->channel), element->channel_name);
/*
* fill in the rest of the information
*/
dest->snr = cabs(z);
dest->coa_phase = carg(z);
dest->chisq = chisq;
......@@ -190,7 +208,9 @@ static SnglInspiralTable *record_inspiral_event(SnglInspiralTable *dest, LIGOTim
dest->end_time_gmst = XLALGreenwichMeanSiderealTime(&end_time);
dest->eff_distance = effective_distance(dest->snr, dest->sigmasq);
xi = dest->chisq / (1 + 0.1 * pow(dest->snr, 2));
/*
* done
*/
return dest;
}
......@@ -236,7 +256,167 @@ static gboolean gen_setcaps(GstPad *pad, GstCaps *caps)
/*
* ============================================================================
*
* Event Generation
* Gstreamer Event Handler
*
* ============================================================================
*/
/*
* src event. inspired by the adder element's src pad event handler
*/
static gboolean src_event(GstPad *pad, GstEvent *event)
{
GSTLALTriggerGen *element = GSTLAL_TRIGGERGEN(GST_PAD_PARENT(pad));
gboolean success;
switch(GST_EVENT_TYPE(event)) {
case GST_EVENT_QOS:
case GST_EVENT_NAVIGATION:
/*
* not handled
*/
success = FALSE;
break;
case GST_EVENT_SEEK: {
GstSeekFlags flags;
GstSeekType curtype;
gint64 cur;
gboolean flush;
/*
* parse the seek parameters
*/
gst_event_parse_seek(event, &element->segment.rate, NULL, &flags, &curtype, &cur, NULL, NULL);
flush = !!(flags & GST_SEEK_FLAG_FLUSH);
/*
* is it a flushing seek?
*/
if(flush) {
/*
* make sure we accept nothing more and return
* WRONG_STATE
*/
gst_collect_pads_set_flushing(element->collect, TRUE);
/*
* start flush downstream.
*/
gst_pad_push_event(element->srcpad, gst_event_new_flush_start());
}
/*
* wait for the collected to be finished and mark a new segment
*/
GST_OBJECT_LOCK(element->collect);
element->segment_pending = TRUE;
if(flush) {
/*
* we need to do this again when the streaming
* threads have stoppped so that the cookie gets
* properly updated
*/
gst_collect_pads_set_flushing(element->collect, TRUE);
}
element->flush_stop_pending = flush;
GST_OBJECT_UNLOCK(element->collect);
gst_event_ref(event);
success = gst_pad_push_event(element->snrpad, event);
gst_event_ref(event);
success &= gst_pad_push_event(element->chisqpad, event);
}
default:
/*
* forward all other events
*/
gst_event_ref(event);
success = gst_pad_push_event(element->snrpad, event);
gst_event_ref(event);
success &= gst_pad_push_event(element->chisqpad, event);
break;
}
return success;
}
/*
* snr event()
*/
static gboolean taglist_extract_string(GSTLALTriggerGen *element, GstTagList *taglist, const char *tagname, gchar **dest)
{
if(!gst_tag_list_get_string(taglist, tagname, dest)) {
GST_WARNING_OBJECT(element, "unable to parse \"%s\" from %" GST_PTR_FORMAT, tagname, taglist);
return FALSE;
}
return TRUE;
}
static gboolean snr_event(GstPad *pad, GstEvent *event)
{
GSTLALTriggerGen *element = GSTLAL_TRIGGERGEN(GST_PAD_PARENT(pad));
gboolean success;
switch(GST_EVENT_TYPE(event)) {
case GST_EVENT_FLUSH_STOP:
/*
* mark a pending new segment. this event is synchronized
* with the streaming thread so we can safely update the
* variable without races. it's somewhat weird because we
* assume the collectpads forwarded the FLUSH_STOP past us
* and downstream (using our source pad, the bastard!).
*/
GST_OBJECT_LOCK(element->collect);
element->segment_pending = TRUE;
element->flush_stop_pending = FALSE;
GST_OBJECT_UNLOCK(element->collect);
break;
case GST_EVENT_TAG: {
GstTagList *taglist;
gchar *instrument, *channel_name;
gst_event_parse_tag(event, &taglist);
success = taglist_extract_string(element, taglist, GSTLAL_TAG_INSTRUMENT, &instrument);
success &= taglist_extract_string(element, taglist, GSTLAL_TAG_CHANNEL_NAME, &channel_name);
if(success) {
GST_DEBUG_OBJECT(element, "found tags \"%s\"=\"%s\" \"%s\"=\"%s\"", GSTLAL_TAG_INSTRUMENT, instrument, GSTLAL_TAG_CHANNEL_NAME, channel_name);
g_free(element->instrument);
element->instrument = instrument;
g_free(element->channel_name);
element->channel_name = channel_name;
}
break;
}
default:
break;
}
return element->collect_event(pad, event);
}
/*
* ============================================================================
*
* Inspiral Event Generation
*
* ============================================================================
*/
......@@ -251,6 +431,26 @@ static GstFlowReturn gen_collected(GstCollectPads *pads, gpointer user_data)
GstBuffer *srcbuf = NULL;
GstFlowReturn result;
/*
* check for uninitialized stream
*/
if(!element->instrument || !element->channel_name) {
GST_ERROR_OBJECT(element, "instrument and/or channel name not known (stream's tags must provide this information)");
result = GST_FLOW_ERROR;
goto error;
}
/*
* forward flush-stop event
*/
if(element->flush_stop_pending) {
gst_pad_push_event(element->srcpad, gst_event_new_flush_stop());
element->flush_stop_pending = FALSE;
}
/*
* check for new segment
*/
......@@ -640,6 +840,10 @@ static void gen_finalize(GObject *object)
g_mutex_free(element->bank_lock);
element->bank_lock = NULL;
free_bankfile(element);
g_free(element->instrument);
element->instrument = NULL;
g_free(element->channel_name);
element->channel_name = NULL;
G_OBJECT_CLASS(gen_parent_class)->finalize(object);
}
......@@ -654,6 +858,7 @@ static GstStateChangeReturn gen_change_state(GstElement *element, GstStateChange
case GST_STATE_CHANGE_READY_TO_PAUSED:
triggergen->segment_pending = TRUE;
triggergen->flush_stop_pending = FALSE;
gst_segment_init(&triggergen->segment, GST_FORMAT_UNDEFINED);
triggergen->next_output_offset = GST_BUFFER_OFFSET_NONE;
triggergen->next_output_timestamp = GST_CLOCK_TIME_NONE;
......@@ -822,8 +1027,14 @@ static void gen_instance_init(GTypeInstance *object, gpointer klass)
element->chisqcollectdata = gstlal_collect_pads_add_pad(element->collect, pad, sizeof(*element->chisqcollectdata));
element->chisqpad = pad;
/* FIXME: hacked way to override/extend the event function of
* GstCollectpads */
element->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC(element->snrpad);
gst_pad_set_event_function(element->snrpad, GST_DEBUG_FUNCPTR(snr_event));
/* retrieve (and ref) src pad */
pad = gst_element_get_static_pad(GST_ELEMENT(element), "src");
gst_pad_set_event_function(pad, GST_DEBUG_FUNCPTR(src_event));
gst_pad_use_fixed_caps(pad);
{
GstCaps *caps = gst_caps_copy(gst_pad_get_pad_template_caps(pad));
......@@ -836,6 +1047,8 @@ static void gen_instance_init(GTypeInstance *object, gpointer klass)
element->bank_lock = g_mutex_new();
element->rate = 0;
element->bank_filename = NULL;
element->instrument = NULL;
element->channel_name = NULL;
element->bank = NULL;
element->num_templates = 0;
element->snr_thresh = DEFAULT_SNR_THRESH;
......
......@@ -82,6 +82,7 @@ typedef struct {
GstElement element;
GstCollectPads *collect;
GstPadEventFunction collect_event;
GstPad *snrpad;
GstLALCollectData *snrcollectdata;
......@@ -90,6 +91,7 @@ typedef struct {
GstPad *srcpad;
gboolean segment_pending;
gboolean flush_stop_pending;
GstSegment segment;
guint64 next_output_offset;
guint64 next_output_timestamp;
......@@ -98,6 +100,8 @@ typedef struct {
GMutex *bank_lock;
char *bank_filename;
gchar *instrument;
gchar *channel_name;
SnglInspiralTable *bank;
gint num_templates;
double snr_thresh;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment