inspiral_pipe.py 17.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (C) 2013--2014  Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

## 
# @file
#
20
# A file that contains the inspiral_pipe module code; used to construct condor dags
21 22 23
#

##
24
# @package inspiral_pipe
25
#
26
# A module that contains the inspiral_pipe module code; used to construct condor dags
Chad Hanna's avatar
Chad Hanna committed
27 28 29
#
# ### Review Status
#
30 31 32
# | Names                                          | Hash                                        | Date       | Diff to Head of Master      |
# | -------------------------------------------    | ------------------------------------------- | ---------- | --------------------------- |
# | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e    | 2014-06-18 | <a href="@gstlal_inspiral_cgit_diff/python/inspiral_pipe.py?id=HEAD&id2=8a6ea41398be79c00bdc27456ddeb1b590b0f68e">inspiral_pipe.py</a> |
33
#
Chad Hanna's avatar
Chad Hanna committed
34 35 36 37
# #### Actions
#
# - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
# - On line 201, fix the comment or explain what the comment is meant to be
38

39
import math
40
import sys, os
41
import subprocess, socket, tempfile, copy, doctest
42
from glue import pipeline
43
from ligo import segments
44 45
from glue.ligolw import lsctables, ligolw
from glue.ligolw import utils as ligolw_utils
46
from gstlal import svd_bank
47
from lal.utils import CacheEntry
Chad Hanna's avatar
Chad Hanna committed
48

49 50

#
Chad Hanna's avatar
Chad Hanna committed
51
# environment utilities
52 53
#

Chad Hanna's avatar
Chad Hanna committed
54 55

def which(prog):
56
	"""!
57 58 59 60 61
	Like the which program to find the path to an executable

	>>> which("ls")
	'/bin/ls'

62
	"""
Chad Hanna's avatar
Chad Hanna committed
63 64
	which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
	out = which.stdout.read().strip()
65
	if not out:
Chad Hanna's avatar
Chad Hanna committed
66
		print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog)
67
		raise ValueError
Chad Hanna's avatar
Chad Hanna committed
68 69
	return out

70

71
def condor_scratch_space():
72 73
	"""!
	A way to standardize the condor scratch space even if it changes
74 75
	>>> condor_scratch_space()
	'_CONDOR_SCRATCH_DIR'
76
	"""
77
	return "_CONDOR_SCRATCH_DIR"
78

79

Chad Hanna's avatar
Chad Hanna committed
80
def log_path():
81 82 83 84
	"""!
	The stupid pet tricks to find log space on the LDG.
	Defaults to checking TMPDIR first.
	"""
Chad Hanna's avatar
Chad Hanna committed
85
	host = socket.getfqdn()
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	try:
		return os.environ['TMPDIR']
	except KeyError:
		print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
		#FIXME add more hosts as you need them
		if 'cit' in host or 'caltech.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phys.uwm.edu' in host:
			tmp = '/localscratch/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'aei.uni-hannover.de' in host:
			tmp = '/local/user/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phy.syr.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp

		raise KeyError("$TMPDIR is not set and I don't recognize this environment")
Chad Hanna's avatar
Chad Hanna committed
109 110


111 112 113 114 115 116 117 118
def webserver_url():
	"""!
	The stupid pet tricks to find webserver on the LDG.
	"""
	host = socket.getfqdn()
	#FIXME add more hosts as you need them
	if "cit" in host or "ligo.caltech.edu" in host:
		return "https://ldas-jobs.ligo.caltech.edu"
119
	if ".phys.uwm.edu" in host or ".cgca.uwm.edu" in host or ".nemo.uwm.edu" in host:
120
		return "https://ldas-jobs.cgca.uwm.edu"
121 122 123 124 125
	# FIXME:  this next system does not have a web server, but not
	# having a web server is treated as a fatal error so we have to
	# make something up if we want to make progress
	if ".icrr.u-tokyo.ac.jp" in host:
		return "https://ldas-jobs.icrr.u-tokyo.ac.jp"
126 127 128 129

	raise NotImplementedError("I don't know where the webserver is for this environment")


130
#
Chad Hanna's avatar
Chad Hanna committed
131
# DAG class
132 133
#

Chad Hanna's avatar
Chad Hanna committed
134

Chad Hanna's avatar
typo  
Chad Hanna committed
135
class DAG(pipeline.CondorDAG):
136 137
	"""!
	A thin subclass of pipeline.CondorDAG.
Chad Hanna's avatar
Chad Hanna committed
138

139
	Extra features include an add_node() method and a cache writing method.
140 141
	Also includes some standard setup, e.g., log file paths etc.
	"""
Chad Hanna's avatar
Chad Hanna committed
142
	def __init__(self, name, logpath = log_path()):
143
		self.basename = name.replace(".dag","")
Chad Hanna's avatar
Chad Hanna committed
144 145 146 147 148 149 150 151 152 153 154
		tempfile.tempdir = logpath
		tempfile.template = self.basename + '.dag.log.'
		logfile = tempfile.mktemp()
		fh = open( logfile, "w" )
		fh.close()
		pipeline.CondorDAG.__init__(self,logfile)
		self.set_dag_file(self.basename)
		self.jobsDict = {}
		self.output_cache = []

	def add_node(self, node):
Chad Hanna's avatar
Chad Hanna committed
155
		node.set_retry(3)
Chad Hanna's avatar
Chad Hanna committed
156 157 158 159 160 161 162 163 164 165 166
		node.add_macro("macronodename", node.get_name())
		pipeline.CondorDAG.add_node(self, node)

	def write_cache(self):
		out = self.basename + ".cache"
		f = open(out,"w")
		for c in self.output_cache:
			f.write(str(c)+"\n")
		f.close()


167
class InspiralJob(pipeline.CondorDAGJob):
168 169 170
	"""!
	A job class that subclasses pipeline.CondorDAGJob and adds some extra
	boiler plate items for gstlal inspiral jobs
171
	"""
172
	def __init__(self, executable, tag_base, universe = "vanilla"):
173 174
		self.__prog__ = tag_base
		self.__executable = executable
175
		self.__universe = universe
176 177
		pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
		self.add_condor_cmd('getenv','True')
178
		self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;")
179 180
		self.tag_base = tag_base
		self.set_sub_file(tag_base+'.sub')
181 182
		self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
		self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
183
		self.number = 1
184 185 186 187 188 189
		# make an output directory for files
		self.output_path = tag_base
		try:
			os.mkdir(self.output_path)
		except:
			pass
190

191

192
class InspiralNode(pipeline.CondorDAGNode):
193 194 195 196
	"""!
	A node class that subclasses pipeline.CondorDAGNode that automates
	adding the node to the dag, makes sensible names and allows a list of parent
	nodes to be provided.
197 198 199 200 201
	"""
	def __init__(self, job, dag, p_node=[]):
		pipeline.CondorDAGNode.__init__(self, job)
		for p in p_node:
			self.add_parent(p)
202 203
		self.set_name("%s_%04X" % (job.tag_base, job.number))
		job.number += 1
204 205 206
		dag.add_node(self)


207 208 209 210 211 212 213 214
class generic_job(InspiralJob):
	"""!
	A generic job class which tends to do the "right" thing when given just
	an executable name but otherwise is a subclass of InspiralJob and thus
	pipeline.CondorDAGJob
	"""
	def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs):
		executable = which(program)
215
		InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1], **kwargs)
216 217
		for cmd,val in condor_commands.items():
			self.add_condor_cmd(cmd, val)
218

219 220 221 222 223 224 225 226

class generic_node(InspiralNode):
	"""!
	A generic node class which tends to do the "right" thing when given a
	job, a dag, parent nodes, a dictionary options relevant to the job, a
	dictionary of options related to input files and a dictionary of options
	related to output files.  Otherwise it is a subclass of InspiralNode and thus
	pipeline.CondorDAGNode
227 228 229 230

	NOTE and important and subtle behavior - You can specify an option with
	an empty argument by setting it to "".  However options set to None are simply
	ignored.
231
	"""
232
	def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None):
233 234
		InspiralNode.__init__(self, job, dag, parent_nodes)

235 236 237 238 239 240 241
		self.input_files = input_files.copy()
		self.input_files.update(input_cache_files)
		self.output_files = output_files.copy()
		self.output_files.update(output_cache_files)

		self.cache_inputs = {}
		self.cache_outputs = {}
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

		for opt, val in opts.items() + output_files.items() + input_files.items():
			if val is None:
				continue # not the same as val = '' which is allowed
			if not hasattr(val, "__iter__"): # catches list like things but not strings
				if opt == "":
					self.add_var_arg(val)
				else:
					self.add_var_opt(opt, val)
			# Must be an iterable
			else:
				if opt == "":
					[self.add_var_arg(a) for a in val]
				else:
					self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))

258 259 260
		# Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string
		# is handled by gstlal_inspiral_pipe directly

261 262
		cache_dir = os.path.join(job.tag_base, 'cache')

263
		for opt, val in input_cache_files.items():
264 265
			if not os.path.isdir(cache_dir):
				os.mkdir(cache_dir)
266
			cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
267
			if input_cache_file_name is None:
268
				cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
269
			else:
270
				cache_file_name = os.path.join(cache_dir, input_cache_file_name)
271
			open(cache_file_name, "w").write("\n".join(map(str, cache_entries)))
272 273 274 275 276
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_inputs.setdefault(opt, []).append(cache_file_name)

		for opt, val in output_cache_files.items():
277 278
			if not os.path.isdir(cache_dir):
				os.mkdir(cache_dir)
279
			cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
280
			cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
281
			open(cache_file_name, "w").write("\n".join(map(str, cache_entries)))
282 283 284
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_outputs.setdefault(opt, []).append(cache_file_name)
285 286 287 288 289 290

def pipeline_dot_py_append_opts_hack(opt, vals):
	"""!
	A way to work around the dictionary nature of pipeline.py which can
	only record options once.

291
	>>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
292 293 294 295 296 297 298 299 300 301
	'1 --my-favorite-option 2 --my-favorite-option 3'
	"""	
	out = str(vals[0])
	for v in vals[1:]:
		out += " --%s %s" % (opt, str(v))
	return out



#
Chad Hanna's avatar
Chad Hanna committed
302
# Utility functions
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
#


def group(inlist, parts):
	"""!
	group a list roughly according to the distribution in parts, e.g.

	>>> A = range(12)
	>>> B = [2,3]
	>>> for g in group(A,B):
	...     print g
	... 
	[0, 1]
	[2, 3]
	[4, 5]
	[6, 7, 8]
	[9, 10, 11]
	"""
	mult_factor = len(inlist) // sum(parts) + 1
	l = copy.deepcopy(inlist)
	for i, p in enumerate(parts):
		for j in range(mult_factor):
			if not l:
				break
			yield l[:p]
			del l[:p]
Chad Hanna's avatar
Chad Hanna committed
329 330 331


def parse_cache_str(instr):
332 333 334 335
	"""!
	A way to decode a command line option that specifies different bank
	caches for different detectors, e.g.,

336
	>>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
337 338 339 340
	>>> bankcache
	{'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
	"""

Chad Hanna's avatar
Chad Hanna committed
341 342 343 344 345 346 347 348
	dictcache = {}
	if instr is None: return dictcache
	for c in instr.split(','):
		ifo = c.split("=")[0]
		cache = c.replace(ifo+"=","")
		dictcache[ifo] = cache
	return dictcache

349

350
def build_bank_groups(cachedict, numbanks = [2], maxjobs = None):
351 352 353 354 355 356
	"""!
	given a dictionary of bank cache files keyed by ifo from .e.g.,
	parse_cache_str(), group the banks into suitable size chunks for a single svd
	bank file according to numbanks.  Note, numbanks can be should be a list and uses
	the algorithm in the group() function
	"""
357
	outstrs = []
358
	ifos = sorted(cachedict.keys())
359
	files = zip(*[[CacheEntry(f).path for f in open(cachedict[ifo],'r').readlines()] for ifo in ifos])
360
	for n, bank_group in enumerate(group(files, numbanks)):
gstlalcbc's avatar
gstlalcbc committed
361
		if maxjobs is not None and n > maxjobs:
Chad Hanna's avatar
Chad Hanna committed
362
			break
363
		c = dict(zip(ifos, zip(*bank_group)))
364
		outstrs.append(c)
Chad Hanna's avatar
Chad Hanna committed
365

366
	return outstrs
367

368

369
def T050017_filename(instruments, description, seg, extension, path = None):
370 371 372
	"""!
	A function to generate a T050017 filename.
	"""
373
	if not isinstance(instruments, basestring):
374
		instruments = "".join(sorted(instruments))
375 376
	start, end = seg
	start = int(math.floor(start))
377 378 379 380 381
	try:
		duration = int(math.ceil(end)) - start
	# FIXME this is not a good way of handling this...
	except OverflowError:
		duration = 2000000000
382 383 384 385 386
	extension = extension.strip('.')
	if path is not None:
		return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
	else:
		return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
387 388 389 390 391


if __name__ == "__main__":
	import doctest
	doctest.testmod()
392 393


394
def condor_command_dict_from_opts(opts, defaultdict = None):
395 396 397 398 399 400 401 402 403 404 405
	"""!
	A function to turn a list of options into a dictionary of condor commands, e.g.,

	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"])
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"})
	{'somecommand': 'somevalue', 'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"})
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	"""

406 407
	if defaultdict is None:
		defaultdict = {}
408 409 410 411 412 413
	for o in opts:
		osplit = o.split("=")
		k = osplit[0]
		v = "=".join(osplit[1:])
		defaultdict.update([(k, v)])
	return defaultdict
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436


def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None):
	"""!
	A function to return the name of a file created from multiple files following
	the T050017 convention. In addition to the T050017 requirements, this assumes
	that numbers relevant to organization schemes will be the first entry in the
	description, e.g. 0_DIST_STATS, and that all files in a given cache file are
	from the same group of ifos and either contain data from the same segment or
	from the same background bin.  Note, that each file doesn't have to be from
	the same IFO, for example the template bank cache could contain template bank
	files from H1 and template bank files from L1.
	"""
	# Check that every file has same observatory. 
	observatories = [cache_entries[0].observatory]
	for entry in cache_entries[1:]:
		if entry.observatory == observatories[0]:
			break
		observatories.append(entry.observatory)

	split_description = cache_entries[0].description.split('_')
	min_bin = [x for x in split_description[:2] if x.isdigit()]
	max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()]
437
	seg = segments.segmentlist(cache_entry.segment for cache_entry in cache_entries).extent()
438 439 440 441 442 443 444 445 446 447 448 449 450
	if min_bin:
		min_bin = min_bin[0]
	if max_bin:
		max_bin = max_bin[-1]
	if min_bin and (min_bin == max_bin or not max_bin):
		# All files from same bin, thus segments may be different.
		# Note that this assumes that if the last file in the cache
		# does not start with a number that every file in the cache is
		# from the same bin, an example of this is the cache file
		# generated for gstlal_inspiral_calc_likelihood, which contains
		# all of the DIST_STATS files from a given background bin and
		# then CREATE_PRIOR_DIST_STATS files which are not generated
		# for specific bins
451
		return T050017_filename(''.join(observatories), cache_entries[0].description, seg, extension, path = path)
452 453 454 455 456 457
	elif min_bin and max_bin and min_bin != max_bin:
		if split_description[1].isdigit():
			description_base = split_description[2:]
		else:
			description_base = split_description[1:]
		# Files from different bins, thus segments must be same
458
		return T050017_filename(''.join(observatories), '_'.join([min_bin, max_bin] + description_base), seg, extension, path = path)
459 460 461
	else:
		print >>sys.stderr, "ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path)
		raise ValueError
462

463 464
def get_svd_bank_params_online(svd_bank_cache):
	template_mchirp_dict = {}
465
	for ce in [CacheEntry(f) for f in open(svd_bank_cache)]:
466 467
		if not template_mchirp_dict.setdefault("%04d" % int(ce.description.split("_")[3]), []):
			min_mchirp, max_mchirp = float("inf"), 0
468
			xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
469 470 471 472 473 474 475 476
			for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
				mchirp_column = snglinspiraltable.getColumnByName("mchirp")
				min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
			template_mchirp_dict["%04d" % int(ce.description.split("_")[3])] = (min_mchirp, max_mchirp)
			xmldoc.unlink()
	return template_mchirp_dict

477 478 479 480 481
def get_svd_bank_params(svd_bank_cache, online = False):
	if not online:
		bgbin_file_map = {}
		max_time = 0
	template_mchirp_dict = {}
482
	for ce in sorted([CacheEntry(f) for f in open(svd_bank_cache)], cmp = lambda x,y: cmp(int(x.description.split("_")[0]), int(y.description.split("_")[0]))):
483 484 485 486
		if not online:
			bgbin_file_map.setdefault(ce.observatory, []).append(ce.path)
		if not template_mchirp_dict.setdefault(ce.description.split("_")[0], []):
			min_mchirp, max_mchirp = float("inf"), 0
487
			xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
488
			for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
489
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
490 491 492 493 494 495 496 497 498 499
				mchirp_column = snglinspiraltable.getColumnByName("mchirp")
				min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
				if not online:
					max_time = max(max_time, max(snglinspiraltable.getColumnByName("template_duration")))
			template_mchirp_dict[ce.description.split("_")[0]] = (min_mchirp, max_mchirp)
			xmldoc.unlink()
	if not online:
		return template_mchirp_dict, bgbin_file_map, max_time
	else:
		return template_mchirp_dict