a couple things
This patch does several things.
Firstly, it attempts to improve the verbosity of a few of our custom errors. This mostly means passing them more information so they can report more transparent errors. A good example is LatencyError, which now tells you how far behind realtime you were, the time at which the LatencyError was tripped, and where you should move to if you want to catch up. Other errors now provide similar amounts of information.
The second big thing is associated with BadSpanError, which is not called IncontiguousDataError because I thought that was clearer. This error was only ever raised within KafkaClassifierData and was meant to signify that there was a gap in the data. However, the conditional that checked for this relied on the ClassifierData's segs, which could be non-trivial and contain gaps of its own. Therefore, even though the data might have been contiguous, or if it was incontiguous it may have only had gaps where we don't care about them, KafkaClassifierData would get confused and raise BadSpanErrors. The logic is a bit more explicit now:
- check whether the new buffer received is contiguous with the last buffer received
- if it is, check whether we care about the new buffer
- record the data if we do care
- pass if we don't care
- if the new buffer is not contiguous with the previous buffer, check whether the gap introduced actually affects us
- if it does, raise an IncontiguousDataError with appropriate descriptions so StreamProcessor can report what happened
- if it does not, pass
- update the timestamp to the new buffer's timestamp
It's worth noting that tools that use KafkaClassifierData outside of a StreamProcessor (i.e., a plain-old training job) could still run into IncontiguousDataErrors that are not caught by StreamProcessor because we query KafkaClassifierData directly instead of going through StreamProcessor.poll. I think we just have to live with that. Our disk-based ClassifierDatas don't raise IncontiguousDataErrors and instead would silently return empty data arrays where there is no data. That's also probably bad, but is a problem for another day.
I've also updated the log statements surrounding a few of the workflows to try to make them even more verbose. More importantly, I exposed the monitoring_cadence
through both the command line for idq-batch and idq-stream as well as through the appropriate sections in the INI files. This allows the user to specify how often they want to check on forked jobs (only used with workflow=fork) to make sure they've completed or haven't died. In particular, the top-level schedulers like idq-stream may only want to check every second or so (the default). However, worker processes like stream.timeseries probably want to check much faster than that because their stride can be as small as 1 sec.
I've tested everything and it all appears to work (after fixing a few bugs I introduced). I've had a full streaming pipeline running on LHO for about 30 minutes without issues while ingesting triggers both from KafkaClassifierData and GstlalHDF5ClassifierData, etc.
thoughts?