Skip to content
Snippets Groups Projects
Commit 6721265c authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_pipe: scale up number of aggregator jobs to keep up with...

gstlal_ll_inspiral_pipe: scale up number of aggregator jobs to keep up with real-time, gstlal_ll_inspiral_aggregator: add more specific group_id to not conflict with other aggregator jobs
parent af2ee30d
No related branches found
No related tags found
No related merge requests found
......@@ -573,6 +573,7 @@ agg_options = {
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
"data-type": "min",
}
if options.agg_data_backend == 'influx':
......@@ -596,15 +597,17 @@ for ifo in channel_dict.keys():
# analysis-based aggregation jobs
for routes in groups(agg_routes, 1):
all_agg_options = {"route": routes, "data-type": ["min", "max"]}
all_agg_options.update(agg_options)
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
agg_options["route"] = routes
agg_options["data-type"] = "min"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
all_agg_options = {"route": routes, "data-type": "max"}
all_agg_options.update(agg_options)
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
agg_options["route"] = routes
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
#
# summary page
......
......@@ -94,7 +94,7 @@ if __name__ == '__main__':
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_aggregator' % routes[0], max_poll_interval_ms = 60000)
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), max_poll_interval_ms = 60000)
else:
consumer = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment