diff --git a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
index 0f40c675b04ae40035c452f25b9b13f2aec65a12..64baada7cb011ce4a87d36152170a88ea8ac905e 100755
--- a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
+++ b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
@@ -246,16 +246,25 @@ def parse_command_line():
 	parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
 	parser.add_option_group(group)
 
-	# Synchronizer/File Sink/Listener commands
+	# Synchronizer/File Sink commands
 	group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
 	parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
 	parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
 	parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
 	parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
 	parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
-	parser.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
 	parser.add_option_group(group)
 
+	# Aggregation/Monitoring commands
+	group = optparse.OptionGroup(parser, "Aggregator Options", "Adjust parameters used for aggregation and monitoring of features.")
+	parser.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
+	parser.add_option("--num-agg-jobs", type = "int", default = 4, help = "Number of aggregator jobs to aggregate incoming features. Default = 4.")
+	parser.add_option("--num-agg-processes-per-job", type = "int", default = 2, help = "Number of processes per aggregator job to aggregate incoming features. Used if --agg-data-backend = hdf5. Default = 2.")
+	parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
+	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --agg-data-backend = influx.")
+	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --agg-data-backend = influx.")
+	parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --agg-data-backend = influx.")
+	parser.add_option_group(group)
 
 	options, filenames = parser.parse_args()
 
@@ -310,6 +319,7 @@ if options.save_format == 'kafka':
 	synchronizer_job = dagparts.DAGJob("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
 	hdf5_sink_job = dagparts.DAGJob("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
 	listener_job = dagparts.DAGJob("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
+	aggregator_job = dagparts.DAGJob("gstlal_feature_aggregator", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
 
 	if not options.disable_kafka_jobs:
 		# kafka/zookeeper jobs
@@ -364,6 +374,22 @@ if options.save_format == 'kafka':
 		"unsafe-channel-include": options.unsafe_channel_include,
 	}
 
+	aggregator_options = {
+		"sample-rate": options.sample_rate,
+		"input-topic-basename": options.kafka_topic,
+		"data-backend": options.agg_data_backend,
+		"data-type": "max",
+	}
+	if options.agg_data_backend == 'influx':
+		aggregator_options.update({
+			"influx-database-name": options.influx_database_name,
+			"influx-hostname": options.influx_hostname,
+			"influx-port": options.influx_port,
+		})
+	else:
+		aggregator_options.update({"num-processes": options.num_agg_processes_per_job})
+
+
 	### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
 	for option_name, option in extra_hdf5_channel_options.items():
 		if option:
@@ -371,18 +397,31 @@ if options.save_format == 'kafka':
 
 	synchronizer_options.update(common_options)
 	hdf5_sink_options.update(common_options)
+	aggregator_options.update(common_options)
 	listener_options.update(common_options)
 
 #
 # set up jobs
 #
 
+def groups(l, n):
+    for i in xrange(0, len(l), n):
+        yield l[i:i+n]
+
 if options.save_format == 'kafka':
 	synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
 	synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
 	hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
 	listener_node = dagparts.DAGNode(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")})
 
+    ### aggregator jobs
+	all_fx_jobs = [(str(ii).zfill(4), channel_subset) for ii, channel_subset in enumerate(data_source_info.channel_subsets)]
+	for job_subset in groups(all_fx_jobs, options.num_agg_jobs):
+		jobs, channels = zip(*job_subset)
+		job_channel_options = {"jobs": jobs, "num-channels": len(channels)}
+		job_channel_options.update(aggregator_options)
+		listener_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_aggregator")})
+
 	if not options.disable_kafka_jobs:
 		zoo_node = dagparts.DAGNode(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
 		kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})