Skip to content
Snippets Groups Projects
Commit 73e86683 authored by chad.hanna's avatar chad.hanna Committed by Kipp Cannon
Browse files

aggregator.py: change tmp file conventions

parent 36b6b404
No related branches found
No related tags found
No related merge requests found
......@@ -157,14 +157,11 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
@param data. The data will be stored in an hdf5 file at path @param path with
base name @param base. You can also make a temporary file.
"""
if tmp:
fname = os.path.join(path, "%s.hdf5.tmp" % base)
else:
# A non tmp dataset should not be overwritten
fname = os.path.join(path, "%s.hdf5" % base)
if os.path.exists(fname):
return fname
f = h5py.File(fname, "w")
tmpfname = "/dev/shm/%s_%s" % (path.replace("/","_"), "%s.hdf5.tmp" % base)
fname = os.path.join(path, "%s.hdf5" % base)
if not tmp and os.path.exists(fname):
return tmpfname, fname
f = h5py.File(tmpfname if tmp else fname, "w")
if timedata is None and data is None:
f.create_dataset("time", (0,), dtype="f8")
f.create_dataset("data", (0,), dtype="f8")
......@@ -177,7 +174,7 @@ def create_new_dataset(path, base, timedata = None, data = None, tmp = False):
f["data"][...] = data
f.close()
return fname
return tmpfname, fname
def get_dataset(path, base):
......@@ -191,7 +188,7 @@ def get_dataset(path, base):
f.close()
return fname, x,y
except IOError:
fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False)
tmpfname, fname = create_new_dataset(path, base, timedata = None, data = None, tmp = False)
return fname, numpy.array([]), numpy.array([])
......@@ -232,7 +229,7 @@ def setup_dir_by_job_and_level(gpstime, typ, job, route, base_dir, verbose = Tru
str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)]
directory = "%s/%s/by_job/%s/%s" % (base_dir, "/".join(str_time), job, typ)
makedir(directory)
fname = create_new_dataset(directory, route)
tmpfname, fname = create_new_dataset(directory, route)
def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True, level = 0):
......@@ -244,7 +241,7 @@ def setup_dir_across_job_by_level(gpstime, typ, route, base_dir, verbose = True,
str_time = str_time[:(len(str_time)-int(numpy.log10(MIN_TIME_QUANTA))-level)]
directory = "%s/%s/%s" % (base_dir, "/".join(str_time), typ)
makedir(directory)
fname = create_new_dataset(directory, route)
tmpfname, fname = create_new_dataset(directory, route)
def gps_range(jobtime):
......@@ -274,7 +271,7 @@ def update_lowest_level_data_by_job_type_and_route(job, route, start, end, typ,
return []
reduced_time, reduced_data = reduce_data(this_time, this_data, func, level = 0)
#logging.info("processing job %s for data %s in span [%d,%d] of type %s: found %d" % (job, route, start, end, typ, len(reduced_time)))
tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
tmpfname, _ = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
# copy the tmp file over the original
shutil.move(tmpfname, fname)
return [start, end]
......@@ -310,9 +307,9 @@ def reduce_data_from_lower_level_by_job_type_and_route(level, base_dir, job, typ
path = "/".join([this_level_dir, "by_job", job, typ])
makedir(path)
#logging.info("processing reduced data %s for job %s in span [%d,%d] of type %s at level %d: found %d/%d" % (d, job, s, e, typ, level, len(reduced_time), len(agg_time)))
tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
tmpfname, fname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
shutil.move(tmpfname, fname)
def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, end)):
......@@ -332,9 +329,9 @@ def reduce_across_jobs((jobs, this_level_dir, typ, route, func, level, start, en
reduced_time, reduced_data = reduce_data(agg_time, agg_data, func, level=level)
#logging.info("processing reduced data %s in span [%d,%d] of type %s at level %d: found %d/%d" % (route, start, end, typ, level, len(reduced_time), len(agg_time)))
path = "/".join([this_level_dir, typ])
tmpfname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
tmpfname, fname = create_new_dataset(path, route, reduced_time, reduced_data, tmp = True)
# FIXME don't assume we can get the non temp file name this way
shutil.move(tmpfname, tmpfname.replace(".tmp",""))
shutil.move(tmpfname, fname)
def get_data_from_job_and_reduce((job, job_tag, routes, datatypes, prevdataspan, base_dir, jobs, timedata, datadata)):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment