gstlal_fake_frames_pipe 15.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python
#
# Copyright (C) 2011 Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Chad Hanna's avatar
Chad Hanna committed
19 20 21
## @file
# An HTCondor DAG generator to recolor frame data

22 23 24 25 26 27 28 29
"""
This program makes a dag to recolor frames
"""

__author__ = 'Chad Hanna <chad.hanna@ligo.org>'


##############################################################################
30
# import standard modules
31
import sys, os, copy, math
32
from optparse import OptionParser
33 34 35 36 37
import subprocess, socket, tempfile

##############################################################################
# import the modules we need to build the pipeline
from glue import pipeline
38
from ligo import segments
39
import glue.ligolw.utils as ligolw_utils
40 41
import glue.ligolw.utils.segments as ligolw_segments
from gstlal import datasource
42
from gstlal import dagparts
43 44
from lal import series as lalseries
from lal.utils import CacheEntry
45 46 47 48 49 50 51 52 53

#
# Classes for generating reference psds
#

class gstlal_reference_psd_job(pipeline.CondorDAGJob):
	"""
	A gstlal_reference_psd job
	"""
54
	def __init__(self, group, user, executable=dagparts.which('gstlal_reference_psd'), tag_base='gstlal_reference_psd'):
55 56 57 58 59 60 61 62 63 64 65
		"""
		"""
		self.__prog__ = 'gstlal_reference_psd'
		self.__executable = executable
		self.__universe = 'vanilla'
		pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
		self.add_condor_cmd('getenv','True')
		self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
		self.tag_base = tag_base
		self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
		self.set_sub_file(tag_base+'.sub')
66 67 68 69
		if group is not None:
			self.add_condor_cmd('accounting_group', group)
		if user is not None:
			self.add_condor_cmd('accounting_group_user', user)
70 71 72 73 74 75 76 77
		self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
		self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')


class gstlal_median_psd_job(pipeline.CondorDAGJob):
	"""
	A gstlal_median_psd job
	"""
78
	def __init__(self, group, user, executable=dagparts.which('gstlal_median_of_psds'), tag_base='gstlal_median_of_psds'):
79 80
		"""
		"""
81
		self.__prog__ = 'gstlal_median_of_psds'
82 83 84 85 86 87 88
		self.__executable = executable
		self.__universe = 'vanilla'
		pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
		self.add_condor_cmd('getenv','True')
		self.tag_base = tag_base
		self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
		self.set_sub_file(tag_base+'.sub')
89 90 91 92
		if group is not None:
			self.add_condor_cmd('accounting_group', group)
		if user is not None:
			self.add_condor_cmd('accounting_group_user', user)
93 94 95 96
		self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
		self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')


97
class gstlal_smooth_reference_psd_job(pipeline.CondorDAGJob):
98
	"""
99
	A gstlal_smooth_reference_psd job
100
	"""
101
	def __init__(self, group, user, executable=dagparts.which('gstlal_psd_polyfit'), tag_base='gstlal_psd_polyfit'):
102 103
		"""
		"""
104
		self.__prog__ = 'gstlal_psd_polyfit'
105 106 107 108 109 110 111
		self.__executable = executable
		self.__universe = 'vanilla'
		pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
		self.add_condor_cmd('getenv','True')
		self.tag_base = tag_base
		self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
		self.set_sub_file(tag_base+'.sub')
112 113 114 115
		if group is not None:
			self.add_condor_cmd('accounting_group', group)
		if user is not None:
			self.add_condor_cmd('accounting_group_user', user)
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
		self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
		self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')


class gstlal_reference_psd_node(pipeline.CondorDAGNode):
	"""
	A gstlal_reference_psd node
	"""
	def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, instrument, channel, injections=None, p_node=[]):

		pipeline.CondorDAGNode.__init__(self,job)
		self.add_var_opt("frame-cache", frame_cache)
		self.add_var_opt("gps-start-time", gps_start_time)
		self.add_var_opt("gps-end-time", gps_end_time)
		self.add_var_opt("data-source", "frames")
		self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
		if injections:
			self.add_var_opt("injections", injections)
		path = os.getcwd()
		output_name = self.output_name = '%s/%s-%d-%d-reference_psd.xml.gz' % (path, instrument, gps_start_time, gps_end_time)
		self.add_var_opt("write-psd",output_name)
137
		dag.output_cache.append(CacheEntry(instrument, "-", segments.segment(gps_start_time, gps_end_time), "file://localhost/%s" % (output_name,)))
138 139 140 141 142
		for p in p_node:
			self.add_parent(p)
		dag.add_node(self)


143
class gstlal_smooth_reference_psd_node(pipeline.CondorDAGNode):
144
	"""
145
	A gstlal_smooth_reference_psd node
146 147 148 149 150 151
	"""
	def __init__(self, job, dag, instrument, input_psd, p_node=[]):
		pipeline.CondorDAGNode.__init__(self,job)
		path = os.getcwd()
		#FIXME shouldn't be hardcoding stuff like this
		output_name = self.output_name = input_psd.replace('reference_psd', 'smoothed_reference_psd')
152 153 154
		self.add_var_arg(input_psd)
		self.add_var_opt("output", output_name)
		self.add_var_opt("low-fit-freq", 10)
155 156 157 158 159 160 161 162 163
		for p in p_node:
			self.add_parent(p)
		dag.add_node(self)


class gstlal_median_psd_node(pipeline.CondorDAGNode):
	"""
	A gstlal_median_psd node
	"""
164
	def __init__(self, job, dag, input_psds, output, p_node=[]):
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
		pipeline.CondorDAGNode.__init__(self,job)
		path = os.getcwd()
		#FIXME shouldn't be hardcoding stuff like this
		output_name = self.output_name = output
		self.add_var_opt("output-name", output_name)
		for psd in input_psds:
			self.add_file_arg(psd)
		for p in p_node:
			self.add_parent(p)
		dag.add_node(self)


#
# classes for generating recolored frames
#

class gstlal_fake_frames_job(pipeline.CondorDAGJob):
	"""
	A gstlal_fake_frames job
	"""
185
	def __init__(self, group, user, executable=dagparts.which('gstlal_fake_frames'), tag_base='gstlal_fake_frames'):
186 187 188 189 190 191 192 193 194 195 196
		"""
		"""
		self.__prog__ = 'gstlal_fake_frames'
		self.__executable = executable
		self.__universe = 'vanilla'
		pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
		self.add_condor_cmd('getenv','True')
		self.add_condor_cmd('requirements', 'Memory > 1999') #FIXME is this enough?
		self.tag_base = tag_base
		self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
		self.set_sub_file(tag_base+'.sub')
197 198 199 200
		if group is not None:
			self.add_condor_cmd('accounting_group', group)
		if user is not None:
			self.add_condor_cmd('accounting_group_user', user)
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
		self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
		self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')


class gstlal_fake_frames_node(pipeline.CondorDAGNode):
	"""
	A gstlal_fake_frames node
	"""
	def __init__(self, job, dag, frame_cache, gps_start_time, gps_end_time, channel, reference_psd, color_psd, sample_rate, injections=None, output_channel_name = None, duration = 4096, output_path = None, frame_type = None, shift = None, whiten_track_psd = False, frames_per_file = 1, p_node=[]):

		pipeline.CondorDAGNode.__init__(self,job)
		self.add_var_opt("frame-cache", frame_cache)
		self.add_var_opt("gps-start-time",gps_start_time)
		self.add_var_opt("gps-end-time",gps_end_time)
		self.add_var_opt("data-source", "frames")
		self.add_var_arg("--channel-name=%s=%s" % (instrument, channel))
		self.add_var_opt("whiten-reference-psd",reference_psd)
		self.add_var_opt("color-psd", color_psd)
		self.add_var_opt("sample-rate", sample_rate)
		if injections is not None:
			self.add_var_opt("injections", injections)
		self.add_var_opt("output-channel-name", output_channel_name)
		self.add_var_opt("frame-duration", duration)
		if output_path is not None:
			self.add_var_opt("output-path", output_path)
		self.add_var_opt("frame-type", frame_type)
		if whiten_track_psd:
			self.add_var_opt("whiten-track-psd",reference_psd)
		if shift:
			self.add_var_opt("shift", shift)
		self.add_var_opt("frames-per-file", frames_per_file)
		for p in p_node:
			self.add_parent(p)
		dag.add_node(self)


237
def choosesegs(seglists, min_segment_length):
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
	for instrument, seglist in seglists.iteritems():
		newseglist = segments.segmentlist()
		for seg in seglist:
			if abs(seg) > min_segment_length:
				newseglist.append(segments.segment(seg))
		seglists[instrument] = newseglist


def parse_command_line():
	parser = OptionParser(description = __doc__)
	
	parser.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional)")
	parser.add_option("--injections", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load injections (optional).")
	parser.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process.  Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
	parser.add_option("--frame-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load frame segments. Required")
	parser.add_option("--frame-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables. Required")
 
	parser.add_option("--min-segment-length", metavar = "SECONDS", help = "Set the minimum segment length to process (required)", type="float")
	parser.add_option("--shift", metavar = "NANOSECONDS", help = "Number of nanoseconds to delay (negative) or advance (positive) the time stream", type = "int")
	parser.add_option("--sample-rate", metavar = "HZ", default = 16384, type = "int", help = "Sample rate at which to generate the data, should be less than or equal to the sample rate of the measured psds provided, default = 16384 Hz")
Chad Hanna's avatar
Chad Hanna committed
258
	parser.add_option("--whiten-type", metavar="name", help = "Whiten whatever data is coming out of datasource either from the data or from a fixed reference psd if a file is given.  Options are psdperseg|medianofpsdperseg|FILE")
259 260
	parser.add_option("--whiten-track-psd", action = "store_true", help = "Calculate PSD from input data and track with time.")
	parser.add_option("--color-psd", metavar = "FILE", help = "Set the name of psd xml file to color the data with")
Chad Hanna's avatar
Chad Hanna committed
261 262 263
	parser.add_option("--output-path", metavar = "name", action = "append", help = "Set the instrument dependent output path for frames, defaults to current working directory. eg H1=/path/to/H1/frames. Can be given more than once.")
	parser.add_option("--output-channel-name", metavar = "name", action="append", help = "The name of the channel in the output frames, e.g., --output-channel-name=IFO=CHANNEL-NAME. The default is the same as the channel name. Can be given more than once. Required ")
	parser.add_option("--frame-type", metavar = "name", action = "append", help = "Set the instrument dependent frame type, H1=TYPE. Can be given more than once and is required for each instrument processed.")
264 265
	parser.add_option("--frame-duration", metavar = "SECONDS", default = 16, type = "int", help = "Set the duration of the output frames.  The duration of the frame file will be multiplied by --frames-per-file.  Default: 16s")
	parser.add_option("--frames-per-file", metavar = "INT", default = 256, type = "int", help = "Set the number of frames per file.  Default: 256")
266 267
	parser.add_option("--accounting-group", metavar = "str", help = "Set the accounting group name, e.g., ligo.dev.o3.cw.directedbinary.production")
	parser.add_option("--accounting-group-user", metavar = "str", help = "Set the accounting group user, e.g., chad.hanna")
268
	parser.add_option("--verbose", action = "store_true", help = "Be verbose")
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
	
	options, filenames = parser.parse_args()

	fail = ""
	for option in ("min_segment_length", "frame_type", "frame_segments_file", "frame_segments_name"):
		if getattr(options, option) is None:
			fail += "must provide option %s\n" % (option)
	if fail:
		raise ValueError(fail)

	inchannels = datasource.channel_dict_from_channel_list(options.channel_name)
	outchannels = datasource.channel_dict_from_channel_list(options.output_channel_name)
	frametypes = datasource.channel_dict_from_channel_list(options.frame_type)
	outpaths = datasource.channel_dict_from_channel_list(options.output_path)

	if not (set(frametypes) == set(inchannels) == set(outchannels)):
		raise ValueError('--frame-type, --channel-name and --output-channel-name must contain same instruments')

	return options, inchannels, outchannels, outpaths, frametypes, filenames


options, inchannels, outchannels, outpaths, frametypes, filenames = parse_command_line()

try:
	os.mkdir("logs")
except:
	pass

297
dag = dagparts.DAG("gstlal_fake_frames_pipe")
298

299
seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, verbose = options.verbose, contenthandler = ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce()
300
choosesegs(seglists, options.min_segment_length)
301

302 303 304 305
psdJob = gstlal_reference_psd_job(options.accounting_group, options.accounting_group_user)
smoothJob = gstlal_smooth_reference_psd_job(options.accounting_group, options.accounting_group_user)
medianJob = gstlal_median_psd_job(options.accounting_group, options.accounting_group_user)
colorJob = gstlal_fake_frames_job(options.accounting_group, options.accounting_group_user)
306 307 308 309 310 311 312 313 314 315 316 317 318 319

smoothnode = {}
mediannode = {}
p_node = dict([(i, []) for i in seglists])

if options.whiten_type in ("psdperseg", "medianofpsdperseg"):
	psd = {}
	for instrument, seglist in seglists.iteritems():
		mediannode[instrument] = {}
		smoothnode[instrument] = {}
		psd[instrument] = {}
		for seg in seglist:
			#FIXME if there are sements without frame caches this will barf
			psdnode = gstlal_reference_psd_node(psdJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), instrument, inchannels[instrument], injections=None, p_node=[])
320
			smoothnode[instrument][seg] = gstlal_smooth_reference_psd_node(smoothJob, dag, instrument, psdnode.output_name,  p_node=[psdnode])
321 322 323
			if options.whiten_type == "psdperseg":
				psd[instrument][seg] = smoothnode[instrument][seg].output_name

324
		mediannode[instrument] = gstlal_median_psd_node(medianJob, dag, [v.output_name for v in smoothnode[instrument].values()], "%s_median_psd.xml.gz" % instrument, p_node=smoothnode[instrument].values())
325 326 327 328 329
		p_node[instrument] = [mediannode[instrument]]
		if options.whiten_type == "medianofpsdperseg":
			psd[instrument] = mediannode[instrument].output_name

elif options.whiten_type is not None:
330
	psd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(options.whiten_reference_psd, verbose = options.verbose, contenthandler = lalseries.PSDContentHandler))
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
else:
	psd = dict([(i, None) for i in seglists])

for instrument, seglist in seglists.iteritems():
	try:
		output_path = outpaths[instrument]
	except KeyError:
		output_path = None
	for seg in seglist:
		try:
			reference_psd = psd[instrument][seg]
		except TypeError:
			reference_psd = psd[instrument]
		gstlal_fake_frames_node(colorJob, dag, options.frame_cache, int(seg[0]), int(seg[1]), inchannels[instrument], reference_psd, color_psd=options.color_psd, sample_rate = options.sample_rate, injections=options.injections, output_channel_name = outchannels[instrument], output_path = output_path, duration = options.frame_duration, frame_type = frametypes[instrument], shift = options.shift, whiten_track_psd = options.whiten_track_psd, frames_per_file = options.frames_per_file, p_node=p_node[instrument])
		
dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()