From b042187d9261b1085c5a81fa7754fc33a0f699c5 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 18 Jul 2019 07:02:33 -0700
Subject: [PATCH] gstlal_ll_inspiral_pipe: allow aggregators to be
 load-balanced, scale based on number of jobs they process

---
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe | 39 +++++++++++----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 7a431588ca..0f43fc128f 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -303,8 +303,9 @@ if options.output_kafka_server is not None and options.run_output_kafka:
 
 
 # aggregator job
-aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator",  condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
-trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator",  condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
+aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
+aggLeaderJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
+trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
 
 # Summary page job
 #pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online",  universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
@@ -549,31 +550,37 @@ if options.agg_data_backend == 'influx':
 
 # define routes used for aggregation jobs
 snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
-network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"]
+network_routes = ["likelihood_history", "snr_history", "latency_history"]
 usage_routes = ["ram_history"]
-agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))
 
 state_routes = []
 for ifo in channel_dict.keys():
     state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
     state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
     state_routes.append("%s_strain_dropped" % ifo)
+agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
 
 # analysis-based aggregation jobs
 # FIXME don't hard code the 1000
 max_agg_jobs = 1000
 agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
-for route in agg_routes:
-	agg_options["route"] = route
-	if route == "far_history":
-		agg_options["data-type"] = "min"
+agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(jobTags)), 1))) + ["far_history"]
+for routes in agg_routes:
+	these_options = dict(agg_options)
+	these_options["route"] = routes
+	if routes == "far_history":
+		these_options["data-type"] = "min"
 	else:
-		agg_options["data-type"] = "max"
+		these_options["data-type"] = "max"
 
-	for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]):
-		agg_options["job-start"] = aggstart
-		agg_options["num-jobs"] = aggend - aggstart
-		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
+	for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
+		these_options["job-start"] = aggstart
+		these_options["num-jobs"] = aggend - aggstart
+		if ii == 0: ### elect first aggregator per route as leader
+			these_options["across-jobs"] = ""
+			aggNode = dagparts.DAGNode(aggLeaderJob, dag, [], opts = these_options)
+		else:
+			aggNode = dagparts.DAGNode(aggJob, dag, [], opts = these_options)
 
 # Trigger aggregation
 trigagg_options = {
@@ -594,12 +601,6 @@ if options.agg_data_backend == 'influx':
 	})
 aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
 
-# state-based aggregation jobs
-for routes in dagparts.groups(state_routes, 2):
-	agg_options["route"] = routes
-	agg_options["data-type"] = "max"
-	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
-
 #
 # summary page
 #
-- 
GitLab