Skip to content

make KafkaClassifierData seek offsets for timestamps associated with it's start and end attributes and read all data between them

Patrick Godwin requested to merge kafka_stream_additions into master

This implements one of the points in 'additional functionality for streaming pipeline', namely:

make KafkaClassifierData seek offsets for timestamps associated with it's start and end attributes and read all data between them

This was needed for KafkaClassifierData to handle higher sampling rates coming from the feature extractor, or in general, for KafkaClassifierData to read more than a single buffer per object.

Changes:

  • It actually ingests all data between it's start and end attributes and blocks until it either has all the data, or some other condition is met that prevents it from doing so (latency timeout, etc).
  • It makes a 'best faith' effort to seek to the correct position when _retrieve_triggers() is called, with it just wrapped around a try/except loop. I would like to actually catch KafkaErrors and not all errors but not having access to the kafka import is severely limiting me here, so it's what it is.
  • I've also heavily changed the logic in KafkaClassifierData for it to handle a bunch of different cases better. In particular, if there is data to return it will actually return the data instead of throwing up an error. It handles latency timeouts and timestamps that are out of order and all that. I've had to add 3 new keyword arguments to do all this: latency_timeout, retry_cadence, sample_rate. timeout was renamed to poll_timeout to avoid confusion.
  • Updated some of the logic in StreamProcessor.poll()
  • Some aesthetics in some of the other modules. I didn't like how some of the methods in Quiver were tucked underneath all those internal methods so I moved those up. There are other minor things like that I don't remember.
  • stream.train: fixed train_nicknames so that it's keyed by classifier nickname and not the actual class object, which causes crashes.
  • Fixed utils.floor_div so that I use it with floats and integers now.
  • Exposed KafkaConsumer's partition property so that I can make use of it when doing seeks.
  • Update some of the relevant unit tests to reflect these changes.

One thing this doesn't change is that a second call to KafkaClassifierData.triggers() will return the same data rather than trying to query for new data if there's any missing. That's because there are calls all over the place that use these calls and I don't particularly trust that that won't either cause strange behavior in the sharing of consumers or will bring the pipeline to a screeching halt when it's trying to grab old data and new data at the same time. At this point it's as easy as flipping a switch (for the most part since all the infrastructure is now in place), but I really don't want to do this until we're sure that it's safe to do so. For example, flags that can be set that enforce that new data won't be requeried (like should be the case for idq-timeseries), a way to introduce locks or assign partitions safely among different consumers (say for multiprocessing or condor workflows), stuff like that.

Edited by Patrick Godwin

Merge request reports

Loading