Skip to content
Snippets Groups Projects
Commit f837de48 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg: added local frame caching option to save on input I/O

parent 236d079f
No related branches found
No related tags found
No related merge requests found
......@@ -27,12 +27,15 @@
from optparse import OptionParser
from collections import deque
import copy
import json
import math
import os
import sys
import socket
import resource
import StringIO
import tempfile
import threading
import shutil
import traceback
......@@ -50,6 +53,7 @@ from lal import LIGOTimeGPS
from glue import iterutils
from glue import segments
from glue.lal import CacheEntry
from glue.ligolw import ligolw
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import process as ligolw_process
......@@ -474,6 +478,7 @@ def parse_command_line():
#
multichannel_datasource.append_options(parser)
parser.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
......@@ -543,14 +548,60 @@ logger = idq_utils.get_logger('gstlal-etg_%d-%d' % (options.gps_start_time, dura
logger.info("writing log to %s" % logdir)
#
# process channel subsets in serial
# set up local frame caching, if specified
#
for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
if options.local_frame_caching:
# get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
# create local frame directory
local_path = os.path.join(tmp_dir, 'local_frames/')
aggregator.makedir(local_path)
# read in frame cache
logger.info("reading in frame cache...")
cache_list = []
with open(data_source_info.frame_cache, 'r') as frame_cache:
cache_list=[CacheEntry(cache_file) for cache_file in frame_cache.readlines()]
# filter out cache entries based on analysis times
# FIXME: this doesn't cover frames where analysis segments partially overlap
cache_list = [c for c in cache_list if c.segment in data_source_info.seg]
# point local cache entries to local path
data_source_info.local_cache_list = [copy.copy(cacheentry) for cacheentry in cache_list]
for cacheentry in data_source_info.local_cache_list:
_, filename = os.path.split(cacheentry.path)
cacheentry.path = os.path.join(local_path, filename)
# copy frame files over locally
logger.info("caching frame data locally to %s" % local_path)
for cacheentry, localcacheentry in zip(cache_list, data_source_info.local_cache_list):
logger.info("caching %s..." % os.path.basename(localcacheentry.path))
shutil.copy(cacheentry.path, localcacheentry.path)
# save local frame cache
f, fname = tempfile.mkstemp(".cache")
f = open(fname, "w")
for cacheentry in data_source_info.local_cache_list:
# guarantee a lal cache compliant file with only integer starts and durations
cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
print >>f, str(cacheentry)
f.close()
data_source_info.frame_cache = fname
# format subset_id for aesthetics
#subset_id = str(subset_id).zfill(4)
#
# process channel subsets in serial
#
for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
#
......@@ -734,6 +785,22 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
del handler.pipeline
del handler
#
# Cleanup local frame file cache and related frames
#
if options.local_frame_caching:
logger.info("deleting temporary cache file and frames...")
# remove frame cache
os.remove(data_source_info.frame_cache)
# remove local frames
for cacheentry in data_source_info.local_cache_list:
os.remove(cacheentry.path)
del data_source_info.local_cache_list
#
# close program manually if data source is live
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment