Skip to content
Snippets Groups Projects
Commit b042187d authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_pipe: allow aggregators to be load-balanced, scale based on...

gstlal_ll_inspiral_pipe: allow aggregators to be load-balanced, scale based on number of jobs they process
parent e7517778
No related branches found
No related tags found
No related merge requests found
...@@ -303,8 +303,9 @@ if options.output_kafka_server is not None and options.run_output_kafka: ...@@ -303,8 +303,9 @@ if options.output_kafka_server is not None and options.run_output_kafka:
# aggregator job # aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) aggLeaderJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", tag_base = "gstlal_ll_inspiral_aggregator_leader", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job # Summary page job
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)) #pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
...@@ -549,31 +550,37 @@ if options.agg_data_backend == 'influx': ...@@ -549,31 +550,37 @@ if options.agg_data_backend == 'influx':
# define routes used for aggregation jobs # define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict] snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"] network_routes = ["likelihood_history", "snr_history", "latency_history"]
usage_routes = ["ram_history"] usage_routes = ["ram_history"]
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))
state_routes = [] state_routes = []
for ifo in channel_dict.keys(): for ifo in channel_dict.keys():
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo) state_routes.append("%s_strain_dropped" % ifo)
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
# analysis-based aggregation jobs # analysis-based aggregation jobs
# FIXME don't hard code the 1000 # FIXME don't hard code the 1000
max_agg_jobs = 1000 max_agg_jobs = 1000
agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs] agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
for route in agg_routes: agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(jobTags)), 1))) + ["far_history"]
agg_options["route"] = route for routes in agg_routes:
if route == "far_history": these_options = dict(agg_options)
agg_options["data-type"] = "min" these_options["route"] = routes
if routes == "far_history":
these_options["data-type"] = "min"
else: else:
agg_options["data-type"] = "max" these_options["data-type"] = "max"
for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]): for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
agg_options["job-start"] = aggstart these_options["job-start"] = aggstart
agg_options["num-jobs"] = aggend - aggstart these_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(aggLeaderJob, dag, [], opts = these_options)
else:
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = these_options)
# Trigger aggregation # Trigger aggregation
trigagg_options = { trigagg_options = {
...@@ -594,12 +601,6 @@ if options.agg_data_backend == 'influx': ...@@ -594,12 +601,6 @@ if options.agg_data_backend == 'influx':
}) })
aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options) aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
# state-based aggregation jobs
for routes in dagparts.groups(state_routes, 2):
agg_options["route"] = routes
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# #
# summary page # summary page
# #
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment