diff --git a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe index 64baada7cb011ce4a87d36152170a88ea8ac905e..f625c287f73e4d86c6bd31bc58b81f10b5174a73 100755 --- a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe +++ b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe @@ -318,7 +318,7 @@ if options.save_format == 'kafka': auxiliary_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options) synchronizer_job = dagparts.DAGJob("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) hdf5_sink_job = dagparts.DAGJob("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) - listener_job = dagparts.DAGJob("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) + monitor_job = dagparts.DAGJob("gstlal_feature_monitor", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) aggregator_job = dagparts.DAGJob("gstlal_feature_aggregator", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) if not options.disable_kafka_jobs: @@ -348,12 +348,14 @@ if options.save_format == 'kafka': if options.no_drop: synchronizer_options.update({"no-drop": options.no_drop}) - listener_options = { + monitor_options = { "instrument": ifo, "target-channel": options.target_channel, "sample-rate": options.sample_rate, "input-topic-basename": '_'.join(['synchronizer', options.tag]), "num-channels": num_channels, + "data-backend": options.agg_data_backend, + "data-type": "max", } hdf5_sink_options = { @@ -381,14 +383,16 @@ if options.save_format == 'kafka': "data-type": "max", } if options.agg_data_backend == 'influx': - aggregator_options.update({ + backend_options = { "influx-database-name": options.influx_database_name, "influx-hostname": options.influx_hostname, "influx-port": options.influx_port, - }) + } else: - aggregator_options.update({"num-processes": options.num_agg_processes_per_job}) + backend_options = {"num-processes": options.num_agg_processes_per_job} + aggregator_options.update(backend_options) + monitor_options.update(backend_options) ### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings for option_name, option in extra_hdf5_channel_options.items(): @@ -398,7 +402,7 @@ if options.save_format == 'kafka': synchronizer_options.update(common_options) hdf5_sink_options.update(common_options) aggregator_options.update(common_options) - listener_options.update(common_options) + monitor_options.update(common_options) # # set up jobs @@ -410,9 +414,9 @@ def groups(l, n): if options.save_format == 'kafka': synchronizer_options.update({"num-topics": len(feature_extractor_nodes)}) - synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")}) - hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")}) - listener_node = dagparts.DAGNode(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")}) + synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "synchronizer")}) + hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "features")}) + monitor_node = dagparts.DAGNode(monitor_job, dag, [], opts = monitor_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")}) ### aggregator jobs all_fx_jobs = [(str(ii).zfill(4), channel_subset) for ii, channel_subset in enumerate(data_source_info.channel_subsets)] @@ -420,7 +424,7 @@ if options.save_format == 'kafka': jobs, channels = zip(*job_subset) job_channel_options = {"jobs": jobs, "num-channels": len(channels)} job_channel_options.update(aggregator_options) - listener_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_aggregator")}) + agg_node = dagparts.DAGNode(aggregator_job, dag, [], opts = job_channel_options, output_files = {"rootdir": os.path.join(options.out_path, "aggregator")}) if not options.disable_kafka_jobs: zoo_node = dagparts.DAGNode(zoo_job, dag, [], opts = {"":"zookeeper.properties"})