#!/usr/bin/env python # # Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ This program makes a dag to run a series of gstlal_feature_extractor jobs online """ __author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>' # ============================= # # preamble # # ============================= import optparse import os from gstlal import aggregator from gstlal import dagparts from gstlal import inspiral_pipe from gstlal.fxtools import feature_extractor from gstlal.fxtools import multichannel_datasource from gstlal.fxtools import multirate_datasource from gstlal.fxtools import utils # ============================= # # functions # # ============================= def generate_options(options): """ Generates a list of command line options to pass into DAG nodes. """ # data source options if options.data_source == 'lvshm': data_source_options = { "data-source": options.data_source, "shared-memory-partition": options.shared_memory_partition, "shared-memory-assumed-duration": options.shared_memory_assumed_duration } elif options.data_source == 'framexmit': data_source_options = {"data-source": options.data_source} # waveform options waveform_options = { "waveform": options.waveform, "mismatch": options.mismatch, "qhigh": options.qhigh } # data transfer options if options.save_format == 'kafka': save_options = { "save-format": options.save_format, "data-transfer": options.data_transfer, "sample-rate": options.sample_rate, "kafka-partition": options.kafka_partition, "kafka-topic": options.kafka_topic, "kafka-server": options.kafka_server } elif options.save_format == 'hdf5': save_options = { "save-format": options.save_format, "sample-rate": options.sample_rate, "cadence": options.cadence, "persist-cadence": options.persist_cadence } else: raise NotImplementedError, 'not an available option for online jobs at this time' # program behavior options program_options = {} if options.disable_web_service: program_options.update({"disable-web-service": options.disable_web_service}) if options.verbose: program_options.update({"verbose": options.verbose}) # gobble options together out_options = {} out_options.update(data_source_options) out_options.update(waveform_options) out_options.update(save_options) out_options.update(program_options) return out_options def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info): feature_extractor_nodes = {} channel_list = [] # generate common command line options command_line_options = generate_options(options) # parallelize jobs by channel subsets for ii, channel_subset in enumerate(data_source_info.channel_subsets): if options.verbose: print("Creating node for channel subset %d"%ii) # creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE channel_list.extend(channel_subset) channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset] channels[0] = channels[0].split('=')[1] # this is done to peel off --channel-name option off first channel # create specific options for each channel subset subset_options = { "max-streams": options.max_streams * 2, # FIXME: done to force all channels to be processed in parallel, but should be handled upstream more gracefully "job-id": str(ii + 1).zfill(4), "channel-name":' '.join(channels) } subset_options.update(command_line_options) feature_extractor_nodes[ii] = \ inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = parent_nodes, opts = subset_options, output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")} ) print("Writing channel list of all channels processed") listpath = os.path.join(options.out_path, "full_channel_list.txt") with open(listpath, 'w') as f: for channel in channel_list: f.write(channel+'\n') return feature_extractor_nodes # ============================= # # classes # # ============================= class zookeeper_job(inspiral_pipe.generic_job): """ A zookeeper job """ def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}): """ """ inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(datadir) except OSError: pass f = open("zookeeper.properties", "w") f.write(""" # the directory where the snapshot is stored. dataDir=%s # the port at which the clients will connect clientPort=%d # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=%d """ % (datadir, port, maxclients)) f.close() class kafka_job(inspiral_pipe.generic_job): """ A kafka job """ def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}): """ """ inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(logdir) except OSError: pass f = open("kafka.properties", "w") f.write(""" broker.id=0 listeners = PLAINTEXT://%s background.threads=100 num.network.threads=50 num.io.threads=80 log.cleaner.threads=10 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 queued.max.requests=10000 log.dirs=%s num.partitions=1 num.recovery.threads.per.data.dir=1 auto.create.topics.enable=true offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.ms=300000 log.retention.ms=100000 log.roll.ms = 1000000 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=%s zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 """ % (host, logdir, zookeeperaddr)) f.close() # ============================= # # command line parser # # ============================= def parse_command_line(): parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__) # generic data source and feature extraction options multichannel_datasource.append_options(parser) feature_extractor.append_options(parser) # Condor commands group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor") parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times") parser.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla") parser.add_option("--disable-kafka-jobs", action = "store_true", help = "If set, do not launch instances of kafka and zookeeper alongside feature extraction jobs.") parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2") parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB") parser.add_option("--auxiliary-request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for auxiliary processes, default = 2") parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB") parser.add_option_group(group) # Synchronizer/File Sink commands group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.") parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'") parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.") parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.") parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.") parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.") parser.add_option_group(group) options, filenames = parser.parse_args() return options, filenames # ============================= # # main # # ============================= # # parsing and setting up core structures # options, filenames = parse_command_line() data_source_info = multichannel_datasource.DataSourceInfo(options) ifo = data_source_info.instrument channels = data_source_info.channel_dict.keys() # # create directories if needed # listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists") aggregator.makedir(listdir) aggregator.makedir("logs") # # set up dag and job classes # dag = inspiral_pipe.DAG("feature_extractor_pipe") # feature extractor job if options.condor_universe == 'local': condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} else: condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options) feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe) # auxiliary jobs if options.save_format == 'kafka': if options.condor_universe == 'local': auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} else: auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options) synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) if not options.disable_kafka_jobs: # kafka/zookeeper jobs local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} local_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, local_condor_options) zoo_job = zookeeper_job(condor_commands = local_condor_commands) kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server) # # set up options for auxiliary jobs # common_options = { "verbose": options.verbose, "tag": options.tag, "processing-cadence": options.processing_cadence, "request-timeout": options.request_timeout, "kafka-server": options.kafka_server } synchronizer_options = { "latency-timeout": options.latency_timeout, "input-topic-basename": options.kafka_topic, "output-topic-basename": '_'.join(['synchronizer', options.tag]) } if options.no_drop: synchronizer_options.update({"no-drop": options.no_drop}) hdf5_sink_options = { "instrument": ifo, "channel-list": options.channel_list, "waveform": options.waveform, "sample-rate": options.sample_rate, "write-cadence": options.cadence, "persist-cadence": options.persist_cadence, "input-topic-basename": '_'.join(['synchronizer', options.tag]) } extra_hdf5_channel_options = { "section-include": options.section_include, "safety-include": list(options.safety_include), "fidelity-exclude": list(options.fidelity_exclude), "safe-channel-include": options.safe_channel_include, "unsafe-channel-include": options.unsafe_channel_include, } ### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings for option_name, option in extra_hdf5_channel_options.items(): if option: hdf5_sink_options[option_name] = option synchronizer_options.update(common_options) hdf5_sink_options.update(common_options) # # set up jobs # feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info) if options.save_format == 'kafka': synchronizer_options.update({"num-topics": len(feature_extractor_nodes)}) synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")}) hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")}) if not options.disable_kafka_jobs: zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"}) kafka_node = inspiral_pipe.generic_node(kafka_job, dag, [], opts = {"":"kafka.properties"}) # # write out dag and sub files # dag.write_sub_files() dag.write_dag() dag.write_script()