Commit 490c03b9 authored by cal's avatar cal

Switching gstlal_calibration_aggregator over to config file

parent 2f57d31a
Pipeline #73091 failed with stages
in 1 minute and 8 seconds
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
from collections import defaultdict from collections import defaultdict
import ConfigParser
import argparse import argparse
import json import json
import logging import logging
...@@ -40,27 +41,31 @@ from ligo.scald import io ...@@ -40,27 +41,31 @@ from ligo.scald import io
# ============================================================================= # =============================================================================
# #
# Read command line options # Read command line options
def parse_command_line(): def parse_command_line():
parser = argparse.ArgumentParser(description="Online calibration aggregator") parser = argparse.ArgumentParser(description="Online calibration aggregator")
# directory to put everything in # directory to put everything in
parser.add_argument("--data-type", help="Specify datatypes to aggregate from 'min', 'max', 'median'.") parser.add_argument("--config-file", help="Specify configuration file.")
parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
args = parser.parse_args() args = parser.parse_args()
return args return args
# Parse config sections
def ConfigSectionMap(section):
dict1 = {}
options = Config.options(section)
for option in options:
try:
dict1[option] = Config.get(section, option)
if dict1[option] == -1:
DebugPrint("skip: %s" % option)
except:
print("exception on %s!" % option)
dict1[option] = None
return dict1
# #
# ============================================================================= # =============================================================================
...@@ -72,6 +77,18 @@ def parse_command_line(): ...@@ -72,6 +77,18 @@ def parse_command_line():
if __name__ == '__main__': if __name__ == '__main__':
options = parse_command_line() options = parse_command_line()
Config = ConfigParser.ConfigParser()
Config.read(options.config_file)
MonitoringConfigs = ConfigSectionMap("MonitoringConfigurations")
kafka_server = MonitoringConfigs["kafkaserver"]
influx_hostname = MonitoringConfigs["influxhostname"]
influx_port = MonitoringConfigs["influxport"]
influx_database_name = MonitoringConfigs["influxdatabasename"]
enable_auth = Config.getboolean("MonitoringConfigurations", "enableauth")
enable_https = Config.getboolean("MonitoringConfigurations", "enablehttps")
across_jobs = Config.getboolean("MonitoringConfigurations", "acrossjobs")
data_type = MonitoringConfigs["datatype"]
dump_period = float(MonitoringConfigs["dumpperiod"])
topics = ['H1_latency', 'H1_statevector_bit_check'] topics = ['H1_latency', 'H1_statevector_bit_check']
channel = 'H1_HOFT_TEST' channel = 'H1_HOFT_TEST'
statevector_tags = ['TDCFs_valid', 'monitor_on'] statevector_tags = ['TDCFs_valid', 'monitor_on']
...@@ -80,7 +97,7 @@ if __name__ == '__main__': ...@@ -80,7 +97,7 @@ if __name__ == '__main__':
consumer = KafkaConsumer( consumer = KafkaConsumer(
*topics, *topics,
bootstrap_servers=[options.kafka_server], bootstrap_servers=[kafka_server],
value_deserializer=lambda m: json.loads(m.decode('utf-8')), value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_aggregator' % topics[0], group_id='%s_aggregator' % topics[0],
auto_offset_reset='latest', auto_offset_reset='latest',
...@@ -93,12 +110,12 @@ if __name__ == '__main__': ...@@ -93,12 +110,12 @@ if __name__ == '__main__':
# set up aggregator sink # set up aggregator sink
agg_sink = io.influx.Aggregator( agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname, hostname=influx_hostname,
port=options.influx_port, port=influx_port,
db=options.influx_database_name, db=influx_database_name,
auth=options.enable_auth, auth=enable_auth,
https=options.enable_https, https=enable_https,
reduce_across_tags=options.across_jobs reduce_across_tags=across_jobs
) )
# register measurement schemas for aggregators # register measurement schemas for aggregators
...@@ -110,8 +127,8 @@ if __name__ == '__main__': ...@@ -110,8 +127,8 @@ if __name__ == '__main__':
# start an infinite loop to keep updating and aggregating data # start an infinite loop to keep updating and aggregating data
while True: while True:
logging.info("sleeping for %.1f s" % options.dump_period) logging.info("sleeping for %.1f s" % dump_period)
time.sleep(options.dump_period) time.sleep(dump_period)
logging.info("retrieving data from kafka") logging.info("retrieving data from kafka")
start = timeit.default_timer() start = timeit.default_timer()
...@@ -150,7 +167,7 @@ if __name__ == '__main__': ...@@ -150,7 +167,7 @@ if __name__ == '__main__':
start = timeit.default_timer() start = timeit.default_timer()
for topic in topics: for topic in topics:
logging.info("storing and reducing timeseries for measurement: %s" % topic) logging.info("storing and reducing timeseries for measurement: %s" % topic)
agg_sink.store_columns(topic, data[topic], aggregate=options.data_type) agg_sink.store_columns(topic, data[topic], aggregate=data_type)
elapsed = timeit.default_timer() - start elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed) logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment