Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • iain.morton/locklost
  • oli.patane/locklost-olifork
  • timothy.ohanlon/locklost
  • benjaminrobert.mannix/locklost
  • austin.jennings/locklost
  • camilla.compton/locklost
  • arnaud.pele/locklost
  • yamamoto/locklost
  • marc.lormand/locklost
  • saravanan.tiruppatturrajamanikkam/locklost
  • nikhil-mukund/locklost
  • patrick.godwin/locklost
  • yannick.lecoeuche/locklost
  • jameson.rollins/locklost
14 results
Show changes
Commits on Source (283)
Showing with 1356 additions and 539 deletions
*.pyc
*~
locklost/version.py
# https://computing.docs.ligo.org/gitlab-ci-templates
include:
- project: computing/gitlab-ci-templates
file:
- conda.yml
- python.yml
stages:
- lint
# https://computing.docs.ligo.org/gitlab-ci-templates/python/#.python:flake8
lint:
extends:
- .python:flake8
stage: lint
needs: []
......@@ -9,9 +9,13 @@ Since the analysis runs best on the site LDAS clusters, you should
contact your local sysadmin about getting access to your local
cluster.
`locklost` is a
[python3](https://docs.python.org/3/tutorial/index.html) project, so
make sure all code is python3 compatible.
The `locklost` project uses [git](https://git-scm.com/) and is hosted
at [git.ligo.org](https://git.ligo.org). Issues are tracked via the
[GitLab issue
at [git.ligo.org](https://git.ligo.org/jameson.rollins/locklost).
Issues are tracked via the [GitLab issue
tracker](https://git.ligo.org/jameson.rollins/locklost/issues).
......@@ -61,13 +65,13 @@ from a checkout of the source. To do this, change directory into
the `locklost` directory that you git cloned, and run the package
as so:
```shell
$ IFO=L1 LOCKLOST_EVENT_ROOT=~/my/event/dir python -m locklost --help
$ IFO=L1 LOCKLOST_EVENT_ROOT=~/my/event/dir python3 -m locklost --help
```
## developing for the web
If you're working on the web pages, create a new directory in your in
your personal web space
your personal web space for testing the locklost web cgi:
```shell
$ mkdir -p ~/public_html/lockloss
```
......@@ -79,9 +83,9 @@ $ cd ~/public_html/lockloss
$ ln -s ~/my/event/dir events
```
Finally, create the web cgi script in the web directory. Create a
file called `index.cgi` with the following contents (with variables
modified for your particular use case):
Finally, create the web cgi script in the web directory. The script
should be called `index.cgi` with the following contents (with
variables modified for your particular environment):
```shell
#!/bin/bash
......@@ -89,13 +93,16 @@ export IFO=L1
export LOCKLOST_EVENT_ROOT=~/public_html/events
export LOCKLOST_WEB_ROOT=https://ldas-jobs.ligo-la.caltech.edu/~albert.einstein/lockloss
export PYTHONPATH=~/path/to/source/locklost
exec python -m locklost.web
exec python3 -m locklost.web
```
Note the `LOCKLOST_WEB_ROOT` variable, which tells the cgi script where
the base URL of the web pages.
Finally, make the script executable:
Finally, make sure the web script is executable and the web directory
tree is only owner writable:
```shell
$ chmod 755 ~/public_html
$ chmod 755 ~/public_html/lockloss
$ chmod 755 ~/public_html/lockloss/index.cgi
```
......@@ -103,6 +110,20 @@ You should now be able to see the web page at the appropriate URL,
e.g. https://ldas-jobs.ligo-la.caltech.edu/~albert.einstein/lockloss
# test suite
The `locklost` source comes with a [test script](test/run) that aims
to cover most of the functionality of the package. It searches past
data for known lock loss times for your given IFO (determined by the
`IFO` environment variable), analyses the events found, and tests that
the web interface works. You should make sure the full test script
runs to completion before making a merge request. As for running the
code, you can set the IFO variable explicitly if it's not already set:
```shell
$ IFO=L1 ./test/run
```
# making merge requests
All contributions to `locklost` are handled via [GitLab merge
......@@ -136,25 +157,34 @@ $ git commit ....
when the patch is merged. See [automatic issue
closing](https://docs.gitlab.com/ee/user/project/issues/automatic_issue_closing.html).
3. When you've got all changes ready to go, push your development
3. Make sure all tests pass:
```shell
$ IFO=L1 ./test/run
```
4. When you've got all changes ready to go, push your development
branch to your fork of the project (assuming you cloned from your
fork of the project as specified above):
```shell
$ git push origin my-dev-branch
```
4. Navigate to your gitlab page for the project and click the "New
5. Navigate to your gitlab page for the project and click the "New
merge request" button.
5. Specify that you would like to submit a merge request for your new
6. Specify that you would like to submit a merge request for your new
development branch
(e.g. source="albert.einstein/locklost:my-dev-branch") to be merged
with the upstream master branch
(e.g. target="jameson.rollins/locklost:master").
6. Watch the merge request page for review of your request.
7. Watch the merge request page for review of your request, and fix
any issues. You can commit any additional changes to the same dev
branch (`my-dev-branch`) and push to your remote, and the merge
request will automatically be updated with the new code.
# quick links:
# useful links:
* [git quick reference](//gitref.org/basic/)
* [gitlab merge request](https://docs.gitlab.com/ee/user/project/merge_requests/)
......@@ -12,77 +12,227 @@ losses". It consists of four main components:
follow-up analyses.
* `web` interface to view lock loses event pages.
A command line interface is provided to launch the `online` analysis
and condor jobs to `search` for and `analyze` events.
The `locklost` command line interface provides access to all of the
above functionality.
The LIGO sites have `locklost` deployments that automatically find and
analyze lock loss events. The site lock loss web pages are available
at the following URLs:
# Deployments
* [H1: https://ldas-jobs.ligo-wa.caltech.edu/~lockloss/](https://ldas-jobs.ligo-wa.caltech.edu/~lockloss/)
* [L1: https://ldas-jobs.ligo-la.caltech.edu/~lockloss/](https://ldas-jobs.ligo-la.caltech.edu/~lockloss/)
If you notice any issues please [file a bug
report](https://git.ligo.org/jameson.rollins/locklost/-/issues), or if
you have any questions please contact the ["lock-loss" group on
chat.ligo.org](https://chat.ligo.org/ligo/channels/lock-loss).
# Analysis plugins
Lock loss event analysis is handled by a set of [analysis
"plugins"](/locklost/plugins/). Each plugin is registered in
[`locklost/plugins/__init__.py`](/locklost/plugins/__init__.py). Some
of the currently enabled plugins are:
* `discover.discover_data` wait for data to be available
* `refine.refine_event` refine event time
* `saturations.find_saturations` find saturating channels before event
* `lpy.find_lpy` find length/pitch/yaw oscillations in suspensions
* `glitch.analyze_glitches` look for glitches around event
* `overflows.find_overflows` look for ADC overflows
* `state_start.find_lock_start` find the start the lock leading to
current lock loss
Each plugin does it's own analysis, although some depend on the output
of other plugins. The output from any plugin (e.g. plots or data)
should be written into the event directory.
# Site deployments
Each site (LHO and LLO) has a dedicated "lockloss" account on their
local LDAS cluster where `locklost` is running:
local LDAS cluster where `locklost` is deployed. The `locklost`
command line interface is available when logging into these acocunts.
NOTE: these accounts are for the managed `locklost` deployments
**ONLY**. These are specially configured. Please don't change the
configuration or run other jobs in these accounts unless you know what
you're doing.
The online searches and other periodic maintenance tasks are handled
via `systemd --user` on the dedicated "detchar.ligo-?a.caltech.edu"
nodes. `systemd --user` is a user-specific instance of the
[systemd](https://systemd.io/) process supervision daemon. The [arch
linux wiki](https://wiki.archlinux.org/title/Systemd) has a nice intro
to systemd.
## online systemd service
The online search service is called `locklost-online.service`. To
control and view the status of the process use the `sysctemctl`
command, e.g.:
```shell
$ systemctl --user status locklost-online.service
$ systemctl --user start locklost-online.service
$ systemctl --user restart locklost-online.service
$ systemctl --user stop locklost-online.service
```
To view the logs from the online search user `journalctl`:
```shell
$ journalctl --user-unit locklost-online.service -f
```
The `-f` options tells journalctl to "follow" the logs and print new
log lines as they come in.
The "service unit" files for the online search service (and the timer
services described below) that describe how the units should be
managed by systemd are maintained in the [support](/support/systemd)
directory in the git source.
* https://ldas-jobs.ligo-wa.caltech.edu/~lockloss/
* https://ldas-jobs.ligo-la.caltech.edu/~lockloss/
These accounts have deployments of the `locklost` package and command
line interface, run the production online and followup condor jobs,
and host the web pages.
## systemd timers
Instead of cron, the site deployments use `systemd timers` to handle
periodic background jobs. The enabled timers and their schedules can
be viewed with:
```shell
$ systemctl --user list-timers
```
Timers are kind of like other services, which can be started/stopped,
enabled/disabled, etc. Unless there is some issue they should usually
just be left alone.
## deploying new versions
When a new version is ready for release, create an annotated tag for
the release and push it to the main repo (https://git.ligo.org/jameson.rollins/locklost):
the release and push it to the [main
repo](https://git.ligo.org/jameson.rollins/locklost):
```shell
$ git tag -m "release" 0.16
$ git tag -m release 0.22.1
$ git push --tags
```
In the LDAS lockloss account, pull the new tag and run
the test/deploy script:
In the "lockloss" account on the LDS "detchar" machines, pull the new
release and run the `deploy` script, which should automatically run the
tests before installing the new version:
```shell
$ ssh lockloss@detchar.ligo-la.caltech.edu
$ cd ~/src/locklost
$ cd src/locklost
$ git pull
$ locklost-deploy
```
If there are changes to the online search, restart the online condor
process, otherwise analysis changes should get picked up automatically
in new condor executions:
```shell
$ locklost online restart
$ ./deploy
```
The deployment should automatically install any updates to the online
search, and restart the search service.
# Usage
## common issues
To start/stop/restart the online analysis use the `online` command:
```shell
$ locklost online start
There are a couple of issues that crop up periodically, usually due to
problems with the site LDAS clusters where the jobs run.
### online analysis restarting due to NDS problems
One of the most common problems is that the online analysis is falling
over because it can't connect to the site NDS server, for example:
```
2021-05-12_17:18:31 2021-05-12 17:18:31,317 NDS connect: 10.21.2.4:31200
2021-05-12_17:18:31 Error in write(): Connection refused
2021-05-12_17:18:31 Error in write(): Connection refused
2021-05-12_17:18:31 Traceback (most recent call last):
2021-05-12_17:18:31 File "/usr/lib64/python3.6/runpy.py", line 193, in _run_module_as_main
2021-05-12_17:18:31 "__main__", mod_spec)
2021-05-12_17:18:31 File "/usr/lib64/python3.6/runpy.py", line 85, in _run_code
2021-05-12_17:18:31 exec(code, run_globals)
2021-05-12_17:18:31 File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.3-py3.6.egg/locklost/online.py", line 109, in <module>
2021-05-12_17:18:31 stat_file=stat_file,
2021-05-12_17:18:31 File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.3-py3.6.egg/locklost/search.py", line 72, in search_iterate
2021-05-12_17:18:31 for bufs in data.nds_iterate([channel], start_end=segment):
2021-05-12_17:18:31 File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.3-py3.6.egg/locklost/data.py", line 48, in nds_iterate
2021-05-12_17:18:31 with closing(nds_connection()) as conn:
2021-05-12_17:18:31 File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.3-py3.6.egg/locklost/data.py", line 28, in nds_connection
2021-05-12_17:18:31 conn = nds2.connection(HOST, PORT)
2021-05-12_17:18:31 File "/usr/lib64/python3.6/site-packages/nds2.py", line 3172, in __init__
2021-05-12_17:18:31 _nds2.connection_swiginit(self, _nds2.new_connection(*args))
2021-05-12_17:18:31 RuntimeError: Failed to establish a connection[INFO: Error occurred trying to write to socket]
```
This launches a condor job that runs the online analysis.
To launch a condor job to search for lock losses within some time
window:
```shell
$ locklost search --condor START END
This is usually because the NDS server itself has died and needs to be
restarted/reset (frequently due to Tuesday maintenance).
Unfortunately the site admins aren't necessarily aware of this issue
and need to be poked about it. Once the NDS server is back the job
should just pick up on it's own.
### analyze jobs failing because of cluster data problems
Another common failure mode is failing follow-up analysis jobs due to
data access problems in the cluster. These often occur during Tuesday
maintenance, but often mysteriously at other times as well. These
kinds of failures are indicated by the following exceptions in the
event analyze log:
```
This will find lock losses with the specified time range, but will not
run the follow-up analyses. This is primarily needed to backfill
times when the online analysis was not running (see below).
2021-05-07 16:02:51,722 [analyze.analyze_event] exception in discover_data:
Traceback (most recent call last):
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/analyze.py", line 56, in analyze_event
func(event)
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/plugins/discover.py", line 67, in discover_data
raise RuntimeError("data discovery timeout reached, data not found")
RuntimeError: data discovery timeout reached, data not found
```
or:
```
2021-05-07 16:02:51,750 [analyze.analyze_event] exception in find_previous_state:
Traceback (most recent call last):
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/analyze.py", line 56, in analyze_event
func(event)
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/plugins/history.py", line 26, in find_previous_state
gbuf = data.fetch(channels, segment)[0]
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/data.py", line 172, in fetch
bufs = func(channels, start, stop)
File "/home/lockloss/.local/lib/python3.6/site-packages/locklost-0.21.0-py3.6.egg/locklost/data.py", line 150, in frame_fetch_gwpy
data = gwpy.timeseries.TimeSeriesDict.find(channels, start, stop, frametype=config.IFO+'_R')
File "/usr/lib/python3.6/site-packages/gwpy/timeseries/core.py", line 1291, in find
on_gaps="error" if pad is None else "warn",
File "/usr/lib/python3.6/site-packages/gwpy/io/datafind.py", line 335, in wrapped
return func(*args, **kwargs)
File "/usr/lib/python3.6/site-packages/gwpy/io/datafind.py", line 642, in find_urls
on_gaps=on_gaps)
File "/usr/lib/python3.6/site-packages/gwdatafind/http.py", line 433, in find_urls
raise RuntimeError(msg)
RuntimeError: Missing segments:
[1304463181 ... 1304463182)
```
This problem sometimes just corrects itself, but often needs admin
poking as well.
Any time argument ('START', 'END', 'TIME', etc.) can be either a GPS
times or a full (even relative) date/time string, e.g. '1 week ago'.
## back-filling events
To run a full analysis on a specific lock loss time found from the
search above:
After things have recovered from any of the issues mentioned above,
you'll probably want to back-fill any missed events. The best way to
do that is to run a condor `search` for missed event (e.g. from "4
weeks ago" until "now"):
```shell
$ locklost analyze TIME
$ locklost search --condor '4 weeks ago' now
```
To launch a condor job to analyze all un-analyzed events within a time
range:
The search will find lock loss events, but it will not run the
follow-up analyses. To run the full analysis on any un-analyzed
events, run the condor `analyze` command over the same time range,
e.g.:
```shell
$ locklost analyze --condor START END
$ locklost analyze --condor '4 weeks ago' now
```
To re-analyze events add the `--rerun` flag e.g.:
The `analyze` command will analyze both missed/new events but also
"failed" events.
You can also re-analyze a specific event with the `--rerun` flag:
```shell
$ locklost analyze TIME --rerun
```
......@@ -91,38 +241,19 @@ or
$ locklost analyze --condor START END --rerun
```
It has happened that analysis jobs are improperly killed by condor,
not giving them a chance to clean up their run locks. The site
locklost deployments include a command line utility to find and remove
Occaissionally analysis jobs are improperly killed by condor, not
giving them a chance to clean up their run locks. The find and remove
any old, stale analysis locks:
```shell
$ find-locks -r
$ locklost find-locks -r
```
# Analysis plugins
Lock loss event analysis consists of a set of follow-up "plugin"
analyses, located in the `locklost.plugins` sub-package:
* [`locklost/plugins/`](/locklost/plugins/)
Each follow-up module is registered in
[`locklost/plugins/__init__.py`](/locklost/plugins/__init__.py). Some
of the currently enabled follow-up are:
* `discover.discover_data` wait for data to be available
* `refine.refine_event` refine event time
* `saturations.find_saturations` find saturating channels before event
* `lpy.find_lpy` find length/pitch/yaw oscillations in suspensions
* `glitch.analyze_glitches` look for glitches around event
* `overflows.find_overflows` look for ADC overflows
* `state_start.find_lock_start` find the start the lock leading to
current lock loss
Each plugin does it's own analysis, although some depend on the output
of other analyses. The output from any analysis (e.g. plots or data)
should be written into the event directory.
We also have timer services available to handle the backfilling after
Tuesday maintenance if need be:
```shell
$ systemctl --user enable --now locklost-search-backfill.timer
$ systemctl --user enable --now locklost-analyze-backfill.timer
```
# Developing and contributing
......
#!/bin/bash -ex
if [[ $(hostname -f) =~ detchar.ligo-[lw]+a.caltech.edu ]] ; then
hostname -f
else
echo "This deploy script is intended to be run only on the detchar.ligo-{l,w}.caltech.edu hosts."
exit 1
fi
CONDA_ROOT=/cvmfs/software.igwn.org/conda
CONDA_ENV=igwn-py39
VENV=~/opt/locklost-${CONDA_ENV}
if [[ "$1" == '--setup' ]] ; then
shift
source ${CONDA_ROOT}/etc/profile.d/conda.sh
conda activate ${CONDA_ENV}
python3 -m venv --system-site-packages ${VENV}
ln -sfn ${VENV} ~/opt/locklost
fi
source ${VENV}/bin/activate
git pull --tags
if [[ "$1" == '--skip-test' ]] ; then
echo "WARNING: SKIPPING TESTS!"
else
./test/run
fi
pip install .
# install -t ~/bin/ support/bin/*
install -m 644 -t ~/.config/systemd/user support/systemd/*
systemctl --user daemon-reload
systemctl --user restart locklost-online.service
systemctl --user status locklost-online.service
from __future__ import print_function, division
import os
import signal
import logging
from . import config
import matplotlib
matplotlib.use('Agg')
......@@ -10,7 +12,9 @@ try:
except ImportError:
__version__ = '?.?.?'
from . import config
logger = logging.getLogger('LOCKLOST')
# signal handler kill/stop signals
def signal_handler(signum, frame):
......@@ -18,10 +22,25 @@ def signal_handler(signum, frame):
signame = signal.Signal(signum).name
except AttributeError:
signame = signum
logging.error("Signal received: {} ({})".format(signame, signum))
logger.error("Signal received: {} ({})".format(signame, signum))
raise SystemExit(1)
def set_signal_handlers():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(eval('signal.'+config.CONDOR_KILL_SIGNAL), signal_handler)
def config_logger(level=None):
if not level:
level = os.getenv('LOG_LEVEL', 'INFO')
handler = logging.StreamHandler()
if os.getenv('LOG_FMT_NOTIME'):
fmt = config.LOG_FMT_NOTIME
else:
fmt = config.LOG_FMT
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
import os
import subprocess
import argparse
import logging
import signal
from . import __version__, set_signal_handlers
from . import __version__
from . import set_signal_handlers
from . import config_logger
from . import config
from . import search
from . import analyze
from . import online
from . import event
from . import segments
from . import summary
from . import plots
##########
parser = argparse.ArgumentParser(
prog='locklost',
)
......@@ -22,10 +24,10 @@ subparser = parser.add_subparsers(
title='Commands',
metavar='<command>',
dest='cmd',
#help=argparse.SUPPRESS,
)
subparser.required = True
def gen_subparser(cmd, func):
assert func.__doc__, "empty docstring: {}".format(func)
help = func.__doc__.split('\n')[0].lower().strip('.')
......@@ -41,8 +43,11 @@ def gen_subparser(cmd, func):
##########
parser.add_argument('--version', action='version', version='%(prog)s {}'.format(__version__),
help="print version and exit")
parser.add_argument(
'--version', action='version', version='%(prog)s {}'.format(__version__),
help="print version and exit")
p = gen_subparser('search', search.main)
search._parser_add_arguments(p)
......@@ -56,6 +61,10 @@ p = gen_subparser('online', online.main)
online._parser_add_arguments(p)
p = gen_subparser('summary', summary.main)
summary._parser_add_arguments(p)
def list_events(args):
"""List all events"""
for e in event.find_events():
......@@ -68,9 +77,10 @@ def list_events(args):
e.transition_index[0],
e.transition_index[1],
aflag,
e.path,
e.path(),
))
p = gen_subparser('list', list_events)
......@@ -80,18 +90,19 @@ def show_event(args):
"""
e = event.LocklossEvent(args.event)
if args.log:
subprocess.call(['less', '+G', os.path.join(e.path, 'log')])
subprocess.call(['less', '+G', e.path('log')])
exit()
print("id: {}".format(e.id))
print("gps: {}".format(e.gps))
print("path: {}".format(e.path))
print("url: {}".format(e.url))
print("path: {}".format(e.path()))
print("url: {}".format(e.url()))
print("transition: {}".format(e.transition_index))
print("analysis: {}".format(e.analysis_version))
print("status: {}".format(e.analysis_status))
print("tags: {}".format(' ' .join(e.list_tags())))
# print("files:")
# subprocess.call(['find', e.path])
# subprocess.call(['find', e.path()])
p = gen_subparser('show', show_event)
p.add_argument('--log', '-l', action='store_true',
......@@ -120,6 +131,7 @@ def tag_events(args):
else:
e.add_tag(tag)
p = gen_subparser('tag', tag_events)
p.add_argument('event',
help="event ID")
......@@ -133,6 +145,7 @@ def compress_segments(args):
"""
segments.compress_segdir(config.SEG_DIR)
p = gen_subparser('compress', compress_segments)
......@@ -150,6 +163,7 @@ def mask_segment(args):
[Segment(args.start, args.end)],
)
p = gen_subparser('mask', mask_segment)
p.add_argument('start', type=int,
help="segment start")
......@@ -163,24 +177,55 @@ def plot_history(args):
"""
plots.plot_history(args.path, lookback=args.lookback)
p = gen_subparser('plot-history', plot_history)
p.add_argument('path',
help="output file")
p.add_argument('-l', '--lookback', default='7 days ago',
help="history look back ['7 days ago']")
def find_locks(args):
"""Find/clear event locks
These are sometimes left by failed condor jobs. This should not
normally need to be run, and caution should be taken to not remove
active locks and events currently being analyzed.
"""
root = config.EVENT_ROOT
for d in os.listdir(root):
try:
int(d)
except ValueError:
continue
for dd in os.listdir(os.path.join(root, d)):
try:
int(d+dd)
except ValueError:
continue
lock_path = os.path.join(root, d, dd, 'lock')
if os.path.exists(lock_path):
print(lock_path)
if args.remove:
os.remove(lock_path)
p = gen_subparser('find-locks', find_locks)
p.add_argument('-r', '--remove', action='store_true',
help="remove any found event locks")
##########
def main():
set_signal_handlers()
logging.basicConfig(
level=os.getenv('LOG_LEVEL', 'INFO'),
format=config.LOG_FMT,
)
config_logger()
args = parser.parse_args()
if not config.EVENT_ROOT:
raise SystemExit("Must specify LOCKLOST_EVENT_ROOT env var.")
args.func(args)
if __name__ == '__main__':
main()
import os
import sys
import argparse
import logging
import argparse
from matplotlib import pyplot as plt
from . import __version__, set_signal_handlers
from . import __version__
from . import set_signal_handlers
from . import config_logger
from . import logger
from . import config
from .plugins import FOLLOWUPS
from .plugins import PLUGINS
from .event import LocklossEvent, find_events
from . import condor
from . import search
##################################################
def analyze_event(event, plugins=None):
"""Execute all follow-up plugins for event
......@@ -20,19 +26,20 @@ def analyze_event(event, plugins=None):
# log analysis to event log
# always append log
log_mode = 'a'
logger = logging.getLogger()
handler = logging.FileHandler(event.gen_path('log'), mode=log_mode)
handler = logging.FileHandler(event.path('log'), mode=log_mode)
handler.setFormatter(logging.Formatter(config.LOG_FMT))
logger.addHandler(handler)
# log exceptions
def exception_logger(exc_type, exc_value, traceback):
logging.error("Uncaught exception:", exc_info=(exc_type, exc_value, traceback))
logger.error("Uncaught exception:", exc_info=(exc_type, exc_value, traceback))
sys.excepthook = exception_logger
try:
event.lock()
except OSError as e:
logging.error(e)
logger.error(e)
raise
# clean up previous run
......@@ -40,37 +47,37 @@ def analyze_event(event, plugins=None):
event._scrub(archive=False)
event._set_version(__version__)
logging.info("analysis version: {}".format(event.analysis_version))
logging.info("event id: {}".format(event.id))
logging.info("event path: {}".format(event.path))
logging.info("event url: {}".format(event.url))
logger.info("analysis version: {}".format(event.analysis_version))
logger.info("event id: {}".format(event.id))
logger.info("event path: {}".format(event.path()))
logger.info("event url: {}".format(event.url()))
complete = True
for name, func in FOLLOWUPS.items():
for name, func in PLUGINS.items():
if plugins and name not in plugins:
continue
logging.info("executing followup: {}({})".format(
logger.info("executing plugin: {}({})".format(
name, event.id))
try:
func(event)
except SystemExit:
complete = False
logging.error("EXIT signal, cleaning up...")
logger.error("EXIT signal, cleaning up...")
break
except AssertionError:
complete = False
logging.exception("FATAL exception in {}:".format(name))
logger.exception("FATAL exception in {}:".format(name))
break
except:
except Exception:
complete = False
logging.exception("exception in {}:".format(name))
logger.exception("exception in {}:".format(name))
plt.close('all')
if complete:
logging.info("analysis complete")
logger.info("analysis complete")
event._set_status(0)
else:
logging.warning("INCOMPLETE ANALYSIS")
logger.warning("INCOMPLETE ANALYSIS")
event._set_status(1)
logger.removeHandler(handler)
......@@ -81,42 +88,43 @@ def analyze_event(event, plugins=None):
def analyze_condor(event):
if event.analyzing:
logging.warning("event analysis already in progress, aborting")
logger.warning("event analysis already in progress, aborting")
return
condor_dir = event.gen_path('condor')
condor_dir = event.path('condor')
try:
os.makedirs(condor_dir)
except:
except Exception:
pass
sub = condor.CondorSubmit(
condor_dir,
'analyze',
[str(event.id)],
local=False,
notify_user=os.getenv('CONDOR_NOTIFY_USER'),
)
sub.write()
sub.submit()
##################################################
def _parser_add_arguments(parser):
from .util import GPSTimeParseAction
egroup = parser.add_mutually_exclusive_group(required=True)
egroup.add_argument('event', action=GPSTimeParseAction, nargs='?',
egroup.add_argument('event', nargs='?', type=int,
help="event ID / GPS second")
egroup.add_argument('--condor', action=GPSTimeParseAction, nargs=2, metavar='GPS',
help="condor analyze all events within GPS range")
help="condor analyze all events within GPS range. If the second time is '0' (zero), the first time will be interpreted as an exact event time and just that specific event will be analyzed via condor submit.")
parser.add_argument('--rerun', action='store_true',
help="condor re-analyze events")
parser.add_argument('--plugin', '-p', action='append',
help="execute only specified plugin (multiple ok, not available for condor runs)")
def main(args=None):
"""Analyze event(s)
By default all analysis follow-ups for the specified event will
By default all analysis plugins for the specified event will
exeuted. If the --condor option is given with GPS start and end
times all events within the specified time span will be analyzed.
......@@ -127,20 +135,23 @@ def main(args=None):
args = parser.parse_args()
if args.condor:
logging.info("finding events...")
after, before = tuple(int(t) for t in args.condor)
events = [e for e in find_events(after=after, before=before)]
t0, t1 = tuple(int(t) for t in args.condor)
if t1 == 0:
events = [LocklossEvent(t0)]
else:
logger.info("finding events...")
events = [e for e in find_events(after=t0, before=t1)]
to_analyze = []
for e in events:
if e.analyzing:
logging.debug(" {} analyzing".format(e))
logger.debug(" {} analyzing".format(e))
continue
if e.analysis_succeeded and not args.rerun:
logging.debug(" {} suceeded".format(e))
logger.debug(" {} suceeded".format(e))
continue
logging.debug(" adding {}".format(e))
logger.debug(" adding {}".format(e))
to_analyze.append(e)
logging.info("found {} events, {} to analyze.".format(len(events), len(to_analyze)))
logger.info("found {} events, {} to analyze.".format(len(events), len(to_analyze)))
if not to_analyze:
return
......@@ -158,7 +169,19 @@ def main(args=None):
dag.submit()
else:
event = LocklossEvent(args.event)
event = None
try:
event = LocklossEvent(args.event)
except OSError:
pass
if not event:
logger.info("no event matching GPS {}. searching...".format(int(args.event)))
search.search((args.event-1, args.event+1))
try:
event = LocklossEvent(args.event)
except OSError as e:
sys.exit(e)
logger.info("analyzing event {}...".format(event))
if not analyze_event(event, args.plugin):
sys.exit(1)
......@@ -166,10 +189,7 @@ def main(args=None):
# direct execution of this module intended for condor jobs
if __name__ == '__main__':
set_signal_handlers()
logging.basicConfig(
level='DEBUG',
format=config.LOG_FMT,
)
config_logger(level='DEBUG')
parser = argparse.ArgumentParser()
parser.add_argument('event', type=int,
help="event ID / GPS second")
......
......@@ -5,14 +5,16 @@ import shutil
import collections
import subprocess
# import uuid
import logging
import datetime
from . import logger
from . import config
##################################################
def _write_executable(
module,
path,
......@@ -23,29 +25,19 @@ def _write_executable(
The first argument is the name of the locklost module to be executed.
"""
PYTHONPATH = os.getenv('PYTHONPATH', '')
NDSSERVER = os.getenv('NDSSERVER', '')
LIGO_DATAFIND_SERVER = os.getenv('LIGO_DATAFIND_SERVER')
with open(path, 'w') as f:
f.write("""#!/bin/sh
export IFO={}
export LOCKLOST_EVENT_ROOT={}
export PYTHONPATH={}
export DATA_ACCESS={}
export NDSSERVER={}
export LIGO_DATAFIND_SERVER={}
export CONDOR_ACCOUNTING_GROUP={}
export CONDOR_ACCOUNTING_GROUP_USER={}
exec {} -m locklost.{} "$@" 2>&1
""".format(
config.IFO,
config.EVENT_ROOT,
os.getenv('PYTHONPATH', ''),
data_access,
os.getenv('NDSSERVER', ''),
os.getenv('LIGO_DATAFIND_SERVER'),
config.CONDOR_ACCOUNTING_GROUP,
config.CONDOR_ACCOUNTING_GROUP_USER,
sys.executable,
module,
))
f.write(f"""#!/bin/sh
export IFO={config.IFO}
export LOCKLOST_EVENT_ROOT={config.EVENT_ROOT}
export PYTHONPATH={PYTHONPATH}
export DATA_ACCESS={data_access}
export NDSSERVER={NDSSERVER}
export LIGO_DATAFIND_SERVER={LIGO_DATAFIND_SERVER}
exec {sys.executable} -m locklost.{module} "$@" 2>&1
""")
os.chmod(path, 0o755)
......@@ -75,6 +67,7 @@ class CondorSubmit(object):
('executable', '{}'.format(self.exec_path)),
('arguments', ' '.join(args)),
('universe', universe),
('request_disk', '10 MB'),
('accounting_group', config.CONDOR_ACCOUNTING_GROUP),
('accounting_group_user', config.CONDOR_ACCOUNTING_GROUP_USER),
('getenv', 'True'),
......@@ -98,11 +91,12 @@ class CondorSubmit(object):
def submit(self):
assert os.path.exists(self.sub_path), "Must write() before submitting"
logging.info("condor submit: {}".format(self.sub_path))
logger.info("condor submit: {}".format(self.sub_path))
subprocess.call(['condor_submit', self.sub_path])
##################################################
class CondorDAG(object):
def __init__(self, condor_dir, module, submit_args_gen, log=None, use_test_job=False):
......@@ -132,7 +126,7 @@ class CondorDAG(object):
break
if not log:
log='logs/$(jobid)_$(cluster)_$(process).'
log = 'logs/$(jobid)_$(cluster)_$(process).'
self.sub = CondorSubmit(
self.condor_dir,
......@@ -153,28 +147,29 @@ class CondorDAG(object):
s = ''
for jid, sargs in enumerate(self.submit_args_gen()):
VARS = self.__gen_VARS(('jobid', jid), *sargs)
s += '''
JOB {jid} {sub}
VARS = ' '.join(VARS)
s += f'''
JOB {jid} {self.sub.sub_path}
VARS {jid} {VARS}
RETRY {jid} 1
'''.format(jid=jid,
sub=self.sub.sub_path,
VARS=' '.join(VARS),
)
'''
if self.use_test_job and jid == 0:
s += '''RETRY {jid} 1
'''.format(jid=jid)
logging.info("condor DAG {} jobs".format(jid+1))
logger.info("condor DAG {} jobs".format(jid+1))
return s
@property
def lock(self):
return os.path.join(self.condor_dir, 'dag.lock')
@property
def has_lock(self):
lock = os.path.join(self.condor_dir, 'dag.lock')
return os.path.exists(lock)
return os.path.exists(self.lock)
def write(self):
if self.has_lock:
raise CondorError("DAG already running: {}".format(lock))
raise RuntimeError("DAG already running: {}".format(self.lock))
shutil.rmtree(self.condor_dir)
try:
os.makedirs(os.path.join(self.condor_dir, 'logs'))
......@@ -187,8 +182,8 @@ RETRY {jid} 1
def submit(self):
assert os.path.exists(self.dag_path), "Must write() before submitting"
if self.has_lock:
raise CondorError("DAG already running: {}".format(lock))
logging.info("condor submit dag: {}".format(self.dag_path))
raise RuntimeError("DAG already running: {}".format(self.lock))
logger.info("condor submit dag: {}".format(self.dag_path))
subprocess.call(['condor_submit_dag', self.dag_path])
print("""
Run the following to monitor condor job:
......@@ -198,6 +193,7 @@ tail -F {0}.*
##################################################
def find_jobs():
"""find all running condor jobs
......@@ -239,7 +235,7 @@ def stop_jobs(job_list):
import htcondor
schedd = htcondor.Schedd()
for job in job_list:
logging.info("stopping job: {}".format(job_str(job)))
logger.info("stopping job: {}".format(job_str(job)))
schedd.act(
htcondor.JobAction.Remove,
'ClusterId=={} && ProcId=={}'.format(job['ClusterId'], job['ProcId']),
......
This diff is collapsed.
import os
import glob
import time
import numpy as np
from contextlib import closing
import logging
import numpy as np
import nds2
import gpstime
import gwdatafind
import gwpy.timeseries
from . import logger
from . import config
##################################################
def nds_connection():
try:
HOSTPORT = os.getenv('NDSSERVER').split(',')[0].split(':')
......@@ -24,7 +26,7 @@ def nds_connection():
PORT = int(HOSTPORT[1])
except IndexError:
PORT = 31200
logging.debug("NDS connect: {}:{}".format(HOST, PORT))
logger.debug("NDS connect: {}:{}".format(HOST, PORT))
conn = nds2.connection(HOST, PORT)
conn.set_parameter('GAP_HANDLER', 'STATIC_HANDLER_NAN')
# conn.set_parameter('ITERATE_USE_GAP_HANDLERS', 'false')
......@@ -33,7 +35,11 @@ def nds_connection():
def nds_fetch(channels, start, stop):
with closing(nds_connection()) as conn:
bufs = conn.fetch(start, stop, channels)
bufs = conn.fetch(
int(np.round(start)),
int(np.round(stop)),
channels,
)
return [ChannelBuf.from_nds(buf) for buf in bufs]
......@@ -57,6 +63,7 @@ def nds_iterate(channels, start_end=None):
##################################################
class ChannelBuf(object):
def __init__(self, channel, data, gps_start, gps_nanoseconds, sample_rate):
self.channel = channel
......@@ -71,7 +78,7 @@ class ChannelBuf(object):
gps=self.gps_start,
sec=self.duration,
nsamples=len(self),
)
)
def __len__(self):
return len(self.data)
......@@ -132,26 +139,31 @@ class ChannelBuf(object):
##################################################
def frame_fetch(channels, start, stop):
conn = glue.datafind.GWDataFindHTTPConnection()
cache = conn.find_frame_urls(config.IFO[0], IFO+'_R', start, stop, urltype='file')
fc = frutils.FrameCache(cache, verbose=True)
return [ChannelBuf.from_frdata(fc.fetch(channel, start, stop)) for channel in channels]
# def frame_fetch(channels, start, stop):
# conn = glue.datafind.GWDataFindHTTPConnection()
# cache = conn.find_frame_urls(config.IFO[0], IFO+'_R', start, stop, urltype='file')
# fc = frutils.FrameCache(cache, verbose=True)
# return [ChannelBuf.from_frdata(fc.fetch(channel, start, stop)) for channel in channels]
def frame_fetch_gwpy(channels, start, stop):
gps_now = int(gpstime.tconvert('now'))
### grab from shared memory if data is still available, otherwise use datafind
if gps_now - start < config.MAX_QUERY_LATENCY:
frames = glob.glob("/dev/shm/lldetchar/{}/*".format(config.IFO))
data = gwpy.timeseries.TimeSeriesDict.read(frames, channels, start=start, end=stop)
# grab from shared memory if data is still available, otherwise use datafind
if gps_now - start < config.DATA_DEVSHM_TIMEOUT:
frames = glob.glob(config.DATA_DEVSHM_ROOT + "/*")
try:
data = gwpy.timeseries.TimeSeriesDict.read(frames, channels, start=start, end=stop)
except IndexError:
raise RuntimeError(f"data not found in {config.DATA_DEVSHM_ROOT}")
else:
data = gwpy.timeseries.TimeSeriesDict.find(channels, start, stop, frametype=config.IFO+'_R')
frametype = f'{config.IFO}_R'
data = gwpy.timeseries.TimeSeriesDict.find(channels, start, stop, frametype=frametype)
return [ChannelBuf.from_gwTS(data[channel]) for channel in channels]
def fetch(channels, segment):
def fetch(channels, segment, as_dict=False):
"""Fetch data using preferred method.
Method set by DATA_ACCESS config variable.
......@@ -162,53 +174,42 @@ def fetch(channels, segment):
method = config.DATA_ACCESS.lower()
if method == 'nds':
func = nds_fetch
elif method == 'fr':
func = frame_fetch
# elif method == 'fr':
# func = frame_fetch
elif method == 'gwpy':
func = frame_fetch_gwpy
else:
raise ValueError("unknown data access method: {}".format(DATA_ACCESS))
logging.debug("{}({}, {}, {})".format(func.__name__, channels, start, stop))
return func(channels, start, stop)
def frame_available(segment):
"""Return True if the specified data segment is available in frames.
"""
ifo = config.IFO[0]
ftype = '{}_R'.format(config.IFO)
start, end = [int(s) for s in segment]
logging.debug('gwdatafind.find_times({}, {}, {}, {})'.format(
ifo, ftype, start, end,
))
segs = gwdatafind.find_times(
ifo, ftype, start, end,
)
return segment in segs.extent()
def frame_wait(segment):
"""Wait until data segment is available.
raise ValueError(f"unknown data access method: {method}")
logger.debug("{}({}, {}, {})".format(func.__name__, channels, start, stop))
# keep attempting to pull data in a loop, until we get the data or
# the timeout is reached
bufs = None
tstart = time.monotonic()
while time.monotonic() <= tstart + config.DATA_DISCOVERY_TIMEOUT:
try:
bufs = func(channels, start, stop)
break
except RuntimeError as e:
logger.info(
"data not available, sleeping for {} seconds ({})".format(
config.DATA_DISCOVERY_SLEEP,
e,
)
)
time.sleep(config.DATA_DISCOVERY_SLEEP)
else:
raise RuntimeError(f"data discovery timeout reached ({config.DATA_DISCOVERY_TIMEOUT}s)")
Returns True if data is available, or False if the
DATA_DISCOVERY_TIMEOUT was reached.
if as_dict:
return {buf.channel: buf for buf in bufs}
else:
return bufs
"""
start = time.monotonic()
while time.monotonic() <= start + config.DATA_DISCOVERY_TIMEOUT:
if frame_available(segment):
return True
logging.info(
"no data available in LDR, sleeping for {} seconds".format(
config.DATA_DISCOVERY_SLEEP,
)
)
time.sleep(config.DATA_DISCOVERY_SLEEP)
return False
##################################################
def gen_transitions(buf, previous=None):
"""Generator of transitions in ChannelBuf
......
import os
import shutil
import logging
import numpy as np
from gpstime import tconvert
from . import logger
from . import config
##################################################
def _trans_int(trans):
return tuple(int(i) for i in trans)
......@@ -16,7 +19,7 @@ def _trans_int(trans):
class LocklossEvent(object):
__slots__ = [
'__id', '__epath',
'__transition_index', '__transition_gps',
'_transition_index', '__transition_gps',
'__refined_gps', '__previous_state',
]
......@@ -32,9 +35,9 @@ class LocklossEvent(object):
except ValueError:
self.__id = int(gps.replace('/', ''))
self.__epath = self._gen_epath(self.__id)
if not os.path.exists(self.path):
raise OSError("Unknown event: {}".format(self.path))
self.__transition_index = None
if not os.path.exists(self.path()):
raise OSError("Unknown event: {}".format(self.path()))
self._transition_index = None
self.__transition_gps = None
self.__refined_gps = None
self.__previous_state = None
......@@ -51,6 +54,11 @@ class LocklossEvent(object):
"""event ID"""
return self.__id
@property
def epath(self):
"""event 'epoch/esecs' string"""
return self.__epath
@property
def gps(self):
"""best estimate of event GPS time
......@@ -69,23 +77,13 @@ class LocklossEvent(object):
"""event UTC time"""
return tconvert(self.gps)
@property
def path(self):
"""event path"""
return os.path.join(config.EVENT_ROOT, self.__epath)
def gen_path(self, *args):
"""generate event path"""
return os.path.join(self.path, *args)
@property
def url(self):
"""event URL"""
return os.path.join(config.WEB_ROOT, 'events', self.__epath)
def path(self, *args):
"""event path, or path to event file"""
return os.path.join(config.EVENT_ROOT, self.epath, *args)
def gen_url(self, *args):
"""generate event URL path"""
return os.path.join(self.url, *args)
def url(self, *args):
"""event URL, or URL to event file"""
return os.path.join(config.WEB_ROOT, 'events', self.epath, *args)
@property
def view_url(self):
......@@ -95,20 +93,20 @@ class LocklossEvent(object):
@property
def transition_index(self):
"""tuple of indices of Guardian state transition"""
if not self.__transition_index:
with open(self.gen_path('transition_index')) as f:
if not self._transition_index:
with open(self.path('transition_index')) as f:
tis = f.read().split()
self.__transition_index = _trans_int(tis)
return self.__transition_index
self._transition_index = _trans_int(tis)
return self._transition_index
@property
def transition_gps(self):
"""GPS time of Guardian state transition"""
if not self.__transition_gps:
try:
with open(self.gen_path('guard_state_end_gps')) as f:
with open(self.path('guard_state_end_gps')) as f:
self.__transition_gps = float(f.read().strip())
except:
except Exception:
# if this is an old event and we don't have a
# guard_state_end_gps file, just return the id (int of
# transition time) as the transition GPS
......@@ -142,12 +140,12 @@ class LocklossEvent(object):
except OSError:
raise OSError("Event {} already exists.".format(path))
event = cls(gps)
with open(event.gen_path('guard_state_end_gps'), 'w') as f:
with open(event.path('guard_state_end_gps'), 'w') as f:
f.write('{:f}\n'.format(gps))
if transition_index:
with open(event.gen_path('transition_index'), 'w') as f:
with open(event.path('transition_index'), 'w') as f:
f.write('{:.0f} {:.0f}\n'.format(*transition_index))
logging.info("event created: {}".format(event.path))
logger.info("event created: {}".format(event.path()))
return event
def _scrub(self, archive=True):
......@@ -167,39 +165,39 @@ class LocklossEvent(object):
]
if archive:
try:
last_time = int(os.stat(self.gen_path('status')).st_mtime)
last_time = int(os.stat(self.path('status')).st_mtime)
except OSError:
return
bak_dir = self.gen_path('bak', str(last_time))
logging.info("archiving old artifacts: {}".format(bak_dir))
bak_dir = self.path('bak', str(last_time))
logger.info("archiving old artifacts: {}".format(bak_dir))
try:
os.makedirs(bak_dir)
except OSError:
pass
for f in os.listdir(self.path):
for f in os.listdir(self.path()):
if f in preserve_files:
continue
shutil.move(os.path.join(self.path, f), bak_dir)
shutil.move(self.path(f), bak_dir)
else:
logging.info("purging old artifacts...")
for f in os.listdir(self.path):
logger.info("purging old artifacts...")
for f in os.listdir(self.path()):
if f in preserve_files:
continue
try:
shutil.rmtree(os.path.join(self.path, f))
shutil.rmtree(self.path(f))
except OSError:
os.remove(os.path.join(self.path, f))
os.remove(self.path(f))
#####
def _set_version(self, version):
path = self.gen_path('version')
path = self.path('version')
with open(path, 'w') as f:
f.write(version+'\n')
def _set_status(self, status):
assert status in [None, 0, 1]
path = self.gen_path('status')
path = self.path('status')
if status is None:
try:
os.remove(path)
......@@ -215,7 +213,7 @@ class LocklossEvent(object):
Throws an OSError if the event is already locked.
"""
path = self.gen_path('lock')
path = self.path('lock')
if os.path.exists(path):
raise OSError("Event already being analyzed: {}".format(path))
open(path, 'w').close()
......@@ -228,7 +226,7 @@ class LocklossEvent(object):
def release(self, *args, **kwargs):
"""release event analysis lock"""
try:
os.remove(self.gen_path('lock'))
os.remove(self.path('lock'))
except OSError:
pass
......@@ -239,12 +237,12 @@ class LocklossEvent(object):
@property
def analyzing(self):
"""True if event is being analyzed"""
return os.path.exists(self.gen_path('lock'))
return os.path.exists(self.path('lock'))
@property
def analyzed(self):
"""True if event has been analyzed"""
return os.path.exists(self.gen_path('status')) or os.path.exists(self.gen_path('analyzed'))
return os.path.exists(self.path('status')) or os.path.exists(self.path('analyzed'))
@property
def analysis_status(self):
......@@ -253,11 +251,16 @@ class LocklossEvent(object):
0=success, 1=fail
"""
path = self.gen_path('status')
path = self.path('status')
if not os.path.exists(path):
return None
with open(path, 'r') as f:
return int(f.readline().strip())
# FIXME: why do we get FileNotFoundError on the web server
# even after the above test passed?
try:
with open(path, 'r') as f:
return int(f.readline().strip())
except FileNotFoundError:
return None
@property
def analysis_succeeded(self):
......@@ -271,9 +274,9 @@ class LocklossEvent(object):
or None if not analyzed
"""
path = self.gen_path('version')
path = self.path('version')
if not os.path.exists(path):
path = self.gen_path('analyzed')
path = self.path('analyzed')
if not os.path.exists(path):
return
with open(path, 'r') as f:
......@@ -283,7 +286,7 @@ class LocklossEvent(object):
@property
def _tag_dir(self):
return self.gen_path('tags')
return self.path('tags')
def _tag_path(self, tag):
return os.path.join(self._tag_dir, tag.upper())
......@@ -312,7 +315,7 @@ class LocklossEvent(object):
for tag in tags:
assert ' ' not in tag, "Spaces are not permitted in tags."
open(self._tag_path(tag), 'a').close()
logging.info("added tag: {}".format(tag))
logger.info("added tag: {}".format(tag))
def rm_tag(self, *tags):
"""remove tags"""
......@@ -337,7 +340,7 @@ class LocklossEvent(object):
"""
if not self.__refined_gps:
path = self.gen_path('refined_gps')
path = self.path('refined_gps')
if not os.path.exists(path):
return None
with open(path) as f:
......@@ -358,7 +361,7 @@ class LocklossEvent(object):
"""
if not self.__previous_state:
path = self.gen_path('previous_state')
path = self.path('previous_state')
if not os.path.exists(path):
return None
with open(path) as f:
......@@ -370,21 +373,12 @@ class LocklossEvent(object):
}
return self.__previous_state
def base_plots_list(self):
l = []
for e in os.listdir(self.path):
if e[-4:] == '.png':
base, ext = e.split('__')
if base not in l:
l.append(base)
return l
def to_dict(self):
return {
'id': self.id,
'gps': self.gps,
'utc': self.utc,
'url': self.url,
'url': self.url(),
'view_url': self.view_url,
'transition_index': self.transition_index,
'analyzed': self.analyzed,
......@@ -397,6 +391,7 @@ class LocklossEvent(object):
##################################################
def _generate_all_events():
"""generate all events in reverse chronological order"""
event_root = config.EVENT_ROOT
......@@ -426,7 +421,17 @@ def find_events(**kwargs):
Events are generated in reverse chronological order.
"""
states = set()
if 'state' in kwargs:
comma_sep = [s.strip() for s in kwargs['state'].split(',')]
for item in comma_sep:
ss = [int(s) for s in item.split('-')]
if len(ss) > 1:
ss = range(ss[0], ss[-1]+1)
states.update(ss)
for event in _generate_all_events():
trans_idx = event.transition_index[0]
if 'after' in kwargs and event.gps < kwargs['after']:
# events generated in reverse chronological order, so we
......@@ -437,10 +442,11 @@ def find_events(**kwargs):
if 'before' in kwargs and event.gps > kwargs['before']:
continue
if 'state' in kwargs and event.transition_index[0] != kwargs['state']:
if states and trans_idx not in states:
continue
if 'tag' in kwargs and kwargs['tag'] not in event.list_tags():
continue
if 'tag' in kwargs:
if not set(kwargs['tag']) <= set(event.list_tags()):
continue
yield event
import os
import sys
import argparse
import logging
from . import set_signal_handlers
from . import config_logger
from . import config
from . import search
from . import analyze
from . import condor
##################################################
def start_job():
try:
os.makedirs(config.CONDOR_ONLINE_DIR)
......@@ -36,28 +39,31 @@ def _parser_add_arguments(parser):
title='Commands',
metavar='<command>',
dest='cmd',
#help=argparse.SUPPRESS,
)
parser.required = True
ep = parser.add_parser(
sp = parser.add_parser(
'exec',
help="direct execute online analysis",
help="directly execute online search",
)
sp.add_argument(
'--analyze', action='store_true',
help="execute the condor event analysis callback when events are found",
)
parser.add_parser(
'start',
help="start condor online analysis",
help="start condor online search",
)
parser.add_parser(
'stop',
help="stop condor online analysis",
help="stop condor online search",
)
parser.add_parser(
'restart',
help="restart condor online analysis",
help="restart condor online search",
)
parser.add_parser(
'status',
help="print condor job status",
help="print online condor job status",
)
return parser
......@@ -72,20 +78,32 @@ def main(args=None):
args = parser.parse_args()
if args.cmd == 'exec':
if args.analyze:
event_callback = analyze.analyze_condor
stat_file = config.ONLINE_STAT_FILE
if not os.path.exists(stat_file):
open(stat_file, 'w').close()
else:
event_callback = None
stat_file = None
search.search_iterate(
event_callback=analyze.analyze_condor,
event_callback=event_callback,
stat_file=stat_file,
)
elif args.cmd == 'start':
elif args.cmd in ['start', 'stop', 'restart']:
print("The LIGO site deployments of the online search are now handled via systemd --user on the 'detechar.ligo-?a.caltech.edu' hosts.", file=sys.stderr)
print("See \"systemctl --user status\" for more info.", file=sys.stderr)
if input(f"Type 'yes' if you're really sure you want to execute the online condor '{args.cmd}' command: ") != 'yes':
print("aborting.", file=sys.stderr)
exit(1)
ojobs, ajobs = condor.find_jobs()
assert not ojobs, "There are already currently acitve jobs:\n{}".format(
'\n'.join([condor.job_str(job) for job in ojobs]))
start_job()
elif args.cmd in ['stop', 'restart']:
ojobs, ajobs = condor.find_jobs()
condor.stop_jobs(ojobs)
if args.cmd == 'restart':
if args.cmd == 'start':
assert not ojobs, "There are already currently acitve jobs:\n{}".format(
'\n'.join([condor.job_str(job) for job in ojobs]))
else:
condor.stop_jobs(ojobs)
if args.cmd in ['start', 'restart']:
start_job()
elif args.cmd == 'status':
......@@ -97,12 +115,10 @@ def main(args=None):
# direct execution of this module intended for condor jobs
if __name__ == '__main__':
set_signal_handlers()
logging.basicConfig(
level='DEBUG',
format='%(asctime)s %(message)s'
)
stat_file = os.path.join(config.CONDOR_ONLINE_DIR, 'stat')
open(stat_file, 'w').close()
config_logger(level='DEBUG')
stat_file = config.ONLINE_STAT_FILE
if not os.path.exists(stat_file):
open(stat_file, 'w').close()
search.search_iterate(
event_callback=analyze.analyze_condor,
stat_file=stat_file,
......
import io
import sys
import logging
import xml.etree.ElementTree as ET
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as dates
import xml.etree.ElementTree as ET
import gpstime
from gwpy.segments import DataQualityFlag
from . import logger
from . import config
from . import data
from .event import find_events
# plt.rc('text', usetex=True)
##################################################
def gtmin(gt):
"""GPS time rounded to minute from gpstime object"""
return int(gt.gps() / 60) * 60
......@@ -74,17 +78,23 @@ def plot_history(path, lookback='7 days ago', draw_segs=False):
bufs = data.nds_fetch([channel], gtmin(start), gtmin(end))
state_index, state_time = bufs[0].yt()
if np.all(np.isnan(state_index)):
logging.warning("state data [{}] is all nan??".format(channel))
logger.warning("state data [{}] is all nan??".format(channel))
# find events
events = list(find_events(after=start.gps()))
fig = plt.figure(figsize=(16, 4)) #, dpi=80)
fig = plt.figure(figsize=(16, 4))
ax = fig.add_subplot(111)
# plot analyzing
escatter(
[e for e in events if e.analyzing],
s=100, color='orange',
)
# plot failed lock losses
escatter(
[e for e in events if not e.analysis_succeeded],
[e for e in events if not e.analyzing and not e.analysis_succeeded],
s=100, color='red',
)
......@@ -174,6 +184,7 @@ def plot_history(path, lookback='7 days ago', draw_segs=False):
##################################################
def main():
path = sys.argv[1]
plot_history(path)
......
......@@ -12,6 +12,10 @@ def set_thresh_crossing(ax, thresh_crossing, gps, segment):
lw=5,
)
x_fraction = (thresh_crossing-segment[0])/(segment[1]-segment[0])
if x_fraction < 0.05:
align = 'left'
else:
align = 'center'
ax.annotate(
'First threshold crossing',
xy=(x_fraction, 1),
......@@ -19,11 +23,12 @@ def set_thresh_crossing(ax, thresh_crossing, gps, segment):
horizontalalignment='center',
verticalalignment='bottom',
bbox=dict(boxstyle="round", fc="w", ec="red", alpha=0.95),
ha=align,
)
def set_rcparams():
"""Sets matplotlib parameters for followups.
"""Standard matplotlib plot styling.
"""
plt.rcParams['font.size'] = 30
......
'''locklost followup analysis plugins
The package consists of functions to be run during a locklost event
followup analysis. Plugin functions should take a LocklossEvent as
argument.
Plugins are registerd with the register_plugin() function. Plugins
are currently executed sequentially in a single process, so register
ordering is preserved as a poor-man's dependency tree.
FIXME: figure out better way to express dependencies (e.g. DAG)
'''
import collections
import matplotlib.pyplot as plt
......@@ -5,48 +18,85 @@ from matplotlib import rcParamsDefault
plt.rcParams.update(rcParamsDefault)
FOLLOWUPS = collections.OrderedDict()
def add_follow(mod):
FOLLOWUPS.update([(mod.__name__, mod)])
def register_plugin(func):
PLUGINS.update([(func.__name__, func)])
PLUGINS = collections.OrderedDict()
from .discover import discover_data
add_follow(discover_data)
register_plugin(discover_data)
from .refine import refine_time
add_follow(refine_time)
register_plugin(refine_time)
from .observe import check_observe
add_follow(check_observe)
from .ifo_mode import check_ifo_mode
register_plugin(check_ifo_mode)
from .history import find_previous_state
add_follow(find_previous_state)
from .initial_alignment import initial_alignment_check
register_plugin(initial_alignment_check)
from .saturations import find_saturations
add_follow(find_saturations)
register_plugin(find_saturations)
from .lpy import find_lpy
add_follow(find_lpy)
register_plugin(find_lpy)
from .lsc_asc import plot_lsc_asc
add_follow(plot_lsc_asc)
from .pi import check_pi
register_plugin(check_pi)
from .glitch import analyze_glitches
add_follow(analyze_glitches)
from .violin import check_violin
register_plugin(check_violin)
from .overflows import find_overflows
add_follow(find_overflows)
from .fss_oscillation import check_fss
register_plugin(check_fss)
from .iss import check_iss
register_plugin(check_iss)
from .etm_glitch import check_glitch
register_plugin(check_glitch)
from .ham6_power import power_in_ham6
register_plugin(power_in_ham6)
from .brs import check_brs
add_follow(check_brs)
register_plugin(check_brs)
from .board_sat import check_boards
add_follow(check_boards)
register_plugin(check_boards)
from .wind import check_wind
add_follow(check_wind)
register_plugin(check_wind)
from .seismic import check_seismic
add_follow(check_seismic)
from .overflows import find_overflows
register_plugin(find_overflows)
from .lsc_asc import plot_lsc_asc
register_plugin(plot_lsc_asc)
from .glitch import analyze_glitches
register_plugin(analyze_glitches)
from .darm import plot_darm
register_plugin(plot_darm)
# Check other possible tags to add
from .ads_excursion import check_ads
add_follow(check_ads)
register_plugin(check_ads)
from .sei_bs_trans import check_sei_bs
register_plugin(check_sei_bs)
from .soft_limiters import check_soft_limiters
register_plugin(check_soft_limiters)
from .omc_dcpd import check_omc_dcpd
register_plugin(check_omc_dcpd)
# Add the following at the end because they need to wait for additional data
from .seismic import check_seismic
register_plugin(check_seismic)
from .history import find_previous_state
register_plugin(find_previous_state)
\ No newline at end of file
import sys
import logging
import importlib
import argparse
from .. import set_signal_handlers
from .. import config
from .. import config_logger
from ..event import LocklossEvent
from . import PLUGINS
parser = argparse.ArgumentParser()
parser.add_argument(
'plugin', nargs='?',
)
parser.add_argument(
'event', nargs='?',
)
def main():
set_signal_handlers()
logging.basicConfig(
level='DEBUG',
format=config.LOG_FMT,
)
mpath = sys.argv[1].split('.')
event = LocklossEvent(sys.argv[2])
mod = importlib.import_module('.'+'.'.join(mpath[:-1]), __package__)
func = mod.__dict__[mpath[-1]]
config_logger()
args = parser.parse_args()
if not args.plugin:
for name, func in PLUGINS.items():
print(name)
return
if not args.event:
parser.error("must specifiy event")
try:
func = PLUGINS[args.plugin]
except KeyError:
parser.error(f"unknown plugin: {args.plugin}")
event = LocklossEvent(args.event)
func(event)
......
import logging
import numpy as np
import matplotlib.pyplot as plt
from gwpy.segments import Segment
from .. import logger
from .. import config
from .. import data
from .. import plotutils
#################################################
def check_ads(event):
"""Checks for ADS channels above threshold.
......@@ -18,8 +20,8 @@ def check_ads(event):
"""
if event.transition_index[0] != config.GRD_NOMINAL_STATE[1]:
logging.info('lockloss not from nominal low noise')
if event.transition_index[0] < config.ADS_GRD_STATE[0]:
logger.info('lockloss not from state using ADS injections')
return
plotutils.set_rcparams()
......@@ -45,9 +47,9 @@ def check_ads(event):
if saturating:
event.add_tag('ADS_EXCURSION')
else:
logging.info('no ADS excursion detected')
fig, ax = plt.subplots(1, figsize=(22,16))
logger.info('no ADS excursion detected')
fig, ax = plt.subplots(1, figsize=(22, 16))
for idx, buf in enumerate(ads_channels):
srate = buf.sample_rate
t = np.arange(segment[0], segment[1], 1/srate)
......@@ -89,5 +91,5 @@ def check_ads(event):
fig.tight_layout(pad=0.05)
outfile_plot = 'ads.png'
outpath_plot = event.gen_path(outfile_plot)
outpath_plot = event.path(outfile_plot)
fig.savefig(outpath_plot, bbox_inches='tight')
import logging
import numpy as np
import matplotlib.pyplot as plt
from gwpy.segments import Segment
from .. import logger
from .. import config
from .. import data
from .. import plotutils
##############################################
def check_boards(event):
"""Checks for analog board saturations.
......@@ -28,7 +30,7 @@ def check_boards(event):
for buf in board_channels:
srate = buf.sample_rate
t = np.arange(segment[0], segment[1], 1/srate)
before_loss = buf.data[np.where(t<event.gps-config.BOARD_SAT_BUFFER)]
before_loss = buf.data[np.where(t < event.gps-config.BOARD_SAT_BUFFER)]
if any(abs(before_loss) >= config.BOARD_SAT_THRESH):
saturating = True
glitch_idx = np.where(abs(buf.data) > config.BOARD_SAT_THRESH)[0][0]
......@@ -38,9 +40,9 @@ def check_boards(event):
if saturating:
event.add_tag('BOARD_SAT')
else:
logging.info('no saturating analog boards')
logger.info('no saturating analog boards')
fig, ax = plt.subplots(1, figsize=(22,16))
fig, ax = plt.subplots(1, figsize=(22, 16))
for idx, buf in enumerate(board_channels):
srate = buf.sample_rate
t = np.arange(segment[0], segment[1], 1/srate)
......@@ -78,5 +80,5 @@ def check_boards(event):
fig.tight_layout(pad=0.05)
outfile_plot = 'board_sat.png'
outpath_plot = event.gen_path(outfile_plot)
outpath_plot = event.path(outfile_plot)
fig.savefig(outpath_plot, bbox_inches='tight')
import logging
import numpy as np
import matplotlib.pyplot as plt
from gwpy.segments import Segment
from .. import logger
from .. import config
from .. import data
from .. import plotutils
#################################################
CHANNEL_ENDINGS = [
'100M_300M',
'300M_1',
'1_3',
]
# H1 checks single status channel that says what combination of BRSX/BRSY is
# being used
if config.IFO == 'H1':
CONFIG = {
'ETMX': {
'skip_states': {65},
'axis': 'RY',
},
'ETMY': {
'skip_states': {60},
'axis': 'RX',
},
}
for station in CONFIG:
CONFIG[station]['state_chan'] = 'H1:GRD-SEI_CONF_REQUEST_N'
CONFIG[station]['skip_states'] |= {10, 17, 45}
# L1 checks status channels for each individual BRS (ETMs and ITMs) that says
# whether it's being used
elif config.IFO == 'L1':
CONFIG = {
'ITMX': {
'state_chan': 'L1:ISI-ITMX_ST1_SENSCOR_X_FADE_CUR_CHAN_MON',
'axis': 'RY',
},
'ETMX': {
'state_chan': 'L1:ISI-ETMX_ST1_SENSCOR_Y_FADE_CUR_CHAN_MON',
'axis': 'RY',
},
'ITMY': {
'state_chan': 'L1:ISI-ITMY_ST1_SENSCOR_Y_FADE_CUR_CHAN_MON',
'axis': 'RX',
},
'ETMY': {
'state_chan': 'L1:ISI-ETMY_ST1_SENSCOR_Y_FADE_CUR_CHAN_MON',
'axis': 'RX',
},
}
for station in CONFIG:
CONFIG[station]['skip_states'] = {8, 9}
for station in CONFIG:
channels = []
for ending in CHANNEL_ENDINGS:
channels.append('{}:ISI-GND_BRS_{}_{}_BLRMS_{}'.format(config.IFO, station, CONFIG[station]['axis'], ending))
CONFIG[station]['channels'] = channels
THRESHOLD = 15
SEARCH_WINDOW = [-30, 5]
#################################################
def check_brs(event):
"""Checks for BRS glitches at both end stations.
......@@ -20,43 +83,50 @@ def check_brs(event):
plotutils.set_rcparams()
mod_window = [config.BRS_SEARCH_WINDOW[0], config.BRS_SEARCH_WINDOW[1]]
segment = Segment(mod_window).shift(int(event.gps))
for endstation, channel_names in config.BRS_CHANNELS.items():
brs_channels = data.fetch(channel_names, segment)
segment = Segment(SEARCH_WINDOW).shift(int(event.gps))
glitch_count = 0
for station, params in CONFIG.items():
# fetch all data (state and data)
channels = [params['state_chan']] + params['channels']
try:
buf_dict = data.fetch(channels, segment, as_dict=True)
except ValueError:
logger.warning('BRS info not available for {}'.format(station))
continue
# check for proper state
state_buf = buf_dict[params['state_chan']]
t = state_buf.tarray
state = state_buf.data[np.argmin(np.absolute(t-event.gps))]
if state in params['skip_states']:
logger.info('{} not using sensor correction during lockloss'.format(station))
continue
del buf_dict[params['state_chan']]
# look for glitches
max_brs = 0
thresh_crossing = segment[1]
for buf in brs_channels:
for channel, buf in buf_dict.items():
max_brs = max([max_brs, max(buf.data)])
if any(buf.data > config.BRS_THRESH):
glitch_count += 1
srate = buf.sample_rate
t = np.arange(segment[0], segment[1], 1/srate)
glitch_idx = np.where(buf.data > config.BRS_THRESH)[0][0]
glitch_time = t[glitch_idx]
if any(buf.data > THRESHOLD):
logger.info('BRS GLITCH DETECTED in {}'.format(channel))
event.add_tag('BRS_GLITCH')
glitch_idx = np.where(buf.data > THRESHOLD)[0][0]
glitch_time = buf.tarray[glitch_idx]
thresh_crossing = min(glitch_time, thresh_crossing)
if glitch_count > 1:
event.add_tag('BRS_GLITCH')
else:
logging.info('No %s BRS glitching detected' % (endstation))
fig, ax = plt.subplots(1, figsize=(22,16))
for buf in brs_channels:
srate = buf.sample_rate
t = np.arange(segment[0], segment[1], 1/srate)
fig, ax = plt.subplots(1, figsize=(22, 16))
for channel, buf in buf_dict.items():
t = buf.tarray
ax.plot(
t-event.gps,
buf.data,
label=buf.channel,
label=channel,
alpha=0.8,
lw=2,
)
ax.axhline(
config.BRS_THRESH,
THRESHOLD,
linestyle='--',
color='black',
label='BRS glitch threshold',
......@@ -68,12 +138,12 @@ def check_brs(event):
ax.grid()
ax.set_xlabel('Time [s] since lock loss at {}'.format(event.gps), labelpad=10)
ax.set_ylabel('RMS Velocity [nrad/s]')
ax.set_ylim(0, max_brs+1)
# ax.set_ylim(0, max_brs+1)
ax.set_xlim(t[0]-event.gps, t[-1]-event.gps)
ax.legend(loc='best')
ax.set_title('%s BRS BLRMS' % (endstation), y=1.04)
ax.set_title('{} BRS BLRMS'.format(station), y=1.04)
fig.tight_layout(pad=0.05)
outfile_plot = 'brs_%s.png' % (endstation)
outpath_plot = event.gen_path(outfile_plot)
outfile_plot = 'brs_{}.png'.format(station)
outpath_plot = event.path(outfile_plot)
fig.savefig(outpath_plot, bbox_inches='tight')