From 05e83de4bc3c91d0e866560c9c4741eff7154224 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 18 Jan 2018 17:30:15 -0800
Subject: [PATCH] gstlal_etg: fixed bug where hdf dataset rewrites caused
 pipeline to crash, fixed bug where last set of hdf triggers were not being
 written

---
 gstlal-ugly/bin/gstlal_etg           | 60 ++++++++++++++++++++--------
 gstlal-ugly/python/idq_aggregator.py |  6 +--
 2 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg
index 1610897702..6739716f2c 100755
--- a/gstlal-ugly/bin/gstlal_etg
+++ b/gstlal-ugly/bin/gstlal_etg
@@ -184,14 +184,34 @@ class MultiChannelHandler(simplehandler.Handler):
 		self.instrument = kwargs.pop("instrument")
 		self.keys = kwargs.pop("keys")
 
-		# iDQ saving properties
+		# set initialization time
+		if options.data_source in ("framexmit", "lvshm"):
+			self.init_gps_time = int(aggregator.now())
+		else:
+			self.init_gps_time = int(options.gps_start_time)
+
+		### iDQ saving properties
 		self.cadence = options.cadence
 		self.tag = '%s-%s' % (self.instrument[:1], self.description)
+
 		# hdf saving properties
 		if options.save_hdf:
 			self.last_save_time = {key:None for key in self.keys}
 			columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
 			self.fdata = idq_aggregator.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
+
+			if options.gps_end_time:
+				duration = int(options.gps_end_time) - int(options.gps_start_time)
+				self.fname = '%s-%d-%d' % (self.tag, self.init_gps_time, duration)
+			else:
+				self.fname = '%s-%d-9999999999' % (self.tag, self.init_gps_time)
+			self.fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(self.fname.split("-")[2])[:5])
+
+			# delete leftover temporary files
+			temp_path = os.path.join(self.fpath, self.fname)+'.h5.tmp'
+			if os.path.isfile(temp_path):
+				os.remove(temp_path)
+
 		# ascii saving properties
 		else:
 			self.last_save_time = None
@@ -203,12 +223,6 @@ class MultiChannelHandler(simplehandler.Handler):
 			self.fdata = deque(maxlen = 25000)
 			self.fdata.append(self.header)
 
-		# set initialization time
-		if options.data_source in ("framexmit", "lvshm"):
-			self.init_gps_time = int(aggregator.now())
-		else:
-			self.init_gps_time = int(options.gps_start_time)
-
 		# set up ETG bottle related properties
 		self.etg_event = deque(maxlen = 20000)
 		self.etg_event_time = None
@@ -287,17 +301,10 @@ class MultiChannelHandler(simplehandler.Handler):
 				if self.last_save_time is None:
 					self.last_save_time = buftime
 
-			# Save triggers (in ascii format) once every cadence
+			# Save triggers (hdf or ascii) once per cadence
 			if options.save_hdf:
 				if idq_aggregator.in_new_epoch(buftime, self.last_save_time[(channel, rate)], self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
-					if options.gps_end_time:
-						# Uses the T050017 filenaming convention.
-						duration = int(options.gps_end_time) - int(options.gps_start_time)
-						fname = '%s-%s-%d' % (self.tag, options.gps_start_time, duration)
-					else:
-						fname = '%s-%d' % (self.tag, self.init_gps_time)
-					fpath = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
-					self.fdata.dump(fpath, fname, idq_aggregator.floor_div(self.last_save_time[(channel, rate)], self.cadence), key=(channel, rate))
+					self.to_hdf_file((channel, rate))
 					self.last_save_time[(channel, rate)] = buftime
 			else:
 				if idq_aggregator.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
@@ -383,6 +390,21 @@ class MultiChannelHandler(simplehandler.Handler):
 				latency = numpy.round(int(aggregator.now()) - buftime)
 				print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency)
 
+	def to_hdf_file(self, key, final_write = False):
+		"""
+		Dumps triggers saved in memory to disk in hdf5 format.
+		Uses the T050017 filenaming convention.
+		NOTE: This method should only be called by an instance that is locked.
+		"""
+		self.fdata.dump(self.fpath, self.fname, idq_aggregator.floor_div(self.last_save_time[key], self.cadence), key = key, tmp = True)
+
+		# when job completes, move temp file over
+		if final_write:
+			final_path = os.path.join(self.fpath, self.fname)+".h5"
+			tmp_path = final_path+".tmp"
+			shutil.move(tmp_path, final_path)
+
+
 	def gen_psd_xmldoc(self):
 		xmldoc = lal.series.make_psd_xmldoc(self.psds)
 		process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_idq", {})
@@ -712,9 +734,13 @@ if options.verbose:
 mainloop.run()
 
 # save remaining triggers
-if not options.save_hdf:
+if  options.save_hdf:
+	for key in src.keys():
+		handler.to_hdf_file(key, final_write = True)
+else:
 	handler.to_trigger_file()
 
+
 #
 # Shut down pipeline
 #
diff --git a/gstlal-ugly/python/idq_aggregator.py b/gstlal-ugly/python/idq_aggregator.py
index 98dcfb44e0..c525c34f3c 100644
--- a/gstlal-ugly/python/idq_aggregator.py
+++ b/gstlal-ugly/python/idq_aggregator.py
@@ -143,19 +143,19 @@ class HDF5FeatureData(FeatureData):
 		self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
 		self.clear()
 
-	def dump(self, path, base, start_time, key = None):
+	def dump(self, path, base, start_time, key = None, tmp = False):
 		"""
 		Saves the current cadence of gps triggers to disk and clear out data
 		"""
 		name = "%d_%d" % (start_time, self.cadence)
 		if key:
 			group = os.path.join(str(key[0]), str(key[1]).zfill(4))
-			create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
+			create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
 			self.clear(key)
 		else:
 			for key in self.keys:
 				group = os.path.join(str(key[0]), str(key[1]).zfill(4))
-				create_new_dataset(path, base, self.etg_data[key], name=name, group=group)
+				create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
 			self.clear()
 
 	def load(self, path):
-- 
GitLab