make KafkaClassifierData seek offsets for timestamps associated with it's start and end attributes and read all data between them
This implements one of the points in 'additional functionality for streaming pipeline', namely:
make KafkaClassifierData seek offsets for timestamps associated with it's start and end attributes and read all data between them
This was needed for KafkaClassifierData
to handle higher sampling rates coming from the feature extractor, or in general, for KafkaClassifierData
to read more than a single buffer per object.
Changes:
- It actually ingests all data between it's start and end attributes and blocks until it either has all the data, or some other condition is met that prevents it from doing so (latency timeout, etc).
- It makes a 'best faith' effort to seek to the correct position when
_retrieve_triggers()
is called, with it just wrapped around a try/except loop. I would like to actually catch KafkaErrors and not all errors but not having access to the kafka import is severely limiting me here, so it's what it is. - I've also heavily changed the logic in
KafkaClassifierData
for it to handle a bunch of different cases better. In particular, if there is data to return it will actually return the data instead of throwing up an error. It handles latency timeouts and timestamps that are out of order and all that. I've had to add 3 new keyword arguments to do all this:latency_timeout
,retry_cadence
,sample_rate
.timeout
was renamed topoll_timeout
to avoid confusion. - Updated some of the logic in
StreamProcessor.poll()
- Some aesthetics in some of the other modules. I didn't like how some of the methods in
Quiver
were tucked underneath all those internal methods so I moved those up. There are other minor things like that I don't remember. -
stream.train
: fixedtrain_nicknames
so that it's keyed by classifier nickname and not the actual class object, which causes crashes. - Fixed
utils.floor_div
so that I use it with floats and integers now. - Exposed
KafkaConsumer
's partition property so that I can make use of it when doing seeks. - Update some of the relevant unit tests to reflect these changes.
One thing this doesn't change is that a second call to KafkaClassifierData.triggers()
will return the same data rather than trying to query for new data if there's any missing. That's because there are calls all over the place that use these calls and I don't particularly trust that that won't either cause strange behavior in the sharing of consumers or will bring the pipeline to a screeching halt when it's trying to grab old data and new data at the same time. At this point it's as easy as flipping a switch (for the most part since all the infrastructure is now in place), but I really don't want to do this until we're sure that it's safe to do so. For example, flags that can be set that enforce that new data won't be requeried (like should be the case for idq-timeseries
), a way to introduce locks or assign partitions safely among different consumers (say for multiprocessing or condor workflows), stuff like that.