From 51788a53a1612ae7e5ea8fb71a7adda3489f6348 Mon Sep 17 00:00:00 2001
From: Rachael Huxford <rachael.huxford@ligo.org>
Date: Wed, 19 Jul 2023 12:02:01 -0700
Subject: [PATCH] Update to ll_inj_stream handler, and to end time options.

---
 gstlal-ugly/bin/gstlal_ll_inj_stream | 79 +++++++++++++++++++---------
 1 file changed, 54 insertions(+), 25 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_ll_inj_stream b/gstlal-ugly/bin/gstlal_ll_inj_stream
index ee83b54759..0cfd540181 100755
--- a/gstlal-ugly/bin/gstlal_ll_inj_stream
+++ b/gstlal-ugly/bin/gstlal_ll_inj_stream
@@ -17,10 +17,10 @@
 # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
 
-from collections import deque
 import logging
 from optparse import OptionParser
 import os
+import queue
 import sys
 
 import numpy
@@ -71,8 +71,8 @@ def parse_command_line():
 	# Add general options
 	parser.add_option("--output-dir", metavar = "path", action = "append", help = "Path to directory where frames should be written. Must be given once per ifo for which channels are provided as {IFO}=/path/to/directory.")
 	parser.add_option("--frame-type", metavar = "name", type="string", action = "append", help = "Frame type to be included in the name of the output frame files. Must be provided once for each ifo anlyzed as {IFO}={FRAME_TYPE}.")
-	parser.add_option("--gps-start-time", metavar = "s", help = "The time at which to start looking for data")
-	parser.add_option("--gps-end-time", metavar = "s", help = "The time at which to stop looking for data")
+	parser.add_option("--gps-end-time", metavar = "s", default = 2000000000, help = "The time at which to stop looking for data in frame cache if different from legnth of cache. Script will terminate at this time. Default is to take all times in frame cache. e.g. GPS=200000000")
+	parser.add_option("--history-len", metavar = "s", type=int, default = 300, help = "Length of time (in seconds) to keep files for in output-dir. Files written more than history-len in the past, will be deleted from output-dir. Default: 300s")
 	parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
 
 	options, filenames = parser.parse_args()
@@ -88,7 +88,7 @@ def parse_command_line():
 # =============================================================================
 #
 
-def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpath: str):
+def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, filename_q: queue.Queue):
 	"""Add path for file sink to element for injection streaming
 
 	Args:
@@ -109,11 +109,29 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpat
 	# get other metadata
 	instrument = elem.get_property("instrument")
 	frame_type = elem.get_property("frame-type")
-
-	# make target directory, and set path
-	if not os.path.exists(outpath):
-		os.makedirs(outpath)
-	elem.set_property("path", outpath)
+	timestamp = elem.get_property("timestamp") // Gst.SECOND
+	path = elem.get_property("path")
+
+	# check if the desired end time has been reached
+	if timestamp >= int(options.gps_end_time):
+		print('Desired gps end time reached. Shutting down...')
+		# needs to die gracefully here with EOS as in gstlal online handler case
+		pipeline.send_event(Gst.Event.new_eos())
+
+	# do cleanup for older files
+	# this if is necessary b/c for some reason the very first path pushed to this function is always '.'
+	# so you can't track that path/file
+	#if path != '.':
+	filename = os.path.join(path, "%s-%s-%d-%d.gwf"%(instrument, frame_type, timestamp, 1))
+	if filename_q.full():
+		old_filename = filename_q.get()
+		print('removing old file from disk: %s'%old_filename)
+		os.unlink(old_filename)
+		filename_q.put(filename)
+		print('adding new file to queue: %s'%filename)
+	else:
+		filename_q.put(filename)
+		print('adding new file to queue: %s'%filename)
 
 
 #
@@ -124,8 +142,6 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpat
 # =============================================================================
 #
 
-options, filenames = parse_command_line()
-
 # Set up gstreamer pipeline
 GObject.threads_init()
 Gst.init(None)
@@ -133,9 +149,15 @@ mainloop = GObject.MainLoop()
 pipeline = Gst.Pipeline(name="gstlal_ll_inj_stream")
 handler = simplehandler.Handler(mainloop, pipeline)
 
+
+#
+# Parse Input Options
+#
+
+options, filenames = parse_command_line()
+
 # Noiseless Injection / Frame Cache Options
 hoft_inj_channel_dict = {}
-print(options.inj_frame_cache)
 for inj_channel in options.inj_channel:
 	hoft_inj_channel_dict[inj_channel.split(":")[0]] = inj_channel.split(":")[1]
 inj_frame_cache = options.inj_frame_cache
@@ -156,9 +178,7 @@ for smd in options.shared_memory_partition:
 
 # Output Frame Options
 output_dir_dict = {}
-print(options.output_dir)
 for out_dir in options.output_dir:
-	print(out_dir)
 	output_dir_dict[out_dir.split('=')[0]] = out_dir.split('=')[1]
 
 frame_type_dict = {}
@@ -184,10 +204,18 @@ logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_ll_dq : %(
 #
 
 cache_entries = [CacheEntry(x) for x in open(options.inj_frame_cache)]
-now = gpstime.gps_time_now() 
-now_to_inf_seg = segments.segment((now, LIGOTimeGPS(2000000000, 0)))
-new_cache = [c for c in cache_entries if now_to_inf_seg.intersects(c.segment)]
-
+now = gpstime.gps_time_now()
+end = LIGOTimeGPS(int(options.gps_end_time), 0)
+
+# Check that end < now.
+# Otherwise will take entire cache.
+if end < now:
+	raise ValueError("--gps-end-time must be greater than the current gps time. Now: %s, Supplied gps-end-time:%s"%(now, end))
+
+#create new cache
+now_to_end_seg = segments.segment((now, end))
+new_cache = [c for c in cache_entries if now_to_end_seg.intersects(c.segment)]
+print(len(new_cache))
 this_inj_frame_cache = 'temp_caches/%s_inj.cache'%now
 with open(this_inj_frame_cache,"w") as f:
 	f.write("\n".join(["%s" % c for c in new_cache]))
@@ -212,8 +240,6 @@ logging.info("building pipeline ...")
 # For use in this program, drop frames from cache which are in the past
 # in order to match timestamps closely with live data from devshmsrc
 def check_current_frames(pad, info, output_start, verbose=False):
-	#if verbose==True:
-	#	print("Checking if frames are complete")
 	buf = info.get_buffer()
 	startts = LIGOTimeGPS(0, buf.pts)
 	duration = LIGOTimeGPS(0, buf.duration)
@@ -268,9 +294,9 @@ for instrument in hoft_inj_channel_dict:
 		# Only add items to strain_dict which we want to become a new hoft + inj stream
 		if "CALIB_STRAIN" in channel_name and channel_name not in hoft_passthrough_channel_dict[instrument]:
 			# tee here, so we can use a tee later for the adder
-			strain_dict[channel_name] = pipeparts.mktee(pipeline, channel_src)
+			strain_dict[instrument+':'+channel_name] = pipeparts.mktee(pipeline, channel_src)
 		else:
-			channel_dict[channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8)
+			channel_dict[instrument+':'+channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8)
 
 
 	#
@@ -299,6 +325,7 @@ for instrument in hoft_inj_channel_dict:
 	# want to mux hoft, inj + hoft, and passthrough channels
 	channel_dict.update(strain_dict)
 	channel_dict.update(inj_strain_dict)
+	print(channel_dict.items())
 	output_stream = pipeparts.mkframecppchannelmux(pipeline, channel_src_map=channel_dict, frame_duration=1, frames_per_file=1)
 	output_stream = pipeparts.mkprogressreport(pipeline, output_stream, "%s_output_after_mux"%instrument)
 	
@@ -306,11 +333,13 @@ for instrument in hoft_inj_channel_dict:
 	#
 	# Write frames to disk
 	#
-	
-	framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument])
+
+	# make a queue for filename clean-up
+	filename_queue = queue.Queue(options.history_len)
+	framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument], path=output_dir_dict[instrument])
 	# note that how the output is organized is controlled
 	# by the function frameccp_filesink_path_handler_simple
-	framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, output_dir_dict[instrument])
+	framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, filename_queue)
 
 
 #
-- 
GitLab