Skip to content
Snippets Groups Projects
Commit a90e4cb7 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_feature_extractor_pipe: link listener job for online DAG

parent 7e02972e
No related branches found
No related tags found
No related merge requests found
......@@ -135,13 +135,15 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op
output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
)
num_channels = len(channel_list)
print("Writing channel list of all channels processed")
listpath = os.path.join(options.out_path, "full_channel_list.txt")
with open(listpath, 'w') as f:
for channel in channel_list:
f.write(channel+'\n')
return feature_extractor_nodes
return feature_extractor_nodes, num_channels
# =============================
......@@ -245,13 +247,14 @@ def parse_command_line():
parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
parser.add_option_group(group)
# Synchronizer/File Sink commands
# Synchronizer/File Sink/Listener commands
group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option("--target-channel", metavar = "channel", help = "Target channel for monitoring.")
parser.add_option_group(group)
......@@ -296,6 +299,7 @@ else:
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe)
feature_extractor_nodes, num_channels = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
# auxiliary jobs
if options.save_format == 'kafka':
......@@ -306,6 +310,7 @@ if options.save_format == 'kafka':
auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
listener_job = inspiral_pipe.generic_job("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
if not options.disable_kafka_jobs:
# kafka/zookeeper jobs
......@@ -334,6 +339,14 @@ if options.save_format == 'kafka':
if options.no_drop:
synchronizer_options.update({"no-drop": options.no_drop})
listener_options = {
"instrument": ifo,
"target-channel": options.target_channel,
"sample-rate": options.sample_rate,
"input-topic-basename": '_'.join(['synchronizer', options.tag]),
"num-channels": num_channels,
}
hdf5_sink_options = {
"instrument": ifo,
"channel-list": options.channel_list,
......@@ -359,17 +372,17 @@ if options.save_format == 'kafka':
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
listener_options.update(common_options)
#
# set up jobs
#
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
listener_node = inspiral_pipe.generic_node(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")})
if not options.disable_kafka_jobs:
zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment