Skip to content
Snippets Groups Projects
Commit 941fb1db authored by Soichiro Morisaki's avatar Soichiro Morisaki
Browse files

gwcelery/tasks/inference.py: don't download psd.xml.gz, as it is no longer uploaded to GraceDB

parent 512341bc
No related branches found
No related tags found
1 merge request!1055gwcelery/tasks/inference.py: don't download psd.xml.gz, as it is no longer uploaded to GraceDB
......@@ -7,6 +7,9 @@ Changelog
- Don't compute p-astro for PyCBC/AllSky because it now computes
and uploads its own.
- Don't try to download psd.xml.gz in online PE, as it is no longer uploaded
by any search pipelines.
2.0.2 "Flying Icarus" (2022-12-23)
----------------------------------
......
......@@ -12,7 +12,6 @@ import urllib
from celery import group
from gwdatafind import find_urls
import numpy as np
from requests.exceptions import HTTPError
from .. import app
from ..jinja import env
......@@ -194,14 +193,14 @@ def prepare_lalinference_ini(frametype_dict, event, superevent_id):
@app.task(shared=False)
def _setup_dag_for_lalinference(coinc_psd, rundir, event, superevent_id,
def _setup_dag_for_lalinference(coinc, rundir, event, superevent_id,
frametype_dict):
"""Create DAG for a lalinference run and return the path to DAG.
Parameters
----------
coinc_psd : tuple of byte contents
Tuple of the byte contents of ``coinc.xml`` and ``psd.xml.gz``.
coinc : byte contents
Byte contents of ``coinc.xml``. The PSD is expected to be embedded.
rundir : str
The path to a run directory where the DAG file is created.
event : dict
......@@ -219,17 +218,10 @@ def _setup_dag_for_lalinference(coinc_psd, rundir, event, superevent_id,
The path to the .dag file
"""
coinc_contents, psd_contents = coinc_psd
# write down coinc.xml in the run directory
path_to_coinc = os.path.join(rundir, 'coinc.xml')
with open(path_to_coinc, 'wb') as f:
f.write(coinc_contents)
# write down psd.xml.gz
path_to_psd = os.path.join(rundir, 'psd.xml.gz')
with open(path_to_psd, 'wb') as f:
f.write(psd_contents)
f.write(coinc)
# write down and upload ini file
ini_contents = prepare_lalinference_ini(
......@@ -247,7 +239,7 @@ def _setup_dag_for_lalinference(coinc_psd, rundir, event, superevent_id,
try:
subprocess.run(
['lalinference_pipe', '--run-path', rundir,
'--coinc', path_to_coinc, path_to_ini, '--psd', path_to_psd],
'--coinc', path_to_coinc, path_to_ini, '--psd', path_to_coinc],
capture_output=True, check=True)
except subprocess.CalledProcessError as e:
contents = b'args:\n' + json.dumps(e.args[1]).encode('utf-8') + \
......@@ -264,13 +256,13 @@ def _setup_dag_for_lalinference(coinc_psd, rundir, event, superevent_id,
@app.task(shared=False)
def _setup_dag_for_bilby(psd, rundir, event, superevent_id, mode):
def _setup_dag_for_bilby(coinc, rundir, event, superevent_id, mode):
"""Create DAG for a bilby run and return the path to DAG.
Parameters
----------
psd : bytes
The byte contents of coinc.xml or psd.xml.gz containing psd
coinc : bytes
Byte contents of ``coinc.xml``. The PSD is expected to be embedded.
rundir : str
The path to a run directory where the DAG file is created
event : dict
......@@ -294,7 +286,7 @@ def _setup_dag_for_bilby(psd, rundir, event, superevent_id, mode):
path_to_psd = os.path.join(rundir, 'coinc.xml')
with open(path_to_psd, 'wb') as f:
f.write(psd)
f.write(coinc)
path_to_webdir = os.path.join(
app.conf['pe_results_path'], superevent_id, 'bilby'
......@@ -455,13 +447,11 @@ def dag_prepare_task(rundir, event, superevent_id, pe_pipeline,
"""
if pe_pipeline == 'lalinference':
canvas = group(
gracedb.download.si('coinc.xml', event['graceid']),
_download_psd.si(event['graceid'])
) | _setup_dag_for_lalinference.s(rundir, event, superevent_id,
canvas = gracedb.download.si('coinc.xml', event['graceid']) | \
_setup_dag_for_lalinference.s(rundir, event, superevent_id,
frametype_dict)
elif pe_pipeline == 'bilby':
canvas = _download_psd.si(event['graceid']) | \
canvas = gracedb.download.si('coinc.xml', event['graceid']) | \
_setup_dag_for_bilby.s(
rundir, event, superevent_id, kwargs['bilby_mode'])
elif pe_pipeline == 'rapidpe':
......@@ -741,18 +731,6 @@ def dag_finished(rundir, superevent_id, pe_pipeline, **kwargs):
gracedb.create_label.delay('PE_READY', superevent_id)
@gracedb.task(shared=False)
def _download_psd(gid):
"""Download psd and return its byte contents. This task first tries to
download ``psd.xml.gz``, and if it does not exist, this task downloads
``coinc.xml``, assuming it contains psd instead.
"""
try:
return gracedb.download("psd.xml.gz", gid)
except HTTPError:
return gracedb.download("coinc.xml", gid)
@app.task(ignore_result=True, shared=False)
def start_pe(frametype_dict, event, superevent_id, pe_pipeline):
"""Run Parameter Estimation on a given event.
......
......@@ -4,7 +4,6 @@ import subprocess
import json
import pytest
from requests.exceptions import HTTPError
from unittest.mock import Mock
from .. import app
......@@ -132,7 +131,7 @@ def test_prepare_lalinference_ini(monkeypatch, mc, q, answers):
def test_setup_dag_for_lalinference(monkeypatch, tmp_path):
coinc, psd = b'coinc', b'psd'
coinc = b'coinc'
rundir = str(tmp_path)
ini = 'ini'
dag = 'lalinference dag'
......@@ -153,7 +152,7 @@ def test_setup_dag_for_lalinference(monkeypatch, tmp_path):
path_to_psd = cmd[7]
assert os.path.exists(path_to_psd)
with open(path_to_psd, 'rb') as f:
assert f.read() == psd
assert f.read() == coinc
with open(os.path.join(rundir, 'multidag.dag'), 'w') as f:
f.write(dag)
......@@ -166,7 +165,7 @@ def test_setup_dag_for_lalinference(monkeypatch, tmp_path):
monkeypatch.setattr('gwcelery.tasks.gracedb.upload.run', upload)
path_to_dag = inference._setup_dag_for_lalinference(
(coinc, psd), rundir, {}, 'S1234', {})
coinc, rundir, {}, 'S1234', {})
assert os.path.exists(path_to_dag)
with open(path_to_dag, 'r') as f:
......@@ -290,7 +289,7 @@ def test_setup_dag_for_failure(monkeypatch, tmp_path, pipeline):
with pytest.raises(subprocess.CalledProcessError):
if pipeline == 'lalinference':
inference._setup_dag_for_lalinference(
(b'coinc', b'psd'), rundir, event, 'S1234', {})
b'coinc', rundir, event, 'S1234', {})
elif pipeline == 'bilby':
inference._setup_dag_for_bilby(
(b'psd'), rundir, event, 'S1234', 'production')
......@@ -309,7 +308,7 @@ def test_setup_dag_for_failure(monkeypatch, tmp_path, pipeline):
'pipeline', ['lalinference', 'bilby', 'rapidpe', 'my_awesome_pipeline'])
def test_dag_prepare_task(monkeypatch, pipeline):
sid = 'S1234'
coinc, psd = b'coinc', b'psd'
coinc = b'coinc'
event = {'gpstime': 1187008882, 'graceid': 'G1234'}
rundir = 'rundir'
frametype_dict = {'H1': 'H1_llhoft', 'L1': 'L1_llhoft'}
......@@ -319,17 +318,14 @@ def test_dag_prepare_task(monkeypatch, pipeline):
def mock_download(filename, gid):
if filename == 'coinc.xml':
return coinc
elif filename == 'psd.xml.gz':
return psd
def _setup_dag_for_lalinference(c_p, r, e, s, f):
c, p = c_p
assert (c == coinc and p == psd and r == rundir
and e == event and s == sid and f == frametype_dict)
def _setup_dag_for_lalinference(c, r, e, s, f):
assert (c == coinc and r == rundir and e == event and s == sid and
f == frametype_dict)
return path_to_dag
def _setup_dag_for_bilby(p, r, e, s, m):
assert p == psd and r == rundir and e == event and s == sid and \
def _setup_dag_for_bilby(c, r, e, s, m):
assert c == coinc and r == rundir and e == event and s == sid and \
m == kwargs['bilby_mode']
return path_to_dag
......@@ -461,21 +457,6 @@ def test_dag_finished(monkeypatch, tmp_path, pipeline):
inference.dag_finished(rundir, sid, pipeline)
def test_download_psd_failure(monkeypatch):
"""Test if _download_psd tries to download coinc.xml when it fails to
download psd.xml.gz"""
coinc_contents = b'coinc'
def mock_download(filename, gid):
if filename == 'coinc.xml':
return coinc_contents
else:
raise HTTPError
monkeypatch.setattr('gwcelery.tasks.gracedb.download', mock_download)
assert inference._download_psd('G1234') == coinc_contents
def test_start_pe(monkeypatch, tmp_path):
path_to_sub = 'pe.dag.condor.sub'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment