Skip to content
Snippets Groups Projects
Commit 9d36eab6 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor: propagated new waveform generation changes...

gstlal_feature_extractor: propagated new waveform generation changes throughout, fixed some bugs/typos from switch causing pipeline to crash
parent a5a1a545
No related branches found
No related tags found
No related merge requests found
......@@ -115,9 +115,9 @@ class MultiChannelHandler(simplehandler.Handler):
self.instrument = data_source_info.instrument
self.frame_segments = data_source_info.frame_segments
self.keys = kwargs.pop("keys")
self.basis_params = kwargs.pop("basis_params")
self.waveforms = kwargs.pop("waveforms")
self.basename = kwargs.pop("basename")
self.waveform = options.waveform
self.waveform_type = options.waveform
# format id for aesthetics
self.job_id = str(options.job_id).zfill(4)
......@@ -286,17 +286,17 @@ class MultiChannelHandler(simplehandler.Handler):
trigger_seg = segments.segment(LIGOTimeGPS(row.end_time, row.end_time_ns), LIGOTimeGPS(row.end_time, row.end_time_ns))
if not self.frame_segments[self.instrument] or self.frame_segments[self.instrument].intersects_segment(trigger_seg):
freq, q, duration = self.basis_params[(channel, rate)].parameter_grid[row.channel_index]
filter_duration = self.basis_params[(channel, rate)].max_duration
freq, q, duration = self.waveforms[channel].parameter_grid[rate][row.channel_index]
filter_duration = self.waveforms[channel].filter_duration[rate]
filter_stop_time = row.end_time + row.end_time_ns * 1e-9
# set trigger time based on waveform
if self.waveform == 'sine_gaussian':
if self.waveform_type == 'sine_gaussian':
trigger_time = filter_stop_time - (filter_duration / 2.)
start_time = trigger_time - duration / 2.
stop_time = trigger_time + duration / 2.
elif self.waveform == 'half_sine_gaussian':
elif self.waveform_type == 'half_sine_gaussian':
trigger_time = filter_stop_time
start_time = trigger_time - duration
stop_time = trigger_time
......@@ -581,6 +581,7 @@ options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
instrument = data_source_info.instrument
basename = '%s-%s' % (instrument[:1], options.description)
waveforms = {}
# only load kafka library if triggers are transferred via kafka topic
if options.use_kafka:
......@@ -713,9 +714,6 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
# dictionary of basis parameters keyed by ifo, rate
basis_params = {}
# generate multiple channel sources, and link up pipeline
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, channel_subset, verbose = options.verbose)
src = {}
......@@ -728,6 +726,25 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
n_rates = int(numpy.log2(max_rate/min_rate) + 1)
rates = [min_rate*2**i for i in range(n_rates)]
# choose range of basis parameters
# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
downsample_factor = 0.8
flow = min_rate/4.
fhigh = max_rate/2.
qlow = 3.3166
if data_source_info.extension == 'ini':
qhigh = data_source_info.channel_dict[channel]['qhigh']
else:
qhigh = options.qhigh
# generate templates
if options.waveform == 'half_sine_gaussian':
waveforms[channel] = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch=options.mismatch, downsample_factor=downsample_factor)
elif options.waveform == 'sine_gaussian':
waveforms[channel] = idq_utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rates, mismatch=options.mismatch, downsample_factor=downsample_factor)
else:
raise NotImplementedError
if options.latency_output:
head[channel] = pipeparts.mklatency(pipeline, head[channel], name=idq_utils.latency_name('beforewhitening', 2, channel))
......@@ -736,34 +753,13 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterwhitening', 3, channel, rate))
# choose range of basis parameters
# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
downsample_factor = 0.8
flow = rate/4. * downsample_factor
fhigh = rate/2. * downsample_factor
qlow = 3.3166
if data_source_info.extension == 'ini':
qhigh = data_source_info.channel_dict[channel]['qhigh']
else:
qhigh = options.qhigh
# generate templates
if options.waveform == 'half_sine_gaussian':
waveforms = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch)
elif options.waveform == 'sine_gaussian':
waveforms = idq_utils.SineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch)
else:
raise NotImplementedError
basis_params[(channel, rate)] = waveforms
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
# determine whether to do time-domain or frequency-domain convolution
time_domain = (waveforms.max_duration*rate**2) < (5*waveforms.max_duration*rate*numpy.log2(rate))
time_domain = (waveforms[channel].sample_pts[rate]*rate) < (5*waveforms[channel].sample_pts[rate]*numpy.log2(rate))
# create FIR bank of half sine-gaussian templates
fir_matrix = numpy.array([waveform for waveform in waveforms.generate_templates()])
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = int(numpy.ceil(waveforms.latency * rate)))
fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate)])
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = waveforms[channel].latency[rate])
# add queues, change stream format, add tags
if options.latency_output:
......@@ -789,7 +785,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# define structures to synchronize output streams and extract triggers from buffer
logger.info("setting up pipeline handler...")
handler = MultiChannelHandler(mainloop, pipeline, data_source_info, options, keys = src.keys(), basis_params = basis_params, basename = basename, subset_id = subset_id)
handler = MultiChannelHandler(mainloop, pipeline, data_source_info, options, keys = src.keys(), waveforms = waveforms, basename = basename, subset_id = subset_id)
logger.info("attaching appsinks to pipeline...")
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
......
......@@ -349,7 +349,7 @@ class HalfSineGaussianGenerator(object):
for f, q, _ in self.parameter_grid[rate]:
if quadrature:
for phase in self.phases:
yield self.waveform(f, q, phase)
yield self.waveform(f, q, phase, rate)
else:
yield self.waveform(f, q, self.phases[0], rate)
......@@ -416,7 +416,7 @@ class SineGaussianGenerator(HalfSineGaussianGenerator):
def __init__(self, f_range, q_range, rates, mismatch=0.2, tolerance=5e-3, downsample_factor=0.8):
super(SineGaussianGenerator, self).__init__(f_range, q_range, rates, mismatch=mismatch, tolerance=tolerance, downsample_factor=0.8)
self.times = {rate: numpy.linspace(-((self.sample_pts[rate] - 1) / 2.) / rate, ((self.sample_pts[rate] - 1) / 2.) / rate, self.sample_pts[rate], endpoint=True) for rate in self.rates}
self.latency = {rate: self.times[rate][-1] for rate in self.rates}
self.latency = {rate: int((self.sample_pts[rate] - 1) / 2) for rate in self.rates}
self.filter_duration = {rate: (self.times[rate][-1] - self.times[rate][0]) for rate in self.rates}
def duration(self, f, q):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment