Skip to content
Snippets Groups Projects
Commit 07769a73 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

simplification/additions to stream API

* merge StreamMap/Stream into a single class
* store caps within Buffers returned from bufsink()
* reflect changes in gstlal_inspiral, lloidparts.py
parent aa0828d4
No related branches found
No related tags found
1 merge request!55Add high-level Stream API to build GStreamer pipelines
......@@ -183,7 +183,7 @@ from gstlal import servicediscovery
from gstlal import simulation
from gstlal import svd_bank
from gstlal.psd import read_psd
from gstlal.stream import MessageType, Stream, StreamMap
from gstlal.stream import MessageType, Stream
from gstlal.stats.inspiral_lr import LnLRDensity
GSTLAL_PROCESS_START_TIME = UTCToGPS(time.gmtime())
......@@ -856,11 +856,11 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
# construct SNR slices
#
snr_slices = defaultdict(dict)
triggers = stream.remap()
for instrument, banklist in banks.items():
for bank in banklist:
for i, bank in enumerate(banklist):
suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
snr = stream[instrument].create_snr_slices(
head = stream[instrument].create_snr_slices(
bank,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
......@@ -869,11 +869,10 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
verbose = options.verbose
)
if stream.source.is_live and instrument not in checked_ifo:
snr = snr.latency(name=f"{instrument}_snrSlice_latency", silent=True)
checked_ifo.add(instrument)
if stream.source.is_live and i == 0:
head = snr.latency(name=f"{instrument}_snrSlice_latency", silent=True)
snr_slices[bank.bank_id][instrument] = snr.checktimestamps(f"timestamps_{suffix}_snr")
triggers[bank.bank_id][instrument] = head.checktimestamps(f"timestamps_{suffix}_snr")
#
# construct trigger generators
......@@ -899,8 +898,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
# FIXME: find a way to use less memory without this hack
del bank.autocorrelation_bank
for i, (bank_id, head) in enumerate(snr_slices.items()):
head = StreamMap.from_dict(head)
for i, (bank_id, head) in enumerate(triggers.items()):
head = head.itacac(**itacac_props[bank_id])
if stream.source.is_live and i == 0:
......@@ -909,10 +907,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
head = head.queue(max_size_buffers=10, max_size_bytes=0, max_size_time=0)
head = head.progressreport(f"progress_xml_bank_{bank_id}")
snr_slices[bank_id] = head
triggersrc = StreamMap.from_dict(snr_slices)
triggers[bank_id] = head
if options.verbose:
print("done", file=sys.stderr)
......@@ -1025,7 +1020,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
if options.verbose:
print("... pipeline handler initialized", file=sys.stderr)
triggersrc.bufsink(tracker.on_buffer, caps=Gst.Caps.from_string("application/x-lal-snglinspiral"))
triggers.bufsink(tracker.on_buffer, caps=Gst.Caps.from_string("application/x-lal-snglinspiral"))
#
......@@ -1074,8 +1069,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
#
del snr_slices
del triggersrc
del triggers
del tracker
del bank
del banks
......
......@@ -82,7 +82,6 @@ from gstlal import datasource
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import plugins
from gstlal.stream import StreamMap
#
......@@ -362,9 +361,6 @@ def mkLLOIDhoftToSnrSlices(pipeline, hoftdict, bank, control_snksrc = (None, Non
@param reconstruction_segment_list A segment list object that describes when the control signal should be on. This can be useful in e.g., only reconstructing physical SNRS around the time of injections, which can save an enormous amount of CPU time.
"""
if isinstance(hoftdict, StreamMap):
hoftdict = hoftdict.head
#
# parameters
#
......
......@@ -67,8 +67,7 @@ from gstlal import simplehandler
SourceElem = namedtuple("SourceElem", "datasource is_live gps_range state_vector dq_vector")
Buffer = namedtuple("Buffer", "name t0 duration data")
Buffer = namedtuple("Buffer", "name t0 duration data caps")
MessageType = Gst.MessageType
......@@ -80,7 +79,15 @@ class Stream:
_has_elements = False
_caps_buffer_map = None
def __init__(self, name=None, mainloop=None, pipeline=None, handler=None, source=None, head=None):
def __init__(
self,
name=None,
mainloop=None,
pipeline=None,
handler=None,
source=None,
head=None,
):
# initialize threads if not set
if not self._thread_init:
GObject.threads_init()
......@@ -101,7 +108,7 @@ class Stream:
self.mainloop = mainloop if mainloop else GObject.MainLoop()
self.pipeline = pipeline if pipeline else Gst.Pipeline(self.name)
self.handler = handler if handler else StreamHandler(self.mainloop, self.pipeline)
self.head = head if head else None
self.head = head if head is not None else {}
# set up source elem properties
self.source = source if source else None
......@@ -149,54 +156,51 @@ class Stream:
return register
@classmethod
def from_datasource(cls, data_source_info, ifos, verbose=False, state_vector=False, dq_vector=False):
def from_datasource(
cls,
data_source_info,
ifos,
verbose=False,
state_vector=False,
dq_vector=False
):
is_live = data_source_info.data_source in data_source_info.live_sources
ref_stream = cls()
if isinstance(ifos, str):
stream = ref_stream
stream.head, state_vector, dq_vector = datasource.mkbasicsrc(
ifos = [ifos]
keyed = False
else:
keyed = True
stream = cls()
state_vectors = {}
dq_vectors = {}
for ifo in ifos:
src, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc(
stream.pipeline,
data_source_info,
ifos,
ifo,
verbose=verbose
)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vector,
dq_vector=dq_vector
stream[ifo] = cls(
name=stream.name,
mainloop=stream.mainloop,
pipeline=stream.pipeline,
handler=stream.handler,
head=src,
)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vectors,
dq_vector=dq_vectors
)
if keyed:
return stream
else:
stream_map = {}
state_vectors = {}
dq_vectors = {}
for ifo in ifos:
stream = cls(
name=ref_stream.name,
mainloop=ref_stream.mainloop,
pipeline=ref_stream.pipeline,
handler=ref_stream.handler,
)
stream.head, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc(
stream.pipeline,
data_source_info,
ifo,
verbose=verbose
)
stream_map[ifo] = stream
stream = StreamMap.from_dict(stream_map)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vectors,
dq_vector=dq_vectors
)
return stream
return stream[ifos[0]]
def connect(self, *args, **kwargs):
self.head.connect(*args, **kwargs)
......@@ -208,9 +212,14 @@ class Stream:
func(buf)
return Gst.FlowReturn.OK
sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False)
sink.connect("new-sample", sample_handler)
sink.connect("new-preroll", self._preroll_handler)
if isinstance(self.head, Mapping):
self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler)
for key in self.keys():
self._appsync.add_sink(self.pipeline, self.head[key], name=key)
else:
sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False)
sink.connect("new-sample", sample_handler)
sink.connect("new-preroll", self._preroll_handler)
def add_callback(self, msg_type, *args):
"""
......@@ -223,6 +232,53 @@ class Stream:
if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"pipeline failed to enter {state.value_name}")
def __getitem__(self, key):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head=self.head.setdefault(key, {}),
)
def __setitem__(self, key, value):
if self.pipeline:
assert self.name == value.name
assert self.mainloop is value.mainloop
assert self.pipeline is value.pipeline
assert self.handler is value.handler
assert self.source is value.source
else:
self.name = value.name
self.mainloop = value.mainloop
self.pipeline = value.pipeline
self.handler = value.handler
self.source = value.source
self.head[key] = value.head
def keys(self):
yield from self.head.keys()
def values(self):
for key in self.keys():
yield self[key]
def items(self):
for key in self.keys():
yield key, self[key]
def remap(self):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head={},
)
def _seek_gps(self):
"""Seek pipeline to the given GPS start/end times.
"""
......@@ -254,18 +310,20 @@ class Stream:
memory.unmap(mapinfo)
return Buffer(
name=elem.name,
t0=buftime,
duration=buf.duration,
data=data,
name=elem.name,
caps=sample.get_caps(),
)
else:
return Buffer(
name=elem.name,
t0=buftime,
duration=buf.duration,
data=pipeio.array_from_audio_sample(sample),
name=elem.name,
caps=sample.get_caps(),
)
@classmethod
......@@ -332,64 +390,6 @@ class Stream:
return registered
class StreamMap(Stream):
def __getitem__(self, key):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head=self.head[key],
)
def __setitem__(self, key, value):
self.head[key] = value
def keys(self):
yield from self.head.keys()
def values(self):
for key in self.keys():
yield self[key]
def items(self):
for key in self.keys():
yield key, self[key]
def bufsink(self, func, caps=None):
def sample_handler(elem):
buf = self._pull_buffer(elem, caps=caps)
if buf:
func(buf)
return Gst.FlowReturn.OK
self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler)
for key in self.keys():
self._appsync.add_sink(self.pipeline, self.head[key], name=key)
@classmethod
def from_dict(cls, stream_dict):
# check that stream properties are consistent
ref_key = next(iter(stream_dict.keys()))
ref_stream = stream_dict[ref_key]
for stream in stream_dict.values():
assert stream.name == ref_stream.name
assert stream.mainloop is ref_stream.mainloop
assert stream.pipeline is ref_stream.pipeline
assert stream.handler is ref_stream.handler
assert stream.source is ref_stream.source
return cls(
name=ref_stream.name,
mainloop=ref_stream.mainloop,
pipeline=ref_stream.pipeline,
handler=ref_stream.handler,
source=ref_stream.source,
head={key: stream.head for key, stream in stream_dict.items()}
)
class StreamHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment