From 73e866839fbe41c67cb4c7958efea25aba0183c9 Mon Sep 17 00:00:00 2001 From: "chad.hanna" <crh184@psu.edu> Date: Wed, 12 Dec 2018 17:50:39 -0800 Subject: [PATCH] aggregator.py: change tmp file conventions --- gstlal-ugly/python/aggregator.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/gstlal-ugly/python/aggregator.py b/gstlal-ugly/python/aggregator.py index 8577217dd2..139ad7e74c 100644 --- a/gstlal-ugly/python/aggregator.py +++ b/gstlal-ugly/python/aggregator.py @@ -157,14 +157,11 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False): @param data. The data will be stored in an hdf5 file at path @param path with base name @param base. You can also make a temporary file. """ - if tmp: - fname = os.path.join(path, "%s.hdf5.tmp" % base) - else: - # A non tmp dataset should not be overwritten - fname = os.path.join(path, "%s.hdf5" % base) - if os.path.exists(fname): - return fname - f = h5py.File(fname, "w") + tmpfname = "/dev/shm/%s_%s" % (path.replace("/","_"), "%s.hdf5.tmp" % base) + fname = os.path.join(path, "%s.hdf5" % base) + if not tmp and os.path.exists(fname): + return tmpfname, fname + f = h5py.File(tmpfname if tmp else fname, "w") if timedata is None and data is None: f.create_dataset("time", (0,), dtype="f8") f.create_dataset("data", (0,), dtype="f8") @@ -177,7 +174,7 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False): f["data"][...] = data f.close() - return fname + return tmpfname, fname def get_dataset(path, base): @@ -191,7 +188,7 @@ def get_dataset(path, base): f.close() return fname, x,y except IOError: - fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False) + tmpfname, fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False) return fname, numpy.array([]), numpy.array([]) @@ -232,7 +229,7 @@ def setup_dir_by_job_and_level(gpstime, typ, job, route, base_dir, verbose = Tru str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)] directory = "%s/%s/by_job/%s/%s" % (base_dir, "/".join(str_time), job, typ) makedir(directory) - fname = create_new_dataset(directory, route) + tmpfname, fname = create_new_dataset(directory, route) def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True, level = 0): @@ -244,7 +241,7 @@ def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True, str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)] directory = "%s/%s/%s" % (base_dir, "/".join(str_time), typ) makedir(directory) - fname = create_new_dataset(directory, route) + tmpfname, fname = create_new_dataset(directory, route) def gps_range(jobtime): @@ -274,7 +271,7 @@ def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ, return [] reduced_time, reduced_data = reduce_data(this_time, this_data, func, level = 0) #logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, route, start, end, typ, len(reduced_time))) - tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) + tmpfname, _ = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # copy the tmp file over the original shutil.move(tmpfname, fname) return [start, end] @@ -310,9 +307,9 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ path = "/".join([this_level_dir, "by_job", job, typ]) makedir(path) #logging.info("processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time))) - tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) + tmpfname, fname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # FIXME don't assume we can get the non temp file name this way - shutil.move(tmpfname, tmpfname.replace(".tmp","")) + shutil.move(tmpfname, fname) def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)): @@ -332,9 +329,9 @@ def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, en reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level) #logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (route, start, end, typ, level, len(reduced_time), len(agg_time))) path = "/".join([this_level_dir, typ]) - tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) + tmpfname, fname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True) # FIXME don't assume we can get the non temp file name this way - shutil.move(tmpfname, tmpfname.replace(".tmp","")) + shutil.move(tmpfname, fname) def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs, timedata, datadata)): -- GitLab