diff --git a/gwcelery/tasks/avro.py b/gwcelery/tasks/avro.py index b290ba28b99acd7460214da098d283c8c7cec85b..9628b05940ca14e1f4e34eb3aaad8acb24970622 100644 --- a/gwcelery/tasks/avro.py +++ b/gwcelery/tasks/avro.py @@ -92,9 +92,18 @@ def send(self, message): """Send an avro alert to a kafka broker """ + # Create dictionary following avro schema alert_dict = create_alert_dict(message) - stream = io.BytesIO() - fastavro.writer(stream, parsed_schema, alert_dict) - - # TODO Actually send to a kafka broker + # Write avro packet to memory + avro_stream = io.BytesIO() + fastavro.writer(avro_stream, parsed_schema, alert_dict) + + # Publish to Kafka producer + for producer in app.steps['consumer']: + # TODO Finalize topic(?) + producer.produce( + topic='StandinKafkaTopic', + value=avro_stream.getvalue(), + value_schema=parsed_schema + )