Skip to content
Snippets Groups Projects
Commit 3a85ae69 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

events.py: allow EventProcessor to optionally act as a source

parent 247e8249
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ class EventProcessor(object):
tag='default'
):
assert kafka_server, 'kafka_server needs to be set'
assert input_topic, 'input_topic needs to be set'
self.is_source = not bool(input_topic)
if isinstance(input_topic, str):
input_topic = [input_topic]
......@@ -68,8 +68,9 @@ class EventProcessor(object):
'group.id': '-'.join([self._name, tag])
}
self.producer = Producer(self.kafka_settings)
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([topic for topic in input_topic])
if not self.is_source:
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([topic for topic in input_topic])
### signal handler
for sig in [signal.SIGINT, signal.SIGTERM]:
......@@ -97,7 +98,8 @@ class EventProcessor(object):
"""
while self.is_running:
start = timeit.default_timer()
self.fetch()
if not self.is_source:
self.fetch()
self.handle()
elapsed = timeit.default_timer() - start
time.sleep(max(self.process_cadence - elapsed, 0))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment