From 1704a7edbe547af0b3dd66812e88d5aa44d813a4 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Wed, 6 Feb 2019 08:07:04 -0800
Subject: [PATCH] gstlal_ll_inspiral_pipe: modify command line options passed
 and num processes for aggregation jobs

---
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 7fa3f61ecd..288f3f25ef 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -560,12 +560,13 @@ for ifo in channel_dict.keys():
     state_routes.append("%s/strain_dropped" % ifo)
 
 # analysis-based aggregation jobs
-for route in agg_routes:
-	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type": ["max", "min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
+for route in groups(agg_routes, 2):
+	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
+	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
 
 # state-based aggregation jobs
-for route in state_routes:
-	analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type": ["max", "min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
+for routes in groups(state_routes, 2):
+	analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
 
 # summary page
 if options.injection_file:
-- 
GitLab