Skip to content
Snippets Groups Projects
Commit a799ce7a authored by chad.hanna's avatar chad.hanna
Browse files

gstlal_ll_inspiral_pipe: add kafka and zookeeper jobs and do some general cleanup

parent caa9709d
No related branches found
No related tags found
No related merge requests found
......@@ -87,7 +87,6 @@ from lal.utils import CacheEntry
# "--inj-framexmit-iface", metavar = "name", default "10.14.0.1", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set). default 10.14.0.1")
# "--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
# "--ht-gate-threshold-linear", metavar = "string", help = "Set the scale for h(t) gate threshold to reject glitches", type="string". set as mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max (example: --ht-gate-threshold-linear 0.8:12.0-45.0:100.0)
# "--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid")
# "--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
# "--likelihood-cache", help = "set the cache containin likelihood files")
# "--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
......@@ -173,6 +172,74 @@ class lvalert_listen_node(pipeline.CondorDAGNode):
dag.add_node(self)
class zookeeper_job(inspiral_pipe.generic_job):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}):
"""
"""
inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
f = open("zookeeper.properties", "w")
f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
""" % (datadir, port, maxclients))
f.close()
class kafka_job(inspiral_pipe.generic_job):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}):
"""
"""
inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
f = open("kafka.properties", "w")
f.write("""
broker.id=0
listeners = PLAINTEXT://%s
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
""" % (host, logdir, zookeeperaddr))
f.close()
#
# Parse the command line
#
......@@ -197,7 +264,6 @@ def parse_command_line():
parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).")
parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
parser.add_option("--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid")
parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")
parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
......@@ -237,6 +303,7 @@ def parse_command_line():
parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.")
parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
options, filenames = parser.parse_args()
......@@ -296,19 +363,13 @@ try: os.mkdir("gracedb")
except: pass
dag = dagparts.CondorDAG("trigger_pipe")
#
# setup the job classes
#
# Figure out if it is iir or not
if options.do_iir_pipeline is not None:
gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_iir_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
else:
gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
if inj_channel_dict:
gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
if inj_channel_dict:
gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
# A local universe job that will run in a loop marginalizing all of the likelihoods
margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command))
......@@ -316,14 +377,17 @@ margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_onl
# an lvalert_listen job
listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False)
# This restores default behavior if you uncomment
# get urls job
# urlsJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_get_urls", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command))
# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
if options.output_kafka_server is not None:
zooJob = zookeeper_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command))
kafkaJob = kafka_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server)
zooNode = inspiral_pipe.generic_node(zooJob, dag, [], opts = {"":"zookeeper.properties"})
kafkaNode = inspiral_pipe.generic_node(kafkaJob, dag, [], opts = {"":"kafka.properties"})
# FIXME find a bigger computer for this or run several instancnes on the cluster. aggregator and state job
# aggregator job
aggJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_aggregator", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Run this on the cluster
# state job
analysisStateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_state", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
......@@ -336,6 +400,10 @@ if options.state_backup_destination:
# State saving job
stateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command))
#
# Setup the Node classes
#
listenNode = lvalert_listen_node(listenJob, dag)
# dq with default options
......@@ -422,7 +490,8 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"min-log-L":options.min_log_L,
"time-slide-file":options.time_slide_file
"time-slide-file":options.time_slide_file,
"output-kafka-server": options.output_kafka_server
},
input_files = {
"ranking-stat-input":[likefile],
......@@ -495,11 +564,11 @@ margNode = inspiral_pipe.generic_node(margJob, dag, [], opts = {}, input_files =
#
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes})
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"]})
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"]})
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes, "kafka-server": options.output_kafka_server})
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"], "kafka-server": options.output_kafka_server})
aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"], "kafka-server": options.output_kafka_server})
analysisStateNode = inspiral_pipe.generic_node(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys()})
analysisStateNode = inspiral_pipe.generic_node(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys(), "kafka-server": options.output_kafka_server})
# summary page
if options.injection_file:
......@@ -534,23 +603,3 @@ dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()
#
# set up the webpage cgi scripts
# FIXME don't hardcode this stuff
#
#cgibinpath = os.path.expanduser("~/public_html/cgi-bin")
#if not os.path.isdir(cgibinpath):
# os.makedirs(cgibinpath)
#shutil.copy2(dagparts.which('gstlalcbcsummary'), cgibinpath)
#shutil.copy2(dagparts.which('gstlalcbcnode'), cgibinpath)
#query = "id=%s,%s&dir=%s&ifos=%s" % (jobTags[0], jobTags[-1], os.getcwd(), ",".join(sorted(bank_cache.keys())))
## Write the analysis to a special file that the summary page can find by default
#with open(os.path.join(cgibinpath, "gstlalcbc_analysis.txt"), "w") as webfile:
# webfile.write(query)
#print >>sys.stderr, "\n\n NOTE! You can monitor the analysis at this url: %s/~%s/cgi-bin/gstlalcbcsummary?%s \n\n" % (inspiral_pipe.webserver_url(), os.environ['USER'], query)
#if inj_jobTags:
# print >>sys.stderr, "\n\n NOTE! You can monitor the injection analysis at this url: %s/~%s/cgi-bin/gstlalcbcsummary?id=%s,%s&dir=%s&ifos=%s \n\n" % (inspiral_pipe.webserver_url(), os.environ['USER'], inj_jobTags[0], inj_jobTags[-1], os.getcwd(), ",".join(sorted(bank_cache.keys())))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment