Skip to content
Snippets Groups Projects
Commit cf4ac782 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_kafka_dag: add tag information so that multiple analyzes do not have file clashes

parent cd11fb1b
No related branches found
No related tags found
No related merge requests found
Pipeline #52354 passed with warnings
......@@ -36,6 +36,7 @@ from optparse import OptionParser
# import the modules we need to build the pipeline
#
from gstlal import aggregator
from gstlal import dagparts
#
......@@ -93,34 +94,36 @@ class ZookeeperJob(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server", rootdir = dagparts.log_path(), tag = "", port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
if tag:
zoodir = os.path.join(rootdir, tag, "zookeeper")
else:
zoodir = os.path.join(rootdir, "zookeeper")
aggregator.makedir(zoodir)
with open("zookeeper.properties", "w") as f:
f.write(ZOOKEEPER_TEMPLATE%(datadir, port, maxclients))
f.write(ZOOKEEPER_TEMPLATE%(zoodir, port, maxclients))
class KafkaJob(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(dagparts.log_path(), "kafka"), hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server", rootdir = dagparts.log_path(), tag = "", hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
if tag:
kafkadir = os.path.join(rootdir, tag, "kafka")
else:
kafkadir = os.path.join(rootdir, "kafka")
aggregator.makedir(kafkadir)
with open("kafka.properties", "w") as f:
f.write(KAFKA_TEMPLATE%(hostname, port, logdir, zookeeperaddr))
f.write(KAFKA_TEMPLATE%(hostname, port, kafkadir, zookeeperaddr))
#
......@@ -131,6 +134,7 @@ class KafkaJob(dagparts.DAGJob):
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously and to avoid filename clashes.")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
......@@ -151,12 +155,12 @@ def parse_command_line():
options, filenames = parse_command_line()
print('making logs directory...')
try:
os.mkdir("logs")
except:
pass
dag = dagparts.DAG("kafka_broker")
aggregator.makedir("logs")
if options.analysis_tag:
dag = dagparts.DAG("kafka_broker_%s" % options.analysis_tag)
else:
dag = dagparts.DAG("kafka_broker")
#
# setup kafka/zookeeper jobs and nodes
......@@ -180,12 +184,14 @@ zookeeper_job = ZookeeperJob(
"zookeeper-server-start.sh",
tag_base = "zookeeper-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
port = options.zookeeper_port
)
kafka_job = KafkaJob(
"kafka-server-start.sh",
tag_base = "kafka-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
hostname = options.kafka_hostname,
port = options.kafka_port,
zookeeperaddr = "localhost:%d" % options.zookeeper_port
......@@ -198,21 +204,16 @@ kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"}
# Write out the dag and other files
#
print('making sub files...')
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]
print('making DAG...')
dag.write_dag()
dag.write_script()
print('making env script...')
with open('kafka.env', 'w') as f:
with open('kafka_env.sh', 'w') as f:
f.write(KAFKA_ENV_TEMPLATE)
print('source kafka env with: source kafka.env')
print('launch DAG with: condor_submit_dag kafka_broker.dag')
print('source kafka_env.sh before submitting dag')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment