Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
1 file
+ 135
6
Compare changes
  • Side-by-side
  • Inline
+ 135
6
@@ -71,6 +71,30 @@ class Stream:
source: Optional["SourceElem"] = None,
head: Union[MappingType[str, Gst.Element], Gst.Element, None] = None,
) -> None:
"""Create a Stream that can be used to build a GStreamer-based pipeline.
Args:
name:
str, a name for the GStreamer pipeline (optional).
If not set, generates a unique name.
mainloop:
GLib.MainLoop, the GLib event loop to drive the GStreamer pipeline.
If not set, one will be created.
pipeline:
Gst.Pipeline, the GStreamer pipeline object that contains the pipeline graph.
If not set, one will be created.
handler:
StreamHandler, a handler which registers callbacks upon new bus messages and
stops the event loop upon EOS. If not set, one will be created.
source:
SourceElem, an object that stores source information as well as state/DQ vector
elements. If not set, one will be created.
head:
Union[MappingType[str, Gst.Element], Gst.Element], a pointer to the current
element in the pipeline. If not set, the Stream will not have any elements
attached to the pipeline upon instantiation.
"""
# initialize GStreamer if needed
if not self._gst_init:
Gst.init(None)
@@ -96,7 +120,8 @@ class Stream:
self.source = source if source else None
def start(self) -> None:
"""Start up the pipeline.
"""Start the main event loop for this stream.
"""
if self.source.is_live:
simplehandler.OneTimeSignalHandler(self.pipeline)
@@ -122,6 +147,15 @@ class Stream:
@classmethod
def register_element(cls, elem_name: str) -> Callable[[Gst.Element], None]:
"""Register an element to the stream, making it callable.
Args:
elem_name:
str, the method name in which to bind this element to
Returns:
Callable[[Gst.Element], None], a function which registers a function
that returns a Gst.Element.
"""
def register(func: Callable[..., Gst.Element]) -> None:
def attach_element(self, *srcs, **kwargs) -> "Stream":
@@ -146,6 +180,24 @@ class Stream:
state_vector: bool = False,
dq_vector: bool = False
) -> "Stream":
"""Construct a Stream from a datasource.DataSourceInfo object.
Args:
data_source_info:
DataSourceInfo, the object to construct this stream with.
ifos:
Union[str, Iterable[str]], the detectors read timeseries data for.
verbose:
bool, default False, whether to display logging/progress information.
state_vector:
bool, default False, whether to attach state vector information to this Stream
dq_vector:
bool, default False, whether to attach data quality vector information to this Stream
Returns:
Stream, the newly created stream.
"""
is_live = data_source_info.data_source in datasource.KNOWN_LIVE_DATASOURCES
if isinstance(ifos, str):
ifos = [ifos]
@@ -186,6 +238,9 @@ class Stream:
return stream[ifos[0]]
def connect(self, *args, **kwargs) -> None:
"""Attach a callback to one of this element's signals.
"""
self.head.connect(*args, **kwargs)
def bufsink(
@@ -193,6 +248,16 @@ class Stream:
func: Callable[[Buffer], None],
caps: Optional[Gst.Caps] = None
) -> None:
"""Terminate this stream with an appsink element and process new buffers with a callback.
Args:
func:
Callable[[Buffer], None], a callback that gets invoked when a new buffer is available
caps:
Gst.Caps, how to interpret the contents of the raw buffers.
If not set, defaults to raw audio buffers (audio/x-raw).
"""
def sample_handler(elem: Gst.Element):
buf = self._pull_buffer(elem, caps=caps)
if buf:
@@ -209,20 +274,55 @@ class Stream:
sink.connect("new-preroll", self._preroll_handler)
def add_callback(self, msg_type: Gst.MessageType, *args) -> None:
"""
"""Attach a callback which get invoked when new bus messages are available.
Args:
msg_type:
Gst.MessageType, the type of message to invoke a callback for.
*args:
extra arguments
"""
self.handler.add_callback(msg_type, *args)
def set_state(self, state: Gst.State) -> None:
"""Set pipeline state, checking for errors.
Args:
state:
Gst.State: The state to set this stream's pipeline to.
Raises:
RuntimeError:
If the pipeline failed to transition to the state specified.
"""
if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"pipeline failed to enter {state.value_name}")
def get_element_by_name(self, name: str) -> Gst.Element:
"""Retrieve an element from the stream's pipeline by name.
Args:
name:
str, the name of the element to retrieve
Returns:
Gst.Element, the element associated with the name given.
"""
return self.pipeline.get_by_name(name)
def post_message(self, msg_name: None, timestamp: Optional[int] = None) -> None:
"""Post a new application message to this stream's bus.
Args:
msg_name:
str, the name of the application message to send.
timestamp:
(int, optional), the timestamp to attach to this message.
"""
s = Gst.Structure.new_empty(msg_name)
message = Gst.Message.new_application(self.pipeline, s)
if timestamp:
@@ -230,6 +330,9 @@ class Stream:
self.pipeline.get_bus().post(message)
def __getitem__(self, key: str) -> "Stream":
"""Retrieves a new Stream with specified key.
"""
return self.__class__(
name=self.name,
mainloop=self.mainloop,
@@ -240,6 +343,9 @@ class Stream:
)
def __setitem__(self, key: str, value: "Stream") -> None:
"""Attach a new Stream with specified key/value pair.
"""
if self.pipeline:
assert self.name == value.name
assert self.mainloop is value.mainloop
@@ -267,6 +373,9 @@ class Stream:
yield key, self[key]
def remap(self) -> "Stream":
"""Return a new stream with all pointers to elements cleared out.
"""
return self.__class__(
name=self.name,
mainloop=self.mainloop,
@@ -277,7 +386,8 @@ class Stream:
)
def _seek_gps(self) -> None:
"""Seek pipeline to the given GPS start/end times.
"""Seek pipeline to the given gps start/end times.
"""
start, end = self.source.gps_range
datasource.pipeline_seek_for_gps(self.pipeline, start, end)
@@ -360,6 +470,7 @@ class Stream:
@classmethod
def _get_registered_elements(cls) -> MappingType[str, Callable[..., Gst.Element]]:
"""Get all registered GStreamer elements.
"""
# set up plugin manager
manager = pluggy.PluginManager("gstlal")
@@ -398,8 +509,15 @@ class StreamHandler(simplehandler.Handler):
Gst.MessageType.EOS: {},
}
def add_callback(self, msg_type: Gst.MessageType, *args):
"""
def add_callback(self, msg_type: Gst.MessageType, *args) -> None:
"""Attach a callback which get invoked when new bus messages are available.
Args:
msg_type:
Gst.MessageType, the type of message to invoke a callback for.
*args:
extra arguments
"""
# FIXME: would be better to rearrange the method signature so
# this extra step to determine args doesn't need to be done
@@ -413,7 +531,18 @@ class StreamHandler(simplehandler.Handler):
self.callbacks[msg_type][msg_name] = callback
def do_on_message(self, bus: Gst.Bus, message: Gst.Message):
"""
"""Invoke registered callbacks when new bus messages are received.
Args:
bus:
Gst.Bus, the GStreamer bus.
message:
Gst.Message, the message received.
Returns:
bool, whether further message handling is performed by the parent class
with default cases for EOS, INFO, WARNING and ERROR messages.
"""
if message.type in self.callbacks:
if message.type == Gst.MessageType.EOS:
Loading