Skip to content
Snippets Groups Projects
Commit f7b34729 authored by Geoffrey Mo's avatar Geoffrey Mo Committed by Cody Messick
Browse files

Use hop client for Kafka implementation

parent 75de6699
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,7 @@ from celery import Celery ...@@ -7,6 +7,7 @@ from celery import Celery
from ._version import get_versions from ._version import get_versions
from .conf import playground from .conf import playground
from . import email from . import email
from . import kafka
from . import lvalert from . import lvalert
from . import sentry from . import sentry
from . import voevent from . import voevent
...@@ -24,6 +25,7 @@ app = Celery(__name__, broker='redis://', config_source=playground) ...@@ -24,6 +25,7 @@ app = Celery(__name__, broker='redis://', config_source=playground)
email.install(app) email.install(app)
lvalert.install(app) lvalert.install(app)
voevent.install(app) voevent.install(app)
kafka.install(app)
# Register all tasks. # Register all tasks.
app.autodiscover_tasks([__name__]) app.autodiscover_tasks([__name__])
......
...@@ -80,8 +80,8 @@ then completely disable the GCN listener.""" ...@@ -80,8 +80,8 @@ then completely disable the GCN listener."""
email_host = 'imap.gmail.com' email_host = 'imap.gmail.com'
"""IMAP hostname to receive the GCN e-mail notice formats.""" """IMAP hostname to receive the GCN e-mail notice formats."""
kafka_bootstrap_servers = ['placeholder.edu:9092'] kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-playground'
"""List of host, port pairs for the Kafka bootstrap servers""" """Kafka topic URL"""
superevent_d_t_start = {'gstlal': 1.0, superevent_d_t_start = {'gstlal': 1.0,
'spiir': 1.0, 'spiir': 1.0,
......
...@@ -19,6 +19,9 @@ lvalert_host = 'lvalert.cgca.uwm.edu' ...@@ -19,6 +19,9 @@ lvalert_host = 'lvalert.cgca.uwm.edu'
gracedb_host = 'gracedb.ligo.org' gracedb_host = 'gracedb.ligo.org'
"""GraceDB host.""" """GraceDB host."""
kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert'
"""Kafka topic URL"""
voevent_broadcaster_address = ':5341' voevent_broadcaster_address = ':5341'
"""The VOEvent broker will bind to this address to send GCNs. """The VOEvent broker will bind to this address to send GCNs.
This should be a string of the form `host:port`. If `host` is empty, This should be a string of the form `host:port`. If `host` is empty,
......
...@@ -19,3 +19,6 @@ messages.""" ...@@ -19,3 +19,6 @@ messages."""
mock_events_simulate_multiple_uploads = True mock_events_simulate_multiple_uploads = True
"""If True, then upload each mock event several times in rapid succession with """If True, then upload each mock event several times in rapid succession with
random jitter in order to simulate multiple pipeline uploads.""" random jitter in order to simulate multiple pipeline uploads."""
kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-test'
"""Kafka topic URL"""
\ No newline at end of file
import json
from celery import bootsteps from celery import bootsteps
from celery.utils.log import get_logger from celery.utils.log import get_logger
from confluent_kafka.avro import AvroProducer from confluent_kafka.avro import AvroProducer
from hop.models import Blob
from hop import stream
from kombu import Consumer, Exchange, Queue
__all__ = ('Producer',) __all__ = ('Producer',)
...@@ -14,14 +20,52 @@ class KafkaBootStep(bootsteps.ConsumerStep): ...@@ -14,14 +20,52 @@ class KafkaBootStep(bootsteps.ConsumerStep):
``--kafka`` command line option. ``--kafka`` command line option.
""" """
def __init__(self, consumer, kafka=False, **kwargs): def __init__(self, parent, kafka=False, **kwargs):
self.enabled = bool(kafka) self.enabled = bool(kafka)
def start(self, consumer): def start(self, parent):
log.info('Starting %s', self.name) log.info(f'Starting {self.name}, topic: {self.topic_url}')
def stop(self, parent):
log.info(f'Closing connection to topic: {self.topic_url}')
class Producer(KafkaBootStep):
"""Run the global Kafka producer in a background thread."""
name = 'Kafka Avro producer'
def stop(self, consumer): def __init__(self, parent, kafka=False, **kwargs):
log.info('Stopping %s', self.name) super().__init__(parent, **kwargs)
self.topic_url = parent.app.conf['kafka_topic_url']
def start(self, parent):
super().start(parent)
self._s = stream.open(self.topic_url, 'w')
def stop(self, parent):
super().stop(parent)
self._s.close()
def get_consumers(self, channel):
queue = Queue('kafka', Exchange('kafka'), 'kafka.key')
return [Consumer(channel,
queues=[queue],
on_message=self.on_message,
accept=['pickle', 'json'])]
def on_message(self, message):
payload = message.decode()
log.info(
'Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body),
)
)
log.info(f'Sending message to {self.topic_url}')
# FIXME: use the hop client's Avro implementation instead of a Blob when it's ready
msg_blob = Blob(json.dumps(message.payload))
self._s.write(msg_blob)
message.ack()
class Producer(KafkaBootStep): class Producer(KafkaBootStep):
...@@ -29,9 +73,12 @@ class Producer(KafkaBootStep): ...@@ -29,9 +73,12 @@ class Producer(KafkaBootStep):
name = 'Kafka Avro producer' name = 'Kafka Avro producer'
'''
def delivery_report(err, msg): def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result. """Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """ Triggered by or flush(). """
if err is not None: if err is not None:
print('Message delivery failed: {}'.format(err)) print('Message delivery failed: {}'.format(err))
else: else:
...@@ -60,3 +107,4 @@ class Producer(KafkaBootStep): ...@@ -60,3 +107,4 @@ class Producer(KafkaBootStep):
def produce(self, topic, value, value_schema, key, key_schema): def produce(self, topic, value, value_schema, key, key_schema):
self._producer.produce(topic=topic, value=value, self._producer.produce(topic=topic, value=value,
value_schema=value_schema, key=key, key_schema=key_schema) value_schema=value_schema, key=key, key_schema=key_schema)
'''
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment