From 4ffdc12e2f68a574649289bbeaebc36bbe1515bc Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Thu, 31 May 2018 11:49:58 -0700
Subject: [PATCH] gstlal_feature_extractor: changed use_kafka option to be part
 of save_format instead, to allow only one data transfer/saving option to run
 at a given time, fixed issue with hdf5 file saving where there were repeated
 timestamps, cleaned up finalization of saving features to disk when pipeline
 finishes, changed some default save/persist cadences

---
 gstlal-ugly/bin/gstlal_feature_extractor | 144 ++++++++++++-----------
 gstlal-ugly/python/idq_utils.py          |  85 ++++---------
 2 files changed, 101 insertions(+), 128 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_feature_extractor b/gstlal-ugly/bin/gstlal_feature_extractor
index 845187f211..9de708a40d 100755
--- a/gstlal-ugly/bin/gstlal_feature_extractor
+++ b/gstlal-ugly/bin/gstlal_feature_extractor
@@ -235,12 +235,15 @@ class MultiChannelHandler(simplehandler.Handler):
 		self.subset_id = str(kwargs.pop("subset_id")).zfill(4)
 
 		### iDQ saving properties
+		self.timestamp = None
 		self.last_save_time = None
 		self.last_persist_time = None
 		self.cadence = options.cadence
 		self.persist_cadence = options.persist_cadence
 		self.feature_start_time = options.feature_start_time
 		self.feature_end_time = options.feature_end_time
+		self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
+		self.feature_queue = idq_utils.FeatureQueue(self.keys, self.columns, self.sample_rate)
 
 		# set whether data source is live
 		self.is_live = data_source_info.data_source in data_source_info.live_sources
@@ -251,38 +254,31 @@ class MultiChannelHandler(simplehandler.Handler):
 		else:
 			self.tmp_dir = os.environ['TMPDIR']
 
-		# row properties
-		columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
-
 		# feature saving properties
 		if options.save_format == 'hdf5':
-			self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
-			duration = idq_utils.floor_div(self.feature_start_time + self.persist_cadence, self.persist_cadence) - self.feature_start_time
-			self.set_hdf_file_properties(self.feature_start_time, duration)
+			self.fdata = idq_utils.HDF5FeatureData(self.columns, keys = self.keys, cadence = self.cadence)
 
 		elif options.save_format == 'ascii':
-			# create header for trigger file
 			self.header = "# %18s\t%20s\t%20s\t%10s\t%8s\t%8s\t%8s\t%10s\t%s\n" % ("start_time", "stop_time", "trigger_time", "frequency", "phase", "q", "chisq", "snr", "channel")
 			self.fdata = deque(maxlen = 25000)
 			self.fdata.append(self.header)
 
-		# set up stream related properties
-		self.stream_event = idq_utils.FeatureQueue(self.keys, columns, self.sample_rate, self.num_samples)
-
-		# set up bottle routes for PSDs and extracted feature data
-		self.psds = {}
-		self.feature_data = deque(maxlen = 2000)
-		if not options.disable_web_service:
-			bottle.route("/psds.xml")(self.web_get_psd_xml)
-			bottle.route("/feature_subset")(self.web_get_feature_data)
-
-		# set up kafka related properties
-		if options.use_kafka:
+		elif options.save_format == 'kafka':
 			self.kafka_partition = options.kafka_partition
 			self.kafka_topic = options.kafka_topic
 			self.kafka_conf = {'bootstrap.servers': options.kafka_server}
 			self.producer = Producer(self.kafka_conf)
 
+		elif options.save_format == 'bottle':
+			assert not options.disable_web_service, 'web service is not available to use bottle to transfer features'
+			self.feature_data = deque(maxlen = 2000)
+			bottle.route("/feature_subset")(self.web_get_feature_data)
+
+		# set up bottle routes for PSDs
+		self.psds = {}
+		if not options.disable_web_service:
+			bottle.route("/psds.xml")(self.web_get_psd_xml)
+
 		super(MultiChannelHandler, self).__init__(mainloop, pipeline, **kwargs)
 
 	def do_on_message(self, bus, message):
@@ -324,35 +320,45 @@ class MultiChannelHandler(simplehandler.Handler):
 			channel, rate  = sink_dict[elem]
 
 			# push new stream event to queue if done processing current timestamp
-			if len(self.stream_event):
-				feature_subset = self.stream_event.pop()
-				if options.use_kafka:
-					self.producer.produce(timestamp = feature_subset['timestamp'], topic = self.kafka_topic, value = json.dumps(feature_subset))
-				else:
-					self.feature_data.append(feature_subset)
-
-			# set save time appropriately
-			if self.last_save_time is None:
-				self.last_save_time = buftime
-				self.last_persist_time = buftime
-
-			# Save triggers once per cadence
-			if idq_utils.in_new_epoch(buftime, self.last_save_time, self.cadence) or (buftime == self.feature_end_time):
-				logger.info("saving features to disk at timestamp = %d" % buftime)
+			if len(self.feature_queue):
+				feature_subset = self.feature_queue.pop()
+				self.timestamp = feature_subset['timestamp']
+
+				# set save times and initialize specific saving properties if not already set
+				if self.last_save_time is None:
+					self.last_save_time = self.timestamp
+					self.last_persist_time = self.timestamp
+					if options.save_format =='hdf5':
+						duration = idq_utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
+						self.set_hdf_file_properties(self.timestamp, duration)
+
+				# Save triggers once per cadence if saving to disk
+				if options.save_format == 'hdf5' or options.save_format == 'ascii':
+					if self.timestamp and idq_utils.in_new_epoch(self.timestamp, self.last_save_time, self.cadence) or (self.timestamp == self.feature_end_time):
+						logger.info("saving features to disk at timestamp = %d" % self.timestamp)
+						if options.save_format == 'hdf5':
+							self.to_hdf_file()
+						elif options.save_format == 'ascii':
+							self.to_trigger_file(self.timestamp)
+							self.fdata.clear()
+							self.fdata.append(self.header)
+						self.last_save_time = self.timestamp
+
+				# persist triggers once per persist cadence if using hdf5 format
 				if options.save_format == 'hdf5':
-					self.to_hdf_file()
-				elif options.save_format == 'ascii':
-					self.to_trigger_file(buftime)
-					self.fdata.clear()
-					self.fdata.append(self.header)
-				self.last_save_time = buftime
-
-			# persist triggers once per persist cadence if using hdf5 format
-			if idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
-				logger.info("persisting features to disk at timestamp = %d" % buftime)
-				self.finish_hdf_file()
-				self.last_persist_time = buftime
-				self.set_hdf_file_properties(buftime, self.persist_cadence)
+					if self.timestamp and idq_utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
+						logger.info("persisting features to disk at timestamp = %d" % self.timestamp)
+						self.finish_hdf_file()
+						self.last_persist_time = self.timestamp
+						self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
+
+				# add features to respective format specified
+				if options.save_format == 'kafka':
+					self.producer.produce(timestamp = self.timestamp, topic = self.kafka_topic, value = json.dumps(feature_subset))
+				elif options.save_format == 'bottle':
+					self.feature_data.append(feature_subset)
+				elif options.save_format == 'hdf5':
+					self.fdata.append(self.timestamp, feature_subset['features'])
 
 			# read buffer contents
 			for i in range(buf.n_memory()):
@@ -407,16 +413,10 @@ class MultiChannelHandler(simplehandler.Handler):
 			timestamp = int(numpy.floor(trigger_time))
 			feature_row = {'timestamp':timestamp, 'channel':channel, 'start_time':start_time, 'stop_time':stop_time, 'snr':row.snr,
 			               'trigger_time':trigger_time, 'frequency':freq, 'q':q, 'phase':row.phase, 'sigmasq':row.sigmasq, 'chisq':row.chisq}
-			self.stream_event.append(timestamp, channel, feature_row)
+			self.feature_queue.append(timestamp, channel, feature_row)
 
 			# save iDQ compatible data
-			if options.save_format == 'hdf5':
-				if self.aggregate_rate:
-					key = channel
-				else:
-					key = os.path.join(channel, str(rate).zfill(4))
-				self.fdata.append(feature_row, key = key, buftime = buftime)
-			elif options.save_format == 'ascii':
+			if options.save_format == 'ascii':
 				channel_tag = ('%s_%i_%i' %(channel, rate/4, rate/2)).replace(":","_",1)
 				self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
 
@@ -463,6 +463,23 @@ class MultiChannelHandler(simplehandler.Handler):
 		tmp_path = os.path.join(self.tmp_path, self.fname)+".h5.tmp"
 		shutil.move(tmp_path, final_path)
 
+	def finalize(self):
+		"""
+		Clears out remaining features from the queue for saving to disk.
+		"""
+		# save remaining triggers
+		if options.save_format == 'hdf5':
+			self.feature_queue.flush()
+			while len(self.feature_queue):
+				feature_subset = self.feature_queue.pop()
+				self.fdata.append(feature_subset['timestamp'], feature_subset['features'])
+
+			self.to_hdf_file()
+			self.finish_hdf_file()
+
+		elif options.save_format == 'ascii':
+			self.to_trigger_file()
+
 	def set_hdf_file_properties(self, start_time, duration):
 		"""
 		Returns the file name, as well as locations of temporary and permanent locations of
@@ -630,13 +647,12 @@ def parse_command_line():
 	group = OptionGroup(parser, "Saving Options", "Adjust parameters used for saving/persisting features to disk as well as directories specified")
 	group.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
 	group.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_FEATURES", help = "Set the filename description in which to save the output.")
-	group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii or hdf5) of features written to disk. Default = hdf5")
-	group.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
-	group.add_option("--persist-cadence", type = "int", default = 320, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Only used for live data, and needs to be a multiple of save cadence. Default = 320 seconds.")
+	group.add_option("--save-format", metavar = "string", default = "hdf5", help = "Specifies the save format (ascii/hdf5/kafka/bottle) of features written to disk. Default = hdf5")
+	group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.")
+	group.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 200 seconds.")
 	parser.add_option_group(group)
 
 	group = OptionGroup(parser, "Kafka Options", "Adjust settings used for pushing extracted features to a Kafka topic.")
-	group.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
 	group.add_option("--kafka-partition", metavar = "string", help = "If using Kafka, sets the partition that this feature extractor is assigned to.")
 	group.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this feature extractor publishes feature vector subsets to.")
 	group.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
@@ -700,7 +716,7 @@ basename = '%s-%s' % (instrument[:1], options.description)
 waveforms = {}
 
 # only load kafka library if triggers are transferred via kafka topic
-if options.use_kafka:
+if options.save_format == 'kafka':
 	from confluent_kafka import Producer
 
 #
@@ -931,12 +947,8 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
 	mainloop.run()
 
 	# save remaining triggers
-	if options.save_format == 'hdf5':
-		handler.to_hdf_file()
-		handler.finish_hdf_file()
-		logger.info("persisting features to disk...")
-	elif options.save_format == 'ascii':
-		handler.to_trigger_file()
+	logger.info("persisting features to disk...")
+	handler.finalize()
 
 	#
 	# Shut down pipeline
diff --git a/gstlal-ugly/python/idq_utils.py b/gstlal-ugly/python/idq_utils.py
index fcd7cd8f89..5d40f95863 100644
--- a/gstlal-ugly/python/idq_utils.py
+++ b/gstlal-ugly/python/idq_utils.py
@@ -217,91 +217,46 @@ class HDF5FeatureData(FeatureData):
 	"""
 	def __init__(self, columns, keys, **kwargs):
 		super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
-		self.padding = 1
 		self.cadence = kwargs.pop('cadence')
 		self.dtype = [(column, '<f8') for column in self.columns]
-		self.feature_data = {key: numpy.empty(((self.cadence+self.padding),), dtype = self.dtype) for key in keys}
-		self.last_save_time = None
+		self.feature_data = {key: numpy.empty((self.cadence,), dtype = self.dtype) for key in keys}
+		self.last_save_time = 0
 		self.clear()
 
-	def dump(self, path, base, start_time, key = None, tmp = False):
+	def dump(self, path, base, start_time, tmp = False):
 		"""
-		Saves the current cadence of gps triggers to disk and clear out data
+		Saves the current cadence of features to disk and clear out data
 		"""
 		name = "%d_%d" % (start_time, self.cadence)
-		if key:
+		for key in self.keys:
 			create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp)
-			self.clear(key)
-		else:
-			for key in self.keys:
-				create_new_dataset(path, base, self.feature_data[key], name=name, group=key, tmp=tmp)
-			self.clear()
-
-	def append(self, value, key = None, buftime = None):
-		"""
-		Append a trigger row to data structure
-		"""
-		if buftime and key:
-			self.last_save_time = floor_div(buftime, self.cadence)
-			idx = int(numpy.floor(value['trigger_time'])) - self.last_save_time
-			if numpy.isnan(self.feature_data[key][idx][self.columns[0]]) or (value['snr'] > self.feature_data[key][idx]['snr']):
-				self.feature_data[key][idx] = numpy.array(tuple(value[col] for col in self.columns), dtype=self.dtype)
-
-	def clear(self, key = None):
-		if key:
-			self.feature_data[key][:] = numpy.nan
-		else:
-			for key in self.keys:
-				self.feature_data[key][:] = numpy.nan
-
-class HDF5SeriesFeatureData(FeatureData):
-	"""!
-	Saves feature data with varying dataset lengths to hdf5.
-	"""
-	def __init__(self, columns, keys, **kwargs):
-		super(HDF5SeriesFeatureData, self).__init__(columns, keys = keys, **kwargs)
-		self.cadence = kwargs.pop('cadence')
-		self.dtype = [(column, '<f8') for column in self.columns]
-		self.feature_data = {key: [] for key in keys}
 		self.clear()
 
-	def dump(self, path, base, start_time, key = None, tmp = False):
+	def append(self, timestamp, features):
 		"""
-		Saves the current cadence of gps triggers to disk and clear out data
+		Append a feature buffer to data structure
 		"""
-		name = "%d_%d" % (start_time, self.cadence)
-		if key:
-			create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp)
-			self.clear(key)
-		else:
-			for key in self.keys:
-				create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp)
-			self.clear()
+		self.last_save_time = floor_div(timestamp, self.cadence)
+		idx = timestamp - self.last_save_time
 
-	def append(self, value, key = None, buftime = None):
-		"""
-		Append a trigger row to data structure
-		"""
-		if buftime and key:
-			self.feature_data[key].append(tuple(value[col] for col in self.columns))
+		### FIXME: assumes there is just one row per channel for now (denoting a sample rate of 1Hz)
+		for key in features.keys():
+			if features[key][0]:
+				self.feature_data[key][idx] = numpy.array(tuple(features[key][0][col] for col in self.columns), dtype=self.dtype)
 
-	def clear(self, key = None):
-		if key:
-			self.feature_data[key] = []
-		else:
-			for key in self.keys:
-				self.feature_data[key] = []
+	def clear(self):
+		for key in self.keys:
+			self.feature_data[key][:] = numpy.nan
 
 class FeatureQueue(object):
 	"""
 	Class for storing feature data.
 	NOTE: assumes that ingested features are time ordered.
 	"""
-	def __init__(self, channels, columns, sample_rate, num_samples):
+	def __init__(self, channels, columns, sample_rate):
 		self.channels = channels
 		self.columns = columns
 		self.sample_rate = sample_rate
-		self.num_samples = num_samples
 		self.out_queue = deque(maxlen = 5)
 		self.in_queue = {}
 		self.counter = Counter()
@@ -331,6 +286,12 @@ class FeatureQueue(object):
 		if len(self):
 			return self.out_queue.popleft()
 
+	def flush(self):
+		while self.in_queue:
+			oldest_timestamp = min(self.counter.keys())
+			del self.counter[oldest_timestamp]
+			self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
+
 	def _create_buffer(self):
 		return {channel: [None for x in range(self.sample_rate)] for channel in self.channels}
 
-- 
GitLab