Skip to content
Snippets Groups Projects
Commit 8c720375 authored by Madeline Wade's avatar Madeline Wade
Browse files

Addition of element to create arbitrary numpy function based on sink timestamps and duration.

parent 7c7708d1
No related branches found
No related tags found
No related merge requests found
......@@ -450,18 +450,17 @@ ctrl = pipeparts.mkcapsfilter(pipeline, ctrl, "audio/x-raw-float, rate=%d" % sr)
# Compute the gamma factors, if applicable
if not options.no_gamma:
exctee = pipeparts.mktee(pipeline, exc)
deltat = 1.0 / float(sr)
coshead = cos = pipeparts.mkgeneric(pipeline, None, "lal_numpy_functiongenerator", expression = "%f * cos(2.0 * 3.1415926535897931 * %f * t)" % (deltat, cal_line_freq), blocksize = sr * 4)
cos = pipeparts.mkcapsfilter(pipeline, cos, "audio/x-raw-float, width=64, rate=%d" % sr)
sinhead = sin = pipeparts.mkgeneric(pipeline, None, "lal_numpy_functiongenerator", expression = "-1.0 * %f * sin(2.0 * 3.1415926535897931 * %f * t)" % (deltat, cal_line_freq), blocksize = sr * 4)
sin = pipeparts.mkcapsfilter(pipeline, sin, "audio/x-raw-float, width=64, rate=%d" % sr)
cos = pipeparts.mkgeneric(pipeline, pipeparts.mkqueue(pipeline, exctee, max_size_time = gst.SECOND * 100), "lal_numpy_fx_transform", expression = "%f * cos(2.0 * 3.1415926535897931 * %f * t)" % (deltat, cal_line_freq))
sin = pipeparts.mkgeneric(pipeline, pipeparts.mkqueue(pipeline, exctee, max_size_time = gst.SECOND * 100), "lal_numpy_fx_transform", expression = "-1.0 * %f * sin(2.0 * 3.1415926535897931 * %f * t)" % (deltat, cal_line_freq))
if options.full_calibration:
ctrl_for_gamma = pipeparts.mkqueue(pipeline, ctrltee, max_size_time = gst.SECOND * 100)
elif options.partial_calibration:
ctrl_for_gamma = pipeparts.mkqueue(pipeline, darm_ctrl, max_size_time = gst.SECOND * 100)
compute_gamma_bin = pipeparts.mkcomputegamma(pipeline, ctrl_for_gamma, pipeparts.mkqueue(pipeline, exc, max_size_time = gst.SECOND * 100), pipeparts.mkqueue(pipeline, cos, max_size_time = gst.SECOND * 100), pipeparts.mkqueue(pipeline, sin, max_size_time = gst.SECOND * 100), olgI = olgI, olgR = olgR, sr = options.compute_gamma_sr, time_domain = True, wI = wI, wR = wR, wmod = wmod, olgmod = olgmod)
compute_gamma_bin = pipeparts.mkcomputegamma(pipeline, ctrl_for_gamma, pipeparts.mkqueue(pipeline, exctee, max_size_time = gst.SECOND * 100), pipeparts.mkqueue(pipeline, cos, max_size_time = gst.SECOND * 100), pipeparts.mkqueue(pipeline, sin, max_size_time = gst.SECOND * 100), olgI = olgI, olgR = olgR, sr = options.compute_gamma_sr, time_domain = True, wI = wI, wR = wR, wmod = wmod, olgmod = olgmod)
gammaR = compute_gamma_bin.get_pad("gammaR")
gammaR = pipeparts.mkaudioconvert(pipeline, gammaR)
gammaR = pipeparts.mkcapsfilter(pipeline, gammaR, "audio/x-raw-float, width=64, rate=%d" % options.compute_gamma_sr)
......@@ -753,25 +752,9 @@ if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" %(options.write_pipeline, "NULL"), verbose = options.verbose)
# Seek the pipeline when necessary. Note: The seekevent for frames is set above when command line is being parsed/sanity checked.
# FIXME: This scheme assumes that the shared memory partitions are keeping up with real time when the data source is lvshm. This is probably not a good assumption outside of the DMT environment.
# FIXME: This also leads to differences between online vs. offline running, since the sine/cosine waves initiate at slightly different GPS times. Maybe this is a non-issue if \gamma(t) will only be calculated in offline use.
if options.data_source == "lvshm" and not options.no_gamma:
seekevent = gst.event_new_seek(1., gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_KEY_UNIT, gst.SEEK_TYPE_SET, NOW, gst.SEEK_TYPE_SET, -1)
if options.data_source == "frames":
datasource.do_seek(pipeline, seekevent)
print >>sys.stderr, "seeking GPS start and stop times ..."
if not options.no_gamma:
if coshead.set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError("Element %s did not want to enter ready state" % coshead.get_name())
if not coshead.send_event(seekevent):
raise RuntimeError("Element %s did not handle seek event" % coshead.get_name())
if sinhead.set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
raise RuntimeError("Element %s did not want to enter ready state" % sinhead.get_name())
if not sinhead.send_event(seekevent):
raise RuntimeError("Element %s did not handle seek event" % sinhead.get_name())
if options.verbose:
print >>sys.stderr, "setting pipeline state to playing ..."
......
......@@ -7,4 +7,5 @@ pkgpython_PYTHON = \
lal_fixodc.py \
lal_logical_undersampler.py \
lal_compute_gamma.py \
lal_check_calib_factors.py
lal_check_calib_factors.py \
lal_numpy_fx_transform.py
# Copyright (C) 2015 Madeline Wade
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
Arbitrary function generator based on sink pad timestamps and duration.
Accepts any Python expression. The "numpy" module is available as if you
typed "from numpy import *". The local variable "t" provides the stream
time in seconds.
"""
__author__ = "Madeline Wade <madeline.wade@ligo.org>"
import numpy
import gst
import sys
import gobject
from gstlal import pipeio
from gstlal.pipeutil import *
#
# =============================================================================
#
# Functions
#
# =============================================================================
#
def create_expression(inbuf, outbuf, caps, expression):
rate = caps[0]["rate"]
dt = 1.0/float(rate)
t_start = float(inbuf.timestamp) / float(gst.SECOND)
print t_start
dur = float(inbuf.duration) / float(gst.SECOND)
print dur
t_end = t_start + dur
t = numpy.arange(t_start, t_end, dt)
y = eval(expression, numpy.__dict__, {'t': t})
print len(y)
print len(t)
unitsize = pipeio.get_unit_size(caps)
bufsize = unitsize * len(t)
outbuf[0:bufsize] = y.flatten().astype(pipeio.numpy_dtype_from_caps(caps)).data
#
# =============================================================================
#
# Element
#
# =============================================================================
#
class lal_numpy_fx_transform(gst.BaseTransform):
__gstdetails__ = (
"Arbitrary function generator from sink timestamps and duration",
"Filter/Audio",
__doc__,
__author__
)
__gproperties__ = {
'expression': (
gobject.TYPE_STRING,
'Expression',
'any Python expression, to be evaluated under "from numpy import *"',
'0 * t',
gobject.PARAM_READWRITE | gobject.PARAM_CONSTRUCT
)
}
__gsttemplates__ = (
gst.PadTemplate("sink",
gst.PAD_SINK,
gst.PAD_ALWAYS,
gst.caps_from_string(
"audio/x-raw-float, " +
"rate = (int) [1, MAX], " +
"channels = (int) 1, " +
"endianness = (int) BYTE_ORDER, " +
"width = (int) 64"
)
),
gst.PadTemplate("src",
gst.PAD_SRC,
gst.PAD_ALWAYS,
gst.caps_from_string(
"audio/x-raw-float, " +
"rate = (int) [1, MAX], " +
"channels = (int) 1, " +
"endianness = (int) BYTE_ORDER, " +
"width = (int) 64"
)
)
)
def __init__(self):
super(lal_numpy_fx_transform, self).__init__()
self.set_gap_aware(True)
def do_set_property(self, prop, val):
if prop.name == "expression":
self.__compiled_expression = compile(val, "<compiled Python expression>", "eval")
self.__expression = val
def do_get_property(self, prop):
if prop.name == "expression":
return self.__expression
def do_transform(self, inbuf, outbuf):
pad = self.src_pads().next()
caps = pad.get_caps()
# FIXME: I'm not sure this is the right fix for hearbeat buffers, so I need to check this!
if len(inbuf) == 0:
gst.Buffer.flag_set(inbuf, gst.BUFFER_FLAG_GAP)
# Process buffer
if not gst.Buffer.flag_is_set(inbuf, gst.BUFFER_FLAG_GAP):
# Input is not 0s
create_expression(inbuf, outbuf, caps, self.__expression)
else:
# Input is 0s
gst.Buffer.flag_set(outbuf, gst.BUFFER_FLAG_GAP)
return gst.FLOW_OK
gobject.type_register(lal_numpy_fx_transform)
__gstelementfactory__ = (
lal_numpy_fx_transform.__name__,
gst.RANK_NONE,
lal_numpy_fx_transform
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment