diff --git a/gstlal-ugly/bin/gstlal_ll_inj_stream b/gstlal-ugly/bin/gstlal_ll_inj_stream index 0cfd540181fe4728e0eb1b4508b9a378a7aed886..3486105a406705339a05802d378cdcfa334d651e 100755 --- a/gstlal-ugly/bin/gstlal_ll_inj_stream +++ b/gstlal-ugly/bin/gstlal_ll_inj_stream @@ -21,6 +21,7 @@ import logging from optparse import OptionParser import os import queue +import tempfile import sys import numpy @@ -178,8 +179,11 @@ for smd in options.shared_memory_partition: # Output Frame Options output_dir_dict = {} -for out_dir in options.output_dir: - output_dir_dict[out_dir.split('=')[0]] = out_dir.split('=')[1] +for out_dir_by_ifo in options.output_dir: + outdir = out_dir_by_ifo.split('=')[1] + if not os.path.exists(outdir): + os.mkdir(outdir) + output_dir_dict[out_dir_by_ifo.split('=')[0]] = outdir frame_type_dict = {} for ftype in options.frame_type: @@ -215,11 +219,6 @@ if end < now: #create new cache now_to_end_seg = segments.segment((now, end)) new_cache = [c for c in cache_entries if now_to_end_seg.intersects(c.segment)] -print(len(new_cache)) -this_inj_frame_cache = 'temp_caches/%s_inj.cache'%now -with open(this_inj_frame_cache,"w") as f: - f.write("\n".join(["%s" % c for c in new_cache])) - # # ============================================================================= @@ -254,102 +253,108 @@ def check_current_frames(pad, info, output_start, verbose=False): # Mainloop # -for instrument in hoft_inj_channel_dict: +#set up temp file for the new cache +with tempfile.NamedTemporaryFile(mode='w',suffix='.cache') as f: + # write out the new cache to the temp filename + f.write("\n".join(["%s" % c for c in new_cache])) - # - # fetch inj channel from disk - # - - inj_src = pipeparts.mklalcachesrc(pipeline, location=this_inj_frame_cache, cache_src_regex='HKLV') - inj_src = pipeparts.mkprogressreport(pipeline, inj_src, '%s_inj_strain_pre_demux'%instrument) - inj_demux = pipeparts.mkframecppchanneldemux(pipeline, inj_src, do_file_checksum=False, channel_list=list(map("%s:%s".__mod__, hoft_inj_channel_dict.items()))) - - # allow frame reading and decoding to occur in a diffrent - # thread - inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=10 * Gst.SECOND) - pipeparts.src_deferred_link(inj_demux, "%s:%s" % (instrument, hoft_inj_channel_dict[instrument]), inj_strain.get_static_pad("sink")) - inj_strain.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_current_frames, now, LIGOTimeGPS(1,0)) - inj_strain = pipeparts.mkprogressreport(pipeline, inj_strain, '%s_inj_strain_after_demux'%instrument) - - - # - # fetch hoft frames from devshm - # - - hoft_src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=shared_memory_dict[instrument], wait_time=60) - hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10) - hoft_src = pipeparts.mkprogressreport(pipeline, hoft_src, '%s_hoft_src'%instrument) - hoft_demux = pipeparts.mkframecppchanneldemux(pipeline, hoft_src, do_file_checksum=False, skip_bad_files=True) - - # extract strain with 10 buffers of buffering - # Extract all the channels we want to mux in the end and src defer link each of them. - channel_dict = {} - strain_dict = {} - for channel_name in hoft_channel_dict[instrument]: - channel_src = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10) - pipeparts.src_deferred_link(hoft_demux, "%s:%s" % (instrument, channel_name), channel_src.get_static_pad("sink")) - channel_src = pipeparts.mkprogressreport(pipeline, channel_src, "%s:%s" % (instrument, channel_name)) - - # Only add items to strain_dict which we want to become a new hoft + inj stream - if "CALIB_STRAIN" in channel_name and channel_name not in hoft_passthrough_channel_dict[instrument]: - # tee here, so we can use a tee later for the adder - strain_dict[instrument+':'+channel_name] = pipeparts.mktee(pipeline, channel_src) - else: - channel_dict[instrument+':'+channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8) + for instrument in hoft_inj_channel_dict: - # - # Combine strains and re-mux - # + # + # fetch inj channel from disk + # - # tee the the inj strain here, so it can be used multiple times - inj_strain = pipeparts.mktee(pipeline, inj_strain) - inj_strain_dict = {} - for channel, hoft_strain in strain_dict.items(): - # Add a tee to this dict, so we can mux o.g. hoft later - strain_dict[channel] = pipeparts.mkqueue(pipeline, hoft_strain, max_size_time = 10 * Gst.SECOND) - - # large queues are necessary for timestamp synchronization - this_inj_strain = pipeparts.mkqueue(pipeline, inj_strain, max_size_time = 1024 * Gst.SECOND) - - # add injections to hoft - inj_strain_dict[channel+'_INJ'] = pipeparts.mkadder(pipeline, [hoft_strain, this_inj_strain], sync=True, mix_mode="sum") - inj_strain_dict[channel+'_INJ'] = pipeparts.mkprogressreport(pipeline, inj_strain_dict[channel+'_INJ'], "%s_%s_INJ_after_adder"%(instrument,channel)) - # Be sure to sink the queued inj stream - this_inj_strain = pipeparts.mkfakesink(pipeline, this_inj_strain) - - # sink the o.g. inj stream b/c we don't want it later - inj_strain = pipeparts.mkfakesink(pipeline, inj_strain) - - # want to mux hoft, inj + hoft, and passthrough channels - channel_dict.update(strain_dict) - channel_dict.update(inj_strain_dict) - print(channel_dict.items()) - output_stream = pipeparts.mkframecppchannelmux(pipeline, channel_src_map=channel_dict, frame_duration=1, frames_per_file=1) - output_stream = pipeparts.mkprogressreport(pipeline, output_stream, "%s_output_after_mux"%instrument) - - - # - # Write frames to disk - # + inj_src = pipeparts.mklalcachesrc(pipeline, location=f.name, cache_src_regex='HKLV') + inj_src = pipeparts.mkprogressreport(pipeline, inj_src, '%s_inj_strain_pre_demux'%instrument) + inj_demux = pipeparts.mkframecppchanneldemux(pipeline, inj_src, do_file_checksum=False, channel_list=list(map("%s:%s".__mod__, hoft_inj_channel_dict.items()))) - # make a queue for filename clean-up - filename_queue = queue.Queue(options.history_len) - framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument], path=output_dir_dict[instrument]) - # note that how the output is organized is controlled - # by the function frameccp_filesink_path_handler_simple - framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, filename_queue) + # allow frame reading and decoding to occur in a diffrent + # thread + inj_strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=10 * Gst.SECOND) + pipeparts.src_deferred_link(inj_demux, "%s:%s" % (instrument, hoft_inj_channel_dict[instrument]), inj_strain.get_static_pad("sink")) + inj_strain.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, check_current_frames, now, LIGOTimeGPS(1,0)) + inj_strain = pipeparts.mkprogressreport(pipeline, inj_strain, '%s_inj_strain_after_demux'%instrument) -# -# Start pipeline -# + # + # fetch hoft frames from devshm + # + + hoft_src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=shared_memory_dict[instrument], wait_time=60) + hoft_src = pipeparts.mkqueue(pipeline, hoft_src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10) + hoft_src = pipeparts.mkprogressreport(pipeline, hoft_src, '%s_hoft_src'%instrument) + hoft_demux = pipeparts.mkframecppchanneldemux(pipeline, hoft_src, do_file_checksum=False, skip_bad_files=True) + + # extract strain with 10 buffers of buffering + # Extract all the channels we want to mux in the end and src defer link each of them. + channel_dict = {} + strain_dict = {} + for channel_name in hoft_channel_dict[instrument]: + channel_src = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 10) + pipeparts.src_deferred_link(hoft_demux, "%s:%s" % (instrument, channel_name), channel_src.get_static_pad("sink")) + channel_src = pipeparts.mkprogressreport(pipeline, channel_src, "%s:%s" % (instrument, channel_name)) + + # Only add items to strain_dict which we want to become a new hoft + inj stream + if "CALIB_STRAIN" in channel_name and channel_name not in hoft_passthrough_channel_dict[instrument]: + # tee here, so we can use a tee later for the adder + strain_dict[instrument+':'+channel_name] = pipeparts.mktee(pipeline, channel_src) + else: + channel_dict[instrument+':'+channel_name] = pipeparts.mkqueue(pipeline, channel_src, max_size_time = Gst.SECOND * 8) + + + # + # Combine strains and re-mux + # + + # tee the the inj strain here, so it can be used multiple times + inj_strain = pipeparts.mktee(pipeline, inj_strain) + inj_strain_dict = {} + for channel, hoft_strain in strain_dict.items(): + # Add a tee to this dict, so we can mux o.g. hoft later + strain_dict[channel] = pipeparts.mkqueue(pipeline, hoft_strain, max_size_time = 10 * Gst.SECOND) + + # large queues are necessary for timestamp synchronization + this_inj_strain = pipeparts.mkqueue(pipeline, inj_strain, max_size_time = 1024 * Gst.SECOND) + + # add injections to hoft + inj_strain_dict[channel+'_INJ'] = pipeparts.mkadder(pipeline, [hoft_strain, this_inj_strain], sync=True, mix_mode="sum") + inj_strain_dict[channel+'_INJ'] = pipeparts.mkprogressreport(pipeline, inj_strain_dict[channel+'_INJ'], "%s_%s_INJ_after_adder"%(instrument,channel)) + # Be sure to sink the queued inj stream + this_inj_strain = pipeparts.mkfakesink(pipeline, this_inj_strain) + + # sink the o.g. inj stream b/c we don't want it later + inj_strain = pipeparts.mkfakesink(pipeline, inj_strain) + + # want to mux hoft, inj + hoft, and passthrough channels + channel_dict.update(strain_dict) + channel_dict.update(inj_strain_dict) + print(channel_dict.items()) + output_stream = pipeparts.mkframecppchannelmux(pipeline, channel_src_map=channel_dict, frame_duration=1, frames_per_file=1) + output_stream = pipeparts.mkprogressreport(pipeline, output_stream, "%s_output_after_mux"%instrument) + + + # + # Write frames to disk + # + + # make a queue for filename clean-up + filename_queue = queue.Queue(options.history_len) + framesink = pipeparts.mkframecppfilesink(pipeline, output_stream, frame_type=frame_type_dict[instrument], path=output_dir_dict[instrument]) + # note that how the output is organized is controlled + # by the function frameccp_filesink_path_handler_simple + framesink.connect("notify::timestamp", framecpp_filesink_path_handler_simple, filename_queue) + + + # + # Start pipeline + # -logging.info("running pipeline ...") + logging.info("running pipeline ...") -if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE: - raise RuntimeError("pipeline failed to enter PLAYING state") -mainloop.run() + if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE: + raise RuntimeError("pipeline failed to enter PLAYING state") + mainloop.run() -logging.info("shutting down...") + logging.info("shutting down...")