diff --git a/gstlal-ugly/bin/gstlal_ll_dq b/gstlal-ugly/bin/gstlal_ll_dq index 4fd62acd2608cc76b082c0081566c0459c2e2381..e6291b69c64b5a57598db665d7443f44442a889f 100755 --- a/gstlal-ugly/bin/gstlal_ll_dq +++ b/gstlal-ugly/bin/gstlal_ll_dq @@ -126,7 +126,7 @@ class PSDHandler(simplehandler.Handler): # Only reduce every 100s if (buftime - self.last_reduce_time) >= 100: self.last_reduce_time = int(round(buftime,-2)) - logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now()))) + logging.debug("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now()))) data = {route: {self.instrument: {'time': list(self.timedeq), 'fields': {'data': list(self.datadeq[route])}}} for route in self.routes} @@ -175,7 +175,8 @@ class PSDHandler(simplehandler.Handler): if __name__ == '__main__': options, filenames = parse_command_line() - logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + log_level = logging.DEBUG if options.verbose else logging.INFO + logging.basicConfig(level = log_level, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") # set up aggregator sink if options.data_backend == 'influx': diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 818e4ddd08b09d4d3136bea6bc2c9b00569d8adb..d74aaeee47e1e475128dfce1e19815c9e2ff4e18 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -62,6 +62,7 @@ def parse_command_line(): parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.") parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.") parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.") + parser.add_argument("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.") args = parser.parse_args() @@ -89,7 +90,9 @@ if __name__ == '__main__': jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)] routes = options.route - logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + log_level = logging.DEBUG if options.verbose else logging.INFO + logging.basicConfig(format = '%(asctime)s | ll_inspiral_aggregator : %(levelname)s : %(message)s') + logging.getLogger().setLevel(log_level) # We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs if options.kafka_server: @@ -111,6 +114,7 @@ if __name__ == '__main__': consumer = None # set up aggregator sink + logging.info("setting up aggregator...") if options.data_backend == 'influx': agg_sink = io.influx.Aggregator( hostname=options.influx_hostname, @@ -128,32 +132,34 @@ if __name__ == '__main__': agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job') # start an infinite loop to keep updating and aggregating data + logging.info("starting up...") while True: - logging.info("sleeping for %.1f s" % options.dump_period) + logging.debug("sleeping for %.1f s" % options.dump_period) time.sleep(options.dump_period) if consumer: # this is not threadsafe! - logging.info("retrieving data from kafka") + logging.debug("retrieving data from kafka") start = timeit.default_timer() datadata = io.kafka.retrieve_timeseries(consumer, routes, max_records = 2 * len(jobs) * len(routes)) elapsed = timeit.default_timer() - start - logging.info("time to retrieve data: %.1f s" % elapsed) + logging.debug("time to retrieve data: %.1f s" % elapsed) else: - logging.info("retrieving data from bottle routes") + logging.debug("retrieving data from bottle routes") datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads) # store and reduce data for each job start = timeit.default_timer() for route in routes: - logging.info("storing and reducing timeseries for measurement: %s" % route) + logging.debug("storing and reducing timeseries for measurement: %s" % route) for aggregate in options.data_type: agg_sink.store_columns(route, datadata[route], aggregate=aggregate) elapsed = timeit.default_timer() - start - logging.info("time to store/reduce timeseries: %.1f s" % elapsed) + logging.debug("time to store/reduce timeseries: %.1f s" % elapsed) # close connection to consumer if using kafka if consumer: + logging.info("shutting down consumer...") consumer.close() # diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator index e6bc5670ed827b52cbd5d5d1247e48771c6ffbc9..4ca857f96ae7108bb9c5e0a560891b961ebbc09d 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator @@ -79,6 +79,7 @@ def parse_command_line(): parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.") parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.") parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.") args = parser.parse_args() @@ -101,7 +102,9 @@ if __name__ == '__main__': # FIXME don't hardcode some of these? jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)] - logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + log_level = logging.DEBUG if options.verbose else logging.INFO + logging.basicConfig(format = '%(asctime)s | ll_inspiral_trigger_aggregator : %(levelname)s : %(message)s') + logging.getLogger().setLevel(log_level) pool = Pool(options.num_threads) @@ -137,20 +140,21 @@ if __name__ == '__main__': agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads) # start an infinite loop to keep updating and aggregating data + logging.info("starting up...") while True: - logging.info("sleeping for %.1f s" % options.dump_period) + logging.debug("sleeping for %.1f s" % options.dump_period) time.sleep(options.dump_period) if consumer: # this is not threadsafe! - logging.info("retrieving data from kafka") + logging.debug("retrieving data from kafka") start = timeit.default_timer() #triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs)) triggers = retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs)) elapsed = timeit.default_timer() - start - logging.info("time to retrieve data: %.1f s" % elapsed) + logging.debug("time to retrieve data: %.1f s" % elapsed) else: - logging.info("retrieving data from bottle routes") + logging.debug("retrieving data from bottle routes") triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = options.route, num_threads=options.num_threads) # filter out triggers that don't have a far assigned yet @@ -159,15 +163,16 @@ if __name__ == '__main__': # store and reduce data for each job if triggers: start = timeit.default_timer() - logging.info("storing and reducing triggers") + logging.debug("storing and reducing triggers") agg_sink.store_triggers('triggers', triggers, far_key = 'combined_far', time_key = 'end') elapsed = timeit.default_timer() - start - logging.info("time to store/reduce triggers: %.1f s" % elapsed) + logging.debug("time to store/reduce triggers: %.1f s" % elapsed) else: - logging.info("no triggers to process") + logging.debug("no triggers to process") # close connection to consumer if using kafka if consumer: + logging.info("shutting down consumer...") consumer.close() #