From ce378f5d95c8de5ad609c86008f1f26169ea4cc1 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Mon, 24 Sep 2018 16:07:07 -0700
Subject: [PATCH] gstlal_ll_feature_extractor_pipe: add kafka/zookeeper nodes
 to DAG, update idq optimized makefile to deal with different cluster sites
 better, export kafka opts

---
 .../bin/gstlal_ll_feature_extractor_pipe      | 88 +++++++++++++++++++
 .../feature_extractor/Makefile.gstlal_idq_icc | 47 ++++++----
 2 files changed, 118 insertions(+), 17 deletions(-)

diff --git a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
index 0fd7cd43ba..d401918b6d 100755
--- a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
+++ b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
@@ -32,6 +32,7 @@ import optparse
 import os
 
 from gstlal import aggregator
+from gstlal import dagparts
 from gstlal import inspiral_pipe
 
 from gstlal.fxtools import feature_extractor
@@ -134,6 +135,84 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op
 
 	return feature_extractor_nodes
 
+
+# =============================
+#
+#           classes
+#
+# =============================
+
+
+class zookeeper_job(inspiral_pipe.generic_job):
+	"""
+	A zookeeper job
+	"""
+	def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}):
+		"""
+		"""
+		inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
+
+		try:
+			os.mkdir(datadir)
+		except OSError:
+			pass
+		f = open("zookeeper.properties", "w")
+		f.write("""
+# the directory where the snapshot is stored.
+dataDir=%s
+# the port at which the clients will connect
+clientPort=%d
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=%d
+		""" % (datadir, port, maxclients))
+
+		f.close()
+
+
+class kafka_job(inspiral_pipe.generic_job):
+	"""
+	A kafka job
+	"""
+	def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}):
+		"""
+		"""
+		inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
+
+		try:
+			os.mkdir(logdir)
+		except OSError:
+			pass
+		f = open("kafka.properties", "w")
+		f.write("""
+broker.id=0
+listeners = PLAINTEXT://%s
+background.threads=100
+num.network.threads=50
+num.io.threads=80
+log.cleaner.threads=10
+socket.send.buffer.bytes=102400
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+queued.max.requests=10000
+log.dirs=%s
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+auto.create.topics.enable=true
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+log.flush.interval.ms=300000
+log.retention.ms=100000
+log.roll.ms = 1000000
+log.segment.bytes=1073741824
+log.retention.check.interval.ms=300000
+zookeeper.connect=%s
+zookeeper.connection.timeout.ms=6000
+group.initial.rebalance.delay.ms=0
+		""" % (host, logdir, zookeeperaddr))
+
+		f.close()
+
 # =============================
 #
 #     command line parser
@@ -164,6 +243,7 @@ def parse_command_line():
 	parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
 	parser.add_option_group(group)
 
+
 	options, filenames = parser.parse_args()
 
 	return options, filenames
@@ -210,6 +290,12 @@ if options.save_format == 'kafka':
 	synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands)
 	hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands)
 
+	# kafka/zookeeper jobs
+	local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
+	local_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, local_condor_options)
+	zoo_job = zookeeper_job(condor_commands = local_condor_commands)
+	kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server)
+
 	#
 	# set up options for auxiliary jobs
 	#
@@ -263,6 +349,8 @@ if options.save_format == 'kafka':
 	synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
 	synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
 	hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
+	zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
+	kafka_node = inspiral_pipe.generic_node(kafka_job, dag, [], opts = {"":"kafka.properties"})
 
 #
 # write out dag and sub files
diff --git a/gstlal-burst/share/feature_extractor/Makefile.gstlal_idq_icc b/gstlal-burst/share/feature_extractor/Makefile.gstlal_idq_icc
index b090b95454..6dc969a056 100644
--- a/gstlal-burst/share/feature_extractor/Makefile.gstlal_idq_icc
+++ b/gstlal-burst/share/feature_extractor/Makefile.gstlal_idq_icc
@@ -23,6 +23,10 @@ SHELL := /bin/bash # Use bash syntax
 # make kafka -f Makefile.gstlal_idq_icc -t
 # make idq -f Makefile.gstlal_idq_icc
 
+
+# Determine cluster
+CLUSTER:=$(shell hostname -d)
+
 # Set directories and paths
 SRC_DIR:=${PWD}/src
 TAR_DIR:=tarball
@@ -30,6 +34,14 @@ LOGS_DIR:=logs
 PATCH_DIR:=${PWD}/patches
 INSTALL_DIR:=${LAL_PATH}
 KAFKA_DIR:=${KAFKA_PATH}
+ifeq ($(CLUSTER),$(wildcard *nemo.uwm.edu))
+	PYTHON_LIB_DIR:=/usr/lib
+	PYTHON_DIR:=${LAL_PATH}/lib/python2.7/site-packages/
+else
+	PYTHON_LIB_DIR:=/usr/lib64
+	PYTHON_DIR:=${LAL_PATH}/lib64/python2.7/site-packages/
+endif
+
 IDQ_REPO:=${PWD}/git/iDQ
 GSTLAL_REPO:=${PWD}/git/gstlal
 GLUE_REPO:=${PWD}/git/glue
@@ -51,8 +63,9 @@ GLUE_GIT_BRANCH="master"
 LALSUITE_GIT_BRANCH="master"
 
 # Set package release versions
+# iDQ
 IDQ=idq-2.0.0
-# idq/feature extractor deps
+# iDQ/feature extractor deps
 KAFKA=0.11.0.2
 SCALA=2.11
 KAFKA_PYTHON=0.11.4
@@ -89,10 +102,11 @@ GSTPYTHON=gst-python-1.4.0
 GSTPLUGINSGOOD=gst-plugins-good-1.4.5
 GSTPLUGINSBASE=gst-plugins-base-1.4.5
 GSTREAMER=gstreamer-1.4.5
-# CIT, LLO, LHO
-GINTRO=gobject-introspection-1.50.0
-#UWM
-#GINTRO=gobject-introspection-1.42.0
+ifeq ($(CLUSTER),$(wildcard *nemo.uwm.edu))
+	GINTRO=gobject-introspection-1.42.0
+else
+	GINTRO=gobject-introspection-1.50.0
+endif
 GSL=gsl-1.16
 ORC=orc-0.4.26
 PYFFTW=pyFFTW-0.9.0
@@ -101,9 +115,9 @@ FFTW=fftw-3.3.5
 # Specify target/dependency files
 IDQ_TARGET=$(INSTALL_DIR)/bin/idq-train
 KAFKA_TARGET=$(KAFKA_DIR)/bin/kafka-topics.sh
-KAFKA_PYTHON_TARGET=$(INSTALL_DIR)/lib64/python2.7/site-packages/confluent_kafka-$(KAFKA_PYTHON)-py2.7-linux-x86_64.egg/confluent_kafka/cimpl.so
+KAFKA_PYTHON_TARGET=$(PYTHON_DIR)/confluent_kafka-$(KAFKA_PYTHON)-py2.7-linux-x86_64.egg/confluent_kafka/cimpl.so
 LIBRDKAFKA_TARGET=$(INSTALL_DIR)/lib/librdkafka.so
-SKLEARN_TARGET=$(INSTALL_DIR)/lib64/python2.7/site-packages/sklearn/_isotonic.so
+SKLEARN_TARGET=$(PYTHON_DIR)/sklearn/_isotonic.so
 CYTHON_TARGET=$(INSTALL_DIR)/bin/cython
 GSTLALBURST_TARGET=$(INSTALL_DIR)/lib/libgstlalburst.so
 GSTINSPIRAL_TARGET=$(INSTALL_DIR)/lib/libgstlalinspiral.so
@@ -137,10 +151,7 @@ GINTRO_TARGET=$(INSTALL_DIR)/lib/libgirepository-1.0.so
 GSL_2_TARGET=$(INSTALL_DIR)/lib/pkgconfig/gsl.pc.orig
 GSL_1_TARGET=$(INSTALL_DIR)/lib/libgsl.so
 ORC_TARGET=$(INSTALL_DIR)/lib/liborc-0.4.so
-# CIT, LLO, LHO
-PYFFTW_TARGET=$(INSTALL_DIR)/lib64/python2.7/site-packages/pyfftw/pyfftw.so
-#UWM
-#PYFFTW_TARGET=$(INSTALL_DIR)/lib/python2.7/site-packages/pyfftw/pyfftw.so
+PYFFTW_TARGET=$(PYTHON_DIR)/pyfftw/pyfftw.so
 FFTW_TARGET=$(INSTALL_DIR)/lib/libfftw3.so
 
 # tarballs
@@ -202,10 +213,6 @@ YELLOW="${esc}[0;33m"
 BLUE="${esc}[0;34m"
 WHITE="${esc}[m"
 
-
-# Determine cluster
-CLUSTER:=$(shell hostname -d)
-
 ICC_CIT:=/ldcg/intel/2017u2/bin/compilervars.sh
 ICC_LHO:=/ldcg/intel/2017u2/bin/compilervars.sh
 ICC_LLO:=/ldcg/intel/2017u2/bin/compilervars.sh
@@ -255,11 +262,17 @@ env.sh :
 	@echo 'PATH=$${LAL_PATH}/bin:$${KAFKA_PATH}/bin:$${PATH}' >> $@
 	@echo 'PKG_CONFIG_PATH=$${LAL_PATH}/lib/pkgconfig:$${LAL_PATH}/lib64/pkgconfig:$${PKG_CONFIG_PATH}' >> $@
 	@echo 'PYTHONPATH=$${LAL_PATH}/lib/python2.7/site-packages:$${LAL_PATH}/lib64/python2.7/site-packages' >> $@
+	@echo '' >> $@
 	@echo 'GST_PLUGIN_PATH=$${LAL_PATH}/lib/gstreamer-1.0:$${LAL_PATH}/lib64/gstreamer-1.0' >> $@
 	@echo 'GI_TYPELIB_PATH=$${LAL_PATH}/lib/girepository-1.0:$${LAL_PATH}/lib64/girepository-1.0:$${GI_TYPELIB_PATH}' >> $@
 	@echo 'GST_REGISTRY_1_0=$${LAL_PATH}/registry.bin' >> $@
 	@echo 'LAL_DATA_PATH=/home/cbc/ROM_data' >> $@
-	@echo 'export LAL_PATH LDFLAGS_INTEL LDFLAGS KAFKA_PATH PATH PKG_CONFIG_PATH PYTHONPATH GST_PLUGIN_PATH GI_TYPELIB_PATH GST_REGISTRY_1_0 LAL_DATA_PATH' >> $@
+	@echo 'GSTLAL_FIR_WHITEN=1' >> $@
+	@echo '' >> $@
+	@echo 'KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"' >> $@
+	@echo 'KAFKA_HEAP_OPTS="-Xms8G -Xmx8G"' >> $@
+	@echo '' >> $@
+	@echo 'export LAL_PATH LDFLAGS_INTEL LDFLAGS KAFKA_PATH PATH PKG_CONFIG_PATH PYTHONPATH GST_PLUGIN_PATH GI_TYPELIB_PATH GST_REGISTRY_1_0 LAL_DATA_PATH GSTLAL_FIR_WHITEN KAFKA_JVM_PERFORMANCE_OPTS KAFKA_HEAP_OPTS' >> $@
 
 #####################
 # Check time stamps #
@@ -701,7 +714,7 @@ $(GSTPYTHON_TARGET) : $(GSTGOOD_TARGET)
 	@echo $(GREEN)gst-python$(WHITE) 1>&2
 	mkdir -p $(INSTALL_DIR)/lib64/python2.7/site-packages/gi/overrides
 	tar -xf $(TAR_DIR)/$(GSTPYTHON).tar.xz -C $(SRC_DIR)
-	cd $(SRC_DIR)/$(GSTPYTHON) && ./configure --prefix=$(INSTALL_DIR) --exec-prefix=$(INSTALL_DIR) --with-libpython-dir=/usr/lib --with-pygi-overrides-dir=$(INSTALL_DIR)/lib64/python2.7/site-packages/gi/overrides
+	cd $(SRC_DIR)/$(GSTPYTHON) && ./configure --prefix=$(INSTALL_DIR) --exec-prefix=$(INSTALL_DIR) --with-libpython-dir=/usr/lib --with-pygi-overrides-dir=$(PYTHON_DIR)/gi/overrides
 	cd $(SRC_DIR)/$(GSTPYTHON) && make -j$(CORES) && make install -j$(CORES)
 
 # gstreamer plugin good
-- 
GitLab