Skip to content
Snippets Groups Projects
Commit 6891a0d8 authored by Cody Messick's avatar Cody Messick
Browse files

avro.py: Switch to using kombu to send avro packet to consumer set up by kafka bootstep

parent 041d4259
No related branches found
No related tags found
No related merge requests found
from ..import app
from ..import kafka
from . import gracedb
import fastavro
......@@ -8,6 +7,8 @@ import voeventparse
from requests.exceptions import HTTPError
from kombu import Connection, Exchange, Producer
# FIXME Figure out where this will live
parsed_schema = fastavro.schema.load_schema('~/userguide/_static/igwn.alerts.v1_0.Alert.avsc')
......@@ -86,7 +87,7 @@ def create_alert_dict(message):
return alert_dict
@app.task(autoretry_for=(SendingError,), bind=True, default_retry_delay=20.0,
ignore_result=True, queue='voevent', retry_backoff=True,
ignore_result=True, queue='kafka', retry_backoff=True,
retry_kwargs=dict(max_retries=10), shared=False)
def send(self, message):
"""Send an avro alert to a kafka broker
......@@ -100,10 +101,6 @@ def send(self, message):
fastavro.writer(avro_stream, parsed_schema, alert_dict)
# Publish to Kafka producer
for producer in app.steps['consumer']:
# TODO Finalize topic(?)
producer.produce(
topic='StandinKafkaTopic',
value=avro_stream.getvalue(),
value_schema=parsed_schema
)
with Connection('redis://').channel() as channel:
with Producer(channel) as producer:
producer.publish({'message': avro_stream.getvalue()}, exchange=Exchange('kafka'), routing_key='kafka.key', declare=[Exchange('kafka')], serializer='json')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment