group=optparse.OptionGroup(parser,"Listener Options","General settings for configuring the listener.")
group=optparse.OptionGroup(parser,"Aggregator Options","General settings for configuring the online aggregator.")
group.add_option("-v","--verbose",default=False,action="store_true",help="Print to stdout in addition to writing to automatically generated log.")
group.add_option("-v","--verbose",default=False,action="store_true",help="Print to stdout in addition to writing to automatically generated log.")
group.add_option("--log-level",type="int",default=10,help="Sets the verbosity of logging. Default = 10.")
group.add_option("--log-level",type="int",default=10,help="Sets the verbosity of logging. Default = 10.")
group.add_option("--rootdir",metavar="path",default=".",help="Location where log messages and sqlite database lives")
group.add_option("--rootdir",metavar="path",default=".",help="Location where log messages and sqlite database lives")
group.add_option("--tag",metavar="string",default="test",help="Sets the name of the tag used. Default = 'test'")
group.add_option("--tag",metavar="string",default="test",help="Sets the name of the tag used. Default = 'test'")
group.add_option("--sample-rate",type="int",metavar="Hz",default=1,help="Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--sample-rate",type="int",metavar="Hz",default=1,help="Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
group.add_option("--processing-cadence",type="float",default=0.1,help="Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
group.add_option("--processing-cadence",type="float",default=0.1,help="Rate at which the aggregator acquires and processes data. Default = 0.1 seconds.")
group.add_option("--request-timeout",type="float",default=0.2,help="Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--request-timeout",type="float",default=0.2,help="Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_option("--kafka-server",metavar="string",help="Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--kafka-server",metavar="string",help="Sets the server url that the kafka topic is hosted on. Required.")
group.add_option("--input-topic-basename",metavar="string",help="Sets the input kafka topic basename. Required.")
group.add_option("--input-topic-basename",metavar="string",help="Sets the input kafka topic basename. Required.")
...
@@ -112,18 +112,18 @@ class StreamAggregator(object):
...
@@ -112,18 +112,18 @@ class StreamAggregator(object):
### set up aggregator
### set up aggregator
logger.info("setting up aggregator with backend: %s"%options.data_backend)
logger.info("setting up aggregator with backend: %s"%options.data_backend)
ifoptions.data_backend=='influx':
ifoptions.data_backend=='influx':
self.agg_sink=io.influx.InfluxDBAggregator(
self.agg_sink=io.influx.InfluxDBAggregator(
hostname=options.influx_hostname,
hostname=options.influx_hostname,
port=options.influx_port,
port=options.influx_port,
db=options.influx_database_name,
db=options.influx_database_name,
reduce_across_tags=False,
reduce_across_tags=False,
)
)
else:### hdf5 data backend
else:### hdf5 data backend
self.agg_sink=io.hdf5.HDF5Aggregator(
self.agg_sink=io.hdf5.HDF5Aggregator(
rootdir=options.rootdir,
rootdir=options.rootdir,
num_processes=options.num_processes,
num_processes=options.num_processes,
reduce_across_tags=False,
reduce_across_tags=False,
)
)
deffetch_data(self,job_consumer):
deffetch_data(self,job_consumer):
"""
"""
...
@@ -152,7 +152,7 @@ class StreamAggregator(object):
...
@@ -152,7 +152,7 @@ class StreamAggregator(object):
defprocess_queue(self):
defprocess_queue(self):
"""
"""
process and aggregate features on a regular cadence
process and aggregate features from feature extraction jobs on a regular cadence