From a90e4cb7a43f72e189d2dbf269007e13cd9807aa Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 29 Nov 2018 16:16:29 -0800
Subject: [PATCH] gstlal_ll_feature_extractor_pipe: link listener job for
 online DAG

---
 .../bin/gstlal_ll_feature_extractor_pipe      | 21 +++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
index cd085faf48..5c11c28453 100755
--- a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
+++ b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe
@@ -135,13 +135,15 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op
 				output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
 			)
 
+	num_channels = len(channel_list)
+
 	print("Writing channel list of all channels processed")
 	listpath = os.path.join(options.out_path, "full_channel_list.txt")
 	with open(listpath, 'w') as f:
 		for channel in channel_list:
 			f.write(channel+'\n')
 
-	return feature_extractor_nodes
+	return feature_extractor_nodes, num_channels
 
 
 # =============================
@@ -245,13 +247,14 @@ def parse_command_line():
 	parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
 	parser.add_option_group(group)
 
-	# Synchronizer/File Sink commands
+	# Synchronizer/File Sink/Listener commands
 	group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
 	parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
 	parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
 	parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
 	parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
 	parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
+	parser.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
 	parser.add_option_group(group)
 
 
@@ -296,6 +299,7 @@ else:
 	condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
 condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
 feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe)
+feature_extractor_nodes, num_channels = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
 
 # auxiliary jobs
 if options.save_format == 'kafka':
@@ -306,6 +310,7 @@ if options.save_format == 'kafka':
 	auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
 	synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
 	hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
+	listener_job = inspiral_pipe.generic_job("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
 
 	if not options.disable_kafka_jobs:
 		# kafka/zookeeper jobs
@@ -334,6 +339,14 @@ if options.save_format == 'kafka':
 	if options.no_drop:
 		synchronizer_options.update({"no-drop": options.no_drop})
 
+	listener_options = {
+		"instrument": ifo,
+		"target-channel": options.target_channel,
+		"sample-rate": options.sample_rate,
+		"input-topic-basename": '_'.join(['synchronizer', options.tag]),
+		"num-channels": num_channels,
+	}
+
 	hdf5_sink_options = {
 		"instrument": ifo,
 		"channel-list": options.channel_list,
@@ -359,17 +372,17 @@ if options.save_format == 'kafka':
 
 	synchronizer_options.update(common_options)
 	hdf5_sink_options.update(common_options)
+	listener_options.update(common_options)
 
 #
 # set up jobs
 #
 
-feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
-
 if options.save_format == 'kafka':
 	synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
 	synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
 	hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
+	listener_node = inspiral_pipe.generic_node(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")})
 
 	if not options.disable_kafka_jobs:
 		zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
-- 
GitLab