Commit 13f3e239 authored by Alexander Pace's avatar Alexander Pace
Browse files

change batch parameters

parent 018926da
Pipeline #305021 passed with stages
in 2 minutes and 25 seconds
......@@ -19,6 +19,7 @@
import logging
import safe_netrc as netrc
from urllib.parse import urlparse, ParseResult
from datetime import timedelta
import json
from json import JSONDecodeError
......@@ -37,7 +38,8 @@ log = logging.getLogger(__name__)
DEFAULT_SERVER = 'kafka://kafka.scimma.org/'
DEFAULT_PORT = 9092
DEFAULT_GROUP = 'lvalert'
DEFAULT_BATCH_SIZE = 1
DEFAULT_BATCH_TIMEOUT = timedelta(seconds=0.05)
class LVAlertClient(Stream):
"""A hop-scotch client configured for LVAlert
......@@ -87,7 +89,8 @@ class LVAlertClient(Stream):
"""
def __init__(self, username=None, password=None, auth=None, authfile=None,
noauth=False, group=None, server=None, port=None):
noauth=False, group=None, server=None, port=None,
batch_size=None, batch_timeout=None):
# Set up variables:
if server:
......@@ -105,6 +108,16 @@ class LVAlertClient(Stream):
else:
self.group_prefix = DEFAULT_GROUP
if batch_size:
self.batch_size = batch_size
else:
self.batch_size = DEFAULT_BATCH_SIZE
if batch_timeout:
self.batch_timeout = batch_timeout
else:
self.batch_timeout = DEFAULT_BATCH_TIMEOUT
# Construct the base url prefix for this session.
self.base_url_prefix = self._construct_base_url()
......@@ -202,7 +215,9 @@ class LVAlertClient(Stream):
listen_topics = self.get_topics()
with self.open(self._construct_topic_url(listen_topics), "r") as s:
for payload, metadata in s.read(metadata=True):
for payload, metadata in s.read(metadata=True,
batch_size=self.batch_size,
batch_timeout=self.batch_timeout):
try:
payload = json.loads(payload)
except (JSONDecodeError, TypeError) as e:
......
flake8==3.7.7
git+https://github.com/scimma/hop-client.git@master
git+https://github.com/astronomy-commons/adc-streaming.git@8286cf55cab51b6ca03e91dad352147463059174
hop-client==0.5.0
ipdb
ipython
tox
git+https://github.com/scimma/hop-client.git@master
git+https://github.com/astronomy-commons/adc-streaming.git@8286cf55cab51b6ca03e91dad352147463059174
hop-client==0.5.0
recommonmark==0.7.1
safe-netrc==1.0.0
Sphinx==4.2.0
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment