Skip to content
Snippets Groups Projects
Commit 2ab9c408 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_kafka_dag to gstlal-ugly

parent 3ed6097f
No related branches found
No related tags found
No related merge requests found
Pipeline #51581 failed
......@@ -29,5 +29,6 @@ dist_bin_SCRIPTS = \
gstlal_ll_dq \
gstlal_condor_top \
gstlal_injsplitter \
gstlal_kafka_dag \
gstlal_reduce_dag \
gstlal_dag_run_time
#!/usr/bin/env python
#
# Copyright (C) 2018--2019 Chad Hanna, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe
"""
This program makes a dag for persistent kafka/zookeeper services
"""
__author__ = 'Chad Hanna <channa@caltech.edu>, Patrick Godwin <patrick.godwin@ligo.org>'
#
# import standard modules and append the lalapps prefix to the python path
#
import os
from optparse import OptionParser
#
# import the modules we need to build the pipeline
#
from gstlal import dagparts
#
# configuration file templates
#
ZOOKEEPER_TEMPLATE = """
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
"""
KAFKA_TEMPLATE = """
broker.id=0
listeners = PLAINTEXT://%s:%d
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
"""
#
# job classes
#
class ZookeeperJob(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
with open("zookeeper.properties", "w") as f:
f.write(ZOOKEEPER_TEMPLATE%(datadir, port, maxclients))
class KafkaJob(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(dagparts.log_path(), "kafka"), hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
with open("kafka.properties", "w") as f:
f.write(KAFKA_TEMPLATE%(hostname, port, logdir, zookeeperaddr))
#
# Parse the command line
#
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
parser.add_option("--kafka-port", type = "int", metavar = "number", help = "Set the kafka port. default: 9092", default = 9092)
parser.add_option("--condor-universe", default = "local", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = local")
parser.add_option("--condor-accounting-user", metavar = "user", help = "set the condor accounting user.")
parser.add_option("--condor-accounting-group", metavar = "group", help = "set the condor accounting group.")
options, filenames = parser.parse_args()
return options, filenames
#
# MAIN
#
options, filenames = parse_command_line()
print('making logs directory...')
try:
os.mkdir("logs")
except:
pass
dag = dagparts.DAG("kafka_broker")
#
# setup kafka/zookeeper jobs and nodes
#
condor_options = {
"accounting_group_user": options.condor_accounting_user,
"accounting_group": options.condor_accounting_group,
"want_graceful_removal": "True",
"kill_sig": "15"
}
if options.condor_universe == 'vanilla':
condor_options.update({
"request_memory": "10GB",
"request_cpus": 2,
})
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
zookeeper_job = ZookeeperJob(
"zookeeper-server-start.sh",
tag_base = "zookeeper-server-start",
condor_commands = condor_commands,
port = options.zookeeper_port
)
kafka_job = KafkaJob(
"kafka-server-start.sh",
tag_base = "kafka-server-start",
condor_commands = condor_commands,
hostname = options.kafka_hostname,
port = options.kafka_port,
zookeeperaddr = "localhost:%d" % options.zookeeper_port
)
zookeeper_node = dagparts.DAGNode(zookeeper_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})
#
# Write out the dag and other files
#
print('making sub files...')
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]
print('making DAG...')
dag.write_dag()
dag.write_script()
print('launch DAG with: condor_submit_dag kafka_broker.dag')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment