Skip to content
Snippets Groups Projects
Commit 2533a8e5 authored by Geoffrey Mo's avatar Geoffrey Mo Committed by Cody Messick
Browse files

Switch from python-kafka to confluent-python-kafka

parent 077adc14
No related branches found
No related tags found
No related merge requests found
from celery import bootsteps
from celery.utils.log import get_logger
from kafka import KafkaProducer
from confluent_kafka.avro import AvroProducer
__all__ = ('Producer',)
......@@ -27,17 +27,36 @@ class KafkaBootStep(bootsteps.ConsumerStep):
class Producer(KafkaBootStep):
"""Run the global Kafka producer in a background thread."""
name = 'Kafka producer'
name = 'Kafka Avro producer'
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def start(self, consumer, default_key_schema, default_value_schema):
config = {
'bootstrap.servers': consumer.app.conf['kafka_bootstrap_servers'],
'on_delivery': self.delivery_report,
}
def start(self, consumer):
super().start(consumer)
self._producer = KafkaProducer(
bootstrap_servers=consumer.app.conf['kafka_bootstrap_servers']
self._producer = AvroProducer(
config,
default_key_schema=default_key_schema,
default_value_schema=default_value_schema,
)
def stop(self, consumer):
def flush(self, consumer):
super().stop(consumer)
self._producer.close()
self._producer.flush()
def send(self, topic, value):
self._producer.send(topic, value)
def produce(self, topic, value, value_schema, key, key_schema):
self._producer.produce(topic=topic, value=value,
value_schema=value_schema, key=key, key_schema=key_schema)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment