Skip to content
Snippets Groups Projects
Commit 7b3b5070 authored by Laleh Sadeghian's avatar Laleh Sadeghian
Browse files

adding gstreamer python script to add injections to the broadcasting data

parent 380c30dc
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#
# Copyright (C) 2014 Laleh Sadeghian
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from optparse import OptionParser
import os
import numpy
import sys
import pygtk
pygtk.require("2.0")
import gobject
gobject.threads_init()
import pygst
pygst.require("0.10")
# This mess is to make gstreamer stop eating our help messages.
if "--help" in sys.argv or "-h" in sys.argv:
try:
del sys.argv[ sys.argv.index( "--help" ) ]
except ValueError:
pass
try:
del sys.argv[ sys.argv.index( "-h" ) ]
except ValueError:
pass
import gst
sys.argv.append( "--help" )
else:
import gst
from gstlal import pipeparts
from gstlal import pipeutil
from gstlal import simplehandler
usage = """
This help mssage help you to get started. Examples of given options is as the following:
For LHO:
./gstlal_inj_frames_shm_optparse --channel H1:GDS-FAKE_STRAIN --shared-memory-read LHO_Data --injections-file ER6_BNS.xml --shared-memory-write LHO_Data_Inj --channel-inj H1:GDS-FAKE_STRAIN_INJ --dqv-channel H1:GDS-CALIB_STATE_VECTOR
For LLO:
./gstlal_inj_frames_shm_optparse --channel L1:GDS-CALIB_STRAIN --shared-memory-read LLO_Data --injections-file ER6_BNS.xml --shared-memory-write LLO_Data_Inj --channel-inj L1:GDS-CALIB_STRAIN_INJ --dqv-channel L1:GDS-CALIB_STATE_VECTOR
For Virgo:
./gstlal_inj_frames_shm_optparse --channel V1:FAKE_h_16384Hz_4R --shared-memory-read VIRGO_Data --injections-file ER6_BNS.xml --shared-memory-write VIRGO_Data_Inj --channel-inj V1:FAKE_h_16384Hz_4R_INJ --dqv-channel V1:LLD-DQ_VECTOR
"""
def write_graph(demux):
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = True)
parser = OptionParser( usage = usage, description = __doc__ )
def parse_command_line():
#parser = OptionParser( usage = usage, description = __doc__ )
parser.add_option("--shared-memory-read", default = None, type = "string", help = "Give the shared memory section name to read the frames from")
parser.add_option("--shared-memory-write", default = None, type = "string", help = "Give the shared memory section name to write the frames with injections into it.")
parser.add_option("--channel", default = None, type = "string", help = "Give the name of the original channel that the injetions will be injected into it.")
parser.add_option("--channel-inj", default = None, type = "string", help = "Give a new name to the channel that has the injections.")
parser.add_option("--dqv-channel", default = None, type = "string", help = "Give the name of the data quality vector channel of the of the original frame file.")
parser.add_option("--injections-file", default = None, type = "string", help = "Give the injections xml file to be injected to the data.")
parser.add_option("--num-buffers", default = 16, type = "int", help = "Give the number of buffers (optional).")
parser.add_option("--blocksize", default = 1000000, type = "int", help = "blocksize (optional)")
parser.add_option("--compression-level", default = 6, type = "int", help = "compression_level (optional)")
parser.add_option("--frames-per-file", default = 1, type = "int", help = "frames_per_file (optional)")
parser.add_option("--frame-duration", default = 4, metavar= "frame duration in seconds" , type = "int", help = "frame_duration (optional)")
options, filenames = parser.parse_args()
required_options = ["shared_memory_read", "shared_memory_write", "channel", "channel_inj", "dqv_channel", "injections_file"]
missing_options = ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
if missing_options:
raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
return options, filenames
#options, filenames = parse_command_line()
# debugging options
parser.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
options, filenames = parse_command_line()
# setup the pipeline
pipeline = gst.Pipeline(os.path.split(sys.argv[0])[1])
# main loop
mainloop = gobject.MainLoop()
# reading from shared memory
src = pipeparts.mklvshmsrc(pipeline, shm_name = options.shared_memory_read)
# demuxer
demux = src = pipeparts.mkframecppchanneldemux(pipeline, src)
if options.write_pipeline is not None:
demux.connect("no-more-pads", write_graph)
# original channel
#inj = pipeparts.mkqueue(pipeline, None)
inj = pipeparts.mkaudioconvert(pipeline, None)
pipeparts.src_deferred_link(src, options.channel, inj.get_pad("sink"))
inj = pipeparts.mkcapsfilter(pipeline, inj, "audio/x-raw-float,width=64")
inj = pipeparts.mkqueue(pipeline, inj, max_size_buffers = 0 , max_size_time = 0, max_size_bytes = 0)
channel_src_map = {}
# giving a new tag and fix the units
inst, channel = options.channel_inj.split(":")
inj = pipeparts.mktaginject(pipeline, inj, "instrument=%s,channel-name=%s,units=\"strain\"" % (inst, channel))
# adding the injections
inj = pipeparts.mkinjections(pipeline, inj, options.injections_file)
channel_src_map[options.channel_inj] = inj
# Data Quality Vector channel
dqv = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_time = 0, max_size_bytes = 0)
pipeparts.src_deferred_link(src, options.dqv_channel, dqv.get_pad("sink"))
channel_src_map[options.dqv_channel] = dqv
# muxer
# mux = pipeparts.mkframecppchannelmux(pipeline, channel_src_map, units = None, seglists = None)
# The compression level, frames_per_file and frame_duration are set when broadcasting using DMTGen
# To get these values, we have to look at the DMTGen configuration file.
# This file (DMTGen-LHO_Data.cfg in Patrick's home directory) currently (6 Aug 2014) looks like:
# Parameter Compression "zero-suppress-or-gzip"
# Parameter OutputDirectory /online/LHO_Data
# Parameter FrameLength 4
# To figure out the numerical compression level, do a "gst-inspect framecpp_channelmux"
mux = pipeparts.mkframecppchannelmux(pipeline, channel_src_map, units = None, seglists = None, compression_level=options.compression_level, frames_per_file=options.frames_per_file, frame_duration=options.frame_duration)
# writing to the shared memory
mux = pipeparts.mkprogressreport(pipeline, mux, name = "multiplexer")
# NOTE: to get the num_buffers and blocksize values, do a "smlist" on soapbox or peloton
# num_buffers = nBuf; blocksize = lBuf
# ALSO note: if they are not exactly correct, the system complains that it cannot write to
# the shared memory.
pipeparts.mkgeneric(pipeline, mux, "gds_lvshmsink", shm_name = options.shared_memory_write, num_buffers=options.num_buffers, blocksize=options.blocksize, buffer_mode=2)
if options.write_pipeline is not None and "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
pipeparts.write_dump_dot(pipeline, "%s.%s" %(options.write_pipeline, "NULL"), verbose = options.verbose)
# state playing
if pipeline.set_state(gst.STATE_PLAYING) == gst.STATE_CHANGE_FAILURE:
raise RuntimeError( "pipeline failed to enter PLAYING state" )
else:
print "set to playing successfully"
## Debugging output
if options.write_pipeline is not None and "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
pipeparts.write_dump_dot(pipeline, "%s.%s" %(options.write_pipeline, "PLAYING"), verbose = options.verbose)
handler = simplehandler.Handler(mainloop, pipeline)
print 'running mainloop...'
mainloop.run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment