Skip to content
Snippets Groups Projects
Commit 3565f713 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_feature_extractor_pipe: add synchronizer + hdf5 sink jobs to online jobs

parent a981c8f2
No related branches found
No related tags found
No related merge requests found
Pipeline #
......@@ -33,7 +33,6 @@ import os
from gstlal import aggregator
from gstlal import inspiral_pipe
from gstlal import dagparts as gstlaldagparts
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
......@@ -101,7 +100,7 @@ def generate_options(options):
return out_options
def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, ifo, options, data_source_info):
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info):
feature_extractor_nodes = {}
# generate common command line options
......@@ -126,7 +125,7 @@ def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, ifo
subset_options.update(command_line_options)
feature_extractor_nodes[ii] = \
inspiral_pipe.generic_node(gstlalFeatureExtractorJob, dag, parent_nodes = parent_nodes,
inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = parent_nodes,
opts = subset_options,
output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
)
......@@ -153,6 +152,14 @@ def parse_command_line():
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB")
parser.add_option_group(group)
# Synchronizer/File Sink commands
group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option_group(group)
options, filenames = parser.parse_args()
return options, filenames
......@@ -187,15 +194,55 @@ aggregator.makedir("logs")
dag = inspiral_pipe.DAG("feature_extractor_pipe")
# feature extractor job
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
gstlalFeatureExtractorJob = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands)
# auxiliary jobs
auxiliary_condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands)
hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands)
#
# set up options for auxiliary jobs
#
common_options = {
"verbose": options.verbose,
"tag": options.tag,
"processing-cadence": options.processing_cadence,
"request-timeout": options.request_timeout,
"kafka-server": options.kafka_server
}
synchronizer_options = {
"latency-timeout": options.latency_timeout,
"input-topic-basename": options.kafka_topic,
"output-topic-basename": '_'.join(['synchronizer', options.tag])
}
hdf5_sink_options = {
"basename": options.basename,
"instrument": ifo,
"channel-list": options.channel_list,
"write-cadence": options.cadence,
"persist-cadence": options.persist_cadence,
"input-topic-basename": '_'.join(['synchronizer', options.tag])
}
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
#
# set up feature extractor jobs
# set up jobs
#
feature_extractor_nodes = feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, [], ifo, options, data_source_info)
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
#
# write out dag and sub files
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment