Commit 0985bd1e authored by Patrick Godwin's avatar Patrick Godwin

kafka.py: add Reporter for storing metrics

parent fa1d472c
Pipeline #74821 failed with stages
in 1 minute and 11 seconds
......@@ -11,9 +11,53 @@ import json
import logging
import numpy
from kafka import KafkaConsumer
from .. import aggregator
#-------------------------------------------------
### classes
class Reporter(object):
"""Handles storing of metrics into Kafka.
Parameters
----------
hostname : `str`
the hostname to connect to, defaults to localhost
port : `int`
the port to connect to, defaults to 8086
"""
def __init__(self, hostname='localhost', port=8086, **kwargs):
self.hostname = hostname
self.port = port
self.producer = KafkaProducer(
bootstrap_servers=[':'.join([self.hostname, self.port])],
key_serializer=lambda m: json.dumps(m).encode('utf-8'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
)
def store(self, schema, data, tags=None):
"""Stores data into Kafka.
Parameters
----------
schema : `str`
the schema name
data : `dict`
the data to store
"""
if tags:
if isinstance(tags, list):
tags = '.'.join(tags)
self.producer.send(schema, key=tags, value=data)
else:
self.producer.send(schema, value=data)
#-------------------------------------------------
### functions
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment