Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
6 files
+ 121
2
Compare changes
  • Side-by-side
  • Inline
Files
6
"""Module for producing source elements
"""
import pathlib
import sys
from typing import List, Tuple
from typing import List, Tuple, Union, Iterable, Optional
from ligo import segments
from gstlal.pipeparts import pipetools, transform, filters
from gstlal.pipeparts import pipetools, transform, filters, mux
from gstlal.utilities import laltools
BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'
@@ -355,3 +357,35 @@ def fake(pipeline: pipetools.Pipeline, instrument: str, channel_name: str, block
volume=volume, is_live=is_live, **properties),
"audio/x-raw, format=F64%s, rate=%d".format(BYTE_ORDER, rate))
return transform.tag_inject(pipeline, caps, "instrument=%s,channel-name=%s,units=strain".format(instrument, channel_name))
def files(pipeline: pipetools.Pipeline, paths: Iterable[Union[str, pathlib.Path]], instrument: str, channel_name: str,
cache_path: Optional[Union[str, pathlib.Path]] = None) -> pipetools.Element:
"""Create a source from a list of file paths
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
paths:
Iterable[Path or str], the full paths to the frame files
cache_path:
Path or str, default None, the path to write out the cache file if specified, else write to temporary directory
Notes:
This is a convenience utility around cache source and framecppdemux that creates a cache file
from a list of file paths
Returns:
Element
"""
cache_path = laltools.create_cache(entries=paths, cache_path=cache_path)
src = cache(pipeline, location=cache_path.as_posix())
demux = mux.framecpp_channel_demux(pipeline, src, do_file_checksum=False, channel_list=["%s:%s".format(instrument, channel_name)])
mux.FrameCPPChannelDemuxSetUnitsHandler(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
# allow frame reading and decoding to occur in a different thread
src = transform.queue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * pipetools.Gst.SECOND)
SrcDeferredLink(demux, "%s:%s".format(instrument, channel_name), src.get_static_pad("sink"))
return src
Loading