inspiral_pipe.py 17.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (C) 2013--2014  Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

## 
# @file
#
20
# A file that contains the inspiral_pipe module code; used to construct condor dags
21 22 23
#

##
24
# @package inspiral_pipe
25
#
26
# A module that contains the inspiral_pipe module code; used to construct condor dags
Chad Hanna's avatar
Chad Hanna committed
27 28 29
#
# ### Review Status
#
30 31 32
# | Names                                          | Hash                                        | Date       | Diff to Head of Master      |
# | -------------------------------------------    | ------------------------------------------- | ---------- | --------------------------- |
# | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e    | 2014-06-18 | <a href="@gstlal_inspiral_cgit_diff/python/inspiral_pipe.py?id=HEAD&id2=8a6ea41398be79c00bdc27456ddeb1b590b0f68e">inspiral_pipe.py</a> |
33
#
Chad Hanna's avatar
Chad Hanna committed
34 35 36 37
# #### Actions
#
# - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
# - On line 201, fix the comment or explain what the comment is meant to be
38

39
import sys, os
40
import subprocess, socket, tempfile, copy, doctest
Chad Hanna's avatar
Chad Hanna committed
41
from glue import pipeline, lal
42 43
from glue.ligolw import lsctables, ligolw
from glue.ligolw import utils as ligolw_utils
44
from gstlal import svd_bank
45
from lal.utils import CacheEntry
Chad Hanna's avatar
Chad Hanna committed
46

47 48

#
Chad Hanna's avatar
Chad Hanna committed
49
# environment utilities
50 51
#

Chad Hanna's avatar
Chad Hanna committed
52 53

def which(prog):
54
	"""!
55 56 57 58 59
	Like the which program to find the path to an executable

	>>> which("ls")
	'/bin/ls'

60
	"""
Chad Hanna's avatar
Chad Hanna committed
61 62
	which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
	out = which.stdout.read().strip()
63
	if not out:
Chad Hanna's avatar
Chad Hanna committed
64
		print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog)
65
		raise ValueError
Chad Hanna's avatar
Chad Hanna committed
66 67
	return out

68

69
def condor_scratch_space():
70 71
	"""!
	A way to standardize the condor scratch space even if it changes
72 73
	>>> condor_scratch_space()
	'_CONDOR_SCRATCH_DIR'
74
	"""
75
	return "_CONDOR_SCRATCH_DIR"
76

77

Chad Hanna's avatar
Chad Hanna committed
78
def log_path():
79 80 81 82
	"""!
	The stupid pet tricks to find log space on the LDG.
	Defaults to checking TMPDIR first.
	"""
Chad Hanna's avatar
Chad Hanna committed
83
	host = socket.getfqdn()
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
	try:
		return os.environ['TMPDIR']
	except KeyError:
		print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
		#FIXME add more hosts as you need them
		if 'cit' in host or 'caltech.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phys.uwm.edu' in host:
			tmp = '/localscratch/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'aei.uni-hannover.de' in host:
			tmp = '/local/user/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp
		if 'phy.syr.edu' in host:
			tmp = '/usr1/' + os.environ['USER']
			print "falling back to ", tmp
			return tmp

		raise KeyError("$TMPDIR is not set and I don't recognize this environment")
Chad Hanna's avatar
Chad Hanna committed
107 108


109 110 111 112 113 114 115 116 117 118 119 120 121 122
def webserver_url():
	"""!
	The stupid pet tricks to find webserver on the LDG.
	"""
	host = socket.getfqdn()
	#FIXME add more hosts as you need them
	if "cit" in host or "ligo.caltech.edu" in host:
		return "https://ldas-jobs.ligo.caltech.edu"
	if "phys.uwm.edu" in host or "cgca.uwm.edu" in host:
		return "https://ldas-jobs.cgca.uwm.edu"

	raise NotImplementedError("I don't know where the webserver is for this environment")


123
#
Chad Hanna's avatar
Chad Hanna committed
124
# DAG class
125 126
#

Chad Hanna's avatar
Chad Hanna committed
127

Chad Hanna's avatar
typo  
Chad Hanna committed
128
class DAG(pipeline.CondorDAG):
129 130
	"""!
	A thin subclass of pipeline.CondorDAG.
Chad Hanna's avatar
Chad Hanna committed
131

132
	Extra features include an add_node() method and a cache writing method.
133 134
	Also includes some standard setup, e.g., log file paths etc.
	"""
Chad Hanna's avatar
Chad Hanna committed
135
	def __init__(self, name, logpath = log_path()):
136
		self.basename = name.replace(".dag","")
Chad Hanna's avatar
Chad Hanna committed
137 138 139 140 141 142 143 144 145 146 147
		tempfile.tempdir = logpath
		tempfile.template = self.basename + '.dag.log.'
		logfile = tempfile.mktemp()
		fh = open( logfile, "w" )
		fh.close()
		pipeline.CondorDAG.__init__(self,logfile)
		self.set_dag_file(self.basename)
		self.jobsDict = {}
		self.output_cache = []

	def add_node(self, node):
Chad Hanna's avatar
Chad Hanna committed
148
		node.set_retry(3)
Chad Hanna's avatar
Chad Hanna committed
149 150 151 152 153 154 155 156 157 158 159
		node.add_macro("macronodename", node.get_name())
		pipeline.CondorDAG.add_node(self, node)

	def write_cache(self):
		out = self.basename + ".cache"
		f = open(out,"w")
		for c in self.output_cache:
			f.write(str(c)+"\n")
		f.close()


160
class InspiralJob(pipeline.CondorDAGJob):
161 162 163
	"""!
	A job class that subclasses pipeline.CondorDAGJob and adds some extra
	boiler plate items for gstlal inspiral jobs
164
	"""
165
	def __init__(self, executable, tag_base, universe = "vanilla"):
166 167
		self.__prog__ = tag_base
		self.__executable = executable
168
		self.__universe = universe
169 170
		pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
		self.add_condor_cmd('getenv','True')
171
		self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;")
172 173
		self.tag_base = tag_base
		self.set_sub_file(tag_base+'.sub')
174 175
		self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
		self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
176
		self.number = 1
177 178 179 180 181 182
		# make an output directory for files
		self.output_path = tag_base
		try:
			os.mkdir(self.output_path)
		except:
			pass
183

184

185
class InspiralNode(pipeline.CondorDAGNode):
186 187 188 189
	"""!
	A node class that subclasses pipeline.CondorDAGNode that automates
	adding the node to the dag, makes sensible names and allows a list of parent
	nodes to be provided.
190 191 192 193 194
	"""
	def __init__(self, job, dag, p_node=[]):
		pipeline.CondorDAGNode.__init__(self, job)
		for p in p_node:
			self.add_parent(p)
195 196
		self.set_name("%s_%04X" % (job.tag_base, job.number))
		job.number += 1
197 198 199
		dag.add_node(self)


200 201 202 203 204 205 206 207
class generic_job(InspiralJob):
	"""!
	A generic job class which tends to do the "right" thing when given just
	an executable name but otherwise is a subclass of InspiralJob and thus
	pipeline.CondorDAGJob
	"""
	def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs):
		executable = which(program)
208
		InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1], **kwargs)
209 210
		for cmd,val in condor_commands.items():
			self.add_condor_cmd(cmd, val)
211

212 213 214 215 216 217 218 219

class generic_node(InspiralNode):
	"""!
	A generic node class which tends to do the "right" thing when given a
	job, a dag, parent nodes, a dictionary options relevant to the job, a
	dictionary of options related to input files and a dictionary of options
	related to output files.  Otherwise it is a subclass of InspiralNode and thus
	pipeline.CondorDAGNode
220 221 222 223

	NOTE and important and subtle behavior - You can specify an option with
	an empty argument by setting it to "".  However options set to None are simply
	ignored.
224
	"""
225
	def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None):
226 227
		InspiralNode.__init__(self, job, dag, parent_nodes)

228 229 230 231 232 233 234
		self.input_files = input_files.copy()
		self.input_files.update(input_cache_files)
		self.output_files = output_files.copy()
		self.output_files.update(output_cache_files)

		self.cache_inputs = {}
		self.cache_outputs = {}
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250

		for opt, val in opts.items() + output_files.items() + input_files.items():
			if val is None:
				continue # not the same as val = '' which is allowed
			if not hasattr(val, "__iter__"): # catches list like things but not strings
				if opt == "":
					self.add_var_arg(val)
				else:
					self.add_var_opt(opt, val)
			# Must be an iterable
			else:
				if opt == "":
					[self.add_var_arg(a) for a in val]
				else:
					self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))

251 252 253
		# Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string
		# is handled by gstlal_inspiral_pipe directly

254 255
		cache_dir = os.path.join(job.tag_base, 'cache')

256
		for opt, val in input_cache_files.items():
257 258
			if not os.path.isdir(cache_dir):
				os.mkdir(cache_dir)
259
			cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
260
			if input_cache_file_name is None:
261
				cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
262
			else:
263
				cache_file_name = os.path.join(cache_dir, input_cache_file_name)
264 265 266 267 268 269 270
			with open(cache_file_name, "w") as cache_file:
				lal.Cache(cache_entries).tofile(cache_file)
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_inputs.setdefault(opt, []).append(cache_file_name)

		for opt, val in output_cache_files.items():
271 272
			if not os.path.isdir(cache_dir):
				os.mkdir(cache_dir)
273
			cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
274
			cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
275 276 277 278 279
			with open(cache_file_name, "w") as cache_file:
				lal.Cache(cache_entries).tofile(cache_file)
			self.add_var_opt(opt, cache_file_name)
			# Keep track of the cache files being created
			self.cache_outputs.setdefault(opt, []).append(cache_file_name)
280 281 282 283 284 285

def pipeline_dot_py_append_opts_hack(opt, vals):
	"""!
	A way to work around the dictionary nature of pipeline.py which can
	only record options once.

286
	>>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
287 288 289 290 291 292 293 294 295 296
	'1 --my-favorite-option 2 --my-favorite-option 3'
	"""	
	out = str(vals[0])
	for v in vals[1:]:
		out += " --%s %s" % (opt, str(v))
	return out



#
Chad Hanna's avatar
Chad Hanna committed
297
# Utility functions
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
#


def group(inlist, parts):
	"""!
	group a list roughly according to the distribution in parts, e.g.

	>>> A = range(12)
	>>> B = [2,3]
	>>> for g in group(A,B):
	...     print g
	... 
	[0, 1]
	[2, 3]
	[4, 5]
	[6, 7, 8]
	[9, 10, 11]
	"""
	mult_factor = len(inlist) // sum(parts) + 1
	l = copy.deepcopy(inlist)
	for i, p in enumerate(parts):
		for j in range(mult_factor):
			if not l:
				break
			yield l[:p]
			del l[:p]
Chad Hanna's avatar
Chad Hanna committed
324 325 326


def parse_cache_str(instr):
327 328 329 330
	"""!
	A way to decode a command line option that specifies different bank
	caches for different detectors, e.g.,

331
	>>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
332 333 334 335
	>>> bankcache
	{'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
	"""

Chad Hanna's avatar
Chad Hanna committed
336 337 338 339 340 341 342 343
	dictcache = {}
	if instr is None: return dictcache
	for c in instr.split(','):
		ifo = c.split("=")[0]
		cache = c.replace(ifo+"=","")
		dictcache[ifo] = cache
	return dictcache

344

345
def build_bank_groups(cachedict, numbanks = [2], maxjobs = None):
346 347 348 349 350 351
	"""!
	given a dictionary of bank cache files keyed by ifo from .e.g.,
	parse_cache_str(), group the banks into suitable size chunks for a single svd
	bank file according to numbanks.  Note, numbanks can be should be a list and uses
	the algorithm in the group() function
	"""
352
	outstrs = []
353
	ifos = sorted(cachedict.keys())
354
	files = zip(*[[CacheEntry(f).path for f in open(cachedict[ifo],'r').readlines()] for ifo in ifos])
355
	for n, bank_group in enumerate(group(files, numbanks)):
gstlalcbc's avatar
gstlalcbc committed
356
		if maxjobs is not None and n > maxjobs:
Chad Hanna's avatar
Chad Hanna committed
357
			break
358
		c = dict(zip(ifos, zip(*bank_group)))
359
		outstrs.append(c)
Chad Hanna's avatar
Chad Hanna committed
360

361
	return outstrs
362

363

364 365 366 367
def T050017_filename(instruments, description, start, end, extension, path = None):
	"""!
	A function to generate a T050017 filename.
	"""
368
	if not isinstance(instruments, basestring):
369 370 371 372 373 374 375
		instruments = "".join(sorted(instruments))
	duration = end - start
	extension = extension.strip('.')
	if path is not None:
		return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
	else:
		return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
376 377 378 379 380


if __name__ == "__main__":
	import doctest
	doctest.testmod()
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400


def condor_command_dict_from_opts(opts, defaultdict = {}):
	"""!
	A function to turn a list of options into a dictionary of condor commands, e.g.,

	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"])
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"})
	{'somecommand': 'somevalue', 'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"})
	{'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'}
	"""

	for o in opts:
		osplit = o.split("=")
		k = osplit[0]
		v = "=".join(osplit[1:])
		defaultdict.update([(k, v)])
	return defaultdict
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449


def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None):
	"""!
	A function to return the name of a file created from multiple files following
	the T050017 convention. In addition to the T050017 requirements, this assumes
	that numbers relevant to organization schemes will be the first entry in the
	description, e.g. 0_DIST_STATS, and that all files in a given cache file are
	from the same group of ifos and either contain data from the same segment or
	from the same background bin.  Note, that each file doesn't have to be from
	the same IFO, for example the template bank cache could contain template bank
	files from H1 and template bank files from L1.
	"""
	# Check that every file has same observatory. 
	observatories = [cache_entries[0].observatory]
	for entry in cache_entries[1:]:
		if entry.observatory == observatories[0]:
			break
		observatories.append(entry.observatory)

	split_description = cache_entries[0].description.split('_')
	min_bin = [x for x in split_description[:2] if x.isdigit()]
	max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()]
	min_seg = min([int(x.segment[0]) for x in cache_entries])
	max_seg = max([int(x.segment[1]) for x in cache_entries])
	if min_bin:
		min_bin = min_bin[0]
	if max_bin:
		max_bin = max_bin[-1]
	if min_bin and (min_bin == max_bin or not max_bin):
		# All files from same bin, thus segments may be different.
		# Note that this assumes that if the last file in the cache
		# does not start with a number that every file in the cache is
		# from the same bin, an example of this is the cache file
		# generated for gstlal_inspiral_calc_likelihood, which contains
		# all of the DIST_STATS files from a given background bin and
		# then CREATE_PRIOR_DIST_STATS files which are not generated
		# for specific bins
		return T050017_filename(''.join(observatories), cache_entries[0].description, min_seg, max_seg, extension, path = path)
	elif min_bin and max_bin and min_bin != max_bin:
		if split_description[1].isdigit():
			description_base = split_description[2:]
		else:
			description_base = split_description[1:]
		# Files from different bins, thus segments must be same
		return T050017_filename(''.join(observatories), '_'.join([min_bin, max_bin] + description_base), min_seg, max_seg, extension, path = path)
	else:
		print >>sys.stderr, "ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path)
		raise ValueError
450

451 452
def get_svd_bank_params_online(svd_bank_cache):
	template_mchirp_dict = {}
453
	for ce in [CacheEntry(f) for f in open(svd_bank_cache)]:
454 455
		if not template_mchirp_dict.setdefault("%04d" % int(ce.description.split("_")[3]), []):
			min_mchirp, max_mchirp = float("inf"), 0
456
			xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
457 458 459 460 461 462 463 464
			for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
				mchirp_column = snglinspiraltable.getColumnByName("mchirp")
				min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
			template_mchirp_dict["%04d" % int(ce.description.split("_")[3])] = (min_mchirp, max_mchirp)
			xmldoc.unlink()
	return template_mchirp_dict

465 466 467 468 469
def get_svd_bank_params(svd_bank_cache, online = False):
	if not online:
		bgbin_file_map = {}
		max_time = 0
	template_mchirp_dict = {}
470
	for ce in sorted([CacheEntry(f) for f in open(svd_bank_cache)], cmp = lambda x,y: cmp(int(x.description.split("_")[0]), int(y.description.split("_")[0]))):
471 472 473 474
		if not online:
			bgbin_file_map.setdefault(ce.observatory, []).append(ce.path)
		if not template_mchirp_dict.setdefault(ce.description.split("_")[0], []):
			min_mchirp, max_mchirp = float("inf"), 0
475
			xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
476
			for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
477
				snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
478 479 480 481 482 483 484 485 486 487
				mchirp_column = snglinspiraltable.getColumnByName("mchirp")
				min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
				if not online:
					max_time = max(max_time, max(snglinspiraltable.getColumnByName("template_duration")))
			template_mchirp_dict[ce.description.split("_")[0]] = (min_mchirp, max_mchirp)
			xmldoc.unlink()
	if not online:
		return template_mchirp_dict, bgbin_file_map, max_time
	else:
		return template_mchirp_dict