From ce7c5dd43d774f6c814fb89f8fb2c45844c6ddc1 Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Sat, 12 Jan 2019 09:58:21 -0800 Subject: [PATCH] gstlal_feature_listener: switch to datamon database utils to add rows --- gstlal-burst/bin/gstlal_feature_listener | 50 +++++++----------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/gstlal-burst/bin/gstlal_feature_listener b/gstlal-burst/bin/gstlal_feature_listener index 8f7d2a4447..05f68cf228 100755 --- a/gstlal-burst/bin/gstlal_feature_listener +++ b/gstlal-burst/bin/gstlal_feature_listener @@ -35,6 +35,8 @@ import time from confluent_kafka import Consumer, KafkaError +from datamon import io + from gstlal.fxtools import utils #------------------------------------------------- @@ -91,6 +93,11 @@ class StreamListener(object): ### initialize queues self.feature_queue = deque(maxlen = 60 * self.sample_rate) + ### database settings + self.database_name = 'fx' + self.table_name = 'fxmonitor' + self.columns = ['target_snr', 'latency', 'percent_missed'] + ### other settings if options.target_channel: self.target_channel = options.target_channel @@ -98,7 +105,8 @@ class StreamListener(object): self.target_channel = '%s:CAL-DELTAL_EXTERNAL_DQ'%options.instrument self.num_channels = options.num_channels - ### initialize DB + + ### setup/connect to database logger.info("initializing sqlite database...") self.sqlite_path = os.path.join(options.rootdir, 'fx.sqlite') @@ -106,9 +114,8 @@ class StreamListener(object): if not os.path.isfile(self.sqlite_path): logger.info("db does not exist, creating one...") - # connect to DB - self.conn = sqlite3.connect(self.sqlite_path) - self.initialize_db(self.sqlite_path) + self.conn = io.sqlite.create_client(self.database_name, options.rootdir) + io.sqlite.create_table(self.conn, self.columns, self.table_name) def fetch_data(self): """ @@ -138,23 +145,17 @@ class StreamListener(object): latency = utils.gps2latency(self.timestamp) logger.info('processing features for timestamp %f, latency = %.3f s' % (self.timestamp, latency)) - ### format row + ### generate metrics percent_missed = 100 * (float(self.num_channels - len(features.keys())) / self.num_channels) if features.has_key(self.target_channel): target_snr = features[self.target_channel][0]['snr'] else: target_snr = 0. - data = (self.timestamp, latency, percent_missed, target_snr) + data = {'target_snr': [target_snr], 'latency': [latency], 'percent_missed': [percent_missed]} - ### insert row into DB - c = self.conn.cursor() - sql = ''' INSERT INTO fxmonitor(timestamp,latency,percent_missed,target_snr) VALUES(?,?,?,?) ''' - c.execute(sql,data) - try: - self.conn.commit() - except sqlite3.OperationalError: - pass + ### add to database + io.sqlite.store_timeseries(self.conn, self.columns, [self.timestamp], data, self.table_name) def start(self): """ @@ -178,27 +179,6 @@ class StreamListener(object): logger.info('shutting down feature listener...') self.conn.close() - def initialize_db(self, sqlite_path): - table_name = 'fxmonitor' - - columns = ['timestamp','latency', 'percent_missed', 'target_snr'] - column_type = ['REAL','REAL','REAL','REAL'] - column_def = [0.,0., 0., 0.] - - c = self.conn.cursor() - - fx_table = """ CREATE TABLE IF NOT EXISTS fxmonitor ( - timestamp real PRIMARY KEY, - latency real, - percent_missed real, - target_snr real - ); """ - - c.execute(fx_table) - - ### Committing changes and closing the connection to the database file - self.conn.commit() - class SignalHandler(object): """ helper class to shut down the stream listener gracefully before exiting -- GitLab