diff --git a/gwcelery/kafka/bootsteps.py b/gwcelery/kafka/bootsteps.py index 8f75c9445906be30b37c6bf9cbdad8560ffd1563..108f9959785ea30d4a91aa14907f4e8cb4b15d7c 100644 --- a/gwcelery/kafka/bootsteps.py +++ b/gwcelery/kafka/bootsteps.py @@ -1,10 +1,9 @@ -import json +from hop import stream from celery import bootsteps +from celery.concurrency import solo from celery.utils.log import get_logger -from hop.models import Blob -from hop import stream -from kombu import Consumer, Exchange, Queue + __all__ = ('Producer',) @@ -19,14 +18,22 @@ class KafkaBootStep(bootsteps.ConsumerStep): ``--kafka`` command line option. """ - def __init__(self, parent, kafka=False, **kwargs): - self.enabled = bool(kafka) + def include_if(self, consumer): + """Only include this bootstep in workers that are configured to listen + to the ``kafka`` queue. + """ + return 'kafka' in consumer.app.amqp.queues - def start(self, parent): - super().start(parent) + def create(self, consumer): + if not isinstance(consumer.pool, solo.TaskPool): + raise RuntimeError( + 'The Kafka broker only works with the "solo" task pool. ' + 'Start the worker with "--queues=kafka --pool=solo".') + + def start(self, consumer): log.info(f'Starting {self.name}, topic: {self.topic_url}') - def stop(self, parent): + def stop(self, consumer): log.info(f'Closing connection to topic: {self.topic_url}') @@ -35,39 +42,15 @@ class Producer(KafkaBootStep): name = 'Kafka Avro producer' - def __init__(self, parent, kafka=False, **kwargs): - super().__init__(parent, **kwargs) - self.topic_url = parent.app.conf['scimma_broker_url'] + \ - parent.app.conf['scimma_topic_name'] - - def start(self, parent): - super().start(parent) - self._s = stream.open(self.topic_url, 'w') - - def stop(self, parent): - super().stop(parent) - self._s.close() - - def get_consumers(self, parent, channel): - queue = Queue( - parent.app.conf['kafka_queue_name'], - Exchange(parent.app.conf['kafka_queue_exchange_name']), - parent.app.conf['kafka_queue_routing_key']) - return [Consumer( - channel, - queues=[queue], - on_message=self.on_message, - accept=['pickle', 'json'])] - - def on_message(self, message): - payload = message.decode() - log.info( - 'Received message: {0!r} {props!r} rawlen={s}'.format( - payload, props=message.properties, s=len(message.body), - ) - ) - log.info(f'Sending message to {self.topic_url}') - # FIXME: use the hop client's Avro implementation when it's ready - msg_blob = Blob(json.dumps(message.payload)) - self._s.write(msg_blob) - message.ack() + def __init__(self, consumer, **kwargs): + super().__init__(consumer, **kwargs) + self.topic_url = consumer.app.conf['scimma_broker_url'] + \ + consumer.app.conf['scimma_topic_name'] + + def start(self, consumer): + super().start(consumer) + consumer.app.conf['hop_stream'] = self._s = stream.open(self.topic_url, 'w') + + def stop(self, consumer): + super().stop(consumer) + consumer.app.conf['hop_stream'].close()