Skip to content

add better preferred logic in KafkaReporter

Patrick Godwin requested to merge kafka_reporter_preferred_logic into master

This takes care of one of the items in the streaming checklist:

teach KafkaReporter to figure out whether there is anything in the queue for a particular nickname and use that within the preferred property instead of always returning None

This was done by adding a new method in utils.KafkaConsumer called seek_to_latest which finds the watermark of the last thing in the queue, and points the queue to that value. That gets set within KafkaReporter when a consumer is instantiated, then when preferred is set to true in retrieve(), it'll just get the last thing in the queue. In addition, the error handling is slightly better. It'll treat any KafkaErrors that pop up and raise them instead of ignoring them, except in the case when there's no data in the queue. In that case it'll return None. Also, calling KafkaReporter.report() defines the timestamp at self.timestamp, and not self.start, which I think was the intended behavior.

In stream.py, I've also changed the reporter gps initialization times to be zero rather than gps_start, which caused some strange side-effects when some processes tried to grab data from topics from their respective lookbacks (which was before gps_start).

Unrelated, in sklearn.py I've changed the SVM classifier to set probability=True in the classifier when it gets defined rather than later, because when I don't do this it doesn't report that there's a predict_proba method in the class because it hasn't been set yet.

Merge request reports

Loading