diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator index 8078bc44b5c90016a828c2f1cd4f2670ffd97120..e6bc5670ed827b52cbd5d5d1247e48771c6ffbc9 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_trigger_aggregator @@ -51,12 +51,9 @@ def retrieve_triggers(consumer, jobs, route_name = 'coinc', timeout = 1000, max_ ### retrieve timeseries for all routes and topics msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records) for tp, messages in msg_pack.items(): - job = tp.topic - if job not in jobs: - continue for message in messages: try: - triggers.extend(message.value[route_name]) + triggers.extend(message.value) except KeyError: ### no route in message pass @@ -70,6 +67,7 @@ def parse_command_line(): # directory to put everything in parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path") parser.add_argument("--job-start", type=int, help="job id to start aggregating from") + parser.add_argument("--route", action="store", default="coinc", help="Specify the route where triggers are stored in.") parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)") parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs") parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") @@ -78,6 +76,8 @@ def parse_command_line(): parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.") + parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.") parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") args = parser.parse_args() @@ -108,13 +108,31 @@ if __name__ == '__main__': # We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs if options.kafka_server: from kafka import KafkaConsumer - consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_trigger_aggregator' % jobs[0], auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000) + consumer = KafkaConsumer( + options.route, + bootstrap_servers=[options.kafka_server], + key_deserializer=lambda m: json.loads(m.decode('utf-8')), + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + group_id='%s_trigger_aggregator' % jobs[0], + auto_offset_reset='latest', + max_poll_interval_ms = 60000, + session_timeout_ms=30000, + heartbeat_interval_ms=10000, + reconnect_backoff_ms=5000, + reconnect_backoff_max_ms=30000 + ) else: consumer = None # set up aggregator sink if options.data_backend == 'influx': - agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name) + agg_sink = io.influx.Aggregator( + hostname=options.influx_hostname, + port=options.influx_port, + db=options.influx_database_name, + auth=options.enable_auth, + https=options.enable_https + ) else: ### hdf5 data backend agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads) @@ -127,13 +145,13 @@ if __name__ == '__main__': # this is not threadsafe! logging.info("retrieving data from kafka") start = timeit.default_timer() - #triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs)) - triggers = retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs)) + #triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs)) + triggers = retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs)) elapsed = timeit.default_timer() - start logging.info("time to retrieve data: %.1f s" % elapsed) else: logging.info("retrieving data from bottle routes") - triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = 'coinc', num_threads=options.num_threads) + triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = options.route, num_threads=options.num_threads) # filter out triggers that don't have a far assigned yet triggers = [trg for trg in triggers if 'combined_far' in trg]