Skip to content
Snippets Groups Projects
Commit c46f0873 authored by Rachael Huxford's avatar Rachael Huxford
Browse files

Extension of queues for more stable long-term running.

parent 96dfc3d5
No related branches found
No related tags found
2 merge requests!612Combined online-offline ranks,!535Logging Patch + Extension of Queues in Injection Streamer
......@@ -126,13 +126,13 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, filena
filename = os.path.join(path, "%s-%s-%d-%d.gwf"%(instrument, frame_type, timestamp, 1))
if filename_q.full():
old_filename = filename_q.get()
logging.debug('removing old file from disk: %s'%old_filename)
logging.debug('removing old file from disk: %s', old_filename)
os.unlink(old_filename)
filename_q.put(filename)
logging.debug('adding new file to queue: %s'%filename)
logging.debug('adding new file to queue: %s', filename)
else:
filename_q.put(filename)
logging.debug('adding new file to queue: %s'%filename)
logging.debug('adding new file to queue: %s', filename)
#
......@@ -244,7 +244,7 @@ def check_current_frames(pad, info, output_start):
startts = LIGOTimeGPS(0, buf.pts)
duration = LIGOTimeGPS(0, buf.duration)
if startts < output_start:
logging.debug("Dropping frame because start time %f is less than desired output start time %f" % (startts, output_start))
logging.debug("Dropping frame because start time %f is less than desired output start time %f", startts, output_start)
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
......@@ -258,6 +258,8 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f:
# write out the new cache to the temp filename
f.write("\n".join(["%s" % c for c in new_cache]))
# fetch the duration of the frame files to use in queue lengths
frame_duration = int(abs(new_cache[0].segment))
for instrument in hoft_inj_channel_dict:
......@@ -273,7 +275,7 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f:
# allow frame reading and decoding to occur in a diffrent
# thread
inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=10 * Gst.SECOND)
inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=frame_duration * Gst.SECOND)
pipeparts.src_deferred_link(inj_demux, "%s:%s" % (instrument, hoft_inj_channel_dict[instrument]), inj_strain.get_static_pad("sink"))
inj_strain.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_current_frames, now)
if verbose:
......@@ -285,7 +287,7 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f:
#
hoft_src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=shared_memory_dict[instrument], wait_time=60)
hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10)
hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * frame_duration)
if verbose:
hoft_src = pipeparts.mkprogressreport(pipeline, hoft_src, '%s_hoft_src'%instrument)
hoft_demux = pipeparts.mkframecppchanneldemux(pipeline, hoft_src, do_file_checksum=False, skip_bad_files=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment