Skip to content
Snippets Groups Projects
Commit 00df278a authored by chad.hanna's avatar chad.hanna
Browse files

gstlal_ll_inspiral_aggregator: enable optional kafka support

parent d7902c7a
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ import shutil
import collections
from multiprocessing import Pool
from gstlal import aggregator
import json
MIN_TIME_QUANTA = 10000
DIRS = 6
......@@ -61,6 +62,7 @@ def parse_command_line():
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
args = parser.parse_args()
......@@ -92,13 +94,27 @@ if __name__ == '__main__':
pool = Pool(options.num_threads)
prevdataspan = set()
# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
if options.kafka_server is not None:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
else:
consumer = None
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("sleeping")
time.sleep(options.dump_period)
dataspan = set()
if consumer is not None:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer)
else:
timedata, datadata = None, None
# First get the raw and reduced data for each job in parallel
mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs) for job in jobs]
mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs]
for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs):
#for ds in map(aggregator.get_data_from_job_and_reduce, mapargs):
dataspan.update(ds)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment