inspiral_pipe.py 15 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (C) 2013--2014  Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

## 
# @file
#
20
# A file that contains the inspiral_pipe module code; used to construct condor dags
21 22 23
#

##
24
# @package inspiral_pipe
25
#
26
# A module that contains the inspiral_pipe module code; used to construct condor dags
Chad Hanna's avatar
Chad Hanna committed
27 28 29
#
# ### Review Status
#
30 31 32
# | Names                                          | Hash                                        | Date       | Diff to Head of Master      |
# | -------------------------------------------    | ------------------------------------------- | ---------- | --------------------------- |
# | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e    | 2014-06-18 | <a href="@gstlal_inspiral_cgit_diff/python/inspiral_pipe.py?id=HEAD&id2=8a6ea41398be79c00bdc27456ddeb1b590b0f68e">inspiral_pipe.py</a> |
33
#
Chad Hanna's avatar
Chad Hanna committed
34 35 36 37
# #### Actions
#
# - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
# - On line 201, fix the comment or explain what the comment is meant to be
38

39
import sys, os
40
import subprocess, socket, tempfile, copy, doctest
Chad Hanna's avatar
Chad Hanna committed
41
from glue import pipeline, lal
42
from glue.ligolw import utils, lsctables, array
Chad Hanna's avatar
Chad Hanna committed
43

44 45

#
Chad Hanna's avatar
Chad Hanna committed
46
# environment utilities
47 48
#

Chad Hanna's avatar
Chad Hanna committed
49 50

def which(prog):
51
	"""!
52 53 54 55 56
	Like the which program to find the path to an executable

	>>> which("ls")
	'/bin/ls'

57
	"""
Chad Hanna's avatar
Chad Hanna committed
58 59
	which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
	out = which.stdout.read().strip()
60
	if not out:
Chad Hanna's avatar
Chad Hanna committed
61
		print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog)
62
		raise ValueError
Chad Hanna's avatar
Chad Hanna committed
63 64
	return out

65

66
def condor_scratch_space():
67 68
	"""!
	A way to standardize the condor scratch space even if it changes
69 70
	>>> condor_scratch_space()
	'_CONDOR_SCRATCH_DIR'
71
	"""
72
	return "_CONDOR_SCRATCH_DIR"
73

74

Chad Hanna's avatar
Chad Hanna committed
75
def log_path():
76 77 78 79
	"""!
	The stupid pet tricks to find log space on the LDG.
	Defaults to checking TMPDIR first.
	"""
Chad Hanna's avatar
Chad Hanna committed
80
	host = socket.getfqdn()
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
	try:
		return os.environ['TMPDIR']
	except KeyError:
		print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
		#FIXME add more hosts as you need them
		if 'cit' in host or 'caltech.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phys.uwm.edu' in host:
			tmp = '/localscratch/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'aei.uni-hannover.de' in host:
			tmp = '/local/user/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phy.syr.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp

		raise KeyError("$TMPDIR is not set and I don't recognize this environment")
Chad Hanna's avatar
Chad Hanna committed
104 105


106 107 108 109 110 111 112 113 114 115 116 117 118 119
def webserver_url():
	"""!
	The stupid pet tricks to find webserver on the LDG.
	"""
	host = socket.getfqdn()
	#FIXME add more hosts as you need them
	if "cit" in host or "ligo.caltech.edu" in host:
		return "https://ldas-jobs.ligo.caltech.edu"
	if "phys.uwm.edu" in host or "cgca.uwm.edu" in host:
		return "https://ldas-jobs.cgca.uwm.edu"

	raise NotImplementedError("I don't know where the webserver is for this environment")


120
#
Chad Hanna's avatar
Chad Hanna committed
121
# DAG class
122 123
#

Chad Hanna's avatar
Chad Hanna committed
124

Chad Hanna's avatar
typo  
Chad Hanna committed
125
class DAG(pipeline.CondorDAG):
126 127
	"""!
	A thin subclass of pipeline.CondorDAG.
Chad Hanna's avatar
Chad Hanna committed
128

129
	Extra features include an add_node() method and a cache writing method.
130 131
	Also includes some standard setup, e.g., log file paths etc.
	"""
Chad Hanna's avatar
Chad Hanna committed
132
	def __init__(self, name, logpath = log_path()):
133
		self.basename = name.replace(".dag","")
Chad Hanna's avatar
Chad Hanna committed
134 135 136 137 138 139 140 141 142 143 144
		tempfile.tempdir = logpath
		tempfile.template = self.basename + '.dag.log.'
		logfile = tempfile.mktemp()
		fh = open( logfile, "w" )
		fh.close()
		pipeline.CondorDAG.__init__(self,logfile)
		self.set_dag_file(self.basename)
		self.jobsDict = {}
		self.output_cache = []

	def add_node(self, node):
Chad Hanna's avatar
Chad Hanna committed
145
		node.set_retry(3)
Chad Hanna's avatar
Chad Hanna committed
146 147 148 149 150 151 152 153 154 155 156
		node.add_macro("macronodename", node.get_name())
		pipeline.CondorDAG.add_node(self, node)

	def write_cache(self):
		out = self.basename + ".cache"
		f = open(out,"w")
		for c in self.output_cache:
			f.write(str(c)+"\n")
		f.close()


157
class InspiralJob(pipeline.CondorDAGJob):
158 159 160
	"""!
	A job class that subclasses pipeline.CondorDAGJob and adds some extra
	boiler plate items for gstlal inspiral jobs
161
	"""
162
	def __init__(self, executable, tag_base, universe = "vanilla"):
163 164
		self.__prog__ = tag_base
		self.__executable = executable
165
		self.__universe = universe
166 167
		pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
		self.add_condor_cmd('getenv','True')
168
		self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;")
169 170
		self.tag_base = tag_base
		self.set_sub_file(tag_base+'.sub')
171 172
		self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
		self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
173
		self.number = 1
174 175 176 177 178 179
		# make an output directory for files
		self.output_path = tag_base
		try:
			os.mkdir(self.output_path)
		except:
			pass
180

181

182
class InspiralNode(pipeline.CondorDAGNode):
183 184 185 186
	"""!
	A node class that subclasses pipeline.CondorDAGNode that automates
	adding the node to the dag, makes sensible names and allows a list of parent
	nodes to be provided.
187 188 189 190 191
	"""
	def __init__(self, job, dag, p_node=[]):
		pipeline.CondorDAGNode.__init__(self, job)
		for p in p_node:
			self.add_parent(p)
192 193
		self.set_name("%s_%04X" % (job.tag_base, job.number))
		job.number += 1
194 195 196
		dag.add_node(self)


197 198 199 200 201 202 203 204
class generic_job(InspiralJob):
	"""!
	A generic job class which tends to do the "right" thing when given just
	an executable name but otherwise is a subclass of InspiralJob and thus
	pipeline.CondorDAGJob
	"""
	def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs):
		executable = which(program)
205
		InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1], **kwargs)
206 207
		for cmd,val in condor_commands.items():
			self.add_condor_cmd(cmd, val)
208

209 210 211 212 213 214 215 216

class generic_node(InspiralNode):
	"""!
	A generic node class which tends to do the "right" thing when given a
	job, a dag, parent nodes, a dictionary options relevant to the job, a
	dictionary of options related to input files and a dictionary of options
	related to output files.  Otherwise it is a subclass of InspiralNode and thus
	pipeline.CondorDAGNode
217 218 219 220

	NOTE and important and subtle behavior - You can specify an option with
	an empty argument by setting it to "".  However options set to None are simply
	ignored.
221
	"""
222
	def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None):
223 224
		InspiralNode.__init__(self, job, dag, parent_nodes)

225 226 227 228 229 230 231
		self.input_files = input_files.copy()
		self.input_files.update(input_cache_files)
		self.output_files = output_files.copy()
		self.output_files.update(output_cache_files)

		self.cache_inputs = {}
		self.cache_outputs = {}
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

		for opt, val in opts.items() + output_files.items() + input_files.items():
			if val is None:
				continue # not the same as val = '' which is allowed
			if not hasattr(val, "__iter__"): # catches list like things but not strings
				if opt == "":
					self.add_var_arg(val)
				else:
					self.add_var_opt(opt, val)
			# Must be an iterable
			else:
				if opt == "":
					[self.add_var_arg(a) for a in val]
				else:
					self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))

248 249 250 251
		# Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string
		# is handled by gstlal_inspiral_pipe directly

		for opt, val in input_cache_files.items():
252
			cache_entries = [lal.CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
253 254 255 256
			if input_cache_file_name is None:
				cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = job.tag_base)
			else:
				cache_file_name = input_cache_file_name
257 258 259 260 261 262 263
			with open(cache_file_name, "w") as cache_file:
				lal.Cache(cache_entries).tofile(cache_file)
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_inputs.setdefault(opt, []).append(cache_file_name)

		for opt, val in output_cache_files.items():
264
			cache_entries = [lal.CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
265
			cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = job.tag_base)
266 267 268 269 270
			with open(cache_file_name, "w") as cache_file:
				lal.Cache(cache_entries).tofile(cache_file)
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_outputs.setdefault(opt, []).append(cache_file_name)
271 272 273 274 275 276

def pipeline_dot_py_append_opts_hack(opt, vals):
	"""!
	A way to work around the dictionary nature of pipeline.py which can
	only record options once.

277
	>>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
278 279 280 281 282 283 284 285 286 287
	'1 --my-favorite-option 2 --my-favorite-option 3'
	"""	
	out = str(vals[0])
	for v in vals[1:]:
		out += " --%s %s" % (opt, str(v))
	return out



#
Chad Hanna's avatar
Chad Hanna committed
288
# Utility functions
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
#


def group(inlist, parts):
	"""!
	group a list roughly according to the distribution in parts, e.g.

	>>> A = range(12)
	>>> B = [2,3]
	>>> for g in group(A,B):
	...     print g
	... 
	[0, 1]
	[2, 3]
	[4, 5]
	[6, 7, 8]
	[9, 10, 11]
	"""
	mult_factor = len(inlist) // sum(parts) + 1
	l = copy.deepcopy(inlist)
	for i, p in enumerate(parts):
		for j in range(mult_factor):
			if not l:
				break
			yield l[:p]
			del l[:p]
Chad Hanna's avatar
Chad Hanna committed
315 316 317


def parse_cache_str(instr):
318 319 320 321
	"""!
	A way to decode a command line option that specifies different bank
	caches for different detectors, e.g.,

322
	>>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
323 324 325 326
	>>> bankcache
	{'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
	"""

Chad Hanna's avatar
Chad Hanna committed
327 328 329 330 331 332 333 334
	dictcache = {}
	if instr is None: return dictcache
	for c in instr.split(','):
		ifo = c.split("=")[0]
		cache = c.replace(ifo+"=","")
		dictcache[ifo] = cache
	return dictcache

335

336
def build_bank_groups(cachedict, numbanks = [2], maxjobs = None):
337 338 339 340 341 342
	"""!
	given a dictionary of bank cache files keyed by ifo from .e.g.,
	parse_cache_str(), group the banks into suitable size chunks for a single svd
	bank file according to numbanks.  Note, numbanks can be should be a list and uses
	the algorithm in the group() function
	"""
343
	outstrs = []
344 345 346
	ifos = sorted(cachedict.keys())
	files = zip(*[[lal.CacheEntry(f).path for f in open(cachedict[ifo],'r').readlines()] for ifo in ifos])
	for n, bank_group in enumerate(group(files, numbanks)):
gstlalcbc's avatar
gstlalcbc committed
347
		if maxjobs is not None and n > maxjobs:
Chad Hanna's avatar
Chad Hanna committed
348
			break
349
		c = dict(zip(ifos, zip(*bank_group)))
350
		outstrs.append(c)
Chad Hanna's avatar
Chad Hanna committed
351

352
	return outstrs
353

354

355 356 357 358
def T050017_filename(instruments, description, start, end, extension, path = None):
	"""!
	A function to generate a T050017 filename.
	"""
359
	if not isinstance(instruments, basestring):
360 361 362 363 364 365 366
		instruments = "".join(sorted(instruments))
	duration = end - start
	extension = extension.strip('.')
	if path is not None:
		return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
	else:
		return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
367 368 369 370 371


if __name__ == "__main__":
	import doctest
	doctest.testmod()
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391


def condor_command_dict_from_opts(opts, defaultdict = {}):
	"""!
	A function to turn a list of options into a dictionary of condor commands, e.g.,

	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"])
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"})
	{'somecommand': 'somevalue', 'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"})
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	"""

	for o in opts:
		osplit = o.split("=")
		k = osplit[0]
		v = "=".join(osplit[1:])
		defaultdict.update([(k, v)])
	return defaultdict
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440


def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None):
	"""!
	A function to return the name of a file created from multiple files following
	the T050017 convention. In addition to the T050017 requirements, this assumes
	that numbers relevant to organization schemes will be the first entry in the
	description, e.g. 0_DIST_STATS, and that all files in a given cache file are
	from the same group of ifos and either contain data from the same segment or
	from the same background bin.  Note, that each file doesn't have to be from
	the same IFO, for example the template bank cache could contain template bank
	files from H1 and template bank files from L1.
	"""
	# Check that every file has same observatory. 
	observatories = [cache_entries[0].observatory]
	for entry in cache_entries[1:]:
		if entry.observatory == observatories[0]:
			break
		observatories.append(entry.observatory)

	split_description = cache_entries[0].description.split('_')
	min_bin = [x for x in split_description[:2] if x.isdigit()]
	max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()]
	min_seg = min([int(x.segment[0]) for x in cache_entries])
	max_seg = max([int(x.segment[1]) for x in cache_entries])
	if min_bin:
		min_bin = min_bin[0]
	if max_bin:
		max_bin = max_bin[-1]
	if min_bin and (min_bin == max_bin or not max_bin):
		# All files from same bin, thus segments may be different.
		# Note that this assumes that if the last file in the cache
		# does not start with a number that every file in the cache is
		# from the same bin, an example of this is the cache file
		# generated for gstlal_inspiral_calc_likelihood, which contains
		# all of the DIST_STATS files from a given background bin and
		# then CREATE_PRIOR_DIST_STATS files which are not generated
		# for specific bins
		return T050017_filename(''.join(observatories), cache_entries[0].description, min_seg, max_seg, extension, path = path)
	elif min_bin and max_bin and min_bin != max_bin:
		if split_description[1].isdigit():
			description_base = split_description[2:]
		else:
			description_base = split_description[1:]
		# Files from different bins, thus segments must be same
		return T050017_filename(''.join(observatories), '_'.join([min_bin, max_bin] + description_base), min_seg, max_seg, extension, path = path)
	else:
		print >>sys.stderr, "ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path)
		raise ValueError