Skip to content
Snippets Groups Projects
Commit d4b7501b authored by chad.hanna's avatar chad.hanna
Browse files

gstlal_ll_inspiral_state: enable optional kafka support

parent 00df278a
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ import shutil
import collections
from multiprocessing import Pool
from gstlal import aggregator
import json
MIN_TIME_QUANTA = 10000
DIRS = 6
......@@ -59,6 +60,7 @@ def parse_command_line():
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently")
parser.add_argument("--instrument", action = "append", help = "Number of threads to use concurrently")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
args = parser.parse_args()
......@@ -75,42 +77,56 @@ def parse_command_line():
#
def get_data_from_route((job, job_tag, routes, basedir)):
with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
url = f.readline().strip()
for route in routes:
logging.info("processing job %s for route %s" % (job, route))
data = aggregator.get_url(url, route)
jobtime, jobdata = numpy.array([]), numpy.array([])
if data and "ram_history" in route:
jobtime, jobdata = data[0], data[1]
if data and "strain_add_drop" in route:
jobtime, jobdata = data[0], data[1]
if data and "state_vector_on_off_gap" in route:
jobtime, ontime, offtime = data[0], data[1], data[2]
jobdata = ontime + offtime
with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
url = f.readline().strip()
for route in routes:
logging.info("processing job %s for route %s : %s" % (job, route, url))
data = aggregator.get_url(url, route)
jobtime, jobdata = data[0], data[1]
path = "%s/by_job/%s" % (basedir, job)
fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = jobtime, data = jobdata, tmp = True)
shutil.move(fname, fname.replace(".tmp", ""))
if __name__ == '__main__':
options = parse_command_line()
jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
routes = ["ram_history"]
for ifo in options.instrument:
routes.append("%s/strain_add_drop" % ifo)
routes.append("%s/state_vector_on_off_gap" % ifo)
routes.append("%s/statevector_on" % ifo)
routes.append("%s/statevector_off" % ifo)
routes.append("%s/statevector_gap" % ifo)
routes.append("%s/dqvector_on" % ifo)
routes.append("%s/dqvector_off" % ifo)
routes.append("%s/dqvector_gap" % ifo)
for job in jobs: aggregator.makedir("%s/by_job/%s" % (options.base_dir, job))
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
prevdataspan = set()
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
else:
pool = Pool(options.num_threads)
consumer = None
while True:
logging.info("sleeping")
time.sleep(options.dump_period)
mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs]
pool.map(get_data_from_route, mapargs)
if consumer:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer, req_all = True)
for (job,route) in timedata:
path = "%s/by_job/%s" % (options.base_dir, job)
fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = timedata[(job,route)], data = datadata[(job,route)], tmp = True)
shutil.move(fname, fname.replace(".tmp", ""))
else:
mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs]
pool.map(get_data_from_route, mapargs)
sys.exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment