updates to streaming pipeline and supporting material
I ended up making quite a few changes, most of which are minor.
stream
- defined
gps_range
to avoid repeated code to calculate start/end times - implemented
idq-stream
andidq.stream.stream
- supports both
workflow=fork
andworkflow=condor
- only supports
--persist
withfork
(parent process stays alive until all children finish) - implements "initial training/calibration" logic
- checks whether there is a "preferred" object at the end of
Reporter
s for both train and calibration processes - if there is no preferred object for at least one classifier, launches a quick batch job using
initial_lookback
to define how much data is included. - this may produce crappy models/calibration_maps, but at least the rest of the pipeline won't fail because of broken plumbing.
- checks whether there is a "preferred" object at the end of
- supports both
- define dictionaries of topic names outside of persistent loops so they are not computed repeated within each epoch (see #47 (closed)).
- within functions' main loops
- update
Reporter.start
andReporter.end
instead of defining new reporters whenever possible. This also allows us to remove thetimestamp
kwarg fromReporter.retrieve
because the equivalent information is now derived fromstart
andend
withinKafkaReporter
. Other reporters similarly determine where they write based onstart
andend
- reporting is always done with
preferred=True
- retrieving is always done with
preferred=True
- generally, slight changes to logic and conditionals. For example, we do not require non-zero numbers of both glitch and clean samples when updating calibration. The
CalibrationMap
should handle that gracefully for us, so we update the calibration whenever there are any new samples.
- update
batch
changes made here were only made to support the new functionality within stream
- added
--persist
options to better control behavior. This mirrors the functionality withinidq-stream
. - reporting models, evaluated quivers, calibration_maps, and series results are always done with
preferred=True
- reporting is done using
names.nickname2topic
and it's derivatives now so that these are compatible with the streaming pipeline.- specifically, we delegate to batch functionality within
stream.stream
for initial training, so naming conventions must be consistent.
- specifically, we delegate to batch functionality within
io
changes made here were only made to support the new functionality within stream and batch
- all
Reporter
s have apreferred
property now. This can be used to grab the "preferred object" known to that reporter. Exact behavior differs based on reporter class-
DiskReporter
s manage acachefile
and append updates to that file. -
KafkaReporter
implements logic based on a timestamp
-
- added a
timestamp
property toKafkaReporter
and referenced that withinKafkaReporter.report
instead of taking in an extra kwarg.timestamp
is the average ofstart
andend
, so updating those will modify thetimestamp
.- added a
old
property ofKafkaReporter
which is used to determine whetherKafkaReporter.timestamp
only increases as it's updated. Specifically, this is used withinKafkaReporter.report
to help guarantee time ordering within the Kafka topic.- Note: the logic implemented is not fool-proof, but it should suffice to catch gross misuses.
- both
Reporter.report
andReporter.retrieve
take apreferred
kwarg to control where they read/write data.- This previously only existed for
DiskReporter.retrieve
. -
DiskReporter.report(*args, preferred=True)
now automatically updates thecachefile
managed byDiskReporter
s -
KafkaReporter.report
ignorespreferred
and always writes to the end of the topic, although it does checkself.timestamp
-
KafkaReporter.retrieve
will seek the offset in the topic corresponding toself.timestamp
and read from that ifpreferred=False
. Otherwise, it consumes the next message in the topic.
- This previously only existed for
- some minor renaming of attributes and methods within
DiskReporter
- removed
gps_start
andgps_end
from the signatures forSegmentReporter.report
,HDF5SegmentReporter.report
, andGPSTimesReporter.report
. That information is still recorded but is extracted fromself.start
andself.end
. This makes the signatures more consistent with the otherReporter
s.
- added a
names
changes made here were only made to support the new functionality within stream and batch
- added
nickname2topic
and similar functions to avoid repeated code instream
and to unify naming conventions betweenstream
andbatch
.
features
- fixed but in
features.FeatureVector.features
associated with calls toTimeMap.__call__
; I was missing a dummy argument
misc changes
- within executables, I changed
logger.warn
statements tologger.error
when we log tracebacks. This felt more appropriate.
Edited by Reed Essick