Skip to content

increased stream functionality

Reed Essick requested to merge merge-ready_SegDBinStream into master

This merge request replaces !40 (closed) and !64 (closed) with a yet another new branch that has been rebased against master. My apologies if this description is not laid out in the most pedagogical way. Please let me know if (when) questions arise and I'll do my best to answer quickly.

This patch is meant to implement segment queries within the streaming pipeline. In addition to simply hooking up the plumbing, we introduced a few new concepts to help control polling and latency/timeout logic within the code.

Although the general structure of the streaming functions remains unchanged, there have been modifications to the technical details in pretty much every function. These largely revolve around expanded functionality within StreamProcessor (SegDb queries, etc), the introduction of a CadenceManager to handle latency/timeout logic, and explicitly calling out blocks of code for workflows besides block.

There was also a good amount of mucking with logging in an attempt to make the logs as readable as possible (ie, clear statements about where the code is in each epoch of the main iterations).

I've tested the new version with io.MockClassifierData configured with a handful of channels (both good and bad witnesses). Everything seems to work just fine, although I've only left it running for ~30 minutes.

A detailed enumeration of changes for each file is below

etc/idq.ini

The example INI file was updated to reflect a few new sections that were introduced associated with options to specifically control latency/timeout logic in streaming jobs. Previously, these options were mashed into the data discovery sections, which seemed to be causing more confusion than anything else.

Note: calibrate does not have a data discovery or a stream section. This is because all it's data discovery and cadence logic are so closely tied to evaluate jobs that the streaming process just reads what it needs from either [evaluate reporting] or [evaluate stream].

idq/batch.py

I switched all "monitoring of forked processes" to use a CadenceManager (see idq.utils.py below). This unifies all the sleep logic in both batch and stream jobs.

I also declared new class methods for ClassifierData: target_times and random_times which replace stream.extract_times. These are now used throughout the batch and stream jobs, again unifying the code-base and preventing duplicated logic.

The only other changes either involve white space/indentation errors or deleting unused import statements.

idq/classifiers/base.py

added an import statement for a new module that houses all our custom exceptions. This means that I've also moved all the exceptions declared in various modules to a single location, and then import them as needed. Seemed prudent given that several different modules have begun to reference the same classes.

SupervisedClassifier

I declared some more class methods and properties. In particular, the precise logic behind is_trained and the model getter/setters has been updated. Similar methods now exist for calibration.

timeseries() now takes a kwarg for segments. If that is not provided, the segments from the ClassifierData within QuiverFactory are used. This functionality already existed in the sklearn classifiers, but not it's been promoted up to the top-level API (and is required for all children, not just sklearn-ers). This prompted corresponding changes in Vetolist and the OVLs.

idq/classifiers/sklearn.py

imported the new exceptions module and referenced it as needed.

idq/exceptions.py

created this module as a new location for all our custom exceptions. I've also declared a new custom exception: MaxIterError which is raised within CadenceManager.poll if it decides we are "too far behind real time" as measured by a maximum number of strides (configurable within the [* stream] sections of the INI file. This error is caught within StreamProcessor and handled appropriately.

idq/features.py

imported exceptions and reference those. Also updated the setters for Quiver.start, Quiver.end so that they update Quiver.segs automatically in a reasonable way.

idq/io.py

This is a big one.

I moved custom exceptions into exceptions.py and import them from there.

I also changed statements like if X: to if X is not None: wherever I found them to help protect us against subtle bugs in which many things can evaluate to False even when they aren't None.

Added restrict_segments to the API for ClassifierData. I thought this existed before my branch, but I can't seem to find where it was moved from. Basically, this modifies the segments in a ClassifierData and throws away cached data as needed.

As discussed above, I implemented target_times and random_times as utility functions for ClassifierData. These are now used in all batch and stream jobs when we need to identify specific times.

UmbrellaClassifierData got several new properties and method classes, mostly built around what I found convenient within stream.py and what I imagined could be convenient (and was easy to tack on). This includes: all_channels, columns (now references children), all_columns (again referencing children), append_children, appendleft_children, extend_child (referenced within the rewrite of extend_children, strip_child, and a version of restrict_segments that references the Umbrella's children.

The only new method classes whose meanings may not be completely obvious are extend_child[ren] and strip_child. The idea here is that extend_child assumes everything is time-ordered and adds the new child on the right (the most recent things). It automatically bumps things like the segments and start, end times. strip_child does something similar, but it removes children from the last (oldest things).

DiskReporters had the sanity check that creates directories moved into the declaration of the directory property. This helps guarantee directories exist when workflows continuously update the start, end time.

QuiverReporter got a bit of a rewrite and now relies much less on its parent class (HDF5Reporter). This was done so that we properly record the start, end times and segs associated with that quiver. We need that when we pass quivers from the evaluate jobs to the calibrate jobs through these QuiverReporters.

HDF5SeriesReporter and GWFSeriesReporter got a bit of extra logic in their report methods to deal with different possible input types for seriesdict (either a list of dictionaries or a single dictionary).

I moved the hard-coded default timeout in StreamReporter.__init__ to a constant defined at the top of the module.

idq/stream.py

this is an even bigger one.

The default values were moved around a bit; in particular several were moved to idq.utils where CadenceManager lives.

There are some trivial changes like making sure the gps_end is at least as big as gps_start in gps_range along with white-space here and there.

I also switched the monitoring of forked processes within stream.stream to use a CadenceManager much like the batch jobs do.

I'll get to each of the big functions and StreamProcessor in turn, but it's also worth pointing out that I deleted extract_times in favor of the target_times and random_times classmethods added to ClassifierData in idq.io.

train

I added more lists that specify how things train with a bit more precision. To wit, whether they are IncrementalSupervisedClassifiers or whether they are just plain SupervisedClassifiers. This controls what new data is fed into them.

Segment queries and instantiation of StreamProcessor have been updated a bit to reflect StreamProcessor.__init__'s new signature along with the additional INI section [train stream].

I also instantiate several "accumulation data products" that serve as buffers that fill up until we kick off new training jobs, etc.

There are now several more optional options for the INI file: min_new_samples (replaces what you had as max_new_samples with appropriately flipped logic), max_samples, and max_stride. These are used to prune data from the "big umbrella" and "big quiver" that are kept around to train SupervisedClassifiers.

The call to StreamProcessor.poll looks different but is functionally very similar to what was there before. Instead of a list of ClassifierData, it directly returns a new_umbrella and a list of segments (segs). This is done because the umbrella may cover more data than the segments (padding on either side to deal with edge effects), and the segments define which bits of data we actually want when generating target/random times.

That padding logic is not actually implemented and the attempts at it from the previous version have been stripped out. My thought process was that we'd get it going as is and then circle back to patch up these (hopefully small) edge-effect corner cases later.

The conditionals on when we train have also been updated. We first condition on whether there is any livetime in the new umbrella. That umbrella includes the impact of possible SegDb queries, so it isn't guaranteed that there are non-trivial segments for any of the children in the umbrella.

We then extract target and random times using the delegation to umbrella (inherits from ClassifierData) and bump the "accumulated counters".

The rest of the main loop strips out "old segments" and "old feature vectors" from the "accumulated quiver, umbrella objects" if there are too many (based on max_samples and max_stride). There's also first a conditional on the number of accumulated new samples, so we don't even try to launch new training jobs unless we have something meaningful to do, basically a form of mini-batching.

The large blocks of changes after this introduce conditionals on the exact workflow (with NotImplementedErrors for fork and condor) and then iterate over classifiers depending on what data they take when they train.

The final change is to flush everything from StreamProcessor instead of retaining the most recent buffer. At some point, we'll want to add that back in but I'll need more time to really pin down all the logic surrounding that and thought it better to "ignore" edge effects near stride boundaries and just flush everything.

evaluate

Many of the changes in evaluate are similar to those in train. In particular, the instantiation of StreamProcessor and it's connection to SegDb are almost identical.

There is also a max_samples option read from the INI that controls the maximum number of target_times and random_times we attempt to evaluate at any time. This should prevent us from falling too far behind if there's a glitch storm and we suddenly get an ungodly rate of glitches.

Again, the logic surrounding StreamProcessor.poll is similar to train. We again condition on livetime in new_umbrella and then extract times to build the quiver.

The actual evaluation jobs are split into blocks based on workflow, with fork and condor containing NotImplementedErrors for the time being.

Again, we flush everything because we're ignoring edge effects for now.

calibrate

Calibrate works slightly differently from train and evaluate in that it doesn't read data from StreamProcessor. Instead, it directly used CadenceManager and holds several buffers to "mini-batch" new quivers and control when calibration jobs are launched. It also uses the [evaluate stream] options because it is tied so closely to the evaluation processes, although I'm not entirely convinced this isn't producing systematically missed data (related to how DiskReporters' "preferred" logic and caching works).

In the main loop, we first try to read in new data for each classifier. Each classifier has a separate quiver that's filled with it's new evaluation data. Then, once all classifiers have had a chance to read data, we iterate again and launch new calibration jobs as needed. If a job is launched, the corresponding quiver is then "emptied" to await future data. This effectively implements mini-batching.

Again, there are blocks called out for each workflow, but the fork and condor ones just contain NotImplementedErrors.

timeseries

If you can follow the changes in train, the changes in timeseries are easy. Again, instantiation of the StreamProcessor is standard at this point. Likewise the logic around StreamProcessor.poll, conditioning on livetime in the new_umbrella, etc. I also added separate blocks for each workflow and flush everything at the end.

One important point is that we do not "fix the segments" that are returned by StreamProcessor.poll. Even though that segment list may have many contiguous segments in it, we keep them separate and instead iterate over them to determine what should be written into separate frames. This guarantees that each frame we write will have the same duration, exactly 1 stride long (specified in [timeseries stream]). You can see there is an iteration over zip(ans, segs) that then delegates to timeseriesreporter to write each frame individually.

StreamProcessor

There are also quite a few changes here. In addition to defining string templates that are used for logging messages that are often repeated (something we should do elsewhere in the code as well), the order in which things are instantiated has changed a bit. At the end of the day, we still end up with a buffer, an umbrella, and the new things is the CadenceManager.

The SegDb functionality should be pretty intuitive. We can configure SegDb queries or not, and whenever we call StreamProcessor.new_buffer() it'll determine whether we care about SegDb or not, performing queries only as needed. In this way, the cadence with which we query SegDb will be controlled by the cadence with which we create new buffers, and that in turn should be controlled by the stride we pass to CadenceManager.

There are a couple new properties defined, which mostly expose access to some of the CadenceManager's attributes. But there's also stuff in there to access the SegDb attributes as well.

StreamProcessor.flush now supports a kwarg (retain) specifying how many buffers to keep.

Lastly, poll looks a bit different but shouldn't be too intimidating. Instead of managing the loop directly with the "processing" boolean, we iterate over a delegation to CadenceManager.poll (a generator) and process each segment it returns. Things like BadSpanErrors and NoDataErrors are caught and the logic for that segment is updated appropriately. At least, I believe this is the case given my understanding of how KafkaClassifierData now works (it should be the case for all other ClassifierData types).

The only "really new" conditional is the try/except that catches MaxIterError. That's raised by CadenceManager.poll when it's gone through "too many strides" and still hasn't caught up with real time. If that happens, we update the timestamp in CadenceManager to point to something much closer to real time before returning. That way, we effectively skip a bunch of data if we fall behind and don't get stuck in the distant past.

As has been noted in train, evaluate, and timeseries, StreamProcessor now returns both the umbrella and a segment list containing segments of every stride that's included there. The segments will be useful when we figure out how to correctly "pad umbrella" to avoid edge effects, and the subsequent filtering logic that would be needed in the callers is pretty much in place. However, as it stands the callers always flush everything and don't manage the padding. Thus, the segments are not strictly needed at the moment but I thought it was worth the trouble to get them in now so we'd have an easier time down the road.

idq/synthetic.py

trivial changes having to do with a conditional to better handle other types (np.ndarray instead of list, etc).

idq/utils.py

Changed an import statement to avoid naming conflicts. Also added the import statement referencing our custom exceptions.

Moved a bunch of default values to here, along with slightly re-arranging the order in which they're declared. I went ahead and deleted CLUSTER_URLS since we're planning to use relative paths for all the html jazz within idq-report.

You can see the actual declaration of CadenceManager here. It supports a few simple methods (wait and poll) that act in slightly different ways. The basic idea behind wait is to make sure that the current gps time is far enough ahead from the current timestamp that it makes sense to do things. The idea behind poll is to grab a bunch of strides, up to some maximum number, until you've caught up to real time, returning them via a generator (poll itself yields instead of returns). As it yields a series of segments, things like StreamProcessor can gobble up each segment in turn, eventually returning an umbrella and segment list representing a large query over many strides.

CadenceManager mostly works with just a timestamp, stride, and delay, but it also supports a max_timestamp (equivalent to gps_end in the command-line for streaming processes) as well as enforce_integer_strides, which forces the timestamp to be an integer number of strides.

draw_random_times now returns an np.ndarray instead of a list. This is because I was getting unexpected type errors and weird behavior, so I decided to strong-arm this into an array. Not sure if it's really necessary, but it probably can't hurt.

tests/conftest.py

changed some syntax to reflect StreamProcessor's changes and deleted old kwargs that are no longer used.

test/test_streamprocessor.py

updated a lot of the signatures and syntax to reflect changes to StreamProcessor. I believe all the same testing functionality should still be in place, though.

Edited by Reed Essick

Merge request reports

Loading