From c90fe4cc24cf3c2934022848d4b4db783820197b Mon Sep 17 00:00:00 2001
From: Rebecca Ewing <rebecca.ewing@ligo.org>
Date: Wed, 3 May 2023 11:42:10 -0700
Subject: [PATCH] gstlal_ifo_stat: bug fixes; write to influx upon state change

---
 gstlal-ugly/bin/gstlal_ifo_stat | 24 +++++++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ifo_stat b/gstlal-ugly/bin/gstlal_ifo_stat
index f56f36d487..952e917dc0 100755
--- a/gstlal-ugly/bin/gstlal_ifo_stat
+++ b/gstlal-ugly/bin/gstlal_ifo_stat
@@ -53,6 +53,7 @@ def parse_command_line():
 
 class IFOStatusTracker(object):
 	def __init__(self, options):
+		logging.info('setting up ifo status tracker...')
 		self.output_url = options.output_url
 		self.last_reduce = None
 
@@ -81,11 +82,13 @@ class IFOStatusTracker(object):
 			bit = int(bit)
 			time = float(time)
 			state = 1 if (bit & 0b1 == 0b1) else 0
+			statechange = False
 
 			## track duration
 			if state == self.previous[ifo][1]:
 				self.duration[ifo] += time - self.previous[ifo][0]
 			elif self.previous[ifo][0]:
+				statechange = True
 				self.duration[ifo] = time - self.previous[ifo][0]
 
 			logging.debug(f'{ifo} h(t) state at {time} = {state} | duration = {self.duration[ifo]}')
@@ -99,7 +102,9 @@ class IFOStatusTracker(object):
 				self.last_reduce = time
 
 			## write to influx
-			if time - self.last_reduce >= 100.:
+			if time - self.last_reduce >= 50. or statechange:
+				if statechange:
+					logging.info(f'{ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}')
 				self.last_reduce = int(utils.gps_now())
 				logging.debug(f'Reducing data to influx...')
 
@@ -125,10 +130,16 @@ def main():
 	channel_dict = {}
 	dirs_dict = {}
 	for channel in options.channel_name:
-		channel_dict.update({channel.split(':')[0]: channel.split(':')[1]})
+		ifo, channel_name = channel.split('=')
+		channel_dict.update({
+			ifo: channel_name
+                })
 
 	for dir in options.shared_memory_directory:
-		dirs_dict.update({dir.split(':')[0]: dir.split(':')[1]})
+		ifo, shm_dir = dir.split('=')
+		dirs_dict.update({
+			ifo: shm_dir
+                })
 
 	if not channel_dict.keys() == dirs_dict.keys():
 		raise Exception(f'List of IFOs provided with channels does not match list of IFOs provided with SHM directories.')
@@ -157,8 +168,15 @@ def main():
 
 	if not pipeline.set_state(Gst.State.PLAYING):
 		raise RuntimeError("pipeline failed to enter PLAYING state")
+	logging.info('starting...')
 	mainloop.run()
 
+	# done.   always end with an error code so that dagman does
+	# not mark the job "done" and the job will be restarted when the dag is
+	# restarted.
+	#
+	sys.exit(1)
+
 
 if __name__ == '__main__':
 	main()
-- 
GitLab