diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 70a486b2a9ebb73beb6cf84f92767076770dc54c..8b6f2cd3da6b5b8e66cf574ba9bc513727891047 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -573,6 +573,7 @@ agg_options = { "job-start": 0, "kafka-server": options.output_kafka_server, "data-backend": options.agg_data_backend, + "data-type": "min", } if options.agg_data_backend == 'influx': @@ -596,15 +597,17 @@ for ifo in channel_dict.keys(): # analysis-based aggregation jobs for routes in groups(agg_routes, 1): - all_agg_options = {"route": routes, "data-type": ["min", "max"]} - all_agg_options.update(agg_options) - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options) + agg_options["route"] = routes + agg_options["data-type"] = "min" + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) + agg_options["data-type"] = "max" + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) # state-based aggregation jobs for routes in groups(state_routes, 2): - all_agg_options = {"route": routes, "data-type": "max"} - all_agg_options.update(agg_options) - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options) + agg_options["route"] = routes + agg_options["data-type"] = "max" + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options) # # summary page diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index afd54d589538b6cbde14207e317c49bcba7d5003..8467590d2832a99dd2ad19453565ddea60eaada8 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -94,7 +94,7 @@ if __name__ == '__main__': # We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs if options.kafka_server: from kafka import KafkaConsumer - consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_aggregator' % routes[0], max_poll_interval_ms = 60000) + consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), max_poll_interval_ms = 60000) else: consumer = None