-
Duncan Macleod authoredDuncan Macleod authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
io.py 4.34 KiB
# Copyright (C) 2022-2025 Cardiff University
#
# This file is part of GWDataFind.
#
# GWDataFind is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWDataFind is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWDataFind. If not, see <https://www.gnu.org/licenses/>.
"""I/O (mainly O) routines for GWDataFind."""
import os.path
from collections import namedtuple
from operator import attrgetter
from urllib.parse import urlparse
from .utils import filename_metadata
__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"
class LalCacheEntry(
namedtuple("CacheEntry", ("obs", "tag", "segment", "url")),
):
"""Simplified version of `lal.utils.CacheEntry`.
This is provided so that we don't have to depend on lalsuite.
"""
def __str__(self):
"""Return a `str` representation of this `LalCacheEntry`."""
seg = self.segment
return " ".join(map(str, (
self.obs,
self.tag,
seg[0],
abs(seg),
self.url,
)))
@classmethod
def from_url(cls, url, **kwargs):
"""Create a new `LalCacheEntry` from a URL that follows LIGO-T050017."""
obs, tag, seg = filename_metadata(url)
return cls(obs, tag, seg, url)
def lal_cache(urls):
"""Convert a list of URLs into a LAL cache.
Returns
-------
cache : `list` of `LalCacheEntry`
"""
return [LalCacheEntry.from_url(url) for url in urls]
class OmegaCacheEntry(namedtuple(
"OmegaCacheEntry",
("obs", "tag", "segment", "duration", "url")
)):
"""CacheEntry for an omega-style cache.
Omega-style cache files contain one entry per contiguous directory of
the form:
<obs> <tag> <dir-start> <dir-end> <file-duration> <directory>
"""
def __str__(self):
"""Return a `str` representation of this `OmegaCacheEntry`."""
return " ".join(map(str, (
self.obs,
self.tag,
self.segment[0],
self.segment[1],
self.duration,
self.url,
)))
def omega_cache(cache):
"""Convert a list of `LalCacheEntry` into a list of `OmegaCacheEntry`.
Returns
-------
cache : `list` of `OmegaCacheEntry`
"""
wcache = []
append = wcache.append
wentry = None
for entry in sorted(
cache,
key=attrgetter("obs", "tag", "segment"),
):
dir_ = os.path.dirname(entry.url)
# if this file has the same attributes, goes into the same directory,
# has the same duration, and overlaps with or is contiguous with
# the last file, just add its segment to the last one:
if wcache and (
entry.obs == wentry.obs
and entry.tag == wentry.tag
and dir_ == wentry.url
and abs(entry.segment) == wentry.duration
and (entry.segment.connects(wentry.segment)
or entry.segment.intersects(wentry.segment))
):
wcache[-1] = wentry = OmegaCacheEntry(
wentry.obs,
wentry.tag,
wentry.segment | entry.segment,
wentry.duration,
wentry.url,
)
continue
# otherwise create a new entry in the omega wcache
wentry = OmegaCacheEntry(
entry.obs,
entry.tag,
entry.segment,
abs(entry.segment),
dir_,
)
append(wentry)
return wcache
def format_cache(cache, fmt):
"""Format a list of `LalCacheEntry` into a different format.
Valid formats:
- ``omega`` - return a list of Omega-format cache entries
"""
if fmt == "lal":
return cache
if fmt == "urls":
return [e.url for e in cache]
if fmt == "names":
return [urlparse(e.url).path for e in cache]
if fmt == "omega":
return omega_cache(cache)
raise ValueError(
f"invalid format '{fmt}'",
)