gstlal_feature_extractor 20.6 KB
Newer Older
1 2
#!/usr/bin/env python

3
# Copyright (C) 2017-2018  Sydney J. Chamberlin, Patrick Godwin, Chad Hanna, Duncan Meacher
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

19 20 21
"""
A program to extract features from auxiliary channel data in real time or in offline mode
"""
22

23
# =============================
24
# 
25
#           preamble
26
#
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
# =============================

### .. graphviz::
###
###    digraph llpipe {
###    	labeljust = "r";
###    	label="gstlal_feature_extractor"
###    	rankdir=LR;
###    	graph [fontname="Roman", fontsize=24];
###    	edge [ fontname="Roman", fontsize=10 ];
###    	node [fontname="Roman", shape=box, fontsize=11];
###
###
###    	subgraph clusterNodeN {
###
###    		style=rounded;
###    		label="gstreamer pipeline";
###    		labeljust = "r";
###    		fontsize = 14;
###
###    		H1L1src [label="H1(L1) data source:\n mkbasicmultisrc()", color=red4];
###
###    		Aux1 [label="Auxiliary channel 1", color=red4];
###    		Aux2 [label="Auxiliary channel 2", color=green4];
###    		AuxN [label="Auxiliary channel N", color=magenta4];
###
###    		Multirate1 [label="Auxiliary channel 1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4];
###    		Multirate2 [label="Auxiliary channel 2 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4];
###    		MultirateN [label="Auxiliary channel N whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4];
###
###    		FilterBankAux1Rate1 [label="Auxiliary Channel 1:\nGlitch Filter Bank", color=red4];
###    		FilterBankAux1Rate2 [label="Auxiliary Channel 1:\nGlitch Filter Bank", color=red4];
###    		FilterBankAux1RateN [label="Auxiliary Channel 1:\nGlitch Filter Bank", color=red4];
###    		FilterBankAux2Rate1 [label="Auxiliary Channel 2:\nGlitch Filter Bank", color=green4];
###    		FilterBankAux2Rate2 [label="Auxiliary Channel 2:\nGlitch Filter Bank", color=green4];
###    		FilterBankAux2RateN [label="Auxiliary Channel 2:\nGlitch Filter Bank", color=green4];
###    		FilterBankAuxNRate1 [label="Auxiliary Channel N:\nGlitch Filter Bank", color=magenta4];
###    		FilterBankAuxNRate2 [label="Auxiliary Channel N:\nGlitch Filter Bank", color=magenta4];
###    		FilterBankAuxNRateN [label="Auxiliary Channel N:\nGlitch Filter Bank", color=magenta4];
###
###    		TriggerAux1Rate1 [label="Auxiliary Channel 1:\nTrigger Max (1 sec)", color=red4];
###    		TriggerAux1Rate2 [label="Auxiliary Channel 1:\nTrigger Max (1 sec)", color=red4];
###    		TriggerAux1RateN [label="Auxiliary Channel 1:\nTrigger Max (1 sec)", color=red4];
###    		TriggerAux2Rate1 [label="Auxiliary Channel 2:\nTrigger Max (1 sec)", color=green4];
###    		TriggerAux2Rate2 [label="Auxiliary Channel 2:\nTrigger Max (1 sec)", color=green4];
###    		TriggerAux2RateN [label="Auxiliary Channel 2:\nTrigger Max (1 sec)", color=green4];
###    		TriggerAuxNRate1 [label="Auxiliary Channel N:\nTrigger Max (1 sec)", color=magenta4];
###    		TriggerAuxNRate2 [label="Auxiliary Channel N:\nTrigger Max (1 sec)", color=magenta4];
###    		TriggerAuxNRateN [label="Auxiliary Channel N:\nTrigger Max (1 sec)", color=magenta4];
###
###    		H1L1src -> Aux1;
###    		H1L1src -> Aux2;
###    		H1L1src -> AuxN;
###
###    		Aux1 -> Multirate1;
###    		Aux2 -> Multirate2;
###    		AuxN -> MultirateN;
###
###    		Multirate1 -> FilterBankAux1Rate1 [label="Aux 1 4096Hz"];
###    		Multirate2 -> FilterBankAux2Rate1 [label="Aux 2 4096Hz"];
###    		MultirateN -> FilterBankAuxNRate1 [label="Aux N 4096Hz"];
###    		Multirate1 -> FilterBankAux1Rate2 [label="Aux 1 2048Hz"];
###    		Multirate2 -> FilterBankAux2Rate2 [label="Aux 2 2048Hz"];
###    		MultirateN -> FilterBankAuxNRate2 [label="Aux N 2048Hz"];
###    		Multirate1 -> FilterBankAux1RateN [label="Aux 1 Nth-pow-of-2 Hz"];
###    		Multirate2 -> FilterBankAux2RateN [label="Aux 2 Nth-pow-of-2 Hz"];
###    		MultirateN -> FilterBankAuxNRateN [label="Aux N Nth-pow-of-2 Hz"];
###
###    		FilterBankAux1Rate1 -> TriggerAux1Rate1;
###    		FilterBankAux1Rate2 -> TriggerAux1Rate2;
###    		FilterBankAux1RateN -> TriggerAux1RateN;
###    		FilterBankAux2Rate1 -> TriggerAux2Rate1;
###    		FilterBankAux2Rate2 -> TriggerAux2Rate2;
###    		FilterBankAux2RateN -> TriggerAux2RateN;
###    		FilterBankAuxNRate1 -> TriggerAuxNRate1;
###    		FilterBankAuxNRate2 -> TriggerAuxNRate2;
###    		FilterBankAuxNRateN -> TriggerAuxNRateN;
###    	}
###
###
###    	Synchronize [label="Synchronize buffers by timestamp"];
###    	Extract [label="Extract features from buffer"];
###    	Save [label="Save triggers to disk"];
###    	Kafka [label="Push features to queue"];
###
###    	TriggerAux1Rate1 -> Synchronize;
###    	TriggerAux1Rate2 -> Synchronize;
###    	TriggerAux1RateN -> Synchronize;
###    	TriggerAux2Rate1 -> Synchronize;
###    	TriggerAux2Rate2 -> Synchronize;
###    	TriggerAux2RateN -> Synchronize;
###    	TriggerAuxNRate1 -> Synchronize;
###    	TriggerAuxNRate2 -> Synchronize;
###    	TriggerAuxNRateN -> Synchronize;
###
###    	Synchronize -> Extract;
###
###    	Extract -> Save [label="Option 1"];
###    	Extract -> Kafka [label="Option 2"];
###
###    }
###
129

130
import math
131
import optparse
132
import os
133
import resource
134 135
import socket
import sys
136
import tempfile
137

138
import h5py
139 140
import numpy

141 142 143 144 145
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
146 147

from lal import LIGOTimeGPS
148

149
from ligo import segments
150
from ligo.segments import utils as segmentsUtils
151

152 153
from gstlal import aggregator
from gstlal import bottle
154
from gstlal import datasource
155
from gstlal import httpinterface
156 157
from gstlal import pipeparts
from gstlal import simplehandler
158 159

from gstlal.fxtools import auxcache
160
from gstlal.fxtools import feature_extractor
161 162 163 164
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import sngltriggertable
from gstlal.fxtools import utils
165
from gstlal.fxtools import waveforms as fxwaveforms
166

167 168 169 170 171 172 173 174 175 176 177 178 179 180
#
# Make sure we have sufficient resources
# We allocate far more memory than we need, so this is okay
#

def setrlimit(res, lim):
	hard_lim = resource.getrlimit(res)[1]
	resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))

# set the number of processes and total set size up to hard limit and
# shrink the per-thread stack size (default is 10 MiB)
setrlimit(resource.RLIMIT_NPROC, None)
setrlimit(resource.RLIMIT_AS, None)
setrlimit(resource.RLIMIT_RSS, None)
181

182 183 184 185 186
# FIXME:  tests at CIT show that this next tweak has no effect.  it's
# possible that SL7 has lowered the default stack size from SL6 and we
# don't need to do this anymore.  remove?
setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread

Patrick Godwin's avatar
Patrick Godwin committed
187

188
# =============================
189
# 
190
#     command line parser
191
#
192
# =============================
193 194 195

def parse_command_line():

196
	parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
197

198
	# First append datasource and feature extraction common options
199
	multichannel_datasource.append_options(parser)
200
	feature_extractor.append_options(parser)
201

202
	# parse the arguments
203 204
	options, filenames = parser.parse_args()

205 206
	# Sanity check the options

207
	# set gps ranges for live and offline sources
208 209 210 211 212 213 214 215
	if options.data_source in ("framexmit", "lvshm", "white_live"):

		if options.data_source in ("framexmit", "lvshm"):
			options.gps_start_time = int(aggregator.now())
		else:
			options.gps_start_time = 0 # NOTE: set start time for 'fake' live sources to zero, since seeking doesn't work with 'is_live' option

		options.gps_end_time = 2000000000 # NOTE: set the gps end time to be "infinite"
216

217 218 219 220 221
	if options.feature_start_time is None:
		options.feature_start_time = int(options.gps_start_time)
	if options.feature_end_time is None:
		options.feature_end_time = int(options.gps_end_time)

222 223 224
	# check if input sample rate is sensible
	assert options.sample_rate == 1 or options.sample_rate % 2 == 0

225 226 227
	# check if persist and save cadence times are sensible
	assert options.persist_cadence >= options.cadence
	assert (options.persist_cadence % options.cadence) == 0
228

229 230 231 232
	# check if there are any segments to dump to disk
	if options.nxydump_segment:
		options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)

233 234
	return options, filenames

235

236
# =============================
237
# 
238
#             main
239
#
240
# =============================
241

242
#  
243
# parsing and setting up some core structures
244 245
#

246 247 248
options, filenames = parse_command_line()

data_source_info = multichannel_datasource.DataSourceInfo(options)
249
instrument = data_source_info.instrument
250
basename = '%s-%s' % (instrument[:1], options.description)
251
waveforms = {}
252

253 254 255 256
#
# set up logging
#

257
duration = options.feature_end_time - options.feature_start_time
258 259
logdir = os.path.join(options.out_path, 'logs', options.job_id)
aggregator.makedir(logdir)
260

261
logger = utils.get_logger('gstlal-feature-extractor_%d-%d' % (options.feature_start_time, duration), rootdir=logdir, verbose=options.verbose)
262
logger.info("writing log to %s" % logdir)
263

264
#
265
# set up local frame caching, if specified
266
#
267

268 269 270 271 272 273 274 275 276 277 278 279 280
if options.local_frame_caching:

	# get base temp directory
	if '_CONDOR_SCRATCH_DIR' in os.environ:
		tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
	else:
		tmp_dir = os.environ['TMPDIR']

	# create local frame directory
	local_path = os.path.join(tmp_dir, 'local_frames/')
	aggregator.makedir(local_path)

	# save local frame cache
281
	logger.info("caching frame data locally to %s" % local_path)
282 283 284
	f, fname = tempfile.mkstemp(".cache")
	f = open(fname, "w")

285
	data_source_info.local_cache_list = auxcache.cache_aux(data_source_info, logger, output_path = local_path, verbose = options.verbose)
286 287 288 289 290 291 292
	for cacheentry in data_source_info.local_cache_list:
		# guarantee a lal cache compliant file with only integer starts and durations
		cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
		print >>f, str(cacheentry)
	f.close()

	data_source_info.frame_cache = fname
293

294 295 296
#
# process channel subsets in serial
#
297

298
for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
299 300 301 302 303

	#
	# checkpointing for offline analysis for hdf5 output
	#

304
	if options.data_source not in data_source_info.live_sources and options.save_format == 'hdf5':
305 306
		try:
			# get path where triggers are located
307
			duration = options.feature_end_time - options.feature_start_time
308 309
			fname = utils.to_trigger_filename(basename, options.feature_start_time, duration, 'h5')
			fpath = utils.to_trigger_path(os.path.abspath(options.out_path), basename, options.feature_start_time, options.job_id, str(subset_id).zfill(4))
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
			trg_file = os.path.join(fpath, fname)

			# visit groups within a given hdf5 file
			with h5py.File(trg_file, 'r') as f:
				f.visit(lambda item: f[item])
			# file is OK and there is no need to process it,
			# skip ahead in the loop
			continue

		except IOError:
			# file does not exist or is corrupted, need to
			# reprocess
			logger.info("checkpoint: {0} of {1} files completed and continuing with channel subset {2}".format((subset_id - 1), len(data_source_info.channel_subsets), subset_id))
			pass

325
		logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
326

327 328 329 330 331
	#
	# if web services serving up bottle routes are enabled,
	# create a new, empty, Bottle application and make it the
	# current default, then start http server to serve it up
	#
332

333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
	if not options.disable_web_service:
		bottle.default_app.push()
		# uncomment the next line to show tracebacks when something fails
		# in the web server
		#bottle.app().catchall = False
		import base64, uuid	# FIXME:  don't import when the uniquification scheme is fixed
		httpservers = httpinterface.HTTPServers(
			# FIXME:  either switch to using avahi's native name
			# uniquification system or adopt a naturally unique naming
			# scheme (e.g., include a search identifier and job
			# number).
			service_name = "gstlal_idq (%s)" % base64.urlsafe_b64encode(uuid.uuid4().bytes),
			service_properties = {},
			verbose = options.verbose
		)

		# Set up a registry of the resources that this job provides
		@bottle.route("/")
		@bottle.route("/index.html")
		def index(channel_list = channel_subset):
			# get the host and port to report in the links from the
			# request we've received so that the URLs contain the IP
			# address by which client has contacted us
			netloc = bottle.request.urlparts[1]
			server_address = "http://%s" % netloc
			yield "<html><body>\n<h3>%s %s</h3>\n<p>\n" % (netloc, " ".join(sorted(channel_list)))
			for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
				# don't create links back to this page
				if route.rule in ("/", "/index.html"):
					continue
				# only create links for GET methods
				if route.method != "GET":
					continue
				yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
			yield "</p>\n</body></html>"
		# FIXME:  get service-discovery working, then don't do this
		open("registry.txt", "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
370

371 372 373
	#
	# building the event loop and pipeline
	#
374

375
	logger.info("assembling pipeline...")
376

377 378
	mainloop = GObject.MainLoop()
	pipeline = Gst.Pipeline(sys.argv[0])
379

380 381 382 383 384 385 386 387 388 389 390
	# generate multiple channel sources, and link up pipeline
	head = multichannel_datasource.mkbasicmultisrc(pipeline, data_source_info, channel_subset, verbose = options.verbose)
	src = {}

	for channel in channel_subset:
		# define sampling rates used
		samp_rate = int(data_source_info.channel_dict[channel]['fsamp'])
		max_rate = min(data_source_info.max_sample_rate, samp_rate)
		min_rate = min(data_source_info.min_sample_rate, max_rate)
		n_rates = int(numpy.log2(max_rate/min_rate) + 1)
		rates = [min_rate*2**i for i in range(n_rates)]
391

392 393 394 395 396
		# choose range of basis parameters
		# NOTE: scale down frequency range by downsample_factor to deal with rolloff from downsampler
		downsample_factor = 0.8
		qlow = 3.3166
		if data_source_info.extension == 'ini':
397 398 399
			flow = max(data_source_info.channel_dict[channel]['flow'], min_rate/4.)
			fhigh = min(data_source_info.channel_dict[channel]['fhigh'], max_rate/2.)
			qhigh = min(data_source_info.channel_dict[channel]['qhigh'], options.qhigh)
400
		else:
401 402
			flow = min_rate/4.
			fhigh = max_rate/2.
403 404 405
			qhigh = options.qhigh

		# generate templates
406 407 408
		if 'sine_gaussian' in options.waveform:
			parameter_range = {'frequency': (flow, fhigh), 'q': (qlow, qhigh)}
			if options.waveform == 'half_sine_gaussian':
409
				waveforms[channel] = fxwaveforms.HalfSineGaussianGenerator(parameter_range, rates, mismatch=options.mismatch, downsample_factor=downsample_factor)
410
			elif options.waveform == 'sine_gaussian':
411 412 413
				waveforms[channel] = fxwaveforms.SineGaussianGenerator(parameter_range, rates, mismatch=options.mismatch, downsample_factor=downsample_factor)
			elif options.waveform == 'tapered_sine_gaussian':
				waveforms[channel] = fxwaveforms.TaperedSineGaussianGenerator(parameter_range, rates, mismatch=options.mismatch, downsample_factor=downsample_factor, max_latency=options.max_latency)
414 415 416
		else:
			raise NotImplementedError

417
		if options.latency_output:
418
			head[channel] = pipeparts.mklatency(pipeline, head[channel], name=utils.latency_name('beforewhitening', 2, channel))
419 420

		# whiten auxiliary channel data
421
		for rate, thishead in multirate_datasource.mkwhitened_multirate_src(pipeline, head[channel], rates, samp_rate, instrument, channel_name = channel, width=32, nxydump_segment=options.nxydump_segment).items():
422
			if options.latency_output:
423
				thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterwhitening', 3, channel, rate))
424 425

			# determine whether to do time-domain or frequency-domain convolution
426
			time_domain = (waveforms[channel].sample_pts(rate)*rate) < (5*waveforms[channel].sample_pts(rate)*numpy.log2(rate))
427

428
			# create FIR bank of half sine-gaussian templates
429 430
			fir_matrix = numpy.array([waveform for waveform in waveforms[channel].generate_templates(rate)])
			thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * 30)
431
			thishead = pipeparts.mkfirbank(pipeline, thishead, fir_matrix = fir_matrix, time_domain = time_domain, block_stride = int(rate), latency = waveforms[channel].latency(rate))
432

433 434
			# add queues, change stream format, add tags
			if options.latency_output:
435
				thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('afterFIRbank', 4, channel, rate))
436 437 438 439
			thishead = pipeparts.mkqueue(pipeline, thishead, max_size_buffers = 1, max_size_bytes = 0, max_size_time = 0)
			thishead = pipeparts.mktogglecomplex(pipeline, thishead)
			thishead = pipeparts.mkcapsfilter(pipeline, thishead, caps = "audio/x-raw, format=Z64LE, rate=%i" % rate)
			thishead = pipeparts.mktaginject(pipeline, thishead, "instrument=%s,channel-name=%s" %( instrument, channel))
440

441 442 443 444 445
			# dump segments to disk if specified
			tee = pipeparts.mktee(pipeline, thishead)
			if options.nxydump_segment:
				pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee), "snrtimeseries_%s_%s.txt" % (channel, repr(rate)), segment = options.nxydump_segment)

446
			# extract features from time series
447 448 449 450
			if options.feature_mode == 'timeseries':
				thishead = pipeparts.mktrigger(pipeline, tee, int(rate // options.sample_rate), max_snr = True)
			elif options.feature_mode == 'etg':
				thishead = pipeparts.mktrigger(pipeline, tee, rate, snr_thresh = options.snr_threshold)
451

452
			if options.latency_output:
453
				thishead = pipeparts.mklatency(pipeline, thishead, name=utils.latency_name('aftertrigger', 5, channel, rate))
454

455 456
			# link to src for processing by appsync
			src[(channel, rate)] = thishead
457

458 459
	# define structures to synchronize output streams and extract triggers from buffer
	logger.info("setting up pipeline handler...")
460
	handler = feature_extractor.MultiChannelHandler(mainloop, pipeline, logger, data_source_info, options, keys = src.keys(), waveforms = waveforms, basename = basename, subset_id = subset_id)
461

462
	logger.info("attaching appsinks to pipeline...")
463
	appsync = feature_extractor.LinkedAppSync(appsink_new_buffer = handler.bufhandler)
464 465
	appsinks = set(appsync.add_sink(pipeline, src[(channel, rate)], name = "sink_%s_%s" % (rate, channel)) for channel, rate in src.keys())
	logger.info("attached %d appsinks to pipeline." % len(appsinks))
466

467 468
	# Allow Ctrl+C or sig term to gracefully shut down the program for online
	# sources, otherwise it will just kill it
469
	if data_source_info.data_source in data_source_info.live_sources:# what about nds online?
470 471 472 473 474
		simplehandler.OneTimeSignalHandler(pipeline)

	# Seek
	if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
		raise RuntimeError("pipeline failed to enter READY state")
475
	if data_source_info.data_source not in data_source_info.live_sources:# what about nds online?
476 477 478 479 480 481 482 483 484 485 486 487 488 489
		datasource.pipeline_seek_for_gps(pipeline, options.gps_start_time, options.gps_end_time)

	#
	# Run pipeline
	#

	if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
		raise RuntimeError("pipeline failed to enter PLAYING state")

	logger.info("running pipeline...")

	mainloop.run()

	# save remaining triggers
490
	logger.info("persisting features to disk...")
491
	handler.flush_and_save_features()
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

	#
	# Shut down pipeline
	#

	logger.info("shutting down pipeline...")

	#
	# Shutdown the web interface servers and garbage collect the Bottle
	# app.  This should release the references the Bottle app's routes
	# hold to the pipeline's data (like template banks and so on).
	#

	if not options.disable_web_service:
		del httpservers
		bottle.default_app.pop()

	#
	# Set pipeline state to NULL and garbage collect the handler
	#

	if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
		raise RuntimeError("pipeline could not be set to NULL")
515

516 517
	del handler.pipeline
	del handler
518

519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
#
# Cleanup local frame file cache and related frames
#

if options.local_frame_caching:
	logger.info("deleting temporary cache file and frames...")

	# remove frame cache
	os.remove(data_source_info.frame_cache)

	# remove local frames
	for cacheentry in data_source_info.local_cache_list:
		os.remove(cacheentry.path)

	del data_source_info.local_cache_list

535 536 537 538
#
# close program manually if data source is live
#

539
if options.data_source in data_source_info.live_sources:
540
	sys.exit(0)