diff --git a/gstlal-ugly/bin/gstlal_feature_extractor b/gstlal-ugly/bin/gstlal_feature_extractor index 9de708a40d8340e5a905837ec5238762c40c180f..b506d89b47dc385c2fa4fd95a3ff7a7bb967428f 100755 --- a/gstlal-ugly/bin/gstlal_feature_extractor +++ b/gstlal-ugly/bin/gstlal_feature_extractor @@ -128,6 +128,7 @@ ### from collections import deque, namedtuple +import itertools import json import math from optparse import OptionParser, OptionGroup @@ -354,7 +355,13 @@ class MultiChannelHandler(simplehandler.Handler): # add features to respective format specified if options.save_format == 'kafka': - self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset)) + if options.data_transfer == 'table': + self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset)) + elif options.data_transfer == 'row': + for row in itertools.chain(*feature_subset['features'].values()): + if row: + self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(row)) + self.producer.poll(0) ### flush out queue of sent packets elif options.save_format == 'bottle': self.feature_data.append(feature_subset) elif options.save_format == 'hdf5': @@ -648,6 +655,7 @@ def parse_command_line(): group.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .") group.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Set the filename description in which to save the output.") group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii/hdf5/kafka/bottle) of features written to disk. Default = hdf5") + group.add_option("--data-transfer", metavar = "string", default = "table", help = "Specifies the format of features transferred over-the-wire (table/row). Default = table") group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.") group.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 200 seconds.") parser.add_option_group(group)