Skip to content
Snippets Groups Projects
Commit 5a025688 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_synchronizer: fix issues where synchronizer was not combining...

gstlal_feature_synchronizer: fix issues where synchronizer was not combining the right subsets in some cases, add steps to ensure that buffers do not get processed more than once
parent e8e2da39
No related branches found
No related tags found
No related merge requests found
Pipeline #29829 passed
......@@ -24,7 +24,7 @@ __author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
# Preamble
#-------------------------------------------------
import itertools
import heapq
import json
import signal
import sys
......@@ -34,8 +34,7 @@ from Queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPool
from optparse import OptionParser
import lal
from lal import LIGOTimeGPS
from lal import gpstime
from confluent_kafka import Producer, Consumer, KafkaError
......@@ -102,6 +101,7 @@ class StreamSynchronizer(object):
### initialize queues
self.feature_queue = PriorityQueue()
self.feature_buffer = deque(maxlen = 300)
self.last_timestamp = 0
def fetch_data(self, consumer):
"""
......@@ -142,13 +142,19 @@ class StreamSynchronizer(object):
and if so, takes subsets from the feature queue, combines them, and push the
result to a buffer
"""
### clear out queue of any stale data
while not self.feature_queue.empty() and self.last_timestamp >= self.feature_queue.queue[0][0]:
self.feature_queue.get()
### inspect timestamps in front of queue
num_elems = min(self.num_topics, self.feature_queue.qsize())
timestamps = [block[0] for block in self.feature_queue.queue[:num_elems]]
timestamps = [block[0] for block in heapq.nsmallest(num_elems, self.feature_queue.queue)]
### check if either all timestamps are identical, or if the timestamps
### are old enough to process regardless. if so, process elements from queue
if timestamps:
if timestamps[0] <= self.max_timeout() or (len(set(timestamps)) == 1 and num_elems == self.num_topics):
### find number of elements to remove from queue
if timestamps[0] <= self.max_timeout():
num_subsets = len([timestamp for timestamp in timestamps if timestamp == timestamps[0]])
......@@ -160,6 +166,7 @@ class StreamSynchronizer(object):
logger.info('combining %d / %d feature subsets for timestamp %d' % (len(subsets),self.num_topics,timestamps[0]))
features = self.combine_subsets(subsets)
self.feature_buffer.appendleft((timestamps[0], features))
self.last_timestamp = timestamps[0]
def combine_subsets(self, subsets):
"""
......@@ -183,7 +190,7 @@ class StreamSynchronizer(object):
"""
calculates the oldest timestamp allowed for incoming data
"""
return int(LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)) - self.latency_timeout
return int(gpstime.tconvert('now')) - self.latency_timeout
def synchronize(self):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment