Commit 7ad4e4a2 authored by chad.hanna's avatar chad.hanna

gstlal_ll_inspiral_pipe: support external kafka service, remove daily pages -...

gstlal_ll_inspiral_pipe: support external kafka service, remove daily pages - they will be replaced scale out aggregators as future proofing
parent 1f57dd15
......@@ -246,6 +246,7 @@ def parse_command_line():
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
parser.add_option("--run-output-kafka", action = "store_true", help = "Actually launch a zookeeper and kafka job as part of the dag")
parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
......@@ -342,7 +343,7 @@ listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.g
# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
if options.output_kafka_server is not None:
if options.output_kafka_server is not None and options.run_output_kafka:
zooJob = zookeeper_job("zookeeper-server-start.sh", tag_base = "zookeeper-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
kafkaJob = kafka_job("kafka-server-start.sh", tag_base = "kafka-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
......@@ -353,7 +354,7 @@ if options.output_kafka_server is not None:
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
# DQ job
dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
......@@ -606,7 +607,14 @@ for routes in groups(agg_routes, 1):
agg_options["data-type"] = "min"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# FIXME don't hard code the 1000
aggstarts = range(len(jobTags))[::1000]
aggends = aggstarts[1:] + [len(jobTags)]
for aggstart, aggend in zip(aggstarts, aggends):
if aggend > aggstart:
agg_options["job-start"] = aggstart
agg_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
......@@ -618,16 +626,16 @@ for routes in groups(state_routes, 2):
# summary page
#
if options.injection_file:
pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
for injfile, jobrange in inj_range_dict.items():
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
else:
pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
#if options.injection_file:
#
# pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
#
# for injfile, jobrange in inj_range_dict.items():
# aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
#
#else:
#
# pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
if options.state_backup_destination:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment