Skip to content
Snippets Groups Projects
Forked from lscsoft / GstLAL
3089 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_feature_extractor_pipe 8.68 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""
This program makes a dag to run offline gstlal_feature_extractor batch jobs
"""

__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'

# =============================
#
#           preamble
#
# =============================

import os
import optparse

import lal
from ligo import segments

from gstlal import aggregator
from gstlal import inspiral_pipe
from gstlal import dagparts as gstlaldagparts

from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils


# =============================
#
#          functions
#
# =============================

def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
	"""
	get a dictionary of all the analysis segments
	"""
	segsdict = segments.segmentlistdict()

	# start pad to allow whitener to settle + the maximum template_length
	start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length

	segsdict[ifo] = segments.segmentlist([boundary_seg])
	segsdict[ifo] = segsdict[ifo].protract(start_pad)
	segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
	if not segsdict[ifo]:
		del segsdict[ifo]

	return segsdict

def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
	"""
	get a dictionary of all the channels per gstlal_feature_extractor job
	"""
	feature_extractor_nodes = {}

	# parallelize jobs by channel subsets
	for ii, channel_subset in enumerate(data_source_info.channel_subsets):

		print "Creating feature extractor jobs for channel subset %d" % ii

		# parallelize jobs by segments
		for seg in segsdict[ifo]:

			# only produce jobs where the analysis runtime after applying segments is nonzero
			if not data_source_info.frame_segments[ifo].intersects_segment(seg):
				if options.verbose:
					print "    Skipping segment (%d, %d) for channel subset %d since there is no analyzable data here" % (int(seg[0]), int(seg[1]), ii)
				continue

			# set maximum number of jobs reading concurrently from the same frame file to prevent I/O locks
			if ii / options.concurrency == 0:
				dep_nodes = parent_nodes
			else:
				dep_nodes = [feature_extractor_nodes[(ii - options.concurrency, seg)]]

			# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
			channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]

			# FIXME: hacky way of getting options to get passed correctly for channels
			channels[0] = channels[0].split('=')[1]

			outpath = os.path.join(options.out_path, "gstlal_feature_extractor")

			# define analysis times
			gps_start_time = int(seg[0])
			feature_start_time = gps_start_time + multirate_datasource.PSD_DROP_TIME + max_template_length
			feature_end_time = min(int(seg[1]), options.gps_end_time)

			feature_extractor_nodes[(ii, seg)] = \
				inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = dep_nodes,
					opts = {"gps-start-time":gps_start_time,
						"gps-end-time":feature_end_time,
						"feature-start-time":feature_start_time,
						"feature-end-time":feature_end_time,
						"data-source":"frames",
						"mismatch":options.mismatch,
						"waveform":options.waveform,
						"qhigh":options.qhigh,
						"channel-name":' '.join(channels),
						"job-id":str(ii + 1).zfill(4),
						"cadence":options.cadence,
						"persist-cadence":options.persist_cadence,
						"max-streams":options.max_serial_streams,
						"disable-web-service":options.disable_web_service,
						"local-frame-caching":options.local_frame_caching,
						"frame-segments-name": options.frame_segments_name,
						"save-format": options.save_format,
						"verbose":options.verbose
					},
					input_files = {"frame-cache":options.frame_cache,
						"frame-segments-file":options.frame_segments_file},
					output_files = {"out-path":outpath}
				)
			if options.verbose:
				print "    Creating node for channel subset %d, gps range %d - %d" % (ii, feature_start_time, feature_end_time)

	return feature_extractor_nodes

# =============================
#
#     command line parser
#
# =============================

def parse_command_line():
	parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)

	# generic data source options
	multichannel_datasource.append_options(parser)
	feature_extractor.append_options(parser)

	# DAG architecture options
	parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
	parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
	parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks")
	parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")

	# Condor commands
	parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2")
	parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB")
	parser.add_option("--request-disk", default = "50GB", metavar = "integer", help = "set the requested node local scratch space size needed, default = 50GB")
	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")

	options, filenames = parser.parse_args()

	# set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel
	options.max_streams = options.max_parallel_streams

	# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check

	# sanity check to enforce a minimum segment length
	# Minimum segment length chosen so that the overlap is a ~33% hit in run time
	min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME)
	assert options.segment_length >= min_segment_length

	return options, filenames

# =============================
#
#             main
#
# =============================

#
# parsing and setting up core structures
#

options, filenames = parse_command_line()

data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()

# FIXME Work out better way to determine max template length
max_template_length = 30

#
# create directories if needed
#

listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")

#
# set up dag and job classes
#

dag = inspiral_pipe.DAG("feature_extractor_pipe")

condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)

#
# set up jobs
#

feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length)

#
# write out dag and sub files
#

dag.write_sub_files()
dag.write_dag()
dag.write_script()