Skip to content
Snippets Groups Projects
Commit d7a22892 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_hdf5_sink: fix issues caused by restructuring of hdf5 saving...

gstlal_feature_hdf5_sink: fix issues caused by restructuring of hdf5 saving causing pipeline to crash
parent 8dc84c8a
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ def parse_command_line():
parser = OptionParser(usage=__usage__, description=__description__)
parser.add_option("-v","--verbose", default=False, action="store_true", help = "Print to stdout in addition to writing to automatically generated log.")
parser.add_option("--log-level", type = "int", default = 10, help = "Sets the verbosity of logging. Default = 10.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where logs and metadata are stored.")
parser.add_option("--rootdir", metavar = "path", default = ".", help = "Sets the root directory where features, logs, and metadata are stored.")
parser.add_option("--basename", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Sets the basename for files written to disk. Default = GSTLAL_IDQ_FEATURES")
parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
......@@ -100,22 +100,22 @@ class HDF5StreamSink(object):
self.keys = multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys()
### iDQ saving properties
self.rootdir = options.rootdir
self.timestamp = None
self.last_save_time = None
self.last_persist_time = None
self.rootdir = options.rootdir
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.tag = '%s-%s' % (options.instrument[:1], options.basename)
self.basename = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate)
### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
self.tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
self.tmp_dir = os.environ['TMPDIR']
def set_hdf_file_properties(self, start_time, duration):
"""
......@@ -127,15 +127,15 @@ class HDF5StreamSink(object):
job_id = '0001'
subset_id = '0001'
self.feature_name = os.path.splitext(utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
self.feature_path = utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, start_time, job_id, subset_id)
self.feature_path = utils.to_trigger_path(os.path.abspath(self.rootdir), self.basename, start_time, job_id, subset_id)
self.tmp_path = utils.to_trigger_path(self.tmp_dir, self.basename, start_time, job_id, subset_id)
# create temp and output directories if they don't exist
aggregator.makedir(self.fpath)
aggregator.makedir(self.feature_path)
aggregator.makedir(self.tmp_path)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
......@@ -167,7 +167,7 @@ class HDF5StreamSink(object):
while self.feature_queue:
### remove data with oldest timestamp and process
self.timestamp, features = self.feature_queue.pop()
logger.info('processing features for timestamp %d' % timestamp)
logger.info('processing features for timestamp %d' % self.timestamp)
# set save times and initialize specific saving properties if not already set
if self.last_save_time is None:
......@@ -178,21 +178,19 @@ class HDF5StreamSink(object):
# Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
self.logger.info("saving features to disk at timestamp = %d" % self.timestamp)
logger.info("saving features to disk at timestamp = %d" % self.timestamp)
self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True)
self.to_hdf_file()
self.last_save_time = self.timestamp
# persist triggers once per persist cadence if using hdf5 format
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
self.logger.info("persisting features to disk for gps range %d - %d" % (self.timestamp, self.timestamp-self.persist_cadence))
logger.info("persisting features to disk for gps range %d - %d" % (self.timestamp-self.persist_cadence, self.timestamp))
self.persist_to_disk()
self.finish_hdf_file()
self.last_persist_time = self.timestamp
self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
### add new feature vector to dataset
self.feature_data.append(timestamp, features)
self.feature_data.append(self.timestamp, features)
def persist_to_disk(self):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment