Skip to content
Snippets Groups Projects
Commit 184bd89e authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_aggregator: tweak kafka consumer settings to mitigate...

gstlal_ll_inspiral_aggregator: tweak kafka consumer settings to mitigate heartbeat session issues, close connection to consumer upon exit
parent 9b5a8472
No related branches found
No related tags found
No related merge requests found
......@@ -94,7 +94,7 @@ if __name__ == '__main__':
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), max_poll_interval_ms = 60000)
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
else:
consumer = None
......@@ -128,6 +128,10 @@ if __name__ == '__main__':
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
# close connection to consumer if using kafka
if consumer:
consumer.close()
#
# always end on an error so that condor won't think we're done and will
# restart us
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment