Skip to content
Snippets Groups Projects
Commit 236d079f authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg + multichannel_datasource.py: added ability to process channel list...

gstlal_etg + multichannel_datasource.py: added ability to process channel list subsets serially, with subsets determined by max_streams option specified, changed function arg for mkbasicmultisrc, moved max_sample_rate and min_sample_rate to command line option rather than having it hardcoded, fixed issue with AppSync incorrectly retaining sink_dict property when instantiated multiple times
parent 32d3d5a8
No related branches found
No related tags found
No related merge requests found
......@@ -112,6 +112,9 @@ class MultiChannelHandler(simplehandler.Handler):
self.keys = kwargs.pop("keys")
self.frame_segments = kwargs.pop("frame_segments")
# format id for aesthetics
self.job_id = str(kwargs.pop("job_id")).zfill(4)
### iDQ saving properties
self.last_save_time = None
self.cadence = options.cadence
......@@ -131,7 +134,7 @@ class MultiChannelHandler(simplehandler.Handler):
duration = int(options.gps_end_time) - int(options.gps_start_time)
self.fname = '%s-%d-%d' % (self.tag, options.gps_start_time, duration)
trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.fname.split("-")[2])[:5], self.tag+"-"+options.job_id)
trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.fname.split("-")[2])[:5], self.tag+"-"+self.job_id)
self.fpath = os.path.join(os.path.abspath(self.out_path), trigger_path)
self.tmp_path = os.path.join(tmp_dir, trigger_path)
......@@ -359,10 +362,13 @@ class MultiChannelHandler(simplehandler.Handler):
return bottle.HTTPResponse(status = status, headers = header, body = body)
class LinkedAppSync(pipeparts.AppSync):
def __init__(self, appsink_new_buffer, sink_dict = {}):
super(LinkedAppSync, self).__init__(appsink_new_buffer, sink_dict.keys())
self.sink_dict = sink_dict
def __init__(self, appsink_new_buffer, sink_dict = None):
self.time_ordering = 'full'
if sink_dict:
self.sink_dict = sink_dict
else:
self.sink_dict = {}
super(LinkedAppSync, self).__init__(appsink_new_buffer, self.sink_dict.keys())
def attach(self, appsink):
"""
......@@ -520,9 +526,15 @@ options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
instrument = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
# set up logger
# only load kafka library if triggers are transferred via kafka topic
if options.use_kafka:
from confluent_kafka import Producer
#
# set up logging
#
duration = int(options.gps_end_time) - int(options.gps_start_time)
logdir = os.path.join(options.out_path, 'logs', options.job_id)
aggregator.makedir(logdir)
......@@ -530,184 +542,197 @@ aggregator.makedir(logdir)
logger = idq_utils.get_logger('gstlal-etg_%d-%d' % (options.gps_start_time, duration), rootdir=logdir, verbose=options.verbose)
logger.info("writing log to %s" % logdir)
# dictionary of basis parameters keyed by ifo, rate
basis_params = {}
# only load kafka library if triggers are transferred via kafka topic
if options.use_kafka:
from confluent_kafka import Producer
#
# if web services serving up bottle routes are enabled,
# create a new, empty, Bottle application and make it the
# current default, then start http server to serve it up
# process channel subsets in serial
#
if not options.disable_web_service:
bottle.default_app.push()
# uncomment the next line to show tracebacks when something fails
# in the web server
#bottle.app().catchall = False
import base64, uuid # FIXME: don't import when the uniquification scheme is fixed
httpservers = httpinterface.HTTPServers(
# FIXME: either switch to using avahi's native name
# uniquification system or adopt a naturally unique naming
# scheme (e.g., include a search identifier and job
# number).
service_name = "gstlal_idq (%s)" % base64.urlsafe_b64encode(uuid.uuid4().bytes),
service_properties = {},
verbose = options.verbose
)
# Set up a registry of the resources that this job provides
@bottle.route("/")
@bottle.route("/index.html")
def index(channel_list = channels):
# get the host and port to report in the links from the
# request we've received so that the URLs contain the IP
# address by which client has contacted us
netloc = bottle.request.urlparts[1]
server_address = "http://%s" % netloc
yield "<html><body>\n<h3>%s %s</h3>\n<p>\n" % (netloc, " ".join(sorted(channel_list)))
for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
# don't create links back to this page
if route.rule in ("/", "/index.html"):
continue
# only create links for GET methods
if route.method != "GET":
continue
yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
yield "</p>\n</body></html>"
# FIXME: get service-discovery working, then don't do this
open("registry.txt", "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
#
# building the event loop and pipeline
#
logger.info("assembling pipeline...")
# format subset_id for aesthetics
#subset_id = str(subset_id).zfill(4)
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
# generate multiple channel sources, and link up pipeline
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = options.verbose)
src = {}
#
# if web services serving up bottle routes are enabled,
# create a new, empty, Bottle application and make it the
# current default, then start http server to serve it up
#
for channel in channels:
# define sampling rates used
samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
max_samp_rate = min(2048, samp_rate)
min_samp_rate = min(32, max_samp_rate)
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
rates = [min_samp_rate*2**i for i in range(n_rates)]
if not options.disable_web_service:
bottle.default_app.push()
# uncomment the next line to show tracebacks when something fails
# in the web server
#bottle.app().catchall = False
import base64, uuid # FIXME: don't import when the uniquification scheme is fixed
httpservers = httpinterface.HTTPServers(
# FIXME: either switch to using avahi's native name
# uniquification system or adopt a naturally unique naming
# scheme (e.g., include a search identifier and job
# number).
service_name = "gstlal_idq (%s)" % base64.urlsafe_b64encode(uuid.uuid4().bytes),
service_properties = {},
verbose = options.verbose
)
# Set up a registry of the resources that this job provides
@bottle.route("/")
@bottle.route("/index.html")
def index(channel_list = channel_subset):
# get the host and port to report in the links from the
# request we've received so that the URLs contain the IP
# address by which client has contacted us
netloc = bottle.request.urlparts[1]
server_address = "http://%s" % netloc
yield "<html><body>\n<h3>%s %s</h3>\n<p>\n" % (netloc, " ".join(sorted(channel_list)))
for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
# don't create links back to this page
if route.rule in ("/", "/index.html"):
continue
# only create links for GET methods
if route.method != "GET":
continue
yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
yield "</p>\n</body></html>"
# FIXME: get service-discovery working, then don't do this
open("registry.txt", "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
if options.latency_output:
head[channel] = pipeparts.mklatency(pipeline, head[channel], name=idq_utils.latency_name('beforewhitening', 2, channel))
#
# building the event loop and pipeline
#
# whiten auxiliary channel data
for rate, thishead in idq_multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32).items():
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterwhitening', 3, channel, rate))
# choose range of basis parameters for sine-gaussians
# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
downsample_factor = 0.8
flow = rate/4. * downsample_factor
fhigh = rate/2. * downsample_factor
qlow = 4
if data_source_info.extension == 'ini':
qhigh = data_source_info.channel_dict[channel]['qhigh']
else:
qhigh = options.qhigh
logger.info("assembling pipeline...")
# generate half sine-gaussian templates
hsg_waveforms = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch)
basis_params[(channel, rate)] = hsg_waveforms.parameter_grid
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(sys.argv[0])
# determine whether to do time-domain or frequency-domain convolution
time_domain = (hsg_waveforms.max_duration*rate**2) < (5*hsg_waveforms.max_duration*rate*numpy.log2(rate))
# dictionary of basis parameters keyed by ifo, rate
basis_params = {}
# create FIR bank of half sine-gaussian templates
fir_matrix = numpy.array([waveform for waveform in hsg_waveforms.generate_templates()])
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = 0)
# generate multiple channel sources, and link up pipeline
head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, channel_subset, verbose = options.verbose)
src = {}
for channel in channel_subset:
# define sampling rates used
samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
max_rate = min(data_source_info.max_sample_rate, samp_rate)
min_rate = min(data_source_info.min_sample_rate, max_rate)
n_rates = int(numpy.log2(max_rate/min_rate) + 1)
rates = [min_rate*2**i for i in range(n_rates)]
# add queues, change stream format, add tags
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterFIRbank', 4, channel, rate))
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
thishead = pipeparts.mktogglecomplex(pipeline, thishead)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % rate)
thishead = pipeparts.mktaginject(pipeline, thishead, "instrument=%s,channel-name=%s" %( instrument, channel))
# extract features from time series
thishead = pipeparts.mktrigger(pipeline, thishead, rate, max_snr = True)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('aftertrigger', 5, channel, rate))
# link to src for processing by appsync
src[(channel, rate)] = thishead
# define structures to synchronize output streams and extract triggers from buffer
logger.info("attaching appsinks to pipeline...")
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, description = options.description, out_path = options.out_path, instrument = instrument, keys = src.keys(), frame_segments = data_source_info.frame_segments)
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
appsinks = set(appsync.add_sink(pipeline, src[(channel, rate)], name = "sink_%s_%s" % (rate, channel)) for channel, rate in src.keys())
# Allow Ctrl+C or sig term to gracefully shut down the program for online
# sources, otherwise it will just kill it
if data_source_info.data_source in ("lvshm", "framexmit"):# what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
# Seek
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if data_source_info.data_source not in ("lvshm", "framexmit"):# what about nds online?
datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)
#
# Run pipeline
#
head[channel] = pipeparts.mklatency(pipeline, head[channel], name=idq_utils.latency_name('beforewhitening', 2, channel))
# whiten auxiliary channel data
for rate, thishead in idq_multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32).items():
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterwhitening', 3, channel, rate))
# choose range of basis parameters for sine-gaussians
# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
downsample_factor = 0.8
flow = rate/4. * downsample_factor
fhigh = rate/2. * downsample_factor
qlow = 4
if data_source_info.extension == 'ini':
qhigh = data_source_info.channel_dict[channel]['qhigh']
else:
qhigh = options.qhigh
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
# generate half sine-gaussian templates
hsg_waveforms = idq_utils.HalfSineGaussianGenerator((flow, fhigh), (qlow, qhigh), rate, mismatch = options.mismatch)
basis_params[(channel, rate)] = hsg_waveforms.parameter_grid
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
logger.info("running pipeline...")
# determine whether to do time-domain or frequency-domain convolution
time_domain = (hsg_waveforms.max_duration*rate**2) < (5*hsg_waveforms.max_duration*rate*numpy.log2(rate))
mainloop.run()
# create FIR bank of half sine-gaussian templates
fir_matrix = numpy.array([waveform for waveform in hsg_waveforms.generate_templates()])
thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = 0)
# save remaining triggers
if options.save_hdf:
handler.to_hdf_file()
handler.finish_hdf_file()
else:
handler.to_trigger_file()
# add queues, change stream format, add tags
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('afterFIRbank', 4, channel, rate))
thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
thishead = pipeparts.mktogglecomplex(pipeline, thishead)
thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % rate)
thishead = pipeparts.mktaginject(pipeline, thishead, "instrument=%s,channel-name=%s" %( instrument, channel))
# extract features from time series
thishead = pipeparts.mktrigger(pipeline, thishead, rate, max_snr = True)
if options.latency_output:
thishead = pipeparts.mklatency(pipeline, thishead, name=idq_utils.latency_name('aftertrigger', 5, channel, rate))
#
# Shut down pipeline
#
# link to src for processing by appsync
src[(channel, rate)] = thishead
logger.info("shutting down pipeline...")
# define structures to synchronize output streams and extract triggers from buffer
#
# Shutdown the web interface servers and garbage collect the Bottle
# app. This should release the references the Bottle app's routes
# hold to the pipeline's data (like template banks and so on).
#
logger.info("setting up pipeline handler...")
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, description = options.description, out_path = options.out_path, instrument = instrument, keys = src.keys(), frame_segments = data_source_info.frame_segments, job_id = subset_id)
if not options.disable_web_service:
del httpservers
bottle.default_app.pop()
logger.info("attaching appsinks to pipeline...")
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
appsinks = set(appsync.add_sink(pipeline, src[(channel, rate)], name = "sink_%s_%s" % (rate, channel)) for channel, rate in src.keys())
logger.info("attached %d appsinks to pipeline." % len(appsinks))
#
# Set pipeline state to NULL and garbage collect the handler
#
# Allow Ctrl+C or sig term to gracefully shut down the program for online
# sources, otherwise it will just kill it
if data_source_info.data_source in ("lvshm", "framexmit"):# what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
# Seek
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if data_source_info.data_source not in ("lvshm", "framexmit"):# what about nds online?
datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)
#
# Run pipeline
#
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
logger.info("running pipeline...")
mainloop.run()
# save remaining triggers
if options.save_hdf:
handler.to_hdf_file()
handler.finish_hdf_file()
else:
handler.to_trigger_file()
#
# Shut down pipeline
#
logger.info("shutting down pipeline...")
#
# Shutdown the web interface servers and garbage collect the Bottle
# app. This should release the references the Bottle app's routes
# hold to the pipeline's data (like template banks and so on).
#
if not options.disable_web_service:
del httpservers
bottle.default_app.pop()
#
# Set pipeline state to NULL and garbage collect the handler
#
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
del handler.pipeline
del handler
del handler.pipeline
del handler
#
# close program manually if data source is live
......
......@@ -44,6 +44,7 @@ from glue import segments
import lal
from lal import LIGOTimeGPS
import numpy
## framexmit ports in use on the LDG
# Look-up table to map instrument name to framexmit multicast address and
......@@ -188,6 +189,75 @@ def channel_dict_from_channel_ini(options):
return channel_dict
def partition_channels_to_subsets(channel_dict, max_streams, min_sample_rate, max_sample_rate):
"""!
Given a channel dictionary, will produce roughly equal partitions of channel subsets, given max_streams,
and well as max and min sample rates enforced to determine the number of streams that a particular
channel will generate.
Returns a list of disjoint channel lists.
"""
# determine how many streams a single channel will produce when split up into multiple frequency bands
channel_streams = []
for channel in channel_dict.keys():
sample_rate = int(channel_dict[channel]['fsamp'])
max_rate = min(max_sample_rate, sample_rate)
min_rate = min(min_sample_rate, max_rate)
n_rates = int(numpy.log2(max_rate/min_rate) + 1)
channel_streams.append((n_rates, channel))
return [subset for subset in partition_list(channel_streams, max_streams)]
def partition_list(lst, target_sum):
"""!
Partition list to roughly equal partitioned chunks based on a target sum,
given a list with items in the form (int, value), where ints are used to determine partitions.
Returns a sublist with items value.
"""
total_sum = sum(item[0] for item in lst)
chunks = numpy.ceil(total_sum/float(target_sum))
avg_sum = total_sum/float(chunks)
chunks_yielded = 0
chunk = []
chunksum = 0
sum_of_seen = 0
for i, item in enumerate(lst):
# if only one chunk left to process, yield rest of list
if chunks - chunks_yielded == 1:
yield chunk + [x[1] for x in lst[i:]]
raise StopIteration
to_yield = chunks - chunks_yielded
chunks_left = len(lst) - i
# yield remaining list in single item chunks
if to_yield > chunks_left:
if chunk:
yield chunk
for x in lst[i:]:
yield [x[1]]
raise StopIteration
sum_of_seen += item[0]
# if target sum is less than the average, add another item to chunk
if chunksum < avg_sum:
chunk.append(item[1])
chunksum += item[0]
# else, yield the chunk, and update expected sum since this chunk isn't perfectly partitioned
else:
yield chunk
avg_sum = (total_sum - sum_of_seen)/(to_yield - 1)
chunks_yielded += 1
chunksum = item[0]
chunk = [item[1]]
class DataSourceInfo(object):
"""!
......@@ -233,7 +303,7 @@ class DataSourceInfo(object):
for fidelity in options.fidelity_exclude:
assert fidelity in self.known_fidelity, '--fidelity-exclude=%s is not understood. Must be one of %s'%(fidelity, ", ".join(self.known_fidelity))
# dictionary of the requested channels, e.g., {"H1": {"LDAS-STRAIN": 16384}, "L1": {"LDAS-STRAIN": 16384}}
# dictionary of the requested channels, e.g., {"H1:LDAS-STRAIN": 16384, "H1:ODC-LARM": 2048}
if options.channel_list:
name, self.extension = options.channel_list.rsplit('.', 1)
if self.extension == 'ini':
......@@ -244,9 +314,20 @@ class DataSourceInfo(object):
self.extension = 'none'
self.channel_dict = channel_dict_from_channel_list(options.channel_name)
# set instrument; it is assumed all channels from a given channel list are from the same instrument
# set instrument; it is assumed all channels from a given channel list are from the same instrument
self.instrument = self.channel_dict[next(iter(self.channel_dict))]['ifo']
# set the maximum number of streams to be run by a single pipeline.
self.max_streams = options.max_streams
# set the frequency ranges considered by channels with splitting into multiple frequency bands.
# If channel sampling rate doesn't fall within this range, it will not be split into multiple bands.
self.max_sample_rate = options.max_sample_rate
self.min_sample_rate = options.min_sample_rate
# split up channels requested into roughly equal partitions for serial processing
self.channel_subsets = partition_channels_to_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate)
## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
if options.shared_memory_partition is not None:
......@@ -336,6 +417,18 @@ def append_options(parser):
Set the name of the channels to process.
Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE
- --max-streams [int]
Set the maximum number of streams to process at a given time (num_channels * num_rates = num_streams).
Used to split up channels given into roughly equal subsets to be processed in sequence.
- --max-sampling-rate [int]
Maximum sampling rate for a given channel.
If a given channel has a higher native sampling rate, it will be downsampled to this target rate.
- --min-sampling-rate [int]
Minimum sampling rate for a given channel when splitting a given channel into multiple frequency bands.
If a channel has a lower sampling rate than this minimum, however, it will not be upsampled to this sampling rate.
- --framexmit-addr [string]
Set the address of the framexmit service. Can be given
multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
......@@ -393,6 +486,9 @@ def append_options(parser):
group.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). This is required iff --data-source=frames")
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE")
group.add_option("--max-streams", type = "int", default = 50, help = "Maximum number of streams to process for a given pipeline at once. Used to split up channel lists into subsets that can then be processed in serial. Default = 50.")
group.add_option("--max-sample-rate", type = "int", default = 2048, help = "Maximum sampling rate for a given channel. If a given channel has a higher native sampling rate, it will be downsampled to this target rate. Default = 2048.")
group.add_option("--min-sample-rate", type = "int", default = 32, help = "Minimum sampling rate for a given channel when splitting a given channel into multiple frequency bands. If a channel has a lower sampling rate than this minimum, however, it will not be upsampled to this sampling rate. Default = 32.")
group.add_option("--framexmit-addr", metavar = "name", action = "append", help = "Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
group.add_option("--framexmit-iface", metavar = "name", help = "Set the multicast interface address of the framexmit service.")
group.add_option("--shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
......@@ -452,7 +548,7 @@ def append_options(parser):
# @enddot
#
#
def mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = False):
def mkbasicmultisrc(pipeline, data_source_info, channels, verbose = False):
"""!
All the things for reading real or simulated channel data in one place.
......@@ -460,29 +556,29 @@ def mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = False):
This src in general supports only one instrument although
DataSourceInfo contains dictionaries of multi-instrument things. By
specifying the instrument when calling this function you will get ony a single
instrument source. A code wishing to have multiple basicsrcs will need to call
this function for each instrument.
specifying the channels when calling this function you will only process
the channels specified. A code wishing to have multiple basicmultisrcs
will need to call this multiple times with different sets of channels specified.
"""
if data_source_info.data_source == "white":
head = {channel : pipeparts.mkfakesrc(pipeline, instrument = instrument, channel_name = channel, volume = 1.0, rate = data_source_info.channel_dict[channel]['fsamp']) for channel in data_source_info.channel_dict.keys()}
head = {channel : pipeparts.mkfakesrc(pipeline, instrument = data_source_info.instrument, channel_name = channel, volume = 1.0, rate = data_source_info.channel_dict[channel]['fsamp']) for channel in channels}
elif data_source_info.data_source == "silence":
head = {channel : pipeparts.mkfakesrc(pipeline, instrument = instrument, channel_name = channel, wave = 4) for channel in data_source_info.channel_dict.keys()}
head = {channel : pipeparts.mkfakesrc(pipeline, instrument = data_source_info.instrument, channel_name = channel, wave = 4) for channel in channels}
elif data_source_info.data_source == "frames":
src = pipeparts.mklalcachesrc(pipeline, location = data_source_info.frame_cache, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True, channel_list = data_source_info.channel_dict.keys())
src = pipeparts.mklalcachesrc(pipeline, location = data_source_info.frame_cache, cache_src_regex = data_source_info.instrument[0], cache_dsc_regex = data_source_info.instrument)
demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True, channel_list = channels)
# allow frame reading and decoding to occur in a different
# thread
head = dict.fromkeys(data_source_info.channel_dict.keys(), None)
head = dict.fromkeys(channels, None)
for channel in head:
head[channel] = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * Gst.SECOND)
pipeparts.src_deferred_link(demux, channel, head[channel].get_static_pad("sink"))
if data_source_info.frame_segments[instrument] is not None:
if not data_source_info.frame_segments[data_source_info.instrument]:
# FIXME: make segmentsrc generate segment samples at the channel sample rate?
# FIXME: make gate leaky when I'm certain that will work.
head[channel] = pipeparts.mkgate(pipeline, head[channel], threshold = 1, control = pipeparts.mksegmentsrc(pipeline, data_source_info.frame_segments[instrument]), name = "%s_frame_segments_gate" % channel)
pipeparts.framecpp_channeldemux_check_segments.set_probe(head[channel].get_static_pad("src"), data_source_info.frame_segments[instrument])
head[channel] = pipeparts.mkgate(pipeline, head[channel], threshold = 1, control = pipeparts.mksegmentsrc(pipeline, data_source_info.frame_segments[data_source_info.instrument]), name = "%s_frame_segments_gate" % channel)
pipeparts.framecpp_channeldemux_check_segments.set_probe(head[channel].get_static_pad("src"), data_source_info.frame_segments[data_source_info.instrument])
# fill in holes, skip duplicate data
head[channel] = pipeparts.mkaudiorate(pipeline, head[channel], skip_to_first = True, silent = False)
......@@ -490,17 +586,17 @@ def mkbasicmultisrc(pipeline, data_source_info, instrument, verbose = False):
elif data_source_info.data_source in ("framexmit", "lvshm"):
if data_source_info.data_source == "lvshm":
# FIXME make wait_time adjustable through web interface or command line or both
src = pipeparts.mklvshmsrc(pipeline, shm_name = data_source_info.shm_part_dict[instrument], num_buffers = 64, blocksize = 10000000, wait_time = 120)
src = pipeparts.mklvshmsrc(pipeline, shm_name = data_source_info.shm_part_dict[data_source_info.instrument], num_buffers = 64, blocksize = 10000000, wait_time = 120)
elif data_source_info.data_source == "framexmit":
src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = data_source_info.framexmit_iface, multicast_group = data_source_info.framexmit_addr[instrument][0], port = data_source_info.framexmit_addr[instrument][1], wait_time = 120)
src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = data_source_info.framexmit_iface, multicast_group = data_source_info.framexmit_addr[data_source_info.instrument][0], port = data_source_info.framexmit_addr[data_source_info.instrument][1], wait_time = 120)
else:
# impossible code path
raise ValueError(data_source_info.data_source)
demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True, channel_list = data_source_info.channel_dict.keys())
demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = False, skip_bad_files = True, channel_list = channels)
# channels
head = dict.fromkeys(data_source_info.channel_dict.keys(), None)
head = dict.fromkeys(channels, None)
for channel in head:
head[channel] = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND* 60 * 1) # 1 minute of buffering
pipeparts.src_deferred_link(demux, channel, head[channel].get_static_pad("sink"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment