diff --git a/gstlal-ugly/python/multichannel_datasource.py b/gstlal-ugly/python/multichannel_datasource.py index f3939a0405a5dd4e25deba721410fb16c44509fd..b0ac826e132ed767dc1b27d8886b402fb1e090b6 100644 --- a/gstlal-ugly/python/multichannel_datasource.py +++ b/gstlal-ugly/python/multichannel_datasource.py @@ -26,6 +26,7 @@ import sys import time +import itertools import optparse from ConfigParser import SafeConfigParser @@ -189,6 +190,67 @@ def channel_dict_from_channel_ini(options): return channel_dict + + +def partition_channels_to_equal_subsets(channel_dict, max_streams, min_sample_rate, max_sample_rate): + """! + Given a channel dictionary, will produce partitions of channel subsets where the number of channels + in each partition is equal (except possibly the last partition). This is given max_streams, + and well as max and min sample rates enforced to determine the number of streams that a particular + channel will generate. + + Returns a list of disjoint channel lists. + """ + # determine how many streams a single channel will produce when split up into multiple frequency bands + # and separate them based on this criterion + channel_streams = {} + + for channel in channel_dict.keys(): + sample_rate = int(channel_dict[channel]['fsamp']) + max_rate = min(max_sample_rate, sample_rate) + min_rate = min(min_sample_rate, max_rate) + n_rates = int(numpy.log2(max_rate/min_rate) + 1) + + channel_streams.setdefault(n_rates, []).append((n_rates, channel)) + + # find relative probabilities in each bin + total = sum((len(channel_streams[n]) for n in channel_streams.keys())) + p_relative = {n: (len(channel_streams[n]) / float(total)) for n in channel_streams.keys()} + + # figure out total number of channels needed per subset + num_streams = {n: int(numpy.ceil(p_relative[n] * max_streams)) for n in channel_streams.keys()} + num_channels = {n: int(numpy.ceil(num_streams[n] / float(n))) for n in num_streams.keys()} + + # force less sampling from the lowest bins (16Hz and 32Hz) at the beginning + # to reduce the number of channels in each subset + rates = sorted(channel_streams.keys()) + if rates[0] == 1: + num_channels[1] = 1 + max_channels = sum((num_channels[n] for n in num_channels.keys())) + + # generate a round-robin type way to sample from + rates2sample = itertools.cycle(n for n in channel_streams.keys() for i in range(int(numpy.round(p_relative[n] * max_channels)))) + + # generate channel subsets + subsets = [] + total = sum((len(channel_streams[n]) for n in channel_streams.keys())) + while total > 0: + subset = [] + while len(subset) < max_channels and total > 0: + rate = next(rates2sample) + while not channel_streams[rate]: + # recalculate probabilities and rates2sample + p_relative = {n: (len(channel_streams[n]) / float(total)) for n in channel_streams.keys()} + rates2sample = itertools.cycle(n for n in channel_streams.keys() for i in range(int(numpy.round(p_relative[n] * max_channels)))) + rate = next(rates2sample) + + subset.append(channel_streams[rate].pop()[1]) + total -= 1 + + subsets.append(subset) + + return subsets + def partition_channels_to_subsets(channel_dict, max_streams, min_sample_rate, max_sample_rate): """! Given a channel dictionary, will produce roughly equal partitions of channel subsets, given max_streams, @@ -325,8 +387,11 @@ class DataSourceInfo(object): self.max_sample_rate = options.max_sample_rate self.min_sample_rate = options.min_sample_rate - # split up channels requested into roughly equal partitions for serial processing - self.channel_subsets = partition_channels_to_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate) + # split up channels requested into partitions for serial processing + if options.equal_subsets: + self.channel_subsets = partition_channels_to_equal_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate) + else: + self.channel_subsets = partition_channels_to_subsets(self.channel_dict, self.max_streams, self.min_sample_rate, self.max_sample_rate) ## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"} self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"} @@ -419,6 +484,9 @@ def append_options(parser): Set the maximum number of streams to process at a given time (num_channels * num_rates = num_streams). Used to split up channels given into roughly equal subsets to be processed in sequence. +- --equal-subsets + If set, forces an equal number of channels processed per channel subset. + - --max-sampling-rate [int] Maximum sampling rate for a given channel. If a given channel has a higher native sampling rate, it will be downsampled to this target rate. @@ -485,6 +553,7 @@ def append_options(parser): group.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file") group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO:AUX-CHANNEL-NAME:RATE") group.add_option("--max-streams", type = "int", default = 50, help = "Maximum number of streams to process for a given pipeline at once. Used to split up channel lists into subsets that can then be processed in serial. Default = 50.") + group.add_option("--equal-subsets", action = "store_true", help = "If set, forces an equal number of channels processed per channel subset.") group.add_option("--max-sample-rate", type = "int", default = 2048, help = "Maximum sampling rate for a given channel. If a given channel has a higher native sampling rate, it will be downsampled to this target rate. Default = 2048.") group.add_option("--min-sample-rate", type = "int", default = 32, help = "Minimum sampling rate for a given channel when splitting a given channel into multiple frequency bands. If a channel has a lower sampling rate than this minimum, however, it will not be upsampled to this sampling rate. Default = 32.") group.add_option("--framexmit-addr", metavar = "name", action = "append", help = "Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")