From efa319a13255b6e0194898a61d637b30e242499e Mon Sep 17 00:00:00 2001
From: "chad.hanna" <crh184@psu.edu>
Date: Wed, 20 Mar 2019 17:37:50 -0700
Subject: [PATCH] gstlal_ll_inspiral_pipe: support external kafka service,
 remove daily pages - they will be replaced scale out aggregators as future
 proofing

---
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe | 34 +++++++++++++--------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 69cbde73c3..b9ba3f84aa 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -246,6 +246,7 @@ def parse_command_line():
 	parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
 	parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
 	parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
+	parser.add_option("--run-output-kafka", action = "store_true", help = "Actually launch a zookeeper and kafka job as part of the dag")
 	parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
 	parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
 	parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
@@ -342,7 +343,7 @@ listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.g
 
 # Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
 
-if options.output_kafka_server is not None:
+if options.output_kafka_server is not None and options.run_output_kafka:
 	zooJob = zookeeper_job("zookeeper-server-start.sh", tag_base = "zookeeper-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
 	kafkaJob = kafka_job("kafka-server-start.sh", tag_base = "kafka-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
 	zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
@@ -353,7 +354,7 @@ if options.output_kafka_server is not None:
 aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator",  condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
 
 # Summary page job
-pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online",  universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
+#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online",  universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
 
 # DQ job
 dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
@@ -606,7 +607,14 @@ for routes in groups(agg_routes, 1):
 	agg_options["data-type"] = "min"
 	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
 	agg_options["data-type"] = "max"
-	aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
+	# FIXME don't hard code the 1000
+	aggstarts = range(len(jobTags))[::1000]
+	aggends = aggstarts[1:] + [len(jobTags)]
+	for aggstart, aggend in zip(aggstarts, aggends):
+		if aggend > aggstart:
+			agg_options["job-start"] = aggstart
+			agg_options["num-jobs"] = aggend - aggstart
+			aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
 
 # state-based aggregation jobs
 for routes in groups(state_routes, 2):
@@ -618,16 +626,16 @@ for routes in groups(state_routes, 2):
 # summary page
 #
 
-if options.injection_file:
-
-	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
-
-	for injfile, jobrange in inj_range_dict.items():
-		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
-
-else:
-
-	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
+#if options.injection_file:
+#
+#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
+#
+#	for injfile, jobrange in inj_range_dict.items():
+#		aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
+#
+#else:
+#
+#	pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
 
 
 if options.state_backup_destination:
-- 
GitLab