Simplified polling in FrameKafkaConsumer
This MR adds a new method, poll_and_extract
in FrameKafkaConsumer
to simplify polling and extracting frame buffers.
What I've noticed in the various kafka2*
programs is that poll_consumer_for_topic()
and extract_frame_buffer_from_message()
are always called in sequence so I've combined the two operations here.
With this simplification, you can do this:
r_poll = framekafkaconsumer.poll_consumer_for_topic()
for topic_partition in r_poll:
for message in r_poll[topic_partition]:
(
frame_buffer,
payload_info
) = framekafkaconsumer.extract_frame_buffer_from_message(
message, tp_info, crc_checker=CRCChecker.GDS
)
instead by doing this:
for frame_buffer, payload_info in framekafkaconsumer.poll_and_extract(
tp_info,
crc_checker=CRCChecker.GDS
):
For the CRC check itself, I noticed that the -a
option that passes crc-check={true,false}
isn't being used anywhere, while the crc_checker
option is typically an option you set once. What I ended up doing is using the information used in tp_info
along with the crc_checker
option to determine whether or not to validate the frame data on a per-IFO basis. Hopefully that design choice makes sense, otherwise you'd need to change the crc_checker
keyword argument on a per-IFO basis which seemed like a bit of a pain to use.
The last thing I did was move the CRCChecker
enumeration from class-level scope to module-level scope so that the various kafka2*
programs can access this more easily.
Edit: Removed multithreaded implementation to reduce complexity.