Commit 2596cffa authored by Patrick Godwin's avatar Patrick Godwin
Browse files

kafka.py: fix tag handling within Client

parent d0941fdf
Pipeline #156702 passed with stages
in 2 minutes and 39 seconds
#!/usr/bin/env python
from __future__ import absolute_import
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -76,8 +75,6 @@ class Client(object):
Parameters
----------
data : `dict`
the data to store
tags : `list`
user-based tags to filter data by
timeout : `float`
......@@ -86,14 +83,23 @@ class Client(object):
max number of messages to process per iteration, default = 1000
"""
if not tags:
tags = []
tags = set(tags)
for msg in self._consumer.consume(num_messages=max_messages, timeout=timeout):
if msg and not msg.error():
if not tags or msg.key() in tags:
if msg.key():
key = msg.key().decode("utf-8").split(".")
msg_tags = set(key)
else:
key = None
msg_tags = set()
if not tags or tags.issubset(msg_tags):
yield Message(
msg.topic(),
msg.partition(),
msg.offset(),
msg.key(),
key,
json.loads(msg.value().decode("utf-8")),
msg.timestamp()[0],
)
......@@ -114,10 +120,10 @@ class Client(object):
payload = json.dumps(data).encode("utf-8")
if tags:
if isinstance(tags, list):
tags = '.'.join(tags)
self._producer.produce(topic=schema, key=tags, value=payload)
tags = ".".join(tags).encode("utf-8")
self._producer.produce(topic=topic, key=tags, value=payload)
else:
self._producer.produce(topic=schema, value=payload)
self._producer.produce(topic=topic, value=payload)
self._producer.poll(0)
def close(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment