Maintenance will be performed on git.ligo.org, chat.ligo.org, containers.ligo.org, and docs.ligo.org on Tuesday 22nd September 2020 starting at approximately 9am MST.It is expected to take around 15 minutes and there will be a short period of downtime towards the end of the maintenance window. Please address any comments, questions, or concerns to computing-help@igwn.org.

Commit c13dffba authored by Patrick Godwin's avatar Patrick Godwin Committed by Chad Hanna

add gstlal_kafka_dag to gstlal-ugly

parent 9d6a0514
......@@ -32,6 +32,7 @@ dist_bin_SCRIPTS = \
gstlal_ll_dq \
gstlal_condor_top \
gstlal_injsplitter \
gstlal_kafka_dag \
gstlal_reduce_dag \
gstlal_dag_run_time \
gstlal_svd_bank_checkerboard
#!/usr/bin/env python
#
# Copyright (C) 2018--2019 Chad Hanna, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe
"""
This program makes a dag for persistent kafka/zookeeper services
"""
__author__ = 'Chad Hanna <channa@caltech.edu>, Patrick Godwin <patrick.godwin@ligo.org>'
#
# import standard modules and append the lalapps prefix to the python path
#
import os
from optparse import OptionParser
#
# import the modules we need to build the pipeline
#
from gstlal import dagparts
#
# configuration file templates
#
ZOOKEEPER_TEMPLATE = """
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
"""
KAFKA_TEMPLATE = """
broker.id=0
listeners = PLAINTEXT://%s:%d
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
"""
#
# job classes
#
class ZookeeperJob(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
with open("zookeeper.properties", "w") as f:
f.write(ZOOKEEPER_TEMPLATE%(datadir, port, maxclients))
class KafkaJob(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(dagparts.log_path(), "kafka"), hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
with open("kafka.properties", "w") as f:
f.write(KAFKA_TEMPLATE%(hostname, port, logdir, zookeeperaddr))
#
# Parse the command line
#
def parse_command_line():
parser = OptionParser(description = __doc__)
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
parser.add_option("--kafka-port", type = "int", metavar = "number", help = "Set the kafka port. default: 9092", default = 9092)
parser.add_option("--condor-universe", default = "local", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = local")
parser.add_option("--condor-accounting-user", metavar = "user", help = "set the condor accounting user.")
parser.add_option("--condor-accounting-group", metavar = "group", help = "set the condor accounting group.")
options, filenames = parser.parse_args()
return options, filenames
#
# MAIN
#
options, filenames = parse_command_line()
print('making logs directory...')
try:
os.mkdir("logs")
except:
pass
dag = dagparts.DAG("kafka_broker")
#
# setup kafka/zookeeper jobs and nodes
#
condor_options = {
"accounting_group_user": options.condor_accounting_user,
"accounting_group": options.condor_accounting_group,
"want_graceful_removal": "True",
"kill_sig": "15"
}
if options.condor_universe == 'vanilla':
condor_options.update({
"request_memory": "10GB",
"request_cpus": 2,
})
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
zookeeper_job = ZookeeperJob(
"zookeeper-server-start.sh",
tag_base = "zookeeper-server-start",
condor_commands = condor_commands,
port = options.zookeeper_port
)
kafka_job = KafkaJob(
"kafka-server-start.sh",
tag_base = "kafka-server-start",
condor_commands = condor_commands,
hostname = options.kafka_hostname,
port = options.kafka_port,
zookeeperaddr = "localhost:%d" % options.zookeeper_port
)
zookeeper_node = dagparts.DAGNode(zookeeper_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})
#
# Write out the dag and other files
#
print('making sub files...')
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]
print('making DAG...')
dag.write_dag()
dag.write_script()
print('launch DAG with: condor_submit_dag kafka_broker.dag')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment