diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 8467590d2832a99dd2ad19453565ddea60eaada8..c4bf5969e1e34c256f0681a723b8123e9eb5a357 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -94,7 +94,7 @@ if __name__ == '__main__': # We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs if options.kafka_server: from kafka import KafkaConsumer - consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), max_poll_interval_ms = 60000) + consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000) else: consumer = None @@ -128,6 +128,10 @@ if __name__ == '__main__': elapsed = timeit.default_timer() - start logging.info("time to store/reduce timeseries: %.1f s" % elapsed) + # close connection to consumer if using kafka + if consumer: + consumer.close() + # # always end on an error so that condor won't think we're done and will # restart us