diff --git a/gstlal-calibration/bin/gstlal_calibration_aggregator b/gstlal-calibration/bin/gstlal_calibration_aggregator index 1b478b9fd69afba7a020bfe65a9eca543a7c76c9..ce5ccd7dffc83d39514c2114ac7ae40e62293446 100755 --- a/gstlal-calibration/bin/gstlal_calibration_aggregator +++ b/gstlal-calibration/bin/gstlal_calibration_aggregator @@ -18,6 +18,7 @@ from collections import defaultdict +import ConfigParser import argparse import json import logging @@ -40,27 +41,31 @@ from ligo.scald import io # ============================================================================= # - # Read command line options def parse_command_line(): parser = argparse.ArgumentParser(description="Online calibration aggregator") # directory to put everything in - parser.add_argument("--data-type", help="Specify datatypes to aggregate from 'min', 'max', 'median'.") - parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)") - parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") - parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") - parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") - parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") - parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.") - parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.") - parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.") + parser.add_argument("--config-file", help="Specify configuration file.") args = parser.parse_args() return args +# Parse config sections +def ConfigSectionMap(section): + dict1 = {} + options = Config.options(section) + for option in options: + try: + dict1[option] = Config.get(section, option) + if dict1[option] == -1: + DebugPrint("skip: %s" % option) + except: + print("exception on %s!" % option) + dict1[option] = None + return dict1 # # ============================================================================= @@ -72,6 +77,18 @@ def parse_command_line(): if __name__ == '__main__': options = parse_command_line() + Config = ConfigParser.ConfigParser() + Config.read(options.config_file) + MonitoringConfigs = ConfigSectionMap("MonitoringConfigurations") + kafka_server = MonitoringConfigs["kafkaserver"] + influx_hostname = MonitoringConfigs["influxhostname"] + influx_port = MonitoringConfigs["influxport"] + influx_database_name = MonitoringConfigs["influxdatabasename"] + enable_auth = Config.getboolean("MonitoringConfigurations", "enableauth") + enable_https = Config.getboolean("MonitoringConfigurations", "enablehttps") + across_jobs = Config.getboolean("MonitoringConfigurations", "acrossjobs") + data_type = MonitoringConfigs["datatype"] + dump_period = float(MonitoringConfigs["dumpperiod"]) topics = ['H1_latency', 'H1_statevector_bit_check'] channel = 'H1_HOFT_TEST' statevector_tags = ['TDCFs_valid', 'monitor_on'] @@ -80,7 +97,7 @@ if __name__ == '__main__': consumer = KafkaConsumer( *topics, - bootstrap_servers=[options.kafka_server], + bootstrap_servers=[kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_aggregator' % topics[0], auto_offset_reset='latest', @@ -93,12 +110,12 @@ if __name__ == '__main__': # set up aggregator sink agg_sink = io.influx.Aggregator( - hostname=options.influx_hostname, - port=options.influx_port, - db=options.influx_database_name, - auth=options.enable_auth, - https=options.enable_https, - reduce_across_tags=options.across_jobs + hostname=influx_hostname, + port=influx_port, + db=influx_database_name, + auth=enable_auth, + https=enable_https, + reduce_across_tags=across_jobs ) # register measurement schemas for aggregators @@ -110,8 +127,8 @@ if __name__ == '__main__': # start an infinite loop to keep updating and aggregating data while True: - logging.info("sleeping for %.1f s" % options.dump_period) - time.sleep(options.dump_period) + logging.info("sleeping for %.1f s" % dump_period) + time.sleep(dump_period) logging.info("retrieving data from kafka") start = timeit.default_timer() @@ -150,7 +167,7 @@ if __name__ == '__main__': start = timeit.default_timer() for topic in topics: logging.info("storing and reducing timeseries for measurement: %s" % topic) - agg_sink.store_columns(topic, data[topic], aggregate=options.data_type) + agg_sink.store_columns(topic, data[topic], aggregate=data_type) elapsed = timeit.default_timer() - start logging.info("time to store/reduce timeseries: %.1f s" % elapsed)