Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
3 files
+ 116
126
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 105
105
@@ -67,8 +67,7 @@ from gstlal import simplehandler
SourceElem = namedtuple("SourceElem", "datasource is_live gps_range state_vector dq_vector")
Buffer = namedtuple("Buffer", "name t0 duration data")
Buffer = namedtuple("Buffer", "name t0 duration data caps")
MessageType = Gst.MessageType
@@ -80,7 +79,15 @@ class Stream:
_has_elements = False
_caps_buffer_map = None
def __init__(self, name=None, mainloop=None, pipeline=None, handler=None, source=None, head=None):
def __init__(
self,
name=None,
mainloop=None,
pipeline=None,
handler=None,
source=None,
head=None,
):
# initialize threads if not set
if not self._thread_init:
GObject.threads_init()
@@ -101,7 +108,7 @@ class Stream:
self.mainloop = mainloop if mainloop else GObject.MainLoop()
self.pipeline = pipeline if pipeline else Gst.Pipeline(self.name)
self.handler = handler if handler else StreamHandler(self.mainloop, self.pipeline)
self.head = head if head else None
self.head = head if head is not None else {}
# set up source elem properties
self.source = source if source else None
@@ -149,54 +156,51 @@ class Stream:
return register
@classmethod
def from_datasource(cls, data_source_info, ifos, verbose=False, state_vector=False, dq_vector=False):
def from_datasource(
cls,
data_source_info,
ifos,
verbose=False,
state_vector=False,
dq_vector=False
):
is_live = data_source_info.data_source in data_source_info.live_sources
ref_stream = cls()
if isinstance(ifos, str):
stream = ref_stream
stream.head, state_vector, dq_vector = datasource.mkbasicsrc(
ifos = [ifos]
keyed = False
else:
keyed = True
stream = cls()
state_vectors = {}
dq_vectors = {}
for ifo in ifos:
src, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc(
stream.pipeline,
data_source_info,
ifos,
ifo,
verbose=verbose
)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vector,
dq_vector=dq_vector
stream[ifo] = cls(
name=stream.name,
mainloop=stream.mainloop,
pipeline=stream.pipeline,
handler=stream.handler,
head=src,
)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vectors,
dq_vector=dq_vectors
)
if keyed:
return stream
else:
stream_map = {}
state_vectors = {}
dq_vectors = {}
for ifo in ifos:
stream = cls(
name=ref_stream.name,
mainloop=ref_stream.mainloop,
pipeline=ref_stream.pipeline,
handler=ref_stream.handler,
)
stream.head, state_vectors[ifo], dq_vectors[ifo] = datasource.mkbasicsrc(
stream.pipeline,
data_source_info,
ifo,
verbose=verbose
)
stream_map[ifo] = stream
stream = StreamMap.from_dict(stream_map)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vectors,
dq_vector=dq_vectors
)
return stream
return stream[ifos[0]]
def connect(self, *args, **kwargs):
self.head.connect(*args, **kwargs)
@@ -208,9 +212,14 @@ class Stream:
func(buf)
return Gst.FlowReturn.OK
sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False)
sink.connect("new-sample", sample_handler)
sink.connect("new-preroll", self._preroll_handler)
if isinstance(self.head, Mapping):
self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler)
for key in self.keys():
self._appsync.add_sink(self.pipeline, self.head[key], name=key)
else:
sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False)
sink.connect("new-sample", sample_handler)
sink.connect("new-preroll", self._preroll_handler)
def add_callback(self, msg_type, *args):
"""
@@ -223,6 +232,53 @@ class Stream:
if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"pipeline failed to enter {state.value_name}")
def __getitem__(self, key):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head=self.head.setdefault(key, {}),
)
def __setitem__(self, key, value):
if self.pipeline:
assert self.name == value.name
assert self.mainloop is value.mainloop
assert self.pipeline is value.pipeline
assert self.handler is value.handler
assert self.source is value.source
else:
self.name = value.name
self.mainloop = value.mainloop
self.pipeline = value.pipeline
self.handler = value.handler
self.source = value.source
self.head[key] = value.head
def keys(self):
yield from self.head.keys()
def values(self):
for key in self.keys():
yield self[key]
def items(self):
for key in self.keys():
yield key, self[key]
def remap(self):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head={},
)
def _seek_gps(self):
"""Seek pipeline to the given GPS start/end times.
"""
@@ -254,18 +310,20 @@ class Stream:
memory.unmap(mapinfo)
return Buffer(
name=elem.name,
t0=buftime,
duration=buf.duration,
data=data,
name=elem.name,
caps=sample.get_caps(),
)
else:
return Buffer(
name=elem.name,
t0=buftime,
duration=buf.duration,
data=pipeio.array_from_audio_sample(sample),
name=elem.name,
caps=sample.get_caps(),
)
@classmethod
@@ -332,64 +390,6 @@ class Stream:
return registered
class StreamMap(Stream):
def __getitem__(self, key):
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head=self.head[key],
)
def __setitem__(self, key, value):
self.head[key] = value
def keys(self):
yield from self.head.keys()
def values(self):
for key in self.keys():
yield self[key]
def items(self):
for key in self.keys():
yield key, self[key]
def bufsink(self, func, caps=None):
def sample_handler(elem):
buf = self._pull_buffer(elem, caps=caps)
if buf:
func(buf)
return Gst.FlowReturn.OK
self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler)
for key in self.keys():
self._appsync.add_sink(self.pipeline, self.head[key], name=key)
@classmethod
def from_dict(cls, stream_dict):
# check that stream properties are consistent
ref_key = next(iter(stream_dict.keys()))
ref_stream = stream_dict[ref_key]
for stream in stream_dict.values():
assert stream.name == ref_stream.name
assert stream.mainloop is ref_stream.mainloop
assert stream.pipeline is ref_stream.pipeline
assert stream.handler is ref_stream.handler
assert stream.source is ref_stream.source
return cls(
name=ref_stream.name,
mainloop=ref_stream.mainloop,
pipeline=ref_stream.pipeline,
handler=ref_stream.handler,
source=ref_stream.source,
head={key: stream.head for key, stream in stream_dict.items()}
)
class StreamHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Loading