Skip to content
Snippets Groups Projects

Revert !949

Merged Cody Messick requested to merge cody.messick/gwcelery:revert_949 into main
1 file
+ 2
17
Compare changes
  • Side-by-side
  • Inline
from unittest.mock import patch
from adc.producer import ProducerConfig
from confluent_kafka.error import KafkaError, KafkaException
from confluent_kafka.error import KafkaException
from hop import stream
from hop.io import list_topics
from hop.models import AvroBlob, JSONBlob
@@ -38,7 +38,7 @@ class KafkaWriter:
# error_callback=self._error_cb)
with patch('adc.producer.ProducerConfig', PatchedProducerConfig):
self._open_hop_stream = stream.open(
config['url'], 'w', error_callback=self._error_cb)
config['url'], 'w')
# Set up flag for failed delivery of messages
self.kafka_delivery_failures = False
@@ -73,21 +73,6 @@ class KafkaWriter:
log.error(f'{broker} appears to be down')
return False
def _error_cb(self, kafka_error):
# FIXME Remove once timeout issues (see #457 and #441) are fixed
# Modification of function from https://github.com/astronomy-commons/adc-streaming/blob/master/adc/errors.py # noqa: E501
if kafka_error.code() == KafkaError._ALL_BROKERS_DOWN:
# See original function for reasoning
log.warn('client is currently disconnected from all brokers')
elif kafka_error.code() == KafkaError._TIMED_OUT:
# Errors described in #457, only log them for now
log.warn('Timeout from kafka url {}: {}'.format(
self._config['url'],
kafka_error
))
else:
log.error(f'internal kafka error: {kafka_error}')
def _delivery_cb(self, kafka_error, message):
# FIXME Get rid of if-else logic once
# https://github.com/scimma/hop-client/pull/190 is merged
Loading