Commit e2a35d6f authored by Patrick Godwin's avatar Patrick Godwin
Browse files

kafka.py: docstring tweaks, filter by tag for Client.query()

parent c9bb8475
Pipeline #134052 passed with stages
in 5 minutes and 3 seconds
......@@ -32,7 +32,6 @@ class Client(object):
Parameters
----------
uri : `str`
the URI to connect to, of the form:
kafka://[groupid@]hostname[:port][/topic1,topic2,...]
......@@ -80,7 +79,7 @@ class Client(object):
data : `dict`
the data to store
tags : `list`
user-based tags associated with the data
user-based tags to filter data by
timeout : `float`
timeout for requesting messages from a topic, default = 0.2s
max_messages : `int`
......@@ -89,17 +88,18 @@ class Client(object):
"""
for msg in self._consumer.consume(num_messages=max_messages, timeout=timeout):
if msg and not msg.error():
yield Message(
msg.topic(),
msg.partition(),
msg.offset(),
msg.key(),
json.loads(msg.value().decode("utf-8")),
msg.timestamp()[0],
)
if not tags or msg.key() in tags:
yield Message(
msg.topic(),
msg.partition(),
msg.offset(),
msg.key(),
json.loads(msg.value().decode("utf-8")),
msg.timestamp()[0],
)
def write(self, topic, data, tags=None):
"""Stores data into Kafka.
"""Write data into Kafka.
Parameters
----------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment