Skip to content
Snippets Groups Projects
Commit 7000fc01 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_feature_extractor_pipe: update monitor settings, rename file output...

gstlal_ll_feature_extractor_pipe: update monitor settings, rename file output directory for auxiliary jobs
parent 5ad9e7fb
No related branches found
No related tags found
No related merge requests found
......@@ -318,7 +318,7 @@ if options.save_format == 'kafka':
auxiliary_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = dagparts.DAGJob("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
hdf5_sink_job = dagparts.DAGJob("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
listener_job = dagparts.DAGJob("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
monitor_job = dagparts.DAGJob("gstlal_feature_monitor", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
aggregator_job = dagparts.DAGJob("gstlal_feature_aggregator", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
if not options.disable_kafka_jobs:
......@@ -348,12 +348,14 @@ if options.save_format == 'kafka':
if options.no_drop:
synchronizer_options.update({"no-drop": options.no_drop})
listener_options = {
monitor_options = {
"instrument": ifo,
"target-channel": options.target_channel,
"sample-rate": options.sample_rate,
"input-topic-basename": '_'.join(['synchronizer', options.tag]),
"num-channels": num_channels,
"data-backend": options.agg_data_backend,
"data-type": "max",
}
hdf5_sink_options = {
......@@ -381,14 +383,16 @@ if options.save_format == 'kafka':
"data-type": "max",
}
if options.agg_data_backend == 'influx':
aggregator_options.update({
backend_options = {
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
}
else:
aggregator_options.update({"num-processes": options.num_agg_processes_per_job})
backend_options = {"num-processes": options.num_agg_processes_per_job}
aggregator_options.update(backend_options)
monitor_options.update(backend_options)
### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
for option_name, option in extra_hdf5_channel_options.items():
......@@ -398,7 +402,7 @@ if options.save_format == 'kafka':
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
aggregator_options.update(common_options)
listener_options.update(common_options)
monitor_options.update(common_options)
#
# set up jobs
......@@ -410,9 +414,9 @@ def groups(l, n):
if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
listener_node = dagparts.DAGNode(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")})
synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "synchronizer")})
hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "features")})
monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
### aggregator jobs
all_fx_jobs = [(str(ii).zfill(4), channel_subset) for ii, channel_subset in enumerate(data_source_info.channel_subsets)]
......@@ -420,7 +424,7 @@ if options.save_format == 'kafka':
jobs, channels = zip(*job_subset)
job_channel_options = {"jobs": jobs, "num-channels": len(channels)}
job_channel_options.update(aggregator_options)
listener_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_aggregator")})
agg_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")})
if not options.disable_kafka_jobs:
zoo_node = dagparts.DAGNode(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment