diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg index a8638236ca4c97772e10c3f29f857bce25dd9b7c..2698db0f7acbd99b45d01f785ac21de25a855059 100755 --- a/gstlal-ugly/bin/gstlal_etg +++ b/gstlal-ugly/bin/gstlal_etg @@ -27,12 +27,15 @@ from optparse import OptionParser from collections import deque +import copy import json +import math import os import sys import socket import resource import StringIO +import tempfile import threading import shutil import traceback @@ -50,6 +53,7 @@ from lal import LIGOTimeGPS from glue import iterutils from glue import segments +from glue.lal import CacheEntry from glue.ligolw import ligolw from glue.ligolw import utils as ligolw_utils from glue.ligolw.utils import process as ligolw_process @@ -474,6 +478,7 @@ def parse_command_line(): # multichannel_datasource.append_options(parser) + parser.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.") parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .") parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.") parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.") @@ -543,14 +548,60 @@ logger = idq_utils.get_logger('gstlal-etg_%d-%d' % (options.gps_start_time, dura logger.info("writing log to %s" % logdir) # -# process channel subsets in serial +# set up local frame caching, if specified # -for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): +if options.local_frame_caching: + + # get base temp directory + if '_CONDOR_SCRATCH_DIR' in os.environ: + tmp_dir = os.environ['_CONDOR_SCRATCH_DIR'] + else: + tmp_dir = os.environ['TMPDIR'] + + # create local frame directory + local_path = os.path.join(tmp_dir, 'local_frames/') + aggregator.makedir(local_path) + + # read in frame cache + logger.info("reading in frame cache...") + cache_list = [] + with open(data_source_info.frame_cache, 'r') as frame_cache: + cache_list=[CacheEntry(cache_file) for cache_file in frame_cache.readlines()] + + # filter out cache entries based on analysis times + # FIXME: this doesn't cover frames where analysis segments partially overlap + cache_list = [c for c in cache_list if c.segment in data_source_info.seg] + + # point local cache entries to local path + data_source_info.local_cache_list = [copy.copy(cacheentry) for cacheentry in cache_list] + for cacheentry in data_source_info.local_cache_list: + _, filename = os.path.split(cacheentry.path) + cacheentry.path = os.path.join(local_path, filename) + + # copy frame files over locally + logger.info("caching frame data locally to %s" % local_path) + for cacheentry, localcacheentry in zip(cache_list, data_source_info.local_cache_list): + logger.info("caching %s..." % os.path.basename(localcacheentry.path)) + shutil.copy(cacheentry.path, localcacheentry.path) + + # save local frame cache + f, fname = tempfile.mkstemp(".cache") + f = open(fname, "w") + + for cacheentry in data_source_info.local_cache_list: + # guarantee a lal cache compliant file with only integer starts and durations + cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) ) + print >>f, str(cacheentry) + f.close() + + data_source_info.frame_cache = fname - # format subset_id for aesthetics - #subset_id = str(subset_id).zfill(4) +# +# process channel subsets in serial +# +for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets))) # @@ -734,6 +785,22 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1): del handler.pipeline del handler +# +# Cleanup local frame file cache and related frames +# + +if options.local_frame_caching: + logger.info("deleting temporary cache file and frames...") + + # remove frame cache + os.remove(data_source_info.frame_cache) + + # remove local frames + for cacheentry in data_source_info.local_cache_list: + os.remove(cacheentry.path) + + del data_source_info.local_cache_list + # # close program manually if data source is live #