From 00df278a2eb1cdbef5c6b2fea0da214383fe880a Mon Sep 17 00:00:00 2001 From: "chad.hanna" <crh184@psu.edu> Date: Sat, 15 Sep 2018 06:33:37 -0700 Subject: [PATCH] gstlal_ll_inspiral_aggregator: enable optional kafka support --- gstlal-ugly/bin/gstlal_ll_inspiral_aggregator | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 3a64e7d050..48405fed7d 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -33,6 +33,7 @@ import shutil import collections from multiprocessing import Pool from gstlal import aggregator +import json MIN_TIME_QUANTA = 10000 DIRS = 6 @@ -61,6 +62,7 @@ def parse_command_line(): parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs") parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.") + parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") args = parser.parse_args() @@ -92,13 +94,27 @@ if __name__ == '__main__': pool = Pool(options.num_threads) prevdataspan = set() + # We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs + if options.kafka_server is not None: + from kafka import KafkaConsumer + consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') + else: + consumer = None + # start an infinite loop to keep updating and aggregating data while True: logging.info("sleeping") time.sleep(options.dump_period) dataspan = set() + if consumer is not None: + # this is not threadsafe! + logging.info("getting data from kafka") + timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer) + else: + timedata, datadata = None, None + # First get the raw and reduced data for each job in parallel - mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs) for job in jobs] + mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs] for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs): #for ds in map(aggregator.get_data_from_job_and_reduce, mapargs): dataspan.update(ds) -- GitLab