Skip to content
Snippets Groups Projects
Commit e11f5c45 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

multichannel_datasource.py: added option to force splitting of channels into...

multichannel_datasource.py: added option to force splitting of channels into subsets with an equal number of channels for serial processing
parent a3c2a9c3
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,7 @@
import sys
import time
import itertools
import optparse
from ConfigParser import SafeConfigParser
......@@ -189,6 +190,67 @@ def channel_dict_from_channel_ini(options):
return channel_dict
def partition_channels_to_equal_subsets(channel_dict, max_streams, min_sample_rate, max_sample_rate):
"""!
Given a channel dictionary, will produce partitions of channel subsets where the number of channels
in each partition is equal (except possibly the last partition). This is given max_streams,
and well as max and min sample rates enforced to determine the number of streams that a particular
channel will generate.
Returns a list of disjoint channel lists.
"""
# determine how many streams a single channel will produce when split up into multiple frequency bands
# and separate them based on this criterion
channel_streams = {}
for channel in channel_dict.keys():
sample_rate = int(channel_dict[channel]['fsamp'])
max_rate = min(max_sample_rate, sample_rate)
min_rate = min(min_sample_rate, max_rate)
n_rates = int(numpy.log2(max_rate/min_rate) + 1)
channel_streams.setdefault(n_rates, []).append((n_rates, channel))
# find relative probabilities in each bin
total = sum((len(channel_streams[n]) for n in channel_streams.keys()))
p_relative = {n: (len(channel_streams[n]) / float(total)) for n in channel_streams.keys()}
# figure out total number of channels needed per subset
num_streams = {n: int(numpy.ceil(p_relative[n] * max_streams)) for n in channel_streams.keys()}
num_channels = {n: int(numpy.ceil(num_streams[n] / float(n))) for n in num_streams.keys()}
# force less sampling from the lowest bins (16Hz and 32Hz) at the beginning
# to reduce the number of channels in each subset
rates = sorted(channel_streams.keys())
if rates[0] == 1:
num_channels[1] = 1
max_channels = sum((num_channels[n] for n in num_channels.keys()))
# generate a round-robin type way to sample from
rates2sample = itertools.cycle(n for n in channel_streams.keys() for i in range(int(numpy.round(p_relative[n] * max_channels))))
# generate channel subsets
subsets = []
total = sum((len(channel_streams[n]) for n in channel_streams.keys()))
while total > 0:
subset = []
while len(subset) < max_channels and total > 0:
rate = next(rates2sample)
while not channel_streams[rate]:
# recalculate probabilities and rates2sample
p_relative = {n: (len(channel_streams[n]) / float(total)) for n in channel_streams.keys()}
rates2sample = itertools.cycle(n for n in channel_streams.keys() for i in range(int(numpy.round(p_relative[n] * max_channels))))
rate = next(rates2sample)
subset.append(channel_streams[rate].pop()[1])
total -= 1
subsets.append(subset)
return subsets
def partition_channels_to_subsets(channel_dict, max_streams, min_sample_rate, max_sample_rate):
"""!
Given a channel dictionary, will produce roughly equal partitions of channel subsets, given max_streams,
......@@ -325,8 +387,11 @@ class DataSourceInfo(object):
self.max_sample_rate = options.max_sample_rate
self.min_sample_rate = options.min_sample_rate
# split up channels requested into roughly equal partitions for serial processing
self.channel_subsets = partition_channels_to_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate)
# split up channels requested into partitions for serial processing
if options.equal_subsets:
self.channel_subsets = partition_channels_to_equal_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate)
else:
self.channel_subsets = partition_channels_to_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate)
## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
......@@ -419,6 +484,9 @@ def append_options(parser):
Set the maximum number of streams to process at a given time (num_channels * num_rates = num_streams).
Used to split up channels given into roughly equal subsets to be processed in sequence.
- --equal-subsets
If set, forces an equal number of channels processed per channel subset.
- --max-sampling-rate [int]
Maximum sampling rate for a given channel.
If a given channel has a higher native sampling rate, it will be downsampled to this target rate.
......@@ -485,6 +553,7 @@ def append_options(parser):
group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE")
group.add_option("--max-streams", type = "int", default = 50, help = "Maximum number of streams to process for a given pipeline at once. Used to split up channel lists into subsets that can then be processed in serial. Default = 50.")
group.add_option("--equal-subsets", action = "store_true", help = "If set, forces an equal number of channels processed per channel subset.")
group.add_option("--max-sample-rate", type = "int", default = 2048, help = "Maximum sampling rate for a given channel. If a given channel has a higher native sampling rate, it will be downsampled to this target rate. Default = 2048.")
group.add_option("--min-sample-rate", type = "int", default = 32, help = "Minimum sampling rate for a given channel when splitting a given channel into multiple frequency bands. If a channel has a lower sampling rate than this minimum, however, it will not be upsampled to this sampling rate. Default = 32.")
group.add_option("--framexmit-addr", metavar = "name", action = "append", help = "Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment