diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_state b/gstlal-ugly/bin/gstlal_ll_inspiral_state index 7cbc1f2429f559cd995b3eb26d70d96a694d533a..b7daa3fd48faa8604fbb34a3c2fa167e78d97ca7 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_state +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_state @@ -33,6 +33,7 @@ import shutil import collections from multiprocessing import Pool from gstlal import aggregator +import json MIN_TIME_QUANTA = 10000 DIRS = 6 @@ -59,6 +60,7 @@ def parse_command_line(): parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently") parser.add_argument("--instrument", action = "append", help = "Number of threads to use concurrently") + parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") args = parser.parse_args() @@ -75,42 +77,56 @@ def parse_command_line(): # def get_data_from_route((job, job_tag, routes, basedir)): - with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f: - url = f.readline().strip() - for route in routes: - logging.info("processing job %s for route %s" % (job, route)) - data = aggregator.get_url(url, route) - jobtime, jobdata = numpy.array([]), numpy.array([]) - if data and "ram_history" in route: - jobtime, jobdata = data[0], data[1] - if data and "strain_add_drop" in route: - jobtime, jobdata = data[0], data[1] - if data and "state_vector_on_off_gap" in route: - jobtime, ontime, offtime = data[0], data[1], data[2] - jobdata = ontime + offtime + with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f: + url = f.readline().strip() + for route in routes: + logging.info("processing job %s for route %s : %s" % (job, route, url)) + data = aggregator.get_url(url, route) + jobtime, jobdata = data[0], data[1] path = "%s/by_job/%s" % (basedir, job) fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = jobtime, data = jobdata, tmp = True) shutil.move(fname, fname.replace(".tmp", "")) + if __name__ == '__main__': options = parse_command_line() jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)] routes = ["ram_history"] for ifo in options.instrument: - routes.append("%s/strain_add_drop" % ifo) - routes.append("%s/state_vector_on_off_gap" % ifo) + routes.append("%s/statevector_on" % ifo) + routes.append("%s/statevector_off" % ifo) + routes.append("%s/statevector_gap" % ifo) + routes.append("%s/dqvector_on" % ifo) + routes.append("%s/dqvector_off" % ifo) + routes.append("%s/dqvector_gap" % ifo) for job in jobs: aggregator.makedir("%s/by_job/%s" % (options.base_dir, job)) logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") - pool = Pool(options.num_threads) prevdataspan = set() + if options.kafka_server: + from kafka import KafkaConsumer + consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') + else: + pool = Pool(options.num_threads) + consumer = None while True: logging.info("sleeping") time.sleep(options.dump_period) - mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs] - pool.map(get_data_from_route, mapargs) + + if consumer: + # this is not threadsafe! + logging.info("getting data from kafka") + timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer, req_all = True) + for (job,route) in timedata: + path = "%s/by_job/%s" % (options.base_dir, job) + fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = timedata[(job,route)], data = datadata[(job,route)], tmp = True) + shutil.move(fname, fname.replace(".tmp", "")) + else: + mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs] + pool.map(get_data_from_route, mapargs) + sys.exit(1)