diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 9f5a1daf7c05204f7c850185808c836212b2950b..0a376546e5e1fe74723691ed1352b6e20756e3d6 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -242,6 +242,7 @@ def parse_command_line(): parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.") parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to") parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file") + parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181) parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092") options, filenames = parser.parse_args() @@ -331,8 +332,8 @@ listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.g # Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server if options.output_kafka_server is not None: - zooJob = zookeeper_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) - kafkaJob = kafka_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server) + zooJob = zookeeper_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port) + kafkaJob = kafka_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port) zooNode = inspiral_pipe.generic_node(zooJob, dag, [], opts = {"":"zookeeper.properties"}) kafkaNode = inspiral_pipe.generic_node(kafkaJob, dag, [], opts = {"":"kafka.properties"})