From 8f776391df4220dccaec15e2a4d8a8c192f132d7 Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Tue, 27 Jul 2021 12:22:22 -0700 Subject: [PATCH] simplification/additions to stream API * merge StreamMap/Stream into a single class * store caps within Buffers returned from bufsink() * reflect changes in gstlal_inspiral, lloidparts.py --- gstlal-inspiral/bin/gstlal_inspiral | 28 ++-- gstlal-inspiral/python/lloidparts.py | 4 - gstlal/python/stream.py | 210 +++++++++++++-------------- 3 files changed, 116 insertions(+), 126 deletions(-) diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index e701c24376..f449bfb47e 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -183,7 +183,7 @@ from gstlal import servicediscovery from gstlal import simulation from gstlal import svd_bank from gstlal.psd import read_psd -from gstlal.stream import MessageType, Stream, StreamMap +from gstlal.stream import MessageType, Stream from gstlal.stats.inspiral_lr import LnLRDensity GSTLAL_PROCESS_START_TIME = UTCToGPS(time.gmtime()) @@ -856,11 +856,11 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, # construct SNR slices # - snr_slices = defaultdict(dict) + triggers = stream.remap() for instrument, banklist in banks.items(): - for bank in banklist: + for i, bank in enumerate(banklist): suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or "")) - snr = stream[instrument].create_snr_slices( + head = stream[instrument].create_snr_slices( bank, control_peak_time = options.control_peak_time, fir_stride = options.fir_stride, @@ -869,11 +869,10 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, verbose = options.verbose ) - if stream.source.is_live and instrument not in checked_ifo: - snr = snr.latency(name=f"{instrument}_snrSlice_latency", silent=True) - checked_ifo.add(instrument) + if stream.source.is_live and i == 0: + head = snr.latency(name=f"{instrument}_snrSlice_latency", silent=True) - snr_slices[bank.bank_id][instrument] = snr.checktimestamps(f"timestamps_{suffix}_snr") + triggers[bank.bank_id][instrument] = head.checktimestamps(f"timestamps_{suffix}_snr") # # construct trigger generators @@ -899,8 +898,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, # FIXME: find a way to use less memory without this hack del bank.autocorrelation_bank - for i, (bank_id, head) in enumerate(snr_slices.items()): - head = StreamMap.from_dict(head) + for i, (bank_id, head) in enumerate(triggers.items()): head = head.itacac(**itacac_props[bank_id]) if stream.source.is_live and i == 0: @@ -909,10 +907,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, head = head.queue(max_size_buffers=10, max_size_bytes=0, max_size_time=0) head = head.progressreport(f"progress_xml_bank_{bank_id}") - snr_slices[bank_id] = head - - - triggersrc = StreamMap.from_dict(snr_slices) + triggers[bank_id] = head if options.verbose: print("done", file=sys.stderr) @@ -1025,7 +1020,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, if options.verbose: print("... pipeline handler initialized", file=sys.stderr) - triggersrc.bufsink(tracker.on_buffer, caps=Gst.Caps.from_string("application/x-lal-snglinspiral")) + triggers.bufsink(tracker.on_buffer, caps=Gst.Caps.from_string("application/x-lal-snglinspiral")) # @@ -1074,8 +1069,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url, # - del snr_slices - del triggersrc + del triggers del tracker del bank del banks diff --git a/gstlal-inspiral/python/lloidparts.py b/gstlal-inspiral/python/lloidparts.py index 23cfd3f7c8..9715e5a8f6 100644 --- a/gstlal-inspiral/python/lloidparts.py +++ b/gstlal-inspiral/python/lloidparts.py @@ -82,7 +82,6 @@ from gstlal import datasource from gstlal import pipeparts from gstlal import pipeio from gstlal import plugins -from gstlal.stream import StreamMap # @@ -362,9 +361,6 @@ def mkLLOIDhoftToSnrSlices(pipeline, hoftdict, bank, control_snksrc = (None, Non @param reconstruction_segment_list A segment list object that describes when the control signal should be on. This can be useful in e.g., only reconstructing physical SNRS around the time of injections, which can save an enormous amount of CPU time. """ - if isinstance(hoftdict, StreamMap): - hoftdict = hoftdict.head - # # parameters # diff --git a/gstlal/python/stream.py b/gstlal/python/stream.py index 2423e78d27..f83571d08f 100644 --- a/gstlal/python/stream.py +++ b/gstlal/python/stream.py @@ -67,8 +67,7 @@ from gstlal import simplehandler SourceElem = namedtuple("SourceElem", "datasource is_live gps_range state_vector dq_vector") -Buffer = namedtuple("Buffer", "name t0 duration data") - +Buffer = namedtuple("Buffer", "name t0 duration data caps") MessageType = Gst.MessageType @@ -80,7 +79,15 @@ class Stream: _has_elements = False _caps_buffer_map = None - def __init__(self, name=None, mainloop=None, pipeline=None, handler=None, source=None, head=None): + def __init__( + self, + name=None, + mainloop=None, + pipeline=None, + handler=None, + source=None, + head=None, + ): # initialize threads if not set if not self._thread_init: GObject.threads_init() @@ -101,7 +108,7 @@ class Stream: self.mainloop = mainloop if mainloop else GObject.MainLoop() self.pipeline = pipeline if pipeline else Gst.Pipeline(self.name) self.handler = handler if handler else StreamHandler(self.mainloop, self.pipeline) - self.head = head if head else None + self.head = head if head is not None else {} # set up source elem properties self.source = source if source else None @@ -149,54 +156,51 @@ class Stream: return register @classmethod - def from_datasource(cls, data_source_info, ifos, verbose=False, state_vector=False, dq_vector=False): + def from_datasource( + cls, + data_source_info, + ifos, + verbose=False, + state_vector=False, + dq_vector=False + ): is_live = data_source_info.data_source in data_source_info.live_sources - ref_stream = cls() if isinstance(ifos, str): - stream = ref_stream - stream.head, state_vector, dq_vector = datasource.mkbasicsrc( + ifos = [ifos] + keyed = False + else: + keyed = True + + stream = cls() + state_vectors = {} + dq_vectors = {} + for ifo in ifos: + src, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc( stream.pipeline, data_source_info, - ifos, + ifo, verbose=verbose ) - stream.source = SourceElem( - datasource=data_source_info.data_source, - is_live=is_live, - gps_range=data_source_info.seg, - state_vector=state_vector, - dq_vector=dq_vector + stream[ifo] = cls( + name=stream.name, + mainloop=stream.mainloop, + pipeline=stream.pipeline, + handler=stream.handler, + head=src, ) + stream.source = SourceElem( + datasource=data_source_info.data_source, + is_live=is_live, + gps_range=data_source_info.seg, + state_vector=state_vectors, + dq_vector=dq_vectors + ) + + if keyed: return stream else: - stream_map = {} - state_vectors = {} - dq_vectors = {} - for ifo in ifos: - stream = cls( - name=ref_stream.name, - mainloop=ref_stream.mainloop, - pipeline=ref_stream.pipeline, - handler=ref_stream.handler, - ) - stream.head, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc( - stream.pipeline, - data_source_info, - ifo, - verbose=verbose - ) - stream_map[ifo] = stream - - stream = StreamMap.from_dict(stream_map) - stream.source = SourceElem( - datasource=data_source_info.data_source, - is_live=is_live, - gps_range=data_source_info.seg, - state_vector=state_vectors, - dq_vector=dq_vectors - ) - return stream + return stream[ifos[0]] def connect(self, *args, **kwargs): self.head.connect(*args, **kwargs) @@ -208,9 +212,14 @@ class Stream: func(buf) return Gst.FlowReturn.OK - sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False) - sink.connect("new-sample", sample_handler) - sink.connect("new-preroll", self._preroll_handler) + if isinstance(self.head, Mapping): + self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler) + for key in self.keys(): + self._appsync.add_sink(self.pipeline, self.head[key], name=key) + else: + sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False) + sink.connect("new-sample", sample_handler) + sink.connect("new-preroll", self._preroll_handler) def add_callback(self, msg_type, *args): """ @@ -223,6 +232,53 @@ class Stream: if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE: raise RuntimeError(f"pipeline failed to enter {state.value_name}") + def __getitem__(self, key): + return self.__class__( + name=self.name, + mainloop=self.mainloop, + pipeline=self.pipeline, + handler=self.handler, + source=self.source, + head=self.head.setdefault(key, {}), + ) + + def __setitem__(self, key, value): + if self.pipeline: + assert self.name == value.name + assert self.mainloop is value.mainloop + assert self.pipeline is value.pipeline + assert self.handler is value.handler + assert self.source is value.source + else: + self.name = value.name + self.mainloop = value.mainloop + self.pipeline = value.pipeline + self.handler = value.handler + self.source = value.source + + self.head[key] = value.head + + def keys(self): + yield from self.head.keys() + + def values(self): + for key in self.keys(): + yield self[key] + + def items(self): + for key in self.keys(): + yield key, self[key] + + def remap(self): + return self.__class__( + name=self.name, + mainloop=self.mainloop, + pipeline=self.pipeline, + handler=self.handler, + source=self.source, + head={}, + ) + def _seek_gps(self): """Seek pipeline to the given GPS start/end times. """ @@ -254,18 +310,20 @@ class Stream: memory.unmap(mapinfo) return Buffer( + name=elem.name, t0=buftime, duration=buf.duration, data=data, - name=elem.name, + caps=sample.get_caps(), ) else: return Buffer( + name=elem.name, t0=buftime, duration=buf.duration, data=pipeio.array_from_audio_sample(sample), - name=elem.name, + caps=sample.get_caps(), ) @classmethod @@ -332,64 +390,6 @@ class Stream: return registered -class StreamMap(Stream): - def __getitem__(self, key): - return self.__class__( - name=self.name, - mainloop=self.mainloop, - pipeline=self.pipeline, - handler=self.handler, - source=self.source, - head=self.head[key], - ) - - def __setitem__(self, key, value): - self.head[key] = value - - def keys(self): - yield from self.head.keys() - - def values(self): - for key in self.keys(): - yield self[key] - - def items(self): - for key in self.keys(): - yield key, self[key] - - def bufsink(self, func, caps=None): - def sample_handler(elem): - buf = self._pull_buffer(elem, caps=caps) - if buf: - func(buf) - return Gst.FlowReturn.OK - - self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler) - for key in self.keys(): - self._appsync.add_sink(self.pipeline, self.head[key], name=key) - - @classmethod - def from_dict(cls, stream_dict): - # check that stream properties are consistent - ref_key = next(iter(stream_dict.keys())) - ref_stream = stream_dict[ref_key] - for stream in stream_dict.values(): - assert stream.name == ref_stream.name - assert stream.mainloop is ref_stream.mainloop - assert stream.pipeline is ref_stream.pipeline - assert stream.handler is ref_stream.handler - assert stream.source is ref_stream.source - - return cls( - name=ref_stream.name, - mainloop=ref_stream.mainloop, - pipeline=ref_stream.pipeline, - handler=ref_stream.handler, - source=ref_stream.source, - head={key: stream.head for key, stream in stream_dict.items()} - ) - - class StreamHandler(simplehandler.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -- GitLab