diff --git a/gstlal-ugly/bin/gstlal_ll_inj_stream b/gstlal-ugly/bin/gstlal_ll_inj_stream index c60f1257810bb9abae9e17b3643095a5a4b610dc..3c146884aae4de60445e6cea1a05040dd96f1911 100755 --- a/gstlal-ugly/bin/gstlal_ll_inj_stream +++ b/gstlal-ugly/bin/gstlal_ll_inj_stream @@ -126,13 +126,13 @@ def framecpp_filesink_path_handler_simple(elem: pipetools.Element, pspec, filena filename = os.path.join(path, "%s-%s-%d-%d.gwf"%(instrument, frame_type, timestamp, 1)) if filename_q.full(): old_filename = filename_q.get() - logging.debug('removing old file from disk: %s'%old_filename) + logging.debug('removing old file from disk: %s', old_filename) os.unlink(old_filename) filename_q.put(filename) - logging.debug('adding new file to queue: %s'%filename) + logging.debug('adding new file to queue: %s', filename) else: filename_q.put(filename) - logging.debug('adding new file to queue: %s'%filename) + logging.debug('adding new file to queue: %s', filename) # @@ -244,7 +244,7 @@ def check_current_frames(pad, info, output_start): startts = LIGOTimeGPS(0, buf.pts) duration = LIGOTimeGPS(0, buf.duration) if startts < output_start: - logging.debug("Dropping frame because start time %f is less than desired output start time %f" % (startts, output_start)) + logging.debug("Dropping frame because start time %f is less than desired output start time %f", startts, output_start) return Gst.PadProbeReturn.DROP return Gst.PadProbeReturn.OK @@ -258,6 +258,8 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f: # write out the new cache to the temp filename f.write("\n".join(["%s" % c for c in new_cache])) + # fetch the duration of the frame files to use in queue lengths + frame_duration = int(abs(new_cache[0].segment)) for instrument in hoft_inj_channel_dict: @@ -273,7 +275,7 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f: # allow frame reading and decoding to occur in a diffrent # thread - inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=10 * Gst.SECOND) + inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=frame_duration * Gst.SECOND) pipeparts.src_deferred_link(inj_demux, "%s:%s" % (instrument, hoft_inj_channel_dict[instrument]), inj_strain.get_static_pad("sink")) inj_strain.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_current_frames, now) if verbose: @@ -285,7 +287,7 @@ with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f: # hoft_src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=shared_memory_dict[instrument], wait_time=60) - hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10) + hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * frame_duration) if verbose: hoft_src = pipeparts.mkprogressreport(pipeline, hoft_src, '%s_hoft_src'%instrument) hoft_demux = pipeparts.mkframecppchanneldemux(pipeline, hoft_src, do_file_checksum=False, skip_bad_files=True)