Skip to content
Snippets Groups Projects
Commit eecf558f authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_dq, gstlal_ll_inspiral_(trigger)aggregator: ease off on excessive logging

parent c2b7eca1
No related branches found
No related tags found
No related merge requests found
......@@ -126,7 +126,7 @@ class PSDHandler(simplehandler.Handler):
# Only reduce every 100s
if (buftime - self.last_reduce_time) >= 100:
self.last_reduce_time = int(round(buftime,-2))
logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now())))
logging.debug("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now())))
data = {route: {self.instrument: {'time': list(self.timedeq), 'fields': {'data': list(self.datadeq[route])}}} for route in self.routes}
......@@ -175,7 +175,8 @@ class PSDHandler(simplehandler.Handler):
if __name__ == '__main__':
options, filenames = parse_command_line()
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(level = log_level, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
# set up aggregator sink
if options.data_backend == 'influx':
......
......@@ -62,6 +62,7 @@ def parse_command_line():
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
parser.add_argument("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
args = parser.parse_args()
......@@ -89,7 +90,9 @@ if __name__ == '__main__':
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
routes = options.route
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format = '%(asctime)s | ll_inspiral_aggregator : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
......@@ -111,6 +114,7 @@ if __name__ == '__main__':
consumer = None
# set up aggregator sink
logging.info("setting up aggregator...")
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
......@@ -128,32 +132,34 @@ if __name__ == '__main__':
agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
# start an infinite loop to keep updating and aggregating data
logging.info("starting up...")
while True:
logging.info("sleeping for %.1f s" % options.dump_period)
logging.debug("sleeping for %.1f s" % options.dump_period)
time.sleep(options.dump_period)
if consumer:
# this is not threadsafe!
logging.info("retrieving data from kafka")
logging.debug("retrieving data from kafka")
start = timeit.default_timer()
datadata = io.kafka.retrieve_timeseries(consumer, routes, max_records = 2 * len(jobs) * len(routes))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
logging.debug("time to retrieve data: %.1f s" % elapsed)
else:
logging.info("retrieving data from bottle routes")
logging.debug("retrieving data from bottle routes")
datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
# store and reduce data for each job
start = timeit.default_timer()
for route in routes:
logging.info("storing and reducing timeseries for measurement: %s" % route)
logging.debug("storing and reducing timeseries for measurement: %s" % route)
for aggregate in options.data_type:
agg_sink.store_columns(route, datadata[route], aggregate=aggregate)
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
logging.debug("time to store/reduce timeseries: %.1f s" % elapsed)
# close connection to consumer if using kafka
if consumer:
logging.info("shutting down consumer...")
consumer.close()
#
......
......@@ -79,6 +79,7 @@ def parse_command_line():
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
args = parser.parse_args()
......@@ -101,7 +102,9 @@ if __name__ == '__main__':
# FIXME don't hardcode some of these?
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(format = '%(asctime)s | ll_inspiral_trigger_aggregator : %(levelname)s : %(message)s')
logging.getLogger().setLevel(log_level)
pool = Pool(options.num_threads)
......@@ -137,20 +140,21 @@ if __name__ == '__main__':
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# start an infinite loop to keep updating and aggregating data
logging.info("starting up...")
while True:
logging.info("sleeping for %.1f s" % options.dump_period)
logging.debug("sleeping for %.1f s" % options.dump_period)
time.sleep(options.dump_period)
if consumer:
# this is not threadsafe!
logging.info("retrieving data from kafka")
logging.debug("retrieving data from kafka")
start = timeit.default_timer()
#triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
triggers = retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
logging.debug("time to retrieve data: %.1f s" % elapsed)
else:
logging.info("retrieving data from bottle routes")
logging.debug("retrieving data from bottle routes")
triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = options.route, num_threads=options.num_threads)
# filter out triggers that don't have a far assigned yet
......@@ -159,15 +163,16 @@ if __name__ == '__main__':
# store and reduce data for each job
if triggers:
start = timeit.default_timer()
logging.info("storing and reducing triggers")
logging.debug("storing and reducing triggers")
agg_sink.store_triggers('triggers', triggers, far_key = 'combined_far', time_key = 'end')
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce triggers: %.1f s" % elapsed)
logging.debug("time to store/reduce triggers: %.1f s" % elapsed)
else:
logging.info("no triggers to process")
logging.debug("no triggers to process")
# close connection to consumer if using kafka
if consumer:
logging.info("shutting down consumer...")
consumer.close()
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment