Commit b2abff37 authored by Patrick Godwin's avatar Patrick Godwin

gstlal_ll_inspiral_pipe: allow aggregators to be load-balanced, scale based on...

gstlal_ll_inspiral_pipe: allow aggregators to be load-balanced, scale based on number of jobs they process
parent f6554291
Pipeline #70747 passed with stages
in 27 minutes and 7 seconds
......@@ -304,8 +304,9 @@ if options.output_kafka_server is not None and options.run_output_kafka:
# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
aggLeaderJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
......@@ -554,31 +555,37 @@ if options.agg_data_backend == 'influx':
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"]
network_routes = ["likelihood_history", "snr_history", "latency_history"]
usage_routes = ["ram_history"]
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))
state_routes = []
for ifo in channel_dict.keys():
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo)
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
for route in agg_routes:
agg_options["route"] = route
if route == "far_history":
agg_options["data-type"] = "min"
agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(jobTags)), 1))) + ["far_history"]
for routes in agg_routes:
these_options = dict(agg_options)
these_options["route"] = routes
if routes == "far_history":
these_options["data-type"] = "min"
else:
agg_options["data-type"] = "max"
these_options["data-type"] = "max"
for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]):
agg_options["job-start"] = aggstart
agg_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
these_options["job-start"] = aggstart
these_options["num-jobs"] = aggend - aggstart
if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(aggLeaderJob, dag, [], opts = these_options)
else:
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = these_options)
# Trigger aggregation
trigagg_options = {
......@@ -599,12 +606,6 @@ if options.agg_data_backend == 'influx':
})
aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
agg_options["route"] = routes
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
#
# summary page
#
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment