diff --git a/gwcelery/__init__.py b/gwcelery/__init__.py index 72d052e6d54fc85e1a565cfbd82bb797bcc1c1d1..71224272b78acfcede8e39dabb920094aa02529e 100644 --- a/gwcelery/__init__.py +++ b/gwcelery/__init__.py @@ -7,6 +7,7 @@ from celery import Celery from ._version import get_versions from .conf import playground from . import email +from . import kafka from . import lvalert from . import sentry from . import voevent @@ -24,6 +25,7 @@ app = Celery(__name__, broker='redis://', config_source=playground) email.install(app) lvalert.install(app) voevent.install(app) +kafka.install(app) # Register all tasks. app.autodiscover_tasks([__name__]) diff --git a/gwcelery/conf/__init__.py b/gwcelery/conf/__init__.py index aa4c56b7d714cc15479594ecad3b5a4fdd0a86cd..f4ee27f7161bc8bd26c002b512789bbaa8faba66 100644 --- a/gwcelery/conf/__init__.py +++ b/gwcelery/conf/__init__.py @@ -80,8 +80,8 @@ then completely disable the GCN listener.""" email_host = 'imap.gmail.com' """IMAP hostname to receive the GCN e-mail notice formats.""" -kafka_bootstrap_servers = ['placeholder.edu:9092'] -"""List of host, port pairs for the Kafka bootstrap servers""" +kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-playground' +"""Kafka topic URL""" superevent_d_t_start = {'gstlal': 1.0, 'spiir': 1.0, diff --git a/gwcelery/conf/production.py b/gwcelery/conf/production.py index a5cfea4f82c0d4580443a285620defd1056be2e7..46cfbce765ef2293daafba9cff6ed3e2eb3f4bd8 100644 --- a/gwcelery/conf/production.py +++ b/gwcelery/conf/production.py @@ -19,6 +19,9 @@ lvalert_host = 'lvalert.cgca.uwm.edu' gracedb_host = 'gracedb.ligo.org' """GraceDB host.""" +kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert' +"""Kafka topic URL""" + voevent_broadcaster_address = ':5341' """The VOEvent broker will bind to this address to send GCNs. This should be a string of the form `host:port`. If `host` is empty, diff --git a/gwcelery/conf/test.py b/gwcelery/conf/test.py index 8ba9c74b6075af6c37fd46e10a2db50f0179b901..4d6d4952de15fe9d597f56462f08ff211761b6fe 100644 --- a/gwcelery/conf/test.py +++ b/gwcelery/conf/test.py @@ -19,3 +19,6 @@ messages.""" mock_events_simulate_multiple_uploads = True """If True, then upload each mock event several times in rapid succession with random jitter in order to simulate multiple pipeline uploads.""" + +kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-test' +"""Kafka topic URL""" \ No newline at end of file diff --git a/gwcelery/kafka/bootsteps.py b/gwcelery/kafka/bootsteps.py index f177816a5270787052f00eaef4f53958ba51e60b..7fb964b39729955130d6cd182598a9af2453b03d 100644 --- a/gwcelery/kafka/bootsteps.py +++ b/gwcelery/kafka/bootsteps.py @@ -1,6 +1,12 @@ +import json + from celery import bootsteps from celery.utils.log import get_logger from confluent_kafka.avro import AvroProducer +from hop.models import Blob +from hop import stream +from kombu import Consumer, Exchange, Queue + __all__ = ('Producer',) @@ -14,14 +20,52 @@ class KafkaBootStep(bootsteps.ConsumerStep): ``--kafka`` command line option. """ - def __init__(self, consumer, kafka=False, **kwargs): + def __init__(self, parent, kafka=False, **kwargs): self.enabled = bool(kafka) - def start(self, consumer): - log.info('Starting %s', self.name) + def start(self, parent): + log.info(f'Starting {self.name}, topic: {self.topic_url}') + + def stop(self, parent): + log.info(f'Closing connection to topic: {self.topic_url}') + + +class Producer(KafkaBootStep): + """Run the global Kafka producer in a background thread.""" + + name = 'Kafka Avro producer' - def stop(self, consumer): - log.info('Stopping %s', self.name) + def __init__(self, parent, kafka=False, **kwargs): + super().__init__(parent, **kwargs) + self.topic_url = parent.app.conf['kafka_topic_url'] + + def start(self, parent): + super().start(parent) + self._s = stream.open(self.topic_url, 'w') + + def stop(self, parent): + super().stop(parent) + self._s.close() + + def get_consumers(self, channel): + queue = Queue('kafka', Exchange('kafka'), 'kafka.key') + return [Consumer(channel, + queues=[queue], + on_message=self.on_message, + accept=['pickle', 'json'])] + + def on_message(self, message): + payload = message.decode() + log.info( + 'Received message: {0!r} {props!r} rawlen={s}'.format( + payload, props=message.properties, s=len(message.body), + ) + ) + log.info(f'Sending message to {self.topic_url}') + # FIXME: use the hop client's Avro implementation instead of a Blob when it's ready + msg_blob = Blob(json.dumps(message.payload)) + self._s.write(msg_blob) + message.ack() class Producer(KafkaBootStep): @@ -29,9 +73,12 @@ class Producer(KafkaBootStep): name = 'Kafka Avro producer' + + +''' def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. - Triggered by poll() or flush(). """ + Triggered by or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: @@ -60,3 +107,4 @@ class Producer(KafkaBootStep): def produce(self, topic, value, value_schema, key, key_schema): self._producer.produce(topic=topic, value=value, value_schema=value_schema, key=key, key_schema=key_schema) +''' \ No newline at end of file