Forked from
lscsoft / GstLAL
2670 commits behind the upstream repository.
-
Patrick Godwin authoredPatrick Godwin authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_ll_feature_extractor_pipe 14.02 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run a series of gstlal_feature_extractor jobs online
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
# =============================
#
# preamble
#
# =============================
import optparse
import os
from gstlal import aggregator
from gstlal import dagparts
from gstlal import inspiral_pipe
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils
# =============================
#
# functions
#
# =============================
def generate_options(options):
"""
Generates a list of command line options to pass into DAG nodes.
"""
# data source options
if options.data_source == 'lvshm':
data_source_options = {
"data-source": options.data_source,
"shared-memory-partition": options.shared_memory_partition,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
elif options.data_source == 'framexmit':
data_source_options = {"data-source": options.data_source}
# waveform options
waveform_options = {
"waveform": options.waveform,
"mismatch": options.mismatch,
"qhigh": options.qhigh
}
# data transfer options
if options.save_format == 'kafka':
save_options = {
"save-format": options.save_format,
"data-transfer": options.data_transfer,
"sample-rate": options.sample_rate,
"kafka-partition": options.kafka_partition,
"kafka-topic": options.kafka_topic,
"kafka-server": options.kafka_server
}
elif options.save_format == 'hdf5':
save_options = {
"save-format": options.save_format,
"sample-rate": options.sample_rate,
"cadence": options.cadence,
"persist-cadence": options.persist_cadence
}
else:
raise NotImplementedError, 'not an available option for online jobs at this time'
# program behavior options
program_options = {}
if options.disable_web_service:
program_options.update({"disable-web-service": options.disable_web_service})
if options.verbose:
program_options.update({"verbose": options.verbose})
# gobble options together
out_options = {}
out_options.update(data_source_options)
out_options.update(waveform_options)
out_options.update(save_options)
out_options.update(program_options)
return out_options
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info):
feature_extractor_nodes = {}
channel_list = []
# generate common command line options
command_line_options = generate_options(options)
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
if options.verbose:
print("Creating node for channel subset %d"%ii)
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channel_list.extend(channel_subset)
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
channels[0] = channels[0].split('=')[1] # this is done to peel off --channel-name option off first channel
# create specific options for each channel subset
subset_options = {
"max-streams": options.max_streams * 2, # FIXME: done to force all channels to be processed in parallel, but should be handled upstream more gracefully
"job-id": str(ii + 1).zfill(4),
"channel-name":' '.join(channels)
}
subset_options.update(command_line_options)
feature_extractor_nodes[ii] = \
inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = parent_nodes,
opts = subset_options,
output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
)
print("Writing channel list of all channels processed")
listpath = os.path.join(options.out_path, "full_channel_list.txt")
with open(listpath, 'w') as f:
for channel in channel_list:
f.write(channel+'\n')
return feature_extractor_nodes
# =============================
#
# classes
#
# =============================
class zookeeper_job(inspiral_pipe.generic_job):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}):
"""
"""
inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
f = open("zookeeper.properties", "w")
f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
""" % (datadir, port, maxclients))
f.close()
class kafka_job(inspiral_pipe.generic_job):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}):
"""
"""
inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
f = open("kafka.properties", "w")
f.write("""
broker.id=0
listeners = PLAINTEXT://%s
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
""" % (host, logdir, zookeeperaddr))
f.close()
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source and feature extraction options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# Condor commands
group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
parser.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla")
parser.add_option("--disable-kafka-jobs", action = "store_true", help = "If set, do not launch instances of kafka and zookeeper alongside feature extraction jobs.")
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB")
parser.add_option("--auxiliary-request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for auxiliary processes, default = 2")
parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
parser.add_option_group(group)
# Synchronizer/File Sink commands
group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
parser.add_option_group(group)
options, filenames = parser.parse_args()
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
#
# create directories if needed
#
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")
#
# set up dag and job classes
#
dag = inspiral_pipe.DAG("feature_extractor_pipe")
# feature extractor job
if options.condor_universe == 'local':
condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe)
# auxiliary jobs
if options.save_format == 'kafka':
if options.condor_universe == 'local':
auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
if not options.disable_kafka_jobs:
# kafka/zookeeper jobs
local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
local_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, local_condor_options)
zoo_job = zookeeper_job(condor_commands = local_condor_commands)
kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server)
#
# set up options for auxiliary jobs
#
common_options = {
"verbose": options.verbose,
"tag": options.tag,
"processing-cadence": options.processing_cadence,
"request-timeout": options.request_timeout,
"kafka-server": options.kafka_server
}
synchronizer_options = {
"latency-timeout": options.latency_timeout,
"input-topic-basename": options.kafka_topic,
"output-topic-basename": '_'.join(['synchronizer', options.tag])
}
if options.no_drop:
synchronizer_options.update({"no-drop": options.no_drop})
hdf5_sink_options = {
"instrument": ifo,
"channel-list": options.channel_list,
"waveform": options.waveform,
"sample-rate": options.sample_rate,
"write-cadence": options.cadence,
"persist-cadence": options.persist_cadence,
"input-topic-basename": '_'.join(['synchronizer', options.tag])
}
extra_hdf5_channel_options = {
"section-include": options.section_include,
"safety-include": list(options.safety_include),
"fidelity-exclude": list(options.fidelity_exclude),
"safe-channel-include": options.safe_channel_include,
"unsafe-channel-include": options.unsafe_channel_include,
}
### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
for option_name, option in extra_hdf5_channel_options.items():
if option:
hdf5_sink_options[option_name] = option
synchronizer_options.update(common_options)
hdf5_sink_options.update(common_options)
#
# set up jobs
#
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)
if options.save_format == 'kafka':
synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})
if not options.disable_kafka_jobs:
zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = inspiral_pipe.generic_node(kafka_job, dag, [], opts = {"":"kafka.properties"})
#
# write out dag and sub files
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()