Skip to content
Snippets Groups Projects
Commit a948006c authored by Cody Messick's avatar Cody Messick
Browse files

Fix mistake in kafka bootstep made when rebasing against kafka MR

parent 1f94f450
No related branches found
No related tags found
No related merge requests found
import json
from hop import stream
from celery import bootsteps
from celery.concurrency import solo
from celery.utils.log import get_logger
from hop.models import Blob
from hop import stream
from kombu import Consumer, Exchange, Queue
__all__ = ('Producer',)
......@@ -19,14 +18,22 @@ class KafkaBootStep(bootsteps.ConsumerStep):
``--kafka`` command line option.
"""
def __init__(self, parent, kafka=False, **kwargs):
self.enabled = bool(kafka)
def include_if(self, consumer):
"""Only include this bootstep in workers that are configured to listen
to the ``kafka`` queue.
"""
return 'kafka' in consumer.app.amqp.queues
def start(self, parent):
super().start(parent)
def create(self, consumer):
if not isinstance(consumer.pool, solo.TaskPool):
raise RuntimeError(
'The Kafka broker only works with the "solo" task pool. '
'Start the worker with "--queues=kafka --pool=solo".')
def start(self, consumer):
log.info(f'Starting {self.name}, topic: {self.topic_url}')
def stop(self, parent):
def stop(self, consumer):
log.info(f'Closing connection to topic: {self.topic_url}')
......@@ -35,39 +42,15 @@ class Producer(KafkaBootStep):
name = 'Kafka Avro producer'
def __init__(self, parent, kafka=False, **kwargs):
super().__init__(parent, **kwargs)
self.topic_url = parent.app.conf['scimma_broker_url'] + \
parent.app.conf['scimma_topic_name']
def start(self, parent):
super().start(parent)
self._s = stream.open(self.topic_url, 'w')
def stop(self, parent):
super().stop(parent)
self._s.close()
def get_consumers(self, parent, channel):
queue = Queue(
parent.app.conf['kafka_queue_name'],
Exchange(parent.app.conf['kafka_queue_exchange_name']),
parent.app.conf['kafka_queue_routing_key'])
return [Consumer(
channel,
queues=[queue],
on_message=self.on_message,
accept=['pickle', 'json'])]
def on_message(self, message):
payload = message.decode()
log.info(
'Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body),
)
)
log.info(f'Sending message to {self.topic_url}')
# FIXME: use the hop client's Avro implementation when it's ready
msg_blob = Blob(json.dumps(message.payload))
self._s.write(msg_blob)
message.ack()
def __init__(self, consumer, **kwargs):
super().__init__(consumer, **kwargs)
self.topic_url = consumer.app.conf['scimma_broker_url'] + \
consumer.app.conf['scimma_topic_name']
def start(self, consumer):
super().start(consumer)
consumer.app.conf['hop_stream'] = self._s = stream.open(self.topic_url, 'w')
def stop(self, consumer):
super().stop(consumer)
consumer.app.conf['hop_stream'].close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment