Skip to content
Snippets Groups Projects
Commit 51788a53 authored by Rachael Huxford's avatar Rachael Huxford
Browse files

Update to ll_inj_stream handler, and to end time options.

parent 13cdf2a4
No related branches found
No related tags found
1 merge request!510Injection Streamer
......@@ -17,10 +17,10 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from collections import deque
import logging
from optparse import OptionParser
import os
import queue
import sys
import numpy
......@@ -71,8 +71,8 @@ def parse_command_line():
# Add general options
parser.add_option("--output-dir", metavar = "path", action = "append", help = "Path to directory where frames should be written. Must be given once per ifo for which channels are provided as {IFO}=/path/to/directory.")
parser.add_option("--frame-type", metavar = "name", type="string", action = "append", help = "Frame type to be included in the name of the output frame files. Must be provided once for each ifo anlyzed as {IFO}={FRAME_TYPE}.")
parser.add_option("--gps-start-time", metavar = "s", help = "The time at which to start looking for data")
parser.add_option("--gps-end-time", metavar = "s", help = "The time at which to stop looking for data")
parser.add_option("--gps-end-time", metavar = "s", default = 2000000000, help = "The time at which to stop looking for data in frame cache if different from legnth of cache. Script will terminate at this time. Default is to take all times in frame cache. e.g. GPS=200000000")
parser.add_option("--history-len", metavar = "s", type=int, default = 300, help = "Length of time (in seconds) to keep files for in output-dir. Files written more than history-len in the past, will be deleted from output-dir. Default: 300s")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
options, filenames = parser.parse_args()
......@@ -88,7 +88,7 @@ def parse_command_line():
# =============================================================================
#
def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpath: str):
def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, filename_q: queue.Queue):
"""Add path for file sink to element for injection streaming
Args:
......@@ -109,11 +109,29 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpat
# get other metadata
instrument = elem.get_property("instrument")
frame_type = elem.get_property("frame-type")
# make target directory, and set path
if not os.path.exists(outpath):
os.makedirs(outpath)
elem.set_property("path", outpath)
timestamp = elem.get_property("timestamp") // Gst.SECOND
path = elem.get_property("path")
# check if the desired end time has been reached
if timestamp >= int(options.gps_end_time):
print('Desired gps end time reached. Shutting down...')
# needs to die gracefully here with EOS as in gstlal online handler case
pipeline.send_event(Gst.Event.new_eos())
# do cleanup for older files
# this if is necessary b/c for some reason the very first path pushed to this function is always '.'
# so you can't track that path/file
#if path != '.':
filename = os.path.join(path, "%s-%s-%d-%d.gwf"%(instrument, frame_type, timestamp, 1))
if filename_q.full():
old_filename = filename_q.get()
print('removing old file from disk: %s'%old_filename)
os.unlink(old_filename)
filename_q.put(filename)
print('adding new file to queue: %s'%filename)
else:
filename_q.put(filename)
print('adding new file to queue: %s'%filename)
#
......@@ -124,8 +142,6 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, outpat
# =============================================================================
#
options, filenames = parse_command_line()
# Set up gstreamer pipeline
GObject.threads_init()
Gst.init(None)
......@@ -133,9 +149,15 @@ mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(name="gstlal_ll_inj_stream")
handler = simplehandler.Handler(mainloop, pipeline)
#
# Parse Input Options
#
options, filenames = parse_command_line()
# Noiseless Injection / Frame Cache Options
hoft_inj_channel_dict = {}
print(options.inj_frame_cache)
for inj_channel in options.inj_channel:
hoft_inj_channel_dict[inj_channel.split(":")[0]] = inj_channel.split(":")[1]
inj_frame_cache = options.inj_frame_cache
......@@ -156,9 +178,7 @@ for smd in options.shared_memory_partition:
# Output Frame Options
output_dir_dict = {}
print(options.output_dir)
for out_dir in options.output_dir:
print(out_dir)
output_dir_dict[out_dir.split('=')[0]] = out_dir.split('=')[1]
frame_type_dict = {}
......@@ -184,10 +204,18 @@ logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_ll_dq : %(
#
cache_entries = [CacheEntry(x) for x in open(options.inj_frame_cache)]
now = gpstime.gps_time_now()
now_to_inf_seg = segments.segment((now, LIGOTimeGPS(2000000000, 0)))
new_cache = [c for c in cache_entries if now_to_inf_seg.intersects(c.segment)]
now = gpstime.gps_time_now()
end = LIGOTimeGPS(int(options.gps_end_time), 0)
# Check that end < now.
# Otherwise will take entire cache.
if end < now:
raise ValueError("--gps-end-time must be greater than the current gps time. Now: %s, Supplied gps-end-time:%s"%(now, end))
#create new cache
now_to_end_seg = segments.segment((now, end))
new_cache = [c for c in cache_entries if now_to_end_seg.intersects(c.segment)]
print(len(new_cache))
this_inj_frame_cache = 'temp_caches/%s_inj.cache'%now
with open(this_inj_frame_cache,"w") as f:
f.write("\n".join(["%s" % c for c in new_cache]))
......@@ -212,8 +240,6 @@ logging.info("building pipeline ...")
# For use in this program, drop frames from cache which are in the past
# in order to match timestamps closely with live data from devshmsrc
def check_current_frames(pad, info, output_start, verbose=False):
#if verbose==True:
# print("Checking if frames are complete")
buf = info.get_buffer()
startts = LIGOTimeGPS(0, buf.pts)
duration = LIGOTimeGPS(0, buf.duration)
......@@ -268,9 +294,9 @@ for instrument in hoft_inj_channel_dict:
# Only add items to strain_dict which we want to become a new hoft + inj stream
if "CALIB_STRAIN" in channel_name and channel_name not in hoft_passthrough_channel_dict[instrument]:
# tee here, so we can use a tee later for the adder
strain_dict[channel_name] = pipeparts.mktee(pipeline, channel_src)
strain_dict[instrument+':'+channel_name] = pipeparts.mktee(pipeline, channel_src)
else:
channel_dict[channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8)
channel_dict[instrument+':'+channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8)
#
......@@ -299,6 +325,7 @@ for instrument in hoft_inj_channel_dict:
# want to mux hoft, inj + hoft, and passthrough channels
channel_dict.update(strain_dict)
channel_dict.update(inj_strain_dict)
print(channel_dict.items())
output_stream = pipeparts.mkframecppchannelmux(pipeline, channel_src_map=channel_dict, frame_duration=1, frames_per_file=1)
output_stream = pipeparts.mkprogressreport(pipeline, output_stream, "%s_output_after_mux"%instrument)
......@@ -306,11 +333,13 @@ for instrument in hoft_inj_channel_dict:
#
# Write frames to disk
#
framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument])
# make a queue for filename clean-up
filename_queue = queue.Queue(options.history_len)
framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument], path=output_dir_dict[instrument])
# note that how the output is organized is controlled
# by the function frameccp_filesink_path_handler_simple
framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, output_dir_dict[instrument])
framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, filename_queue)
#
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment