From 1eb717d3903de38ca2b1127cd3e268927242e0a9 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 1 Aug 2019 08:25:41 -0700
Subject: [PATCH] add gstlal_calibration_aggregator for storing metrics from
 calibration

---
 gstlal-calibration/bin/Makefile.am            |   1 +
 .../bin/gstlal_calibration_aggregator         | 166 ++++++++++++++++++
 2 files changed, 167 insertions(+)
 create mode 100755 gstlal-calibration/bin/gstlal_calibration_aggregator

diff --git a/gstlal-calibration/bin/Makefile.am b/gstlal-calibration/bin/Makefile.am
index 93657f56ca..d044a64e0d 100644
--- a/gstlal-calibration/bin/Makefile.am
+++ b/gstlal-calibration/bin/Makefile.am
@@ -1,2 +1,3 @@
 dist_bin_SCRIPTS = \
+	gstlal_calibration_aggregator \
 	gstlal_compute_strain
diff --git a/gstlal-calibration/bin/gstlal_calibration_aggregator b/gstlal-calibration/bin/gstlal_calibration_aggregator
new file mode 100755
index 0000000000..1b478b9fd6
--- /dev/null
+++ b/gstlal-calibration/bin/gstlal_calibration_aggregator
@@ -0,0 +1,166 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2019  Patrick Godwin
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+
+from collections import defaultdict
+import argparse
+import json
+import logging
+import sys, os
+import time
+import timeit
+
+import numpy
+
+from kafka import KafkaConsumer
+
+from ligo.scald import io
+
+
+#
+# =============================================================================
+#
+#                                 Command Line
+#
+# =============================================================================
+#
+
+
+# Read command line options
+def parse_command_line():
+
+	parser = argparse.ArgumentParser(description="Online calibration aggregator")
+
+	# directory to put everything in
+	parser.add_argument("--data-type", help="Specify datatypes to aggregate from 'min', 'max', 'median'.")
+	parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the  URLs (default = 1., set to 0 to disable)")
+	parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
+	parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
+	parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
+	parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
+	parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
+	parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
+	parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
+
+	args = parser.parse_args()
+
+	return args
+
+
+#
+# =============================================================================
+#
+#                                     Main
+#
+# =============================================================================
+#
+
+if __name__ == '__main__':
+	options = parse_command_line()
+	topics = ['H1_latency', 'H1_statevector_bit_check']
+	channel = 'H1_HOFT_TEST'
+	statevector_tags = ['TDCFs_valid', 'monitor_on']
+
+	logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
+
+	consumer = KafkaConsumer(
+		*topics,
+		bootstrap_servers=[options.kafka_server],
+		value_deserializer=lambda m: json.loads(m.decode('utf-8')),
+		group_id='%s_aggregator' % topics[0],
+		auto_offset_reset='latest',
+		max_poll_interval_ms = 60000,
+		session_timeout_ms=30000,
+		heartbeat_interval_ms=10000,
+		reconnect_backoff_ms=5000,
+		reconnect_backoff_max_ms=30000
+	)
+
+	# set up aggregator sink
+	agg_sink = io.influx.Aggregator(
+		hostname=options.influx_hostname,
+		port=options.influx_port,
+		db=options.influx_database_name,
+		auth=options.enable_auth,
+		https=options.enable_https,
+		reduce_across_tags=options.across_jobs
+	)
+
+	# register measurement schemas for aggregators
+	for topic in topics:
+		if 'latency' in topic:
+			agg_sink.register_schema(topic, columns='data', column_key='data', tags='stage', tag_key='stage')
+		elif 'statevector' in topic:
+			agg_sink.register_schema(topic, columns='data', column_key='data', tags='check', tag_key='check')
+
+	# start an infinite loop to keep updating and aggregating data
+	while True:
+		logging.info("sleeping for %.1f s" % options.dump_period)
+		time.sleep(options.dump_period)
+
+		logging.info("retrieving data from kafka")
+		start = timeit.default_timer()
+		data = {topic: defaultdict(lambda: {'time': [], 'fields': {'data': []}}) for topic in topics}
+
+		### poll consumer for messages
+		msg_pack = consumer.poll(timeout_ms = 1000, max_records = 1000)
+		for tp, messages in msg_pack.items():
+			for message in messages:
+				try:
+					topic = message.topic
+					if 'latency' in topic:
+						ifo = topic.split('_')[0]
+						tag = [name for name in message.value.keys() if channel in name][0]
+						formatted_tag = tag.strip(channel+'_')
+						data[topic][formatted_tag]['time'].append(message.value['time'])
+						data[topic][formatted_tag]['fields']['data'].append(message.value[tag])
+					elif 'statevector' in topic:
+						tag = [name for name in message.value.keys() if name in statevector_tags][0]
+						data[topic][tag]['time'].append(message.value['time'])
+						data[topic][tag]['fields']['data'].append(message.value[tag])
+
+				except KeyError: ### no metrics
+					pass
+		
+		### convert series to numpy arrays
+		for topic in topics:
+			for tag in data[topic].keys():
+				data[topic][tag]['time'] = numpy.array(data[topic][tag]['time'])
+				data[topic][tag]['fields']['data'] = numpy.array(data[topic][tag]['fields']['data'])
+
+		elapsed = timeit.default_timer() - start
+		logging.info("time to retrieve data: %.1f s" % elapsed)
+
+		# store and reduce data for each job
+		start = timeit.default_timer()
+		for topic in topics:
+			logging.info("storing and reducing timeseries for measurement: %s" % topic)
+			agg_sink.store_columns(topic, data[topic], aggregate=options.data_type)
+		elapsed = timeit.default_timer() - start
+		logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
+
+	# close connection to consumer if using kafka
+	if consumer:
+		consumer.close()
+
+	#
+	# always end on an error so that condor won't think we're done and will
+	# restart us
+	#
+
+	sys.exit(1)
-- 
GitLab