additional functionality for streaming pipeline
This is a list of everything we want to add to the streaming pipeline that isn't already included in the planned merge of streaming_development_pt2
.
-
the ability to query SegDB and to skip that step if ignore_segdb
is specified in the config file -
support workflows besides block within the main workflow -
do all I/O (and cache results) before forking processes. This will greatly simplify the headaches associated with parallel I/O reads, particularly from things like KafkaClassifierData
where there is a single position that might accidentally be shared between different processes (race conditions).
-
-
clean up variables (e.g. topic names) that are repeatedly defined within loops but do not change from epoch to epoch (define them once outside the loop and just reference them) -
teach KafkaReporter
to figure out whether there is anything in the queue for a particular nickname and use that within thepreferred
property instead of always returningNone
-
teach StreamProcessor
how to run with acadence
anddelay
to control how often it attempts to poll and find new data. The defaults should both be 0, but the idea is to limit the rate at which new jobs might be launched (i.e.: mini-batching). -
make sklearn
dependence "optional" in that we can still run the pipeline without installingsklearn
as long as we don't use any of those classifiers-
@patrick.godwin, suggests "baking this into
SupervisedSklearnClassifier
where only classes whose libraries are available will show up as a possible choice when creating a new supervised classifier" - an alternative (simpler?) way would be to mimic what we've done for
confluent_kafka
. Classes are still discover-able but will raiseImportError
s if we attempt to use them withoutsklearn
installed. - we'll want to do something similar for
keras
- we may want to check the versions of these libraries to make sure they're compatible
-
@patrick.godwin, suggests "baking this into
-
make KafkaClassifierData
seek offsets for timestamps associated with it'sstart
andend
attributes and read all data between them. This would make calls toKafkaClassifierData
reproducible if somehow the cached data is lost. @patrick.godwin enumerated some good points in !31 (merged), which I've reproduced below -
for training jobs, we need to find a way to put classifiers using ClassifierData
orQuiver
on the same footing.- by this I mean that both the
Umbrella
andQuiver
take in a maximum number of samples, but since the size of the umbrella isn't representative of the number of samples, we could end up in a situation where if we have multiple classifiers training at the same time, they may not be trained on the same data as a result. - one way to account for this is to keep just a quiver and build a big umbrella for use in OVL at the time of training. I'm not sure if this is a good idea, however.
- by this I mean that both the
should KafkaClassifierData do something like seek timestamps/offsets corresponding to start and end and then read everything between those offsets instead of what it currently does, which is just to read whatever's next in the buffer? This would make repeated calls to the same KafkaClassifierData produce the same output, which is nice and would match the behavior of the rest of our ClassifierData objects. Note, this type of behavior is now the default in KafkaReporter
I've tried to set up something like this at the beginning of testing training/evaluation jobs but could not get it to work. In the process, I have simplified things quite a bit so where it does what you've seen in
KafkaClassifierData
. There are two things that make this different from a regularClassifierData
object:
It assumes that data will probably be in the buffer it's about to receive without seeking first. If that assumption is wrong, it'll seek to the right offset and try again on the next iteration.
If it already has received data, a repeated call to
ClassifierData.triggers()
will just return what's inClassifierData._data
rather than query for more data.The question I have is, what should the right behavior be for
ClassifierData
in the streaming implementation?For 1, I would argue either way is technically correct, since if
KafkaClassifierData
didn't find the right offset, it will move the offset to the right location with a seek() and then it'll get thrown away in favor of a new object that covers the right span. My guess is similar situations would arise in otherClassifierData
objects in which you'd need to throw out an existingClassifierData
in favor of a new one with a different span, they'll just manifest themselves in different ways. For example, I expect a disk-based object to sometimes just not return data after x number of retries. Then you'd need to destroy this object and make a new one with a new span.I think that what we need to answer is how do we expect the data flow and ingestion procedure to look like? What I have set up takes a greedy approach and assumes that data will be almost always ordered, continuous, and that particular
KafkaClassifierData
is the only one controlling the offset for that particular topic/group/partition. The alternative is to seek to the correct position at the beginning every time.If we say that for point 2 that repeated calls shouldn't query for more data after it has already received data, then it would be pretty easy to guarantee that the offset wouldn't be moving around all the time by shared references to the same Kafka consumer from multiple
KafkaClassifierData
querying at some later time. Then it just comes down to how 'well-behaved' the incoming data stream is, and whether it's more costly to waste a few moreClassifierData
vs. repeated seeks.I think this would be a good place to do some performance testing and see. If there is a marginal difference, then seeking before should be in place just to keep everything nice. If we find that seeking before every time is performing poorly, then that's a different story. I don't think it'll be difficult to get seeking beforehand to work, I think it was just a matter of me not calling assign() before the initial seek to activate that partition in the consumer.
For 2, I think this is the more serious issue. Should we let
ClassifierData
objects try to query for older data after the fact or not? We could take the approach of letting data ingestion be a 'best faith' effort, where what we ingest at time t will stay the same at t+t0, even if there's missing data, etc. Do we want to be able to backfill missing data? I can see that being useful for training jobs but not much else. Evaluation, calibration and timeseries jobs all care about ingesting single data points, and so backfilling is pointless in these cases. Backfilling could be useful for training jobs but it has the potential to make streamed training jobs laggy. I don't know what the right approach is here.