Skip to content
Snippets Groups Projects
Commit 42b5a4e5 authored by James Kennington's avatar James Kennington
Browse files

Pipepart element for file source

parent cd9f8c4f
No related branches found
No related tags found
1 merge request!119Pipepart element for file source
Pipeline #312775 passed
*.gwf filter=lfs diff=lfs merge=lfs -text
"""Module for producing source elements
"""
import pathlib
import sys
from typing import List, Tuple
from typing import List, Tuple, Union, Iterable, Optional
from ligo import segments
from gstlal.pipeparts import pipetools, transform, filters
from gstlal.pipeparts import pipetools, transform, filters, mux
from gstlal.utilities import laltools
BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'
......@@ -355,3 +357,35 @@ def fake(pipeline: pipetools.Pipeline, instrument: str, channel_name: str, block
volume=volume, is_live=is_live, **properties),
"audio/x-raw, format=F64%s, rate=%d".format(BYTE_ORDER, rate))
return transform.tag_inject(pipeline, caps, "instrument=%s,channel-name=%s,units=strain".format(instrument, channel_name))
def files(pipeline: pipetools.Pipeline, paths: Iterable[Union[str, pathlib.Path]], instrument: str, channel_name: str,
cache_path: Optional[Union[str, pathlib.Path]] = None) -> pipetools.Element:
"""Create a source from a list of file paths
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
paths:
Iterable[Path or str], the full paths to the frame files
cache_path:
Path or str, default None, the path to write out the cache file if specified, else write to temporary directory
Notes:
This is a convenience utility around cache source and framecppdemux that creates a cache file
from a list of file paths
Returns:
Element
"""
cache_path = laltools.create_cache(entries=paths, cache_path=cache_path)
src = cache(pipeline, location=cache_path.as_posix())
demux = mux.framecpp_channel_demux(pipeline, src, do_file_checksum=False, channel_list=["%s:%s".format(instrument, channel_name)])
mux.FrameCPPChannelDemuxSetUnitsHandler(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
# allow frame reading and decoding to occur in a different thread
src = transform.queue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * pipetools.Gst.SECOND)
SrcDeferredLink(demux, "%s:%s".format(instrument, channel_name), src.get_static_pad("sink"))
return src
......@@ -5,4 +5,5 @@ utilities_PYTHON = \
__init__.py \
admin.py \
element_registry.py \
laltools.py \
testtools.py
"""This module contains miscellaneous utilities for working with LAL data structures and api
"""
import os
import pathlib
import tempfile
from typing import Union, Iterable, Optional
from lal.utils import CacheEntry
def create_cache(entries: Iterable[Union[str, pathlib.Path, CacheEntry]], cache_path: Optional[Union[str, pathlib.Path]] = None,
use_os_tmpdir: bool = True) -> pathlib.Path:
"""Create a LAL cache file from an iterable of entries
Args:
entries:
Iterable of either str, Path, or CacheEntry. If str or Path, a CacheEntry will be created using CacheEntry.from_T050017
cache_path:
use_os_tmpdir:
bool, if True use 'TMPDIR' env variable else create a tmp directory using tempfile.TemporaryDirectory. Only applies if
cache_path argument is None.
Returns:
pathlib.Path, the path to the cache file
"""
# Coerce entry types
cache_entries = []
for entry in entries:
if isinstance(entry, str):
entry = pathlib.Path(entry)
if isinstance(entry, pathlib.Path):
entry = CacheEntry.from_T050017(url=entry.as_uri())
cache_entries.append(entry)
# Set cache path
if cache_path is None:
tmpdir = os.getenv('TMPDIR') if use_os_tmpdir else tempfile.TemporaryDirectory()
cache_path = tempfile.NamedTemporaryFile(suffix='.cache', dir=tmpdir).name
if isinstance(cache_path, str):
cache_path = pathlib.Path(cache_path)
# Write cache file
with open(cache_path.as_posix(), 'w') as fid:
for entry in cache_entries:
fid.write(str(entry))
return cache_path
File added
"""Unit tests for laltools module
"""
import pathlib
import tempfile
from lal.utils import CacheEntry
from gstlal.utilities import laltools
DATA_ROOT = pathlib.Path(__file__).parent.parent.parent / 'data'
FILE_1 = DATA_ROOT / 'L-L1_GWOSC_O3a_4KHZ_R1-1238437888-4096.gwf'
entries = [
FILE_1.as_posix(),
FILE_1,
CacheEntry.from_T050017(FILE_1.as_uri()),
]
class TestCache:
"""Tests for LAL cache creation"""
def test_cache(self):
"""Test cache"""
with tempfile.TemporaryDirectory() as tmpdir:
cache_path = pathlib.Path(tmpdir) / 'tmp.cache'
res_path = laltools.create_cache(entries, cache_path)
assert res_path.exists()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment