diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index b9ba3f84aae07be6287ef523b25f0752fd858fa5..05e16b61d1398803b09badbcf44f6c8433fd78eb 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -579,7 +579,6 @@ agg_options = { "job-start": 0, "kafka-server": options.output_kafka_server, "data-backend": options.agg_data_backend, - "data-type": "min", } if options.agg_data_backend == 'influx': @@ -602,19 +601,20 @@ for ifo in channel_dict.keys(): state_routes.append("%s_strain_dropped" % ifo) # analysis-based aggregation jobs -for routes in groups(agg_routes, 1): - agg_options["route"] = routes - agg_options["data-type"] = "min" - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) - agg_options["data-type"] = "max" - # FIXME don't hard code the 1000 - aggstarts = range(len(jobTags))[::1000] - aggends = aggstarts[1:] + [len(jobTags)] - for aggstart, aggend in zip(aggstarts, aggends): - if aggend > aggstart: - agg_options["job-start"] = aggstart - agg_options["num-jobs"] = aggend - aggstart - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) +# FIXME don't hard code the 1000 +max_agg_jobs = 1000 +agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs] +for route in agg_routes: + agg_options["route"] = route + if route == "far_history": + agg_options["data-type"] = "min" + else: + agg_options["data-type"] = "max" + + for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]): + agg_options["job-start"] = aggstart + agg_options["num-jobs"] = aggend - aggstart + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) # state-based aggregation jobs for routes in groups(state_routes, 2):