From 6fac00dca60318b307747897dc51d5ac1a5ddd22 Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Fri, 6 Apr 2018 19:08:38 -0700
Subject: [PATCH] gstlal_feature_extractor: removed check in DataSourceInfo
 causing the pipeline to not start up using live data, added an option for
 live data that allows features to be persisted to disk at a regular cadence,
 minor refactoring in MultiChannelHandler

---
 gstlal-ugly/bin/gstlal_feature_extractor      | 68 +++++++++++++------
 gstlal-ugly/python/multichannel_datasource.py |  2 -
 2 files changed, 49 insertions(+), 21 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_feature_extractor b/gstlal-ugly/bin/gstlal_feature_extractor
index d4e6a7739c..544a4e3465 100755
--- a/gstlal-ugly/bin/gstlal_feature_extractor
+++ b/gstlal-ugly/bin/gstlal_feature_extractor
@@ -124,33 +124,30 @@ class MultiChannelHandler(simplehandler.Handler):
 
 		### iDQ saving properties
 		self.last_save_time = None
+		self.last_persist_time = None
 		self.cadence = options.cadence
+		self.persist_cadence = options.persist_cadence
+
+		# set whether data source is live
+		self.is_live = data_source_info.data_source in data_source_info.live_sources
 
 		# get base temp directory
 		if '_CONDOR_SCRATCH_DIR' in os.environ:
-			tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
+			self.tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
 		else:
-			tmp_dir = os.environ['TMPDIR']
+			self.tmp_dir = os.environ['TMPDIR']
 
 		# feature saving properties
 		if options.save_format == 'hdf5':
 			columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
 			self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
 
-			duration = int(options.gps_end_time) - int(options.gps_start_time)
-			self.fname = os.path.splitext(idq_utils.to_trigger_filename(self.basename, options.gps_start_time, duration, 'h5'))[0]
-
-			self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, options.gps_start_time, self.job_id, self.subset_id)
-			self.tmp_path = idq_utils.to_trigger_path(tmp_dir, self.basename, options.gps_start_time, self.job_id, self.subset_id)
-
-			# create temp and output directories if they don't exist
-			aggregator.makedir(self.fpath)
-			aggregator.makedir(self.tmp_path)
+			if self.is_live:
+				duration = idq_utils.floor_div(int(options.gps_start_time) + self.persist_cadence, self.persist_cadence) - int(options.gps_start_time)
+			else:
+				duration = int(options.gps_end_time) - int(options.gps_start_time)
 
-			# delete leftover temporary files
-			tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
-			if os.path.isfile(tmp_file):
-				os.remove(tmp_file)
+			self.set_hdf_file_properties(int(options.gps_start_time), duration)
 
 		elif options.save_format == 'ascii':
 			# create header for trigger file
@@ -231,6 +228,7 @@ class MultiChannelHandler(simplehandler.Handler):
 			# set save time appropriately
 			if self.last_save_time is None:
 				self.last_save_time = buftime
+				self.last_persist_time = buftime
 
 			# Save triggers once per cadence
 			if idq_utils.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
@@ -243,6 +241,13 @@ class MultiChannelHandler(simplehandler.Handler):
 					self.fdata.append(self.header)
 				self.last_save_time = buftime
 
+			# persist triggers once per persist cadence if using hdf5 format and running with live data
+			if self.is_live and idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
+				logger.info("persisting features to disk at timestamp = %d" % buftime)
+				self.finish_hdf_file()
+				self.last_persist_time = buftime
+				self.set_hdf_file_properties(buftime, self.persist_cadence)
+
 			# read buffer contents
 			for i in range(buf.n_memory()):
 				memory = buf.peek_memory(i)
@@ -338,6 +343,26 @@ class MultiChannelHandler(simplehandler.Handler):
 		tmp_path = os.path.join(self.tmp_path, self.fname)+".h5.tmp"
 		shutil.move(tmp_path, final_path)
 
+	def set_hdf_file_properties(self, start_time, duration):
+		"""
+		Returns the file name, as well as locations of temporary and permanent locations of
+		directories where triggers will live, when given the current gps time and a gps duration.
+		Also takes care of creating new directories as needed and removing any leftover temporary files.
+		"""
+		# set/update file names and directories with new gps time and duration
+		self.fname = os.path.splitext(idq_utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
+		self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, start_time, self.job_id, self.subset_id)
+		self.tmp_path = idq_utils.to_trigger_path(self.tmp_dir, self.basename, start_time, self.job_id, self.subset_id)
+
+		# create temp and output directories if they don't exist
+		aggregator.makedir(self.fpath)
+		aggregator.makedir(self.tmp_path)
+
+		# delete leftover temporary files
+		tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
+		if os.path.isfile(tmp_file):
+			os.remove(tmp_file)
+
 	def gen_psd_xmldoc(self):
 		xmldoc = lal.series.make_psd_xmldoc(self.psds)
 		process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_idq", {})
@@ -482,6 +507,7 @@ def parse_command_line():
 	parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
 	parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
 	parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
+	parser.add_option("--persist-cadence", type = "int", default = 320, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Only used for live data, and needs to be a multiple of save cadence. Default = 320 seconds.")
 	parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
 	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
 	parser.add_option("--save-format", action = "store_true", default = "hdf5", help = "Specifies the save format (ascii or hdf5) of features written to disk. Default = hdf5")
@@ -509,10 +535,14 @@ def parse_command_line():
 			options.trigger_start_time = options.gps_start_time
 		if options.trigger_end_time is None:
 			options.trigger_end_time = options.gps_end_time
-
-	elif options.data_source in ("framexmit", "lvshm"):
+	else:
 		options.gps_start_time = int(aggregator.now())
-		options.gps_end_time = 9999999999
+		# NOTE: set the gps end time to be "infinite"
+		options.gps_end_time = 2000000000
+
+	# check if persist and save cadence times are sensible
+	assert options.persist_cadence >= options.cadence
+	assert (options.persist_cadence % options.cadence) == 0
 
 	return options, filenames
 
@@ -609,7 +639,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
 			logger.info("checkpoint: {0} of {1} files completed and continuing with channel subset {2}".format((subset_id - 1), len(data_source_info.channel_subsets), subset_id))
 			pass
 
-	logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
+		logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
 
 	#
 	# if web services serving up bottle routes are enabled,
diff --git a/gstlal-ugly/python/multichannel_datasource.py b/gstlal-ugly/python/multichannel_datasource.py
index 162cef65b5..e750c64bfc 100644
--- a/gstlal-ugly/python/multichannel_datasource.py
+++ b/gstlal-ugly/python/multichannel_datasource.py
@@ -348,8 +348,6 @@ class DataSourceInfo(object):
 		if options.gps_start_time is not None:
 			if options.gps_end_time is None:
 				raise ValueError("must provide both --gps-start-time and --gps-end-time")
-			if options.data_source in self.live_sources:
-				raise ValueError("cannot set --gps-start-time or --gps-end-time with %s" % " or ".join("--data-source=%s" % src for src in sorted(self.live_sources)))
 			try:
 				start = LIGOTimeGPS(options.gps_start_time)
 			except ValueError:
-- 
GitLab