diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 3a64e7d050898e45622608ea617d162e61b351db..48405fed7dc8c7c04480d92e9065030cc501d4b4 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -33,6 +33,7 @@ import shutil import collections from multiprocessing import Pool from gstlal import aggregator +import json MIN_TIME_QUANTA = 10000 DIRS = 6 @@ -61,6 +62,7 @@ def parse_command_line(): parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs") parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.") + parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") args = parser.parse_args() @@ -92,13 +94,27 @@ if __name__ == '__main__': pool = Pool(options.num_threads) prevdataspan = set() + # We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs + if options.kafka_server is not None: + from kafka import KafkaConsumer + consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') + else: + consumer = None + # start an infinite loop to keep updating and aggregating data while True: logging.info("sleeping") time.sleep(options.dump_period) dataspan = set() + if consumer is not None: + # this is not threadsafe! + logging.info("getting data from kafka") + timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer) + else: + timedata, datadata = None, None + # First get the raw and reduced data for each job in parallel - mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs) for job in jobs] + mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs] for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs): #for ds in map(aggregator.get_data_from_job_and_reduce, mapargs): dataspan.update(ds)