Skip to content
Snippets Groups Projects
Commit ce7c5dd4 authored by Patrick Godwin's avatar Patrick Godwin Committed by Kipp Cannon
Browse files

gstlal_feature_listener: switch to datamon database utils to add rows

parent 15263800
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,8 @@ import time
from confluent_kafka import Consumer, KafkaError
from datamon import io
from gstlal.fxtools import utils
#-------------------------------------------------
......@@ -91,6 +93,11 @@ class StreamListener(object):
### initialize queues
self.feature_queue = deque(maxlen = 60 * self.sample_rate)
### database settings
self.database_name = 'fx'
self.table_name = 'fxmonitor'
self.columns = ['target_snr', 'latency', 'percent_missed']
### other settings
if options.target_channel:
self.target_channel = options.target_channel
......@@ -98,7 +105,8 @@ class StreamListener(object):
self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument
self.num_channels = options.num_channels
### initialize DB
### setup/connect to database
logger.info("initializing sqlite database...")
self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite')
......@@ -106,9 +114,8 @@ class StreamListener(object):
if not os.path.isfile(self.sqlite_path):
logger.info("db does not exist, creating one...")
# connect to DB
self.conn = sqlite3.connect(self.sqlite_path)
self.initialize_db(self.sqlite_path)
self.conn = io.sqlite.create_client(self.database_name, options.rootdir)
io.sqlite.create_table(self.conn, self.columns, self.table_name)
def fetch_data(self):
"""
......@@ -138,23 +145,17 @@ class StreamListener(object):
latency = utils.gps2latency(self.timestamp)
logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency))
### format row
### generate metrics
percent_missed = 100 * (float(self.num_channels - len(features.keys())) / self.num_channels)
if features.has_key(self.target_channel):
target_snr = features[self.target_channel][0]['snr']
else:
target_snr = 0.
data = (self.timestamp, latency, percent_missed, target_snr)
data = {'target_snr': [target_snr], 'latency': [latency], 'percent_missed': [percent_missed]}
### insert row into DB
c = self.conn.cursor()
sql = ''' INSERT INTO fxmonitor(timestamp,latency,percent_missed,target_snr) VALUES(?,?,?,?) '''
c.execute(sql,data)
try:
self.conn.commit()
except sqlite3.OperationalError:
pass
### add to database
io.sqlite.store_timeseries(self.conn, self.columns, [self.timestamp], data, self.table_name)
def start(self):
"""
......@@ -178,27 +179,6 @@ class StreamListener(object):
logger.info('shutting down feature listener...')
self.conn.close()
def initialize_db(self, sqlite_path):
table_name = 'fxmonitor'
columns = ['timestamp','latency', 'percent_missed', 'target_snr']
column_type = ['REAL','REAL','REAL','REAL']
column_def = [0.,0., 0., 0.]
c = self.conn.cursor()
fx_table = """ CREATE TABLE IF NOT EXISTS fxmonitor (
timestamp real PRIMARY KEY,
latency real,
percent_missed real,
target_snr real
); """
c.execute(fx_table)
### Committing changes and closing the connection to the database file
self.conn.commit()
class SignalHandler(object):
"""
helper class to shut down the stream listener gracefully before exiting
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment