Skip to content
Snippets Groups Projects
Commit 9acec2cf authored by Branson Craig Stephens's avatar Branson Craig Stephens
Browse files

Added JSON logging to iDQ vetosrc helper class. Also fixed un-necessary...

Added JSON logging to iDQ vetosrc helper class. Also fixed un-necessary latency when there are iDQ files before and after a gap.
parent 412bb23c
No related branches found
No related tags found
No related merge requests found
......@@ -36,6 +36,9 @@ import glob
import gzip
import os
import time
import logging
import datetime
import json
from collections import deque
import numpy as np
from glue import gpstime
......@@ -82,13 +85,23 @@ def probeBufferHandler(pad,gst_buffer):
# A class for handling the veto timeseries files.
class vetoSource:
def __init__(self, inputPath, inputPre, inputExt, waitTime, initTime=None, dirDigits=5):
def __init__(self, inputPath, inputPre, inputExt, waitTime, initTime=None, dirDigits=5, log_file=None):
self.inputPath = inputPath
self.inputPre = inputPre
self.inputExt = inputExt
self.waitTime = waitTime
self.dirDigits = dirDigits
self.fullCurrentPrefix = ""
self.logger = None
if log_file:
self.logger = logging.getLogger('idq_sender')
self.logger.setLevel(logging.INFO)
fh = logging.FileHandler(log_file)
self.logger.addHandler(fh)
fh.setFormatter(logging.Formatter('%(message)s'))
# Initialize the list of files.
if not initTime:
# FIXME: Badness. Sutracting max iDQ latency by hand.
......@@ -191,6 +204,15 @@ class vetoSource:
src.emit("push-buffer", buf)
src.info("No files! Pushed gap with start=%d, duration=%d latency=%d" %
(buf.timestamp/gst.SECOND,gap_duration/gst.SECOND,(gpstime.GpsSecondsFromPyUTC(time.time()) - gap_end)))
if self.logger:
outDict = {}
outDict['type'] = 'buffer'
outDict['time'] = datetime.datetime.now().isoformat()
outDict['is_gap'] = True
outDict['gpsstart'] = buf.timestamp/gst.SECOND
outDict['duration'] = gap_duration/gst.SECOND
outDict['latency'] = gpstime.GpsSecondsFromPyUTC(time.time()) - gap_end
self.logger.info(json.dumps(outDict))
self.next_output_timestamp += buf.duration
self.current_offset = buf.offset_end
return True
......@@ -203,40 +225,41 @@ class vetoSource:
rest = rest.split('-')[2]
duration = int(rest[:rest.find(self.inputExt)])
# Is this the file we were expecting? If not, we'll wait
# to see if the file we want shows up.
# Is this file later than the one we were expecting?
# If so, we can be sure that the one we wanted will never show up.
# We might as well push a gap and then continue to process
# the file that we have available.
if gpsstart * gst.SECOND > self.next_output_timestamp:
time.sleep(self.waitTime)
self.check_for_new_files(self.next_output_timestamp)
filePath = self.fileQueue.popleft()
rest = filePath[len(self.fullCurrentPrefix):]
gpsstart = int(rest.split('-')[1])
rest = rest.split('-')[2]
duration = int(rest[:rest.find(self.inputExt)])
if gpsstart * gst.SECOND > self.next_output_timestamp:
# Push a gap before continuing to process the file we have.
# FIXME The gap should be a real gap, not a buffer full of zeros.
gap_duration = gpsstart * gst.SECOND - self.next_output_timestamp
gap_samples = int ((gap_duration / gst.SECOND) * self.rate)
#gap_samples = 0
gap_vals = np.zeros(gap_samples)
gap_vals = gap_vals.astype(np.float32)
gap_end = gap_duration/gst.SECOND + gpsstart
buffer_len = gap_vals.nbytes
buf = gst.buffer_new_and_alloc(buffer_len)
buf[:buffer_len-1] = np.getbuffer(gap_vals)
#buf.flag_set(gst.BUFFER_FLAG_GAP)
buf.timestamp = self.next_output_timestamp
buf.duration = gap_duration
buf.offset = self.current_offset
buf.offset_end = self.current_offset + gap_samples
src.emit("push-buffer", buf)
src.info("gst clock = %d" % int(src.get_clock().get_time()))
src.info("pushed gap with start=%d, duration=%d, latency=%d" %
(buf.timestamp/gst.SECOND,gap_duration/gst.SECOND, (gpstime.GpsSecondsFromPyUTC(time.time()) - gap_end)))
self.next_output_timestamp += buf.duration
self.current_offset = buf.offset_end
# FIXME The gap should be a real gap, not a buffer full of zeros.
gap_duration = gpsstart * gst.SECOND - self.next_output_timestamp
gap_samples = int ((gap_duration / gst.SECOND) * self.rate)
#gap_samples = 0
gap_vals = np.zeros(gap_samples)
gap_vals = gap_vals.astype(np.float32)
gap_end = gap_duration/gst.SECOND + gpsstart
buffer_len = gap_vals.nbytes
buf = gst.buffer_new_and_alloc(buffer_len)
buf[:buffer_len-1] = np.getbuffer(gap_vals)
#buf.flag_set(gst.BUFFER_FLAG_GAP)
buf.timestamp = self.next_output_timestamp
buf.duration = gap_duration
buf.offset = self.current_offset
buf.offset_end = self.current_offset + gap_samples
src.emit("push-buffer", buf)
src.info("gst clock = %d" % int(src.get_clock().get_time()))
src.info("pushed gap with start=%d, duration=%d, latency=%d" %
(buf.timestamp/gst.SECOND,gap_duration/gst.SECOND, (gpstime.GpsSecondsFromPyUTC(time.time()) - gap_end)))
if self.logger:
outDict = {}
outDict['type'] = 'buffer'
outDict['time'] = datetime.datetime.now().isoformat()
outDict['is_gap'] = True
outDict['gpsstart'] = buf.timestamp/gst.SECOND
outDict['duration'] = gap_duration/gst.SECOND
outDict['latency'] = gpstime.GpsSecondsFromPyUTC(time.time()) - gap_end
self.logger.info(json.dumps(outDict))
self.next_output_timestamp += buf.duration
self.current_offset = buf.offset_end
# Load the numpy array.
src.info("processing %s" % filePath)
......@@ -266,6 +289,16 @@ class vetoSource:
# (gpsstart,duration, (int(src.get_clock().get_time())-buf.timestamp)/gst.SECOND))
src.info("pushed buffer with start=%d, duration=%d, latency=%d" %
(gpsstart,duration,(gpstime.GpsSecondsFromPyUTC(time.time()) - buf_end)))
if self.logger:
outDict = {}
outDict['type'] = 'buffer'
outDict['time'] = datetime.datetime.now().isoformat()
outDict['is_gap'] = False
outDict['gpsstart'] = gpsstart
outDict['duration'] = duration
outDict['latency'] = gpstime.GpsSecondsFromPyUTC(time.time()) - buf_end
self.logger.info(json.dumps(outDict))
self.next_output_timestamp += buf.duration
self.current_offset = buf.offset_end
return True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment