Maintenance will be performed on git.ligo.org, chat.ligo.org, containers.ligo.org, and docs.ligo.org on the morning of Tuesday 11th August 2020, starting at approximately 9am PDT. It is expected to take around 20 minutes and there will be a short period of downtime (less than five minutes) towards the end of the maintenance window. Please direct any comments, questions, or concerns to computing-help@ligo.org.

Commit a0d40e52 authored by Patrick Godwin's avatar Patrick Godwin Committed by Chad Hanna

gstlal_kafka_dag: add tag information so that multiple analyzes do not have file clashes

parent 71debb64
......@@ -36,6 +36,7 @@ from optparse import OptionParser
# import the modules we need to build the pipeline
#
from gstlal import aggregator
from gstlal import dagparts
#
......@@ -93,34 +94,36 @@ class ZookeeperJob(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server", rootdir = dagparts.log_path(), tag = "", port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
if tag:
zoodir = os.path.join(rootdir, tag, "zookeeper")
else:
zoodir = os.path.join(rootdir, "zookeeper")
aggregator.makedir(zoodir)
with open("zookeeper.properties", "w") as f:
f.write(ZOOKEEPER_TEMPLATE%(datadir, port, maxclients))
f.write(ZOOKEEPER_TEMPLATE%(zoodir, port, maxclients))
class KafkaJob(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(dagparts.log_path(), "kafka"), hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server", rootdir = dagparts.log_path(), tag = "", hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
if tag:
kafkadir = os.path.join(rootdir, tag, "kafka")
else:
kafkadir = os.path.join(rootdir, "kafka")
aggregator.makedir(kafkadir)
with open("kafka.properties", "w") as f:
f.write(KAFKA_TEMPLATE%(hostname, port, logdir, zookeeperaddr))
f.write(KAFKA_TEMPLATE%(hostname, port, kafkadir, zookeeperaddr))
#
......@@ -131,6 +134,7 @@ class KafkaJob(dagparts.DAGJob):
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously and to avoid filename clashes.")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
......@@ -151,12 +155,12 @@ def parse_command_line():
options, filenames = parse_command_line()
print('making logs directory...')
try:
os.mkdir("logs")
except:
pass
dag = dagparts.DAG("kafka_broker")
aggregator.makedir("logs")
if options.analysis_tag:
dag = dagparts.DAG("kafka_broker_%s" % options.analysis_tag)
else:
dag = dagparts.DAG("kafka_broker")
#
# setup kafka/zookeeper jobs and nodes
......@@ -180,12 +184,14 @@ zookeeper_job = ZookeeperJob(
"zookeeper-server-start.sh",
tag_base = "zookeeper-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
port = options.zookeeper_port
)
kafka_job = KafkaJob(
"kafka-server-start.sh",
tag_base = "kafka-server-start",
condor_commands = condor_commands,
tag = options.analysis_tag,
hostname = options.kafka_hostname,
port = options.kafka_port,
zookeeperaddr = "localhost:%d" % options.zookeeper_port
......@@ -198,21 +204,16 @@ kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"}
# Write out the dag and other files
#
print('making sub files...')
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]
print('making DAG...')
dag.write_dag()
dag.write_script()
print('making env script...')
with open('kafka.env', 'w') as f:
with open('kafka_env.sh', 'w') as f:
f.write(KAFKA_ENV_TEMPLATE)
print('source kafka env with: source kafka.env')
print('launch DAG with: condor_submit_dag kafka_broker.dag')
print('source kafka_env.sh before submitting dag')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment