Skip to content
Snippets Groups Projects
Commit 75048a06 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_pipe: reduce number of aggregator jobs, rewrite portion...

gstlal_ll_inspiral_pipe: reduce number of aggregator jobs, rewrite portion that divvies up which jobs aggregators process
parent 55bd44b8
No related branches found
No related tags found
No related merge requests found
...@@ -579,7 +579,6 @@ agg_options = { ...@@ -579,7 +579,6 @@ agg_options = {
"job-start": 0, "job-start": 0,
"kafka-server": options.output_kafka_server, "kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend, "data-backend": options.agg_data_backend,
"data-type": "min",
} }
if options.agg_data_backend == 'influx': if options.agg_data_backend == 'influx':
...@@ -602,19 +601,20 @@ for ifo in channel_dict.keys(): ...@@ -602,19 +601,20 @@ for ifo in channel_dict.keys():
state_routes.append("%s_strain_dropped" % ifo) state_routes.append("%s_strain_dropped" % ifo)
# analysis-based aggregation jobs # analysis-based aggregation jobs
for routes in groups(agg_routes, 1): # FIXME don't hard code the 1000
agg_options["route"] = routes max_agg_jobs = 1000
agg_options["data-type"] = "min" agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) for route in agg_routes:
agg_options["data-type"] = "max" agg_options["route"] = route
# FIXME don't hard code the 1000 if route == "far_history":
aggstarts = range(len(jobTags))[::1000] agg_options["data-type"] = "min"
aggends = aggstarts[1:] + [len(jobTags)] else:
for aggstart, aggend in zip(aggstarts, aggends): agg_options["data-type"] = "max"
if aggend > aggstart:
agg_options["job-start"] = aggstart for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]):
agg_options["num-jobs"] = aggend - aggstart agg_options["job-start"] = aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) agg_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# state-based aggregation jobs # state-based aggregation jobs
for routes in groups(state_routes, 2): for routes in groups(state_routes, 2):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment