Skip to content
Snippets Groups Projects

Log kafka timeout messages without raising error

Merged Cody Messick requested to merge cody.messick/gwcelery:silence_kafka_timeout into main
1 file
+ 18
2
Compare changes
  • Side-by-side
  • Inline
from confluent_kafka.error import KafkaException
from confluent_kafka.error import KafkaError, KafkaException
from hop import stream
from hop.io import list_topics
from hop.models import AvroBlob
@@ -18,7 +18,8 @@ class KafkaWriter:
def __init__(self, config):
self._config = config
self._open_hop_stream = stream.open(config['url'], 'w')
self._open_hop_stream = stream.open(config['url'], 'w',
error_callback=self._error_cb)
# Set up flag for failed delivery of messages
self.kafka_delivery_failures = False
@@ -40,6 +41,21 @@ class KafkaWriter:
log.error(f'{broker} appears to be down')
return False
def _error_cb(self, kafka_error):
# FIXME Remove once timeout issues (see #457 and #441) are fixed
# Modification of function from https://github.com/astronomy-commons/adc-streaming/blob/master/adc/errors.py # noqa: E501
if kafka_error.code() == KafkaError._ALL_BROKERS_DOWN:
# See original function for reasoning
log.warn('client is currently disconnected from all brokers')
elif kafka_error.code() == KafkaError._TIMED_OUT:
# Errors described in #457, only log them for now
log.warn('Timeout from kafka url {}: {}'.format(
self._config['url'],
kafka_error
))
else:
log.error(f'internal kafka error: {kafka_error}')
def _delivery_cb(self, kafka_error, message):
# FIXME Get rid of if-else logic once
# https://github.com/scimma/hop-client/pull/190 is merged
Loading