Skip to content
Snippets Groups Projects
Commit 59e2ff87 authored by Cort Posnansky's avatar Cort Posnansky
Browse files

dtdphi dag: parallelize final job and implement site profile requirements

parent aa8ef64e
No related branches found
No related tags found
1 merge request!429dtdphi dag: parallelize final job and implement site profile requirements
Checking pipeline status
......@@ -18,10 +18,16 @@
import sys
import numpy
import argparse
from gstlal.stats.inspiral_extrinsics import TimePhaseSNR
parser = argparse.ArgumentParser(description = 'combine dt dphi snr ratio pdfs')
parser.add_argument("--output", metavar = 'filename', type = str, help = 'The output file name', default = "inspiral_dtdphi_pdf.h5")
parser.add_argument('files', metavar = 'filename', type = str, nargs='+', help = 'The input pdf filenames')
options = parser.parse_args()
# Read in and combine all of the input files
files = sys.argv[1:]
files = options.files
TPS = TimePhaseSNR.from_hdf5(files[0], files[1:])
# compute the normalization
......@@ -54,4 +60,4 @@ for i in range(len(time["H1"])):
TPS.norm = norm
# write the result to disk
TPS.to_hdf5("inspiral_dtdphi_pdf.h5")
TPS.to_hdf5(options.output)
......@@ -22,6 +22,7 @@ import os
from gstlal.dags import Argument, Option, DAG
from gstlal.dags.layers import Layer, Node
from gstlal.dags import profiles
parser = argparse.ArgumentParser(description = 'generate a dt dphi covariance matrix and tree data to replace share/inspiral_dtdphi_pdf.h5')
......@@ -36,16 +37,18 @@ parser.add_argument('--s1', type = float, default = 0., help = 'primary (z) spin
parser.add_argument('--s2', type = float, default = 0., help = 'secondary (z) spin')
parser.add_argument('--flow', type = float, default = 10., help = 'Low frequency cut-off. Default 10 Hz')
parser.add_argument('--fhigh', type = float, default = 1024., help = 'High frequency cut-off. Default 1024 Hz')
parser.add_argument('--profile', type = str, help = 'The site profile to use for condor submit requirements')
parser.add_argument('--singularity-image', metavar = "filename", help = 'If set, uses the Singularity image provided as the build environment and sets Singularity-specific condor options.')
args = parser.parse_args()
if args.flow >= args.fhigh:
raise ValueError("flow cannot be greater than fhigh")
raise ValueError("flow must be less than fhigh")
dag = DAG("dt_dphi")
dag.create_log_dir()
requirements = {
requirements_list = []
submit_opts = {
"want_graceful_removal": "True",
"kill_sig": "15",
"accounting_group_user": getpass.getuser(),
......@@ -53,23 +56,36 @@ requirements = {
"environment": '"HDF5_USE_FILE_LOCKING=FALSE"'
}
if args.profile:
profile_dict = profiles.load_profile(profile=args.profile)
requirements_list.extend(profile_dict['requirements'])
if args.singularity_image:
requirements['+SingularityImage'] = '"{}"'.format(args.singularity_image)
requirements['transfer_executable'] = False
requirements['get_env'] = False
requirements['requirements'] = "(HAS_SINGULARITY=?=True)"
submit_opts['+SingularityImage'] = f'"{args.singularity_image}"'
submit_opts['transfer_executable'] = False
submit_opts['get_env'] = False
requirements_list.append("(HAS_SINGULARITY=?=True)")
if len(requirements_list) > 0:
submit_opts['requirements'] = " && ".join(requirements_list)
cov_layer = Layer(
"gstlal_inspiral_compute_dtdphideff_cov_matrix",
requirements={"request_cpus": 1, "request_memory": "1GB", "request_disk": "1GB", **requirements},
requirements={"request_cpus": 1, "request_memory": "1GB", "request_disk": "1GB", **submit_opts},
)
marg_layer = Layer(
"gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs",
requirements={"request_cpus": 1, "request_memory": "5GB", "request_disk": "1GB",**requirements},
requirements={"request_cpus": 1, "request_memory": "5GB", "request_disk": "1GB",**submit_opts},
)
add_round_one_layer = Layer(
"gstlal_inspiral_add_dt_dphi_snr_ratio_pdfs",
name="add_pdfs_round_one",
requirements={"request_cpus": 1, "request_memory": "4GB", "request_disk": "5GB",**submit_opts},
)
add_layer = Layer(
add_round_two_layer = Layer(
"gstlal_inspiral_add_dt_dphi_snr_ratio_pdfs",
requirements={"request_cpus": 1, "request_memory": "4GB", "request_disk": "10GB",**requirements},
name="add_pdfs_round_two",
requirements={"request_cpus": 1, "request_memory": "4GB", "request_disk": "6GB",**submit_opts},
)
cov_layer += Node(
......@@ -104,11 +120,24 @@ for start in range(0, 3345408, num):
marg_files.append(marg_file)
dag.attach(marg_layer)
add_layer += Node(
inputs = Argument("inputs", marg_files),
outputs = Argument("dtdphi-pdf", "inspiral_dtdphi_pdf.h5", suppress=True)
# Add dtdphi pdfs in chunks of 50
files_per_job = 50
intermediate_files = []
for i in range(0, len(marg_files), files_per_job):
output_filename = f"inspiral_dtdphi_pdf_combined_{i}_to_{i+files_per_job}"
add_round_one_layer += Node(
inputs = Argument("inputs", marg_files[i:i+files_per_job]),
outputs = Option("output", output_filename)
)
intermediate_files.append(output_filename)
dag.attach(add_round_one_layer)
# Add intermediate dtdphi pdfs
add_round_two_layer += Node(
inputs = Argument("inputs", intermediate_files),
outputs = Option("output", "inspiral_dtdphi_pdf.h5")
)
dag.attach(add_layer)
dag.attach(add_round_two_layer)
dag.write_dag("dt_dphi.dag")
dag.write_script("dt_dphi.sh")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment