Skip to content
Snippets Groups Projects
Commit 0b9ee941 authored by Gregory Ashton's avatar Gregory Ashton
Browse files

Create Dag class and add testing

parent 7e50d818
No related branches found
No related tags found
No related merge requests found
from .main import Input # noqa
from .main import Dag # noqa
......@@ -116,6 +116,84 @@ class Input(object):
raise ValueError('Input X509 not a file or not understood')
class Dag(object):
def __init__(self, inputs, job_logs='logs'):
""" A class to handle the creation and building of a DAG
Parameters
----------
inputs: bilby_pipe.Input
An object holding the inputs built from the command-line/ini
jobs_logs: str
A path relative to the `inputs.outdir` to store per-job logs
"""
self.dag = pycondor.Dagman(name=inputs.label, submit=inputs.outdir)
self.inputs = inputs
self.job_logs = job_logs
self.create_jobs()
self.build_submit()
def create_jobs(self):
""" Create all the condor jobs and add them to the dag """
for job in self.jobs:
self._create_job(**job)
@property
def jobs(self):
""" A list of dictionaries enumerating all the main jobs to generate
The keys of each dictionary should be the keyword arguments to
`self._create_jobs()`
"""
jobs = []
jobs.append(dict(detectors=self.inputs.include_detectors))
if self.inputs.coherence_test:
for detector in self.inputs.include_detectors:
jobs.append(dict(detectors=[detector]))
return jobs
def _create_job(self, detectors):
""" Create a condor job and add it to the dag
Parameters
----------
detectors: list, str
A list of the detectors to include, e.g. `['H1', 'L1']`
"""
if not isinstance(detectors, list):
raise ValueError("`detectors must be a list")
job_logs_path = os.path.join(self.inputs.outdir, self.job_logs)
error = job_logs_path
log = job_logs_path
output = job_logs_path
submit = self.inputs.outdir
extra_lines = 'accounting_group={}'.format(self.inputs.accounting)
extra_lines += '\nx509userproxy={}'.format(self.inputs.x509userproxy)
arguments = '--ini {}'.format(self.inputs.ini)
name = self.inputs.label + '_' + ''.join(detectors)
arguments += ' --detectors {}'.format(' '.join(detectors))
arguments += ' ' + ' '.join(self.inputs.unknown_args)
pycondor.Job(
name=name, executable=self.inputs.executable,
extra_lines=extra_lines, output=output, log=log, error=error,
submit=submit, arguments=arguments, dag=self.dag)
def build_submit(self):
""" Build the dag, optionally submit them if requested in inputs """
if self.inputs.submit:
raise NotImplementedError(
"This method is currently failing for unknown reasons")
self.dag.build_submit()
else:
self.dag.build()
def set_up_argument_parsing():
parser = configargparse.ArgParser(
usage='Generate submission scripts for the job',
......@@ -149,33 +227,6 @@ def set_up_argument_parsing():
return args, unknown_args
def create_job_per_detector_set(inputs, dag, detectors):
error = log = output = os.path.join(inputs.outdir, 'logs')
submit = inputs.outdir
extra_lines = 'accounting_group={}'.format(inputs.accounting)
extra_lines += '\nx509userproxy={}'.format(inputs.x509userproxy)
arguments = '--ini {}'.format(inputs.ini)
name = inputs.label + '_' + ''.join(detectors)
if isinstance(detectors, list):
detectors = ' '.join(detectors)
arguments += ' --detectors {}'.format(detectors)
arguments += ' ' + ' '.join(inputs.unknown_args)
pycondor.Job(
name=name, executable=inputs.executable, extra_lines=extra_lines,
output=output, log=log, error=error, submit=submit,
arguments=arguments, dag=dag)
def main():
inputs = Input(*set_up_argument_parsing())
dag = pycondor.Dagman(name=inputs.label, submit=inputs.outdir)
if inputs.coherence_test:
for detector in inputs.include_detectors:
create_job_per_detector_set(inputs, dag, detector)
create_job_per_detector_set(inputs, dag, inputs.include_detectors)
if inputs.submit:
raise NotImplementedError(
"This method is currently failing for unknown reasons")
dag.build_submit()
else:
dag.build()
Dag(inputs)
import os
import unittest
from argparse import Namespace
import copy
import bilby_pipe
class TestDag(unittest.TestCase):
def setUp(self):
self.directory = os.path.abspath(os.path.dirname(__file__))
self.test_args = Namespace(
ini='file.ini', submit=False, outdir='outdir', label='label',
accounting='accounting.group', include_detectors='H1',
coherence_test=False, executable_library=self.directory,
executable='executable.py', exe_help=False,
X509=os.path.join(self.directory, 'X509.txt'))
self.test_unknown_args = ['--argument', 'value']
self.inputs = bilby_pipe.Input(self.test_args, self.test_unknown_args)
def tearDown(self):
del self.test_args
del self.inputs
def test_job_logs(self):
dag = bilby_pipe.Dag(self.inputs, job_logs='test')
self.assertEqual(dag.job_logs, 'test')
def test_jobs_creation(self):
test_args = copy.copy(self.test_args)
test_args.include_detectors = 'H1 L1'
test_args.coherence_test = True
inputs = bilby_pipe.Input(test_args, self.test_unknown_args)
dag = bilby_pipe.Dag(inputs)
self.assertEqual(dag.jobs, [dict(detectors=['H1', 'L1']),
dict(detectors=['H1']),
dict(detectors=['L1'])])
#def test_build_submit(self):
# test_args = copy.copy(self.test_args)
# test_args.submit = True
# inputs = bilby_pipe.Input(test_args, self.test_unknown_args)
# dag = bilby_pipe.Dag(inputs)
# dag.build_submit()
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment