From 6721265c0b6c290b1a0c90973bb00a252ebdfb21 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Mon, 11 Feb 2019 11:03:46 -0800
Subject: [PATCH] gstlal_ll_inspiral_pipe: scale up number of aggregator jobs
 to keep up with real-time, gstlal_ll_inspiral_aggregator: add more specific
 group_id to not conflict with other aggregator jobs

---
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe   | 15 +++++++++------
 gstlal-ugly/bin/gstlal_ll_inspiral_aggregator |  2 +-
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 70a486b2a9..8b6f2cd3da 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -573,6 +573,7 @@ agg_options = {
 	"job-start": 0,
 	"kafka-server": options.output_kafka_server,
 	"data-backend": options.agg_data_backend,
+	"data-type": "min",
 }
 
 if options.agg_data_backend == 'influx':
@@ -596,15 +597,17 @@ for ifo in channel_dict.keys():
 
 # analysis-based aggregation jobs
 for routes in groups(agg_routes, 1):
-	all_agg_options = {"route": routes, "data-type": ["min", "max"]}
-	all_agg_options.update(agg_options)
-	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
+	agg_options["route"] = routes
+	agg_options["data-type"] = "min"
+	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
+	agg_options["data-type"] = "max"
+	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
 
 # state-based aggregation jobs
 for routes in groups(state_routes, 2):
-	all_agg_options = {"route": routes, "data-type": "max"}
-	all_agg_options.update(agg_options)
-	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
+	agg_options["route"] = routes
+	agg_options["data-type"] = "max"
+	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
 
 #
 # summary page
diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
index afd54d5895..8467590d28 100755
--- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
+++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator
@@ -94,7 +94,7 @@ if __name__ == '__main__':
 	# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
 	if options.kafka_server:
 		from kafka import KafkaConsumer
-		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_aggregator' % routes[0], max_poll_interval_ms = 60000)
+		consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), max_poll_interval_ms = 60000)
 	else:
 		consumer = None
 
-- 
GitLab