There will be maintenance performed on git.ligo.org, chat.ligo.org, containers.lig.org, and docs.ligo.org starting at 9am PDT on Tuesday 18th August 2020. There will be an extremely small period of downtime at the start of the maintenance window as various services are restarted. Please address any comments, questions, or concerns to computing-help@igwn.org.

Commit 1eb717d3 authored by Patrick Godwin's avatar Patrick Godwin

add gstlal_calibration_aggregator for storing metrics from calibration

parent 53d690cb
Pipeline #73034 failed with stages
in 3 minutes and 1 second
dist_bin_SCRIPTS = \
gstlal_calibration_aggregator \
gstlal_compute_strain
#!/usr/bin/env python
#
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from collections import defaultdict
import argparse
import json
import logging
import sys, os
import time
import timeit
import numpy
from kafka import KafkaConsumer
from ligo.scald import io
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online calibration aggregator")
# directory to put everything in
parser.add_argument("--data-type", help="Specify datatypes to aggregate from 'min', 'max', 'median'.")
parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
args = parser.parse_args()
return args
#
# =============================================================================
#
# Main
#
# =============================================================================
#
if __name__ == '__main__':
options = parse_command_line()
topics = ['H1_latency', 'H1_statevector_bit_check']
channel = 'H1_HOFT_TEST'
statevector_tags = ['TDCFs_valid', 'monitor_on']
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
consumer = KafkaConsumer(
*topics,
bootstrap_servers=[options.kafka_server],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_aggregator' % topics[0],
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
# set up aggregator sink
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=options.across_jobs
)
# register measurement schemas for aggregators
for topic in topics:
if 'latency' in topic:
agg_sink.register_schema(topic, columns='data', column_key='data', tags='stage', tag_key='stage')
elif 'statevector' in topic:
agg_sink.register_schema(topic, columns='data', column_key='data', tags='check', tag_key='check')
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("sleeping for %.1f s" % options.dump_period)
time.sleep(options.dump_period)
logging.info("retrieving data from kafka")
start = timeit.default_timer()
data = {topic: defaultdict(lambda: {'time': [], 'fields': {'data': []}}) for topic in topics}
### poll consumer for messages
msg_pack = consumer.poll(timeout_ms = 1000, max_records = 1000)
for tp, messages in msg_pack.items():
for message in messages:
try:
topic = message.topic
if 'latency' in topic:
ifo = topic.split('_')[0]
tag = [name for name in message.value.keys() if channel in name][0]
formatted_tag = tag.strip(channel+'_')
data[topic][formatted_tag]['time'].append(message.value['time'])
data[topic][formatted_tag]['fields']['data'].append(message.value[tag])
elif 'statevector' in topic:
tag = [name for name in message.value.keys() if name in statevector_tags][0]
data[topic][tag]['time'].append(message.value['time'])
data[topic][tag]['fields']['data'].append(message.value[tag])
except KeyError: ### no metrics
pass
### convert series to numpy arrays
for topic in topics:
for tag in data[topic].keys():
data[topic][tag]['time'] = numpy.array(data[topic][tag]['time'])
data[topic][tag]['fields']['data'] = numpy.array(data[topic][tag]['fields']['data'])
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
# store and reduce data for each job
start = timeit.default_timer()
for topic in topics:
logging.info("storing and reducing timeseries for measurement: %s" % topic)
agg_sink.store_columns(topic, data[topic], aggregate=options.data_type)
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
# close connection to consumer if using kafka
if consumer:
consumer.close()
#
# always end on an error so that condor won't think we're done and will
# restart us
#
sys.exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment