Skip to content
Snippets Groups Projects
Forked from lscsoft / GstLAL
2670 commits behind the upstream repository.
gstlal_ll_feature_extractor_pipe 14.02 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
This program makes a dag to run a series of gstlal_feature_extractor jobs online
"""

__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'

# =============================
#
#           preamble
#
# =============================

import optparse
import os

from gstlal import aggregator
from gstlal import dagparts
from gstlal import inspiral_pipe

from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils

# =============================
#
#          functions
#
# =============================

def generate_options(options):
	"""
	Generates a list of command line options to pass into DAG nodes.
	"""
	# data source options
	if options.data_source == 'lvshm':
		data_source_options = {
			"data-source": options.data_source,
			"shared-memory-partition": options.shared_memory_partition,
			"shared-memory-assumed-duration": options.shared_memory_assumed_duration
		}
	elif options.data_source == 'framexmit':
		data_source_options = {"data-source": options.data_source}

	# waveform options
	waveform_options = {
		"waveform": options.waveform,
		"mismatch": options.mismatch,
		"qhigh": options.qhigh
	}

	# data transfer options
	if options.save_format == 'kafka':
		save_options = {
			"save-format": options.save_format,
			"data-transfer": options.data_transfer,
			"sample-rate": options.sample_rate,
			"kafka-partition": options.kafka_partition,
			"kafka-topic": options.kafka_topic,
			"kafka-server": options.kafka_server
		}
	elif options.save_format == 'hdf5':
		save_options = {
			"save-format": options.save_format,
			"sample-rate": options.sample_rate,
			"cadence": options.cadence,
			"persist-cadence": options.persist_cadence
		}
	else:
		raise NotImplementedError, 'not an available option for online jobs at this time'

	# program behavior options
	program_options = {}
	if options.disable_web_service:
		program_options.update({"disable-web-service": options.disable_web_service})
	if options.verbose:
		program_options.update({"verbose": options.verbose})

	# gobble options together
	out_options = {}
	out_options.update(data_source_options)
	out_options.update(waveform_options)
	out_options.update(save_options)
	out_options.update(program_options)

	return out_options

def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, options, data_source_info):
	feature_extractor_nodes = {}
	channel_list = []

	# generate common command line options
	command_line_options = generate_options(options)

	# parallelize jobs by channel subsets
	for ii, channel_subset in enumerate(data_source_info.channel_subsets):

		if options.verbose:
			print("Creating node for channel subset %d"%ii)

		# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
		channel_list.extend(channel_subset)
		channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
		channels[0] = channels[0].split('=')[1] # this is done to peel off --channel-name option off first channel

		# create specific options for each channel subset
		subset_options = {
			"max-streams": options.max_streams * 2, # FIXME: done to force all channels to be processed in parallel, but should be handled upstream more gracefully
			"job-id": str(ii + 1).zfill(4),
			"channel-name":' '.join(channels)
		}
		subset_options.update(command_line_options)

		feature_extractor_nodes[ii] = \
			inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = parent_nodes,
				opts = subset_options,
				output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
			)

	print("Writing channel list of all channels processed")
	listpath = os.path.join(options.out_path, "full_channel_list.txt")
	with open(listpath, 'w') as f:
		for channel in channel_list:
			f.write(channel+'\n')

	return feature_extractor_nodes


# =============================
#
#           classes
#
# =============================


class zookeeper_job(inspiral_pipe.generic_job):
	"""
	A zookeeper job
	"""
	def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}):
		"""
		"""
		inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)

		try:
			os.mkdir(datadir)
		except OSError:
			pass
		f = open("zookeeper.properties", "w")
		f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
		""" % (datadir, port, maxclients))

		f.close()


class kafka_job(inspiral_pipe.generic_job):
	"""
	A kafka job
	"""
	def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}):
		"""
		"""
		inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands)

		try:
			os.mkdir(logdir)
		except OSError:
			pass
		f = open("kafka.properties", "w")
		f.write("""
broker.id=0
listeners = PLAINTEXT://%s
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
		""" % (host, logdir, zookeeperaddr))

		f.close()

# =============================
#
#     command line parser
#
# =============================

def parse_command_line():
	parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)

	# generic data source and feature extraction options
	multichannel_datasource.append_options(parser)
	feature_extractor.append_options(parser)

	# Condor commands
	group = optparse.OptionGroup(parser, "Condor Options", "Adjust parameters used for HTCondor")
	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
	parser.add_option("--condor-universe", default = "vanilla", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = vanilla")
	parser.add_option("--disable-kafka-jobs", action = "store_true", help = "If set, do not launch instances of kafka and zookeeper alongside feature extraction jobs.")
	parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for feature extraction jobs, default = 2")
	parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory for feature extraction jobs, default = 8GB")
	parser.add_option("--auxiliary-request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count for auxiliary processes, default = 2")
	parser.add_option("--auxiliary-request-memory", default = "2GB", metavar = "integer", help = "set the requested node memory for auxiliary processes, default = 2GB")
	parser.add_option_group(group)

	# Synchronizer/File Sink commands
	group = optparse.OptionGroup(parser, "Synchronizer/File Sink Options", "Adjust parameters used for synchronization and dumping of features to disk.")
	parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
	parser.add_option("--no-drop", default=False, action="store_true", help = "If set, do not drop incoming features based on the latency timeout. Default = False.")
	parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the streaming jobs acquire and processes data. Default = 0.1 seconds.")
	parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
	parser.add_option("--latency-timeout", type = "float", default = 5, help = "Maximum time before incoming data is dropped for a given timestamp. Default = 5 seconds.")
	parser.add_option_group(group)


	options, filenames = parser.parse_args()

	return options, filenames

# =============================
#
#             main
#
# =============================

#
# parsing and setting up core structures
#

options, filenames = parse_command_line()

data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()

#
# create directories if needed
#

listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")

#
# set up dag and job classes
#

dag = inspiral_pipe.DAG("feature_extractor_pipe")

# feature extractor job
if options.condor_universe == 'local':
	condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
else:
	condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe)

# auxiliary jobs
if options.save_format == 'kafka':
	if options.condor_universe == 'local':
		auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
	else:
		auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
	auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options)
	synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)
	hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe)

	if not options.disable_kafka_jobs:
		# kafka/zookeeper jobs
		local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"}
		local_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, local_condor_options)
		zoo_job = zookeeper_job(condor_commands = local_condor_commands)
		kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server)

	#
	# set up options for auxiliary jobs
	#
	common_options = {
		"verbose": options.verbose,
		"tag": options.tag,
		"processing-cadence": options.processing_cadence,
		"request-timeout": options.request_timeout,
		"kafka-server": options.kafka_server
	}

	synchronizer_options = {
		"latency-timeout": options.latency_timeout,
		"input-topic-basename": options.kafka_topic,
		"output-topic-basename": '_'.join(['synchronizer', options.tag])
	}
	if options.no_drop:
		synchronizer_options.update({"no-drop": options.no_drop})

	hdf5_sink_options = {
		"instrument": ifo,
		"channel-list": options.channel_list,
		"waveform": options.waveform,
		"sample-rate": options.sample_rate,
		"write-cadence": options.cadence,
		"persist-cadence": options.persist_cadence,
		"input-topic-basename": '_'.join(['synchronizer', options.tag])
	}

	extra_hdf5_channel_options = {
		"section-include": options.section_include,
		"safety-include": list(options.safety_include),
		"fidelity-exclude": list(options.fidelity_exclude),
		"safe-channel-include": options.safe_channel_include,
		"unsafe-channel-include": options.unsafe_channel_include,
	}

	### FIXME: hack to deal with condor DAG utilities not playing nice with empty settings
	for option_name, option in extra_hdf5_channel_options.items():
		if option:
			hdf5_sink_options[option_name] = option

	synchronizer_options.update(common_options)
	hdf5_sink_options.update(common_options)

#
# set up jobs
#

feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info)

if options.save_format == 'kafka':
	synchronizer_options.update({"num-topics": len(feature_extractor_nodes)})
	synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")})
	hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")})

	if not options.disable_kafka_jobs:
		zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"})
		kafka_node = inspiral_pipe.generic_node(kafka_job, dag, [], opts = {"":"kafka.properties"})

#
# write out dag and sub files
#

dag.write_sub_files()
dag.write_dag()
dag.write_script()