Commit c14dfe1b authored by Patrick Godwin's avatar Patrick Godwin
Browse files

kafka.py: drop Reporter in favor of Client class to store/query metrics, add kafka utils

parent 3e84005d
......@@ -7,105 +7,172 @@ __description__ = "a module for kafka I/O utilities"
#-------------------------------------------------
### imports
from collections import defaultdict
from collections import defaultdict, namedtuple
import getpass
import json
import logging
import random
import string
import sys
import numpy
from kafka import KafkaProducer, KafkaConsumer
if sys.version_info >= (3, ):
from urllib.parse import urlparse
else:
from urlparse import urlparse
from confluent_kafka import Consumer, Producer
#-------------------------------------------------
### classes
class Reporter(object):
"""Handles storing of metrics into Kafka.
Message = namedtuple("Message", "topic partition offset key value timestamp")
class Client(object):
"""A Kafka-based client to write and query time-based metrics.
Parameters
----------
hostname : `str`
the hostname to connect to, defaults to localhost
port : `int`
the port to connect to, defaults to 8086
uri : `str`
the URI to connect to, of the form:
kafka://[groupid@]hostname[:port][/topic1,topic2,...]
"""
def __init__(self, hostname='localhost', port=8086, **kwargs):
self.hostname = hostname
self.port = port
def __init__(self, uri):
self.uri = uriparse(uri)
### kafka settings
self._kafka_settings = {
'bootstrap.servers': self.uri.broker,
'group.id': self.uri.groupid,
}
### set up producer
self._producer = Producer(self._kafka_settings)
### set up consumer
self._consumer = Consumer(self._kafka_settings)
if self.uri.topics:
self._consumer.subscribe([topic for topic in self.uri.topics])
self.topics = self.uri.topics
def subscribe(self, topic):
"""Subscribe to Kafka topics.
Parameters
----------
topic : `str` or `list`
the topic(s) to subscribe to
"""
if isinstance(topic, str):
topic = [topic]
new_topics = [t for t in topic if t not in self.topics]
if new_topics:
self._consumer.subscribe(new_topics)
self.topics |= set(new_topics)
self.producer = KafkaProducer(
bootstrap_servers=[':'.join([self.hostname, str(self.port)])],
key_serializer=lambda m: json.dumps(m).encode('utf-8'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
)
def query(self, tags=None, timeout=0.2, max_messages=1000):
"""Query data from Kafka.
def store(self, schema, data, tags=None):
Parameters
----------
data : `dict`
the data to store
tags : `list`
user-based tags associated with the data
timeout : `float`
timeout for requesting messages from a topic, default = 0.2s
max_messages : `int`
max number of messages to process per iteration, default = 1000
"""
for msg in self._consumer.consume(num_messages=max_messages, timeout=timeout):
if msg and not msg.error():
yield Message(
msg.topic(),
msg.partition(),
msg.offset(),
msg.key(),
json.loads(msg.value().decode("utf-8")),
msg.timestamp()[0],
)
def store(self, topic, data, tags=None):
"""Stores data into Kafka.
Parameters
----------
schema : `str`
the schema name
topic : `str`
the topic name
data : `dict`
the data to store
tags : `list`
user-based tags associated with the data
"""
payload = json.loads(data).encode("utf-8")
if tags:
if isinstance(tags, list):
tags = '.'.join(tags)
self.producer.send(schema, key=tags, value=data)
self._producer.produce(topic=schema, key=tags, value=payload)
else:
self.producer.send(schema, value=data)
self._producer.produce(topic=schema, value=payload)
self._producer.poll(0)
def close(self):
"""Close the connection to the client.
"""
self._producer.flush()
self._consumer.unsubscribe()
self._consumer.close()
#-------------------------------------------------
### functions
### kafka utilities
KafkaURI = namedtuple('KafkaURI', 'groupid broker topics')
def uriparse(uri):
"""Parses a Kafka URI of the form:
kafka://[groupid@]broker[,broker2[,...]]/topicspec[,topicspec[,...]]
and returns a namedtuple to access properties by name:
uri.groupid
uri.broker
uri.topics
"""
uri = urlparse(uri)
assert uri.scheme == 'kafka'
if uri.username:
groupid, broker = uri.netloc.split('@')
else:
groupid, broker = generate_groupid(), uri.netloc
topics = uri.path.lstrip('/')
if topics:
topics = topics.split(',')
else:
topics = []
return KafkaURI(groupid, broker, set(topics))
def generate_groupid():
"""Generate a random Kafka groupid
def retrieve_timeseries(consumer, routes, timeout = 1000, max_records = 1000):
"""!
A function to pull data from kafka for a set of jobs (topics) and
routes (keys in the incoming json messages)
"""
data = {route: defaultdict(lambda: {'time': [], 'fields': {'data': []}}) for route in routes}
### retrieve timeseries for all routes and topics
msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records)
for tp, messages in msg_pack.items():
for message in messages:
try:
job = message.key
route = message.topic
data[route][job]['time'].extend(message.value['time'])
data[route][job]['fields']['data'].extend(message.value['data'])
except KeyError: ### no route in message
pass
### convert series to numpy arrays
for route in routes:
for job in data[route].keys():
data[route][job]['time'] = numpy.array(data[route][job]['time'])
data[route][job]['fields']['data'] = numpy.array(data[route][job]['fields']['data'])
return data
def retrieve_triggers(consumer, jobs, route_name = 'coinc', timeout = 1000, max_records = 1000):
"""!
A function to pull triggers from kafka for a set of jobs (topics) and
route_name (key in the incoming json messages)
return '-'.join((getpass.getuser(), random_alphanum(10)))
def random_alphanum(n):
"""Generate a random alpha-numeric sequence of N characters.
"""
triggers = []
### retrieve timeseries for all routes and topics
msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records)
for tp, messages in msg_pack.items():
job = tp.topic
if job not in jobs:
continue
for message in messages:
try:
triggers.extend(message.value[route_name])
except KeyError: ### no route in message
pass
return triggers
alphanum = string.ascii_uppercase + string.digits
return ''.join(random.SystemRandom().choice(alphanum) for _ in range(n))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment