diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 7fa3f61ecda9d470f81d8e004b0718c1ee0af245..288f3f25ef233a65523a91c96a5246bb1475a119 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -560,12 +560,13 @@ for ifo in channel_dict.keys(): state_routes.append("%s/strain_dropped" % ifo) # analysis-based aggregation jobs -for route in agg_routes: - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type": ["max", "min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) +for route in groups(agg_routes, 2): + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) # state-based aggregation jobs -for route in state_routes: - analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type": ["max", "min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) +for routes in groups(state_routes, 2): + analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) # summary page if options.injection_file: