Initial commit

parents
Pipeline #60299 failed with stages
in 22 seconds
# filetypes to ignore
*.pyc
.DS_store
*.swp
# build files/directories to ignore
/dist/
/build/
/.cache/
/*.egg-info/
/.eggs/
__pycache__/
/.coverage
.pytest_cache/
/docs/_build/
/docs/api/
stages:
- lint
- dist
- build
- test
variables:
FLASK_ENV: "development"
LDR_LOCATION: "${CI_PROJECT_DIR}/tests"
PIP_CACHE_DIR: "${CI_PROJECT_DIR}/.cache/pip"
.dist-artifacts: &dist-artifacts
after_script:
# list files
- find dist
artifacts:
expire_in: 18h
paths:
- dist
# -- lint -------------------
lint:
stage: lint
image: python:3.7
before_script:
- python -m pip install "flake8>=3.7.0"
script:
- python -m flake8 --output-file "flake8-report.txt"
after_script:
- python -m pip install "flake8-junit-report"
- python -m junit_conversor "flake8-report.txt" "junit.xml"
artifacts:
reports:
junit: junit.xml
# -- dist -------------------
dist:tarball:
<<: *dist-artifacts
stage: dist
image: python:3.6
script:
- python -m pip install setuptools
- python setup.py sdist bdist_wheel
# -- build ------------------
.build: &build
<<: *dist-artifacts
stage: build
dependencies:
- dist:tarball
.build:debian: &build-debian
<<: *build
before_script:
- apt-get update -yqq
- apt-get install -yq
dpkg-dev
devscripts
script:
- DIST_NAME=${CI_JOB_NAME#build:debian:}
- mkdir -pv dist/${DIST_NAME}
# unpack tarball
- mkdir -pv build
- tar -xvf dist/gwdatafind-server-*.tar.gz -C ./build --strip-components=1
- pushd build
# install build dependencies
- mk-build-deps --tool "apt-get -y" --install --remove
# build debian packages
- dpkg-buildpackage -us -uc -b
# move things into dist/
- popd
- mv -v {python,python3}-gwdatafind-server_*.deb gwdatafind-server_*.{buildinfo,changes} dist/${DIST_NAME}/
build:debian:stretch:
<<: *build-debian
image: ligo/base:stretch
# -- test -------------------
.test: &test
stage: test
image: python
dependencies:
- dist:tarball
before_script:
- python -m pip install dist/gwdatafind-server*.tar.gz
- python -m pip install
"pytest>=2.8.0"
"pytest-cov"
script:
- python -m pytest tests --cov=gwdatafind_server --junitxml=junit.xml
cache:
key: "${CI_JOB_NAME}"
paths:
- .cache/pip
artifacts:
reports:
junit: junit.xml
test:python3.5:
<<: *test
image: python:3.5
test:python3.6:
<<: *test
image: python:3.6
test:python3.7:
<<: *test
image: python:3.7
This diff is collapsed.
include LICENSE
recursive-include debian *
include gwdatafindserver.spec
# GWDataFind Server
This module defines a Flask App that serves URLs based on the contents of a diskcache
gwdatafind-server (0.1.0-1) unstable; urgency=low
* first release of new server
-- Duncan Macleod <duncan.macleod@ligo.org> Mon, 10 Aug 2018 00:00:00 +0100
# -- gwdatafind-server source package -----------------------------------------------
Source: gwdatafindserver
Maintainer: Duncan Macleod <duncan.macleod@ligo.org>
Section: python
Priority: optional
Standards-Version: 3.9.1
X-Python3-Version: >= 3.4
Build-Depends: debhelper (>= 9),
dh-python,
python3-all,
python3-setuptools,
python3-configobj,
python3-ligo-segments,
# -- python3-gwdatafind-server ------------------
Package: python3-gwdatafindserver
Architecture: all
Depends: ${misc:Depends},
${python3:Depends},
python3-configobj,
python3-ligo-segments
Description: The server library for the GWDataFind service
.
The DataFind service allows users to query for the location of
files containing data from the current
gravitational-wave detectors.
.
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Contact: Duncan Macleod <duncan.macleod@ligo.org>
Source: https://git.ligo.org/duncanmmacleod/gwdatafind-server
Files: *
Copyright: 2018 Duncan Macleod <duncan.macleod@ligo.org>
License: GPL-3+
License: GPL-3+
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
.
On Debian systems, the complete text of the GNU General
Public License version 3 can be found in `/usr/share/common-licenses/GPL-3'.
#!/usr/bin/make -f
export PYBUILD_NAME = gwdatafind-server
export PYBUILD_DISABLE = test
%:
dh $@ --with python3 --buildsystem=pybuild
# -*- coding: utf-8 -*-
# Copyright (2019) Duncan Macleod
# Licensed under GPLv3+ - see LICENSE
"""The GWDataFind server app
"""
import time
from flask import Flask
from .cache import CacheManager
from .config import (ConfigObj, get_config_path)
from . import views # pylint: disable=unused-import
__version__ = '0.1.0'
class DataFlask(Flask):
def __init__(self, import_name, configpath, *args, **kwargs):
super().__init__(import_name, *args, **kwargs)
self.config.update(ConfigObj(configpath))
# create thread to read cache file and start
man = self._init_manager(self.config)
man.setDaemon(True)
man.start()
def _init_manager(self, conf):
section = conf['LDRDataFindServer']
cachefile = section['framecachefile']
patterns = {
key.rsplit('_', 1)[0]: section[key] for
key in section.keys() if key.endswith('_pattern')
}
sleeptime = float(section.get('framecachetimeout', 60))
self.manager = CacheManager(self, cachefile, sleeptime=sleeptime,
**patterns)
return self.manager
def get_cache_data(self, *keys):
while not self.manager.ready:
self.logger.debug("Waiting for cache...")
time.sleep(.5)
self.manager.lock.acquire()
try:
print(self.manager.cache)
return self._get_cache_data(keys)
finally:
self.manager.lock.release()
def _get_cache_data(self, keys):
keys = list(keys)
last = keys.pop(-1)
if not keys:
return self.manager.cache.get(last, {})
ent = self.manager.cache.get(keys.pop(0), {})
for key in keys:
try:
ent = ent[key]
except KeyError:
return {}
return ent.get(last, {})
def shutdown(self):
self.manager.lock.acquire()
self.manager.shutdown = True
self.manager.lock.release()
self.manager.join()
def create_app():
"""Create an instance of the application
"""
app = DataFlask(__name__, get_config_path())
app.register_blueprint(views.blueprint)
return app
# -*- coding: utf-8 -*-
# Copyright (2019) Duncan Macleod
# Licensed under GPLv3+ - see LICENSE
"""Utilities for the GWDataFind Server
"""
import logging
import re
import socket
import threading
import time
from collections import defaultdict
from os.path import getmtime
from ligo.segments import (segment, segmentlist)
__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
class CacheManager(threading.Thread):
"""Thread to continuously update the diskcache in memory
"""
def __init__(self, parent, path, sleeptime=60,
site_exclude=None, site_include=None,
frametype_exclude=None, frametype_include=None):
super().__init__(name=type(self).__name__)
self.path = path
# create logger
self.logger = parent.logger
# create lock and flags
self.lock = threading.Lock()
self.shutdown = False
self.ready = False
# create cache
self.cache = defaultdict(dict)
# time between iterations
self.sleeptime = sleeptime
# record exclusion filters
self.patterns = {key: self._parse_pattern(value) for key, value in [
('site_exclude', site_exclude),
('site_include', site_include),
('frametype_exclude', frametype_exclude),
('frametype_include', frametype_include),
]}
@staticmethod
def _parse_pattern(pattern):
if pattern is None:
pattern = []
if not isinstance(pattern, list):
pattern = [pattern]
return [re.compile(reg) for reg in pattern]
def _update(self, cache):
self.logger.debug('updating cache with lock...')
self.lock.acquire()
self.cache = cache
self.lock.release()
self.logger.debug('updated cache with {} entries'.format(len(cache)))
self.logger.debug(str(cache))
self.logger.debug('lock released')
def sleep(self):
"""Wait until next iteration
"""
self.logger.debug("sleeping for {0} seconds".format(self.sleeptime))
start = time.time()
while time.time() - start < self.sleeptime:
time.sleep(.5)
if self.shutdown:
self.state = 'SHUTDOWN'
return
def exclude(self, site, tag):
"""Return `True` if this site and tag combination should be excluded
"""
for var, key in ((site, 'site'), (tag, 'frametype')):
pat = '{0}_exclude'.format(key)
for regex in self.patterns[pat]:
if regex.search(var): # positive match
return pat
pat = '{0}_include'.format(key)
for regex in self.patterns[pat]:
if not regex.search(var): # negative match
return pat
def run(self):
"""Continuously read and update the cache
"""
last = 0
while True:
if self.shutdown:
return
try:
mod = getmtime(self.path)
except OSError as exc:
self.logger.error("unable to determine modification time of "
"{0}: {1}".format(self.path, str(exc)))
mod = 0
if last < mod: # file changed since last iteration
try:
self.parse()
except (TypeError, ValueError) as exc:
self.logger.error("exception in parse(): {1}".format(
self.path, str(exc)))
else:
last = time.time()
else:
self.logger.info('cache file unchanged since last iteration')
self.sleep()
def parse(self):
"""Read the cache from the path
"""
self.logger.info('Parsing cache from {0}'.format(self.path))
exclusions = {key: 0 for key in self.patterns}
nlines = 0
cache = {}
with open(self.path, 'rb') as fobj:
for line in fobj:
site, tag, path, dur, ext, segments = self._parse_line(line)
exclude = self.exclude(site, tag)
if exclude: # record why excluded
exclusions[exclude] += 1
continue
cache.setdefault(ext, {})
cache[ext].setdefault(site, {})
cache[ext][site].setdefault(tag, {})
cache[ext][site][tag][(path, int(dur))] = segments
nlines += 1
self.logger.info('Parsed {0} lines from cache file'.format(nlines))
for key, count in exclusions.items():
self.logger.debug('excluded {0} lines with {1}'.format(count, key))
# store new cache
self._update(cache)
self.ready = True # can now be used
@staticmethod
def _parse_line(line):
"""Parse one line from the cache file
"""
if isinstance(line, bytes):
line = line.decode('utf-8')
# parse line
header, modt, count, times = line.strip().split(' ', 3)
path, site, tag, _, dur, ext = header.split(',')
# format times
times = list(map(int, times[1:-1].strip().split(' ')))
segments = segmentlist(map(
segment, (times[i:i+2] for i in range(0, len(times), 2))))
return site, tag, path, dur, ext, segments
# -*- coding: utf-8 -*-
# Copyright (2019) Duncan Macleod
# Licensed under GPLv3+ - see LICENSE
"""Configuration manipulation for a GWDataFind Server
"""
import os.path
from configobj import ConfigObj # pylint: disable=unused-import
def get_config_path(basename='gwdatafind-server.ini'):
"""Try and locate the given basename file in a set of standard directories
"""
ldrloc = os.getenv('LDR_LOCATION', '')
dirs = [
os.path.join(ldrloc, 'ldr', 'etc'),
os.path.join(ldrloc, 'etc'),
os.path.join(ldrloc),
os.path.expanduser('~'),
os.path.join('', 'etc'),
]
for dir_ in dirs:
path = os.path.join(dir_, basename)
if os.path.isfile(path):
return path
raise ValueError("Cannot locate {0} in any of the standard "
"locations".format(basename))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from gwdatafind_server import create_app
application = create_app()
# -*- coding: utf-8 -*-
# Copyright (2019) Duncan Macleod
# Licensed under GPLv3+ - see LICENSE
"""Utilities for the GWDataFind Server
"""
#!/usr/bin/env python
# Copyright (2019) Duncan Macleod
# Licensed under GPLv3+ - see LICENSE
"""Data views for GWDataFindServer
"""
import json
import operator
import re
import socket
from collections import defaultdict
from functools import (reduce, wraps)
from math import inf as INF
from urllib.parse import urlparse
from flask import (Blueprint, current_app, jsonify)
from ligo.segments import (segmentlist, segment)
_PREFIX = '/LDR/services/data/v1'
blueprint = Blueprint(
"data",
__name__,
url_prefix=_PREFIX,
)
_DEFAULT_GSIFTP_HOST = socket.gethostbyaddr(socket.gethostname())[0]
_DEFAULT_GSIFTP_PORT = 15000
# -- utilities ----------------------------------------------------------------
def as_json(func):
"""Dump a function's return to JSON
"""
@wraps(func)
def decorated(*args, **kwargs):
return jsonify(func(*args, **kwargs))
return decorated
def _file_url(path):
config = current_app.config['LDRDataFindServer']
host = config.get('filehost', 'localhost')
return 'file://{0}{1}'.format(host, path)
def _gsiftp_url(path):
config = current_app.config['LDRDataFindServer']
host = config.get('gsiftphost', _DEFAULT_GSIFTP_HOST)
port = config.get('gsiftpport', _DEFAULT_GSIFTP_PORT)
return 'gsiftp://{0}:{1}{2}'.format(host, port, path)
# -- routes -------------------------------------------------------------------
@blueprint.route('/<ext>.json')
@as_json
def find_observatories(ext):
"""List all observatories
"""
return list(current_app.get_cache_data(ext).keys())
@blueprint.route('<ext>/<site>.json')
@as_json
def find_types(ext, site):
"""List all data tags 'frametypes'
"""
ecache = current_app.manager.cache.get(ext, {})
if site == 'all':
sites = ecache.keys()
else:
sites = [site]
return [tag for site in sites for tag in current_app.get_cache_data(ext, site)]
@blueprint.route('<ext>/<site>/<tag>/segments.json')
@as_json
def find_all_times(ext, site, tag):
"""List segments known for a given tag
"""
span = segmentlist([segment(0., INF)])
return reduce(
operator.or_,
(segs & span for segs in current_app.get_cache_data(ext, site, tag).values()),
segmentlist(),
)
@blueprint.route('<ext>/<site>/<tag>/segments/<int:start>,<int:end>.json')
@as_json
def find_times(ext, site, tag, start, end):
"""List segments known for a given tag
"""
span = segmentlist([segment(float(start), float(end))])
return reduce(
operator.or_,
(segs & span for segs in current_app.get_cache_data(ext, site, tag).values()),
segmentlist(),
)
@blueprint.route('<ext>/<site>/<tag>/<filename>.json')
@as_json
def find_url(ext, site, tag, filename):
"""Return URL(s) for a given filename
"""
# parse GPS information from filename
_, _, start, dur = filename.split('-')
dur = dur[:-len(ext)].rstrip('.')
return list(_find_urls(ext, site, tag, int(start), int(start + dur)))
@blueprint.route('<ext>/<site>/<tag>/<int:start>,<int:end>.json')
@blueprint.route('<ext>/<site>/<tag>/<int:start>,<int:end>/<urltype>.json')
@as_json
def find_urls(ext, site, tag, start, end, urltype=None):
"""Find all URLs in a given GPS time interval
"""
return list(_find_urls(ext, site, tag, start, end, urltype=urltype))
@blueprint.route('<ext>/<site>/<tag>/latest.json')
@blueprint.route('<ext>/<site>/<tag>/latest/<urltype>.json')
@as_json
def find_latest_url(ext, site, tag, urltype=None):
"""Find the latest URL(s) for a given tag
"""
return list(_find_urls(ext, site, tag, 0, INF,
urltype=urltype, latest=True))
# -- URL matcher --------------------------------------------------------------
def _get_latest_segment(seglist, duration):
"""Get segment for latest file of the given duration in a segment list
"""
end = seglist[-1][1]
return segment(end-duration, end)
def _find_urls(ext, site, tag, start, end, urltype=None, match=None,
latest=False):
"""Find all URLs for the given GPS interval
"""
# parse file paths
search = segment(start, end)
lfns = defaultdict(list)
maxgps = -1e9 # something absurdly old
print("TEST")
for (path, cdur), seglist in current_app.get_cache_data(ext, site, tag).items():
# if running a 'latest' URL search, restrict the search to
# the most recent available segment for this frametype
print(path, cdur, seglist, search)
if latest and seglist: # 'if seglist' to prevent IndexError
latest = _get_latest_segment(seglist, cdur)
if latest[1] <= maxgps: # if this is not an improvement, move on
continue
maxgps = latest[1]
seglist = [latest]
# loop over segments and construct file URLs
for seg in seglist:
if not seg.intersects(search):
continue
gps = seg[0]
while gps < seg[1]:
if segment(gps, gps+cdur).intersects(search):
lfn = '{site}-{tag}-{start}-{dur}.{ext}'.format(
site=site, tag=tag, start=gps, dur=cdur, ext=ext)
lfns[lfn].append('{0}/{1}'.format(path, lfn))
gps += cdur
# convert paths to URLs for various schemes
allurls = {}
for lfn in lfns:
allurls[lfn] = []
for path in lfns[lfn]:
# build file:// and gsiftp:// URL for each LFN
allurls[lfn].extend((
_file_url(path),
_gsiftp_url(path),
))
print(allurls)
# filter URLs for each LFN and return
urls = []
for lfn in allurls:
urls.extend(_filter_urls(allurls[lfn], urltype=urltype, regex=match))
return urls
# -- URL filtering ------------------------------------------------------------
def _filter_urls(urls, urltype=None, regex=None):
"""Filter a list of URLs that all represent the same LFN.
"""
if regex:
regex = re.compile(regex)