diff --git a/gstlal-ugly/bin/gstlal_feature_extractor b/gstlal-ugly/bin/gstlal_feature_extractor index e88d97150b11631a9ee4901918e54a385dcb4382..0e3e5fa146b8ce99633caba503a2383d2c29328a 100755 --- a/gstlal-ugly/bin/gstlal_feature_extractor +++ b/gstlal-ugly/bin/gstlal_feature_extractor @@ -115,9 +115,9 @@ class MultiChannelHandler(simplehandler.Handler): self.instrument = data_source_info.instrument self.frame_segments = data_source_info.frame_segments self.keys = kwargs.pop("keys") - self.basis_params = kwargs.pop("basis_params") + self.waveforms = kwargs.pop("waveforms") self.basename = kwargs.pop("basename") - self.waveform = options.waveform + self.waveform_type = options.waveform # format id for aesthetics self.job_id = str(options.job_id).zfill(4) @@ -286,17 +286,17 @@ class MultiChannelHandler(simplehandler.Handler): trigger_seg = segments.segment(LIGOTimeGPS(row.end_time, row.end_time_ns), LIGOTimeGPS(row.end_time, row.end_time_ns)) if not self.frame_segments[self.instrument] or self.frame_segments[self.instrument].intersects_segment(trigger_seg): - freq, q, duration = self.basis_params[(channel, rate)].parameter_grid[row.channel_index] - filter_duration = self.basis_params[(channel, rate)].max_duration + freq, q, duration = self.waveforms[channel].parameter_grid[rate][row.channel_index] + filter_duration = self.waveforms[channel].filter_duration[rate] filter_stop_time = row.end_time + row.end_time_ns * 1e-9 # set trigger time based on waveform - if self.waveform == 'sine_gaussian': + if self.waveform_type == 'sine_gaussian': trigger_time = filter_stop_time - (filter_duration / 2.) start_time = trigger_time - duration / 2. stop_time = trigger_time + duration / 2. - elif self.waveform == 'half_sine_gaussian': + elif self.waveform_type == 'half_sine_gaussian': trigger_time = filter_stop_time start_time = trigger_time - duration stop_time = trigger_time @@ -581,6 +581,7 @@ options, filenames = parse_command_line() data_source_info = multichannel_datasource.DataSourceInfo(options) instrument = data_source_info.instrument basename = '%s-%s' % (instrument[:1], options.description) +waveforms = {} # only load kafka library if triggers are transferred via kafka topic if options.use_kafka: @@ -713,9 +714,6 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): mainloop = GObject.MainLoop() pipeline = Gst.Pipeline(sys.argv[0]) - # dictionary of basis parameters keyed by ifo, rate - basis_params = {} - # generate multiple channel sources, and link up pipeline head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, channel_subset, verbose = options.verbose) src = {} @@ -728,6 +726,25 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): n_rates = int(numpy.log2(max_rate/min_rate) + 1) rates = [min_rate*2**i for i in range(n_rates)] + # choose range of basis parameters + # NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler + downsample_factor = 0.8 + flow = min_rate/4. + fhigh = max_rate/2. + qlow = 3.3166 + if data_source_info.extension == 'ini': + qhigh = data_source_info.channel_dict[channel]['qhigh'] + else: + qhigh = options.qhigh + + # generate templates + if options.waveform == 'half_sine_gaussian': + waveforms[channel] = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch=options.mismatch, downsample_factor=downsample_factor) + elif options.waveform == 'sine_gaussian': + waveforms[channel] = idq_utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch=options.mismatch, downsample_factor=downsample_factor) + else: + raise NotImplementedError + if options.latency_output: head[channel] = pipeparts.mklatency(pipeline, head[channel], name=idq_utils.latency_name('beforewhitening', 2, channel)) @@ -736,34 +753,13 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): if options.latency_output: thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterwhitening', 3, channel, rate)) - # choose range of basis parameters - # NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler - downsample_factor = 0.8 - flow = rate/4. * downsample_factor - fhigh = rate/2. * downsample_factor - qlow = 3.3166 - if data_source_info.extension == 'ini': - qhigh = data_source_info.channel_dict[channel]['qhigh'] - else: - qhigh = options.qhigh - - # generate templates - if options.waveform == 'half_sine_gaussian': - waveforms = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch) - elif options.waveform == 'sine_gaussian': - waveforms = idq_utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch) - else: - raise NotImplementedError - - basis_params[(channel, rate)] = waveforms - thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30) - # determine whether to do time-domain or frequency-domain convolution - time_domain = (waveforms.max_duration*rate**2) < (5*waveforms.max_duration*rate*numpy.log2(rate)) + time_domain = (waveforms[channel].sample_pts[rate]*rate) < (5*waveforms[channel].sample_pts[rate]*numpy.log2(rate)) # create FIR bank of half sine-gaussian templates - fir_matrix = numpy.array([waveform for waveform in waveforms.generate_templates()]) - thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = int(numpy.ceil(waveforms.latency * rate))) + fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate)]) + thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30) + thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = waveforms[channel].latency[rate]) # add queues, change stream format, add tags if options.latency_output: @@ -789,7 +785,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): # define structures to synchronize output streams and extract triggers from buffer logger.info("setting up pipeline handler...") - handler = MultiChannelHandler(mainloop, pipeline, data_source_info, options, keys = src.keys(), basis_params = basis_params, basename = basename, subset_id = subset_id) + handler = MultiChannelHandler(mainloop, pipeline, data_source_info, options, keys = src.keys(), waveforms = waveforms, basename = basename, subset_id = subset_id) logger.info("attaching appsinks to pipeline...") appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler) diff --git a/gstlal-ugly/python/idq_utils.py b/gstlal-ugly/python/idq_utils.py index e43af5a60a11ddd5d1dd5df92952873bfd37619d..99fbff65fdd497122d9c0b779da5318ec8c749fb 100644 --- a/gstlal-ugly/python/idq_utils.py +++ b/gstlal-ugly/python/idq_utils.py @@ -349,7 +349,7 @@ class HalfSineGaussianGenerator(object): for f, q, _ in self.parameter_grid[rate]: if quadrature: for phase in self.phases: - yield self.waveform(f, q, phase) + yield self.waveform(f, q, phase, rate) else: yield self.waveform(f, q, self.phases[0], rate) @@ -416,7 +416,7 @@ class SineGaussianGenerator(HalfSineGaussianGenerator): def __init__(self, f_range, q_range, rates, mismatch=0.2, tolerance=5e-3, downsample_factor=0.8): super(SineGaussianGenerator, self).__init__(f_range, q_range, rates, mismatch=mismatch, tolerance=tolerance, downsample_factor=0.8) self.times = {rate: numpy.linspace(-((self.sample_pts[rate] - 1) / 2.) / rate, ((self.sample_pts[rate] - 1) / 2.) / rate, self.sample_pts[rate], endpoint=True) for rate in self.rates} - self.latency = {rate: self.times[rate][-1] for rate in self.rates} + self.latency = {rate: int((self.sample_pts[rate] - 1) / 2) for rate in self.rates} self.filter_duration = {rate: (self.times[rate][-1] - self.times[rate][0]) for rate in self.rates} def duration(self, f, q):