diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 7cefb261c8b5309ffdfa787b6a4d85fb4a13173d..085a160242d7972d46248a04543205f55f81ee8b 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -87,7 +87,6 @@ from lal.utils import CacheEntry # "--inj-framexmit-iface", metavar = "name", default "10.14.0.1", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set). default 10.14.0.1") # "--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float") # "--ht-gate-threshold-linear", metavar = "string", help = "Set the scale for h(t) gate threshold to reject glitches", type="string". set as mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max (example: --ht-gate-threshold-linear 0.8:12.0-45.0:100.0) -# "--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid") # "--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool") # "--likelihood-cache", help = "set the cache containin likelihood files") # "--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files") @@ -173,6 +172,74 @@ class lvalert_listen_node(pipeline.CondorDAGNode): dag.add_node(self) +class zookeeper_job(inspiral_pipe.generic_job): + """ + A zookeeper job + """ + def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}): + """ + """ + inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + + try: + os.mkdir(datadir) + except OSError: + pass + f = open("zookeeper.properties", "w") + f.write(""" +# the directory where the snapshot is stored. +dataDir=%s +# the port at which the clients will connect +clientPort=%d +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=%d + """ % (datadir, port, maxclients)) + + f.close() + + +class kafka_job(inspiral_pipe.generic_job): + """ + A kafka job + """ + def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}): + """ + """ + inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + + try: + os.mkdir(logdir) + except OSError: + pass + f = open("kafka.properties", "w") + f.write(""" +broker.id=0 +listeners = PLAINTEXT://%s +num.network.threads=3 +num.io.threads=8 +socket.send.buffer.bytes=102400 +socket.receive.buffer.bytes=102400 +socket.request.max.bytes=104857600 +log.dirs=%s +num.partitions=1 +num.recovery.threads.per.data.dir=1 +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 +log.flush.interval.messages=10000 +log.flush.interval.ms=1000 +log.retention.ms=100000 +log.roll.ms = 1000000 +log.segment.bytes=1073741824 +log.retention.check.interval.ms=300000 +zookeeper.connect=%s +zookeeper.connection.timeout.ms=6000 +group.initial.rebalance.delay.ms=0 + """ % (host, logdir, zookeeperaddr)) + + f.close() + + # # Parse the command line # @@ -197,7 +264,6 @@ def parse_command_line(): parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).") parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float") parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp") - parser.add_option("--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid") parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool") parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files") parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files") @@ -237,6 +303,7 @@ def parse_command_line(): parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.") parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to") parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file") + parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092") options, filenames = parser.parse_args() @@ -296,19 +363,13 @@ try: os.mkdir("gracedb") except: pass dag = dagparts.CondorDAG("trigger_pipe") - # # setup the job classes # - -# Figure out if it is iir or not -if options.do_iir_pipeline is not None: - gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_iir_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) -else: - gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) - if inj_channel_dict: - gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) +gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) +if inj_channel_dict: + gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) # A local universe job that will run in a loop marginalizing all of the likelihoods margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) @@ -316,14 +377,17 @@ margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_onl # an lvalert_listen job listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False) -# This restores default behavior if you uncomment -# get urls job -# urlsJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_get_urls", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) +# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server +if options.output_kafka_server is not None: + zooJob = zookeeper_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) + kafkaJob = kafka_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server) + zooNode = inspiral_pipe.generic_node(zooJob, dag, [], opts = {"":"zookeeper.properties"}) + kafkaNode = inspiral_pipe.generic_node(kafkaJob, dag, [], opts = {"":"kafka.properties"}) -# FIXME find a bigger computer for this or run several instancnes on the cluster. aggregator and state job +# aggregator job aggJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_aggregator", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command)) -# Run this on the cluster +# state job analysisStateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_state", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command)) # Summary page job @@ -336,6 +400,10 @@ if options.state_backup_destination: # State saving job stateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) +# +# Setup the Node classes +# + listenNode = lvalert_listen_node(listenJob, dag) # dq with default options @@ -422,7 +490,8 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr "likelihood-snapshot-interval":options.likelihood_snapshot_interval, "min-instruments":options.min_instruments, "min-log-L":options.min_log_L, - "time-slide-file":options.time_slide_file + "time-slide-file":options.time_slide_file, + "output-kafka-server": options.output_kafka_server }, input_files = { "ranking-stat-input":[likefile], @@ -495,11 +564,11 @@ margNode = inspiral_pipe.generic_node(margJob, dag, [], opts = {}, input_files = # snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict] -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes}) -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"]}) -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"]}) +aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes, "kafka-server": options.output_kafka_server}) +aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"], "kafka-server": options.output_kafka_server}) +aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"], "kafka-server": options.output_kafka_server}) -analysisStateNode = inspiral_pipe.generic_node(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys()}) +analysisStateNode = inspiral_pipe.generic_node(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys(), "kafka-server": options.output_kafka_server}) # summary page if options.injection_file: @@ -534,23 +603,3 @@ dag.write_sub_files() dag.write_dag() dag.write_script() dag.write_cache() - - -# -# set up the webpage cgi scripts -# FIXME don't hardcode this stuff -# - - -#cgibinpath = os.path.expanduser("~/public_html/cgi-bin") -#if not os.path.isdir(cgibinpath): -# os.makedirs(cgibinpath) -#shutil.copy2(dagparts.which('gstlalcbcsummary'), cgibinpath) -#shutil.copy2(dagparts.which('gstlalcbcnode'), cgibinpath) -#query = "id=%s,%s&dir=%s&ifos=%s" % (jobTags[0], jobTags[-1], os.getcwd(), ",".join(sorted(bank_cache.keys()))) -## Write the analysis to a special file that the summary page can find by default -#with open(os.path.join(cgibinpath, "gstlalcbc_analysis.txt"), "w") as webfile: -# webfile.write(query) -#print >>sys.stderr, "\n\n NOTE! You can monitor the analysis at this url: %s/~%s/cgi-bin/gstlalcbcsummary?%s \n\n" % (inspiral_pipe.webserver_url(), os.environ['USER'], query) -#if inj_jobTags: -# print >>sys.stderr, "\n\n NOTE! You can monitor the injection analysis at this url: %s/~%s/cgi-bin/gstlalcbcsummary?id=%s,%s&dir=%s&ifos=%s \n\n" % (inspiral_pipe.webserver_url(), os.environ['USER'], inj_jobTags[0], inj_jobTags[-1], os.getcwd(), ",".join(sorted(bank_cache.keys())))