Skip to content
Snippets Groups Projects

Update htcondor sync to record from spool

Merged Gregory Ashton requested to merge improve-sync into master
1 file
+ 85
7
Compare changes
  • Side-by-side
  • Inline
"""
This executable will use rsync to copy the results from remote worker nodes
back to the execute node. The executable is assumed to be run from the submit
directory (e.g., where the .ini file is) and on the submit machine.
bilby_pipe_htcondor_sync can be used to sync jobs running under HTCondor where
transfer-files=True (i.e. they do not use a virtual file system). It will use
rsync to copy the results from remote worker nodes or spool back to the execute
node. The executable is assumed to be run from the submit directory (i.e.,
where the .ini file is) and on the submit machine (i.e. where the job was
submitted from).
"""
import argparse
import glob
import os
import subprocess
from bilby_pipe.utils import logger
@@ -56,7 +59,27 @@ def get_cluster_id(logfile):
logger.info("No cluster ID found in log file")
def run_rsync(cluster_id, outdir):
def rsync_via_ssh(cluster_id, outdir):
"""Attempt to rsync the local (submit) directory to current running worker nodes
This method applies when the job is actively executing on a remote worker
node and condor_ssh_to_job is possible. The method works by using
condor_ssh_to_job and rsync as described in the HTCondor documentation:
https://htcondor.readthedocs.io/en/latest/man-pages/condor_ssh_to_job.html.
Parameters
----------
cluster_id: int
The HTCondor clusterId
outdir: str
The top-level outdir of the bilby_pipe job
Returns
-------
success: bool
True if the method was successful.
"""
sync_path = f"{outdir}/result/"
target = f"{cluster_id}:{sync_path}"
cmd = ["rsync", "-v", "-r", "-e", '"condor_ssh_to_job"', target, sync_path]
@@ -64,8 +87,53 @@ def run_rsync(cluster_id, outdir):
out = subprocess.run(cmd, capture_output=True)
if out.returncode == 0:
logger.info(f"Synced job {cluster_id}: {out.stdout.decode('utf-8')}")
return True
else:
logger.warning(f"Unable to sync job {cluster_id}: {out.stderr.decode('utf-8')}")
logger.info(f"Unable to sync job {cluster_id}: {out.stderr.decode('utf-8')}")
return False
def rsync_via_spool(cluster_id, outdir):
"""Attempt to rsync the local (submit) directory to the spool
This method applies when the job is not actively executing on a remote
worker, but is idle. In this instance, any files produced by the job will
be stored in the spool (a local directory on the submit machine). This
methods identifies the spool location, based on the cluster_id, and attempts
to rsync the data.
Parameters
----------
cluster_id: int
The HTCondor clusterId
outdir: str
The top-level outdir of the bilby_pipe job
Returns
-------
success: bool
True if the method was successful.
"""
outdir = outdir.rstrip("/")
subdir = cluster_id % 10000
procid = 0
spool_dir = (
subprocess.check_output("condor_config_val SPOOL", shell=True)
.decode("utf-8")
.rstrip("\n")
)
# Definition of the spool location credit to James Clark
src = f"{spool_dir}/{subdir}/{procid}/cluster{cluster_id}.proc{procid}.subproc0/{outdir}/"
if os.path.isdir(src):
subprocess.call(["rsync", "-rv", src, outdir])
return True
else:
return False
methods = [rsync_via_ssh, rsync_via_spool]
def main():
@@ -76,4 +144,14 @@ def main():
cluster_id_list = get_cluster_id_list(args.outdir)
for cluster_id in cluster_id_list:
if cluster_id is not None:
run_rsync(cluster_id, args.outdir)
success = False
for method in methods:
success = method(cluster_id, args.outdir)
if success:
break
if success is False:
logger.warning("Failed to obtain data")
if __name__ == "__main__":
main()
Loading