Skip to content

updates to streaming pipeline and supporting material

Reed Essick requested to merge stream.stream into streaming_development_pt2

I ended up making quite a few changes, most of which are minor.

stream

  • defined gps_range to avoid repeated code to calculate start/end times
  • implemented idq-stream and idq.stream.stream
    • supports both workflow=fork and workflow=condor
    • only supports --persist with fork (parent process stays alive until all children finish)
    • implements "initial training/calibration" logic
      • checks whether there is a "preferred" object at the end of Reporters for both train and calibration processes
      • if there is no preferred object for at least one classifier, launches a quick batch job using initial_lookback to define how much data is included.
      • this may produce crappy models/calibration_maps, but at least the rest of the pipeline won't fail because of broken plumbing.
  • define dictionaries of topic names outside of persistent loops so they are not computed repeated within each epoch (see #47 (closed)).
  • within functions' main loops
    • update Reporter.start and Reporter.end instead of defining new reporters whenever possible. This also allows us to remove the timestamp kwarg from Reporter.retrieve because the equivalent information is now derived from start and end within KafkaReporter. Other reporters similarly determine where they write based on start and end
    • reporting is always done with preferred=True
    • retrieving is always done with preferred=True
    • generally, slight changes to logic and conditionals. For example, we do not require non-zero numbers of both glitch and clean samples when updating calibration. The CalibrationMap should handle that gracefully for us, so we update the calibration whenever there are any new samples.

batch

changes made here were only made to support the new functionality within stream

  • added --persist options to better control behavior. This mirrors the functionality within idq-stream.
  • reporting models, evaluated quivers, calibration_maps, and series results are always done with preferred=True
  • reporting is done using names.nickname2topic and it's derivatives now so that these are compatible with the streaming pipeline.
    • specifically, we delegate to batch functionality within stream.stream for initial training, so naming conventions must be consistent.

io

changes made here were only made to support the new functionality within stream and batch

  • all Reporters have a preferred property now. This can be used to grab the "preferred object" known to that reporter. Exact behavior differs based on reporter class
    • DiskReporters manage a cachefile and append updates to that file.
    • KafkaReporter implements logic based on a timestamp
  • added a timestamp property to KafkaReporter and referenced that within KafkaReporter.report instead of taking in an extra kwarg. timestamp is the average of start and end, so updating those will modify the timestamp.
    • added a old property of KafkaReporter which is used to determine whether KafkaReporter.timestamp only increases as it's updated. Specifically, this is used within KafkaReporter.report to help guarantee time ordering within the Kafka topic.
      • Note: the logic implemented is not fool-proof, but it should suffice to catch gross misuses.
    • both Reporter.report and Reporter.retrieve take a preferred kwarg to control where they read/write data.
      • This previously only existed for DiskReporter.retrieve.
      • DiskReporter.report(*args, preferred=True) now automatically updates the cachefile managed by DiskReporters
      • KafkaReporter.report ignores preferred and always writes to the end of the topic, although it does check self.timestamp
      • KafkaReporter.retrieve will seek the offset in the topic corresponding to self.timestamp and read from that if preferred=False. Otherwise, it consumes the next message in the topic.
    • some minor renaming of attributes and methods within DiskReporter
    • removed gps_start and gps_end from the signatures for SegmentReporter.report, HDF5SegmentReporter.report, and GPSTimesReporter.report. That information is still recorded but is extracted from self.start and self.end. This makes the signatures more consistent with the other Reporters.

names

changes made here were only made to support the new functionality within stream and batch

  • added nickname2topic and similar functions to avoid repeated code in stream and to unify naming conventions between stream and batch.

features

  • fixed but in features.FeatureVector.features associated with calls to TimeMap.__call__; I was missing a dummy argument

misc changes

  • within executables, I changed logger.warn statements to logger.error when we log tracebacks. This felt more appropriate.
Edited by Reed Essick

Merge request reports