Skip to content

Simplified polling in FrameKafkaConsumer

This MR adds a new method, poll_and_extract in FrameKafkaConsumer to simplify polling and extracting frame buffers.

What I've noticed in the various kafka2* programs is that poll_consumer_for_topic() and extract_frame_buffer_from_message() are always called in sequence so I've combined the two operations here.

With this simplification, you can do this:

r_poll = framekafkaconsumer.poll_consumer_for_topic()
for topic_partition in r_poll:
    for message in r_poll[topic_partition]:
        (
            frame_buffer,
            payload_info
        ) = framekafkaconsumer.extract_frame_buffer_from_message(
            message, tp_info, crc_checker=CRCChecker.GDS
        )

instead by doing this:

for frame_buffer, payload_info in framekafkaconsumer.poll_and_extract(
     tp_info,
     crc_checker=CRCChecker.GDS
):

For the CRC check itself, I noticed that the -a option that passes crc-check={true,false} isn't being used anywhere, while the crc_checker option is typically an option you set once. What I ended up doing is using the information used in tp_info along with the crc_checker option to determine whether or not to validate the frame data on a per-IFO basis. Hopefully that design choice makes sense, otherwise you'd need to change the crc_checker keyword argument on a per-IFO basis which seemed like a bit of a pain to use.

The last thing I did was move the CRCChecker enumeration from class-level scope to module-level scope so that the various kafka2* programs can access this more easily.

Edit: Removed multithreaded implementation to reduce complexity.

Edited by Patrick Godwin

Merge request reports