Commit 57676283 authored by John Douglas Veitch's avatar John Douglas Veitch

Refactor the ROQ setup code

parent 50d93067
......@@ -32,6 +32,7 @@
#standard library imports
import sys
import os
import socket
from math import ceil,floor
import cPickle as pickle
......@@ -84,68 +85,18 @@ __author__="Ben Aylott <benjamin.aylott@ligo.org>, Ben Farr <bfarr@u.northwester
__version__= "git id %s"%git_version.id
__date__= git_version.date
from lalinference.lalinference_pipe_utils import guess_url
def email_notify(address,path):
import smtplib
import subprocess
import socket
import os
address=address.split(',')
SERVER="localhost"
USER=os.environ['USER']
HOST=socket.getfqdn()
FROM=USER+'@'+HOST
SUBJECT="LALInference result is ready at "+HOST+"!"
# Guess the web space path for the clusters
fslocation=os.path.abspath(path)
webpath='posplots.html'
if 'public_html' in fslocation:
k='public_html/'
elif 'WWW' in fslocation:
k='WWW/'
elif 'www_html' in fslocation:
k='www_html/'
else:
k=None
if k is not None:
(a,b)=fslocation.split(k)
webpath=os.path.join('~%s'%(USER),b,webpath)
onweb=True
else:
(c,d)=outpath.split(os.environ['USER'])
for k in ['public_html','WWW','www_html']:
trypath=c+os.environ['USER']+'/'+k+d
#Follow symlinks
if os.path.realpath(trypath)==os.path.normpath(outpath):
(a,b)=trypath.split(k)
webpath=os.path.join('~%s'%(USER),b,webpath)
onweb=True
break
else:
webpath=os.path.join(fslocation,'posplots.html')
onweb=False
if 'atlas' in HOST:
url="https://atlas1.atlas.aei.uni-hannover.de/"
elif 'cit' in HOST or 'caltech' in HOST:
url="https://ldas-jobs.ligo.caltech.edu/"
elif 'ligo-wa' in HOST:
url="https://ldas-jobs.ligo-wa.caltech.edu/"
elif 'ligo-la' in HOST:
url="https://ldas-jobs.ligo-la.caltech.edu/"
elif 'uwm' in HOST or 'nemo' in HOST:
url="https://ldas-jobs.phys.uwm.edu/"
elif 'phy.syr.edu' in HOST:
url="https://sugar-jobs.phy.syr.edu/"
elif 'arcca.cf.ac.uk' in HOST:
url="https://geo2.arcca.cf.ac.uk/"
elif 'vulcan' in HOST:
url="https://galahad.aei.mpg.de/"
else:
if onweb:
url="http://%s/"%(HOST)
else:
url=HOST+':'
url=url+webpath
url = guess_url(os.path.join(fslocation,webpath))
TEXT="Hi "+USER+",\nYou have a new parameter estimation result on "+HOST+".\nYou can view the result at "+url+"\n"
cmd='echo "%s" | mail -s "%s" "%s"'%(TEXT,SUBJECT,', '.join(address))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
......
......@@ -23,6 +23,62 @@ import numpy as np
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.
def guess_url(fslocation):
"""
Try to work out the web address of a given path
"""
SERVER="localhost"
USER=os.environ['USER']
HOST=socket.getfqdn()
if 'public_html' in fslocation:
k='public_html/'
elif 'WWW' in fslocation:
k='WWW/'
elif 'www_html' in fslocation:
k='www_html/'
else:
k=None
if k is not None:
(a,b)=fslocation.split(k)
webpath=os.path.join('~%s'%(USER),b)
onweb=True
else:
(c,d)=fslocation.split(USER)
for k in ['public_html','WWW','www_html']:
trypath=c+os.environ['USER']+'/'+k+d
#Follow symlinks
if os.path.realpath(trypath)==os.path.normpath(fslocation):
(a,b)=trypath.split(k)
webpath=os.path.join('~%s'%(USER),b,webpath)
onweb=True
break
else:
webpath=fslocation
onweb=False
if 'atlas' in HOST:
url="https://atlas1.atlas.aei.uni-hannover.de/"
elif 'cit' in HOST or 'caltech' in HOST:
url="https://ldas-jobs.ligo.caltech.edu/"
elif 'ligo-wa' in HOST:
url="https://ldas-jobs.ligo-wa.caltech.edu/"
elif 'ligo-la' in HOST:
url="https://ldas-jobs.ligo-la.caltech.edu/"
elif 'uwm' in HOST or 'nemo' in HOST:
url="https://ldas-jobs.phys.uwm.edu/"
elif 'phy.syr.edu' in HOST:
url="https://sugar-jobs.phy.syr.edu/"
elif 'arcca.cf.ac.uk' in HOST:
url="https://geo2.arcca.cf.ac.uk/"
elif 'vulcan' in HOST:
url="https://galahad.aei.mpg.de/"
else:
if onweb:
url="http://%s/"%(HOST)
else:
url=HOST+':'
url=url+webpath
return(url)
class Event():
"""
Represents a unique event to run on
......@@ -2648,8 +2704,8 @@ class GraceDBJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.set_stdout_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).err'))
self.add_condor_cmd('getenv','True')
self.baseurl=cp.get('paths','baseurl')
self.basepath=cp.get('paths','webdir')
self.baseurl=guess_url(self.basepath)
class GraceDBNode(pipeline.CondorDAGNode):
"""
......
......@@ -39,10 +39,6 @@ parser.add_option("-I","--injections",action="store",type="string",default=None,
parser.add_option("-B","--burst_injections",action="store",type="string",default=None,help="SimBurst table for LIB injections",metavar="INJFILE.xml")
parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
parser.add_option("--pegasus-submit",action="store_true",default=False,help="Automatically submit the pegasus dax")
parser.add_option("-x", "--dax",action="store_true",default=False, help="Delete the ligo_data_find jobs and populate frame LFNs in the DAX")
parser.add_option("-G", "--grid-site",action="store",type="string",metavar="SITE", help="Specify remote site in conjunction with --dax option. e.g. --grid-site=creamce for Bologna cluster.\
Supported options are: creamce and local",default=None)
(opts,args)=parser.parse_args()
......@@ -134,10 +130,6 @@ if len(args)!=1:
print 'Error: must specify one ini file'
sys.exit(1)
if opts.condor_submit and opts.pegasus_submit:
print 'Error: Please only specify one of --condor-submit or --pegasus-submit'
sys.exit(1)
inifile=args[0]
cp=ConfigParser.SafeConfigParser()
......@@ -200,13 +192,18 @@ variations.update(add_variations(cp, 'analysis', 'roq'))
roq_paths=[]
def setup_roq(cp):
"""
Generates cp objects with the different ROQs applied
"""
use_roq=False
if cp.has_option('paths','roq_b_matrix_directory'):
if not cp.has_option('analysis','roq'):
print("Warning: If you are attempting to enable ROQ by specifying paths.roq_b_matrix_directory,\
please use analysis.roq in your config file in future. Enabling ROQ.")
cp.set('analysis','roq',True)
if not cp.getboolean('analysis','roq'): return
if not cp.getboolean('analysis','roq'):
yield cp
raise StopIteration()
from numpy import genfromtxt, array
path=cp.get('paths','roq_b_matrix_directory')
if not os.path.isdir(path):
......@@ -276,43 +273,32 @@ def setup_roq(cp):
for mc_prior in mc_priors:
mc_priors[mc_prior] = array(mc_priors[mc_prior])*roq_mass_freq_scale_factor
# write the master configparser
cur_basedir = cp.get('paths','basedir')
masterpath=os.path.join(cur_basedir,'config.ini')
with open(masterpath,'w') as cpfile:
cp.write(cpfile)
# Create an outer dag to wrap the sub-dags
outerdaglog=os.path.join(daglogdir,'lalinference_multi_'+str(uuid.uuid1())+'.log')
outerdag=pipeline.CondorDAG(outerdaglog,dax=opts.dax)
outerdag.set_dag_file(os.path.join(cp.get('paths','basedir'),'multidag'))
setup_roq(cp)
master_cp=cp
for cp in generate_variations(master_cp,variations):
basepath=cp.get('paths','basedir')
# Copy injection file into place as paths outside basedir are inaccessible
if cp.has_option('input','injection-file'):
injpath=cp.get('input','injection-file')
myinjpath=os.path.join(basepath,os.path.basename(injpath))
os.link(injpath, myinjpath)
cp.set('input','injection-file',myinjpath)
for roq in roq_paths:
basedir = cp.get('paths','basedir')
this_cp = ConfigParser.ConfigParser()
this_cp.read(masterpath)
basedir = this_cp.get('paths','basedir')
for dirs in 'basedir','daglogdir','webdir':
mkdirs(cp.get('paths',dirs))
mkdirs(this_cp.get('paths',dirs))
# do the appropriate hacks for ROQ
if not os.path.isdir(os.path.join(basedir,roq)):
os.makedirs(os.path.join(basedir,roq))
for p in dict(cp.items('paths')).keys():
for p in dict(this_cp.items('paths')).keys():
#current value
if 'webdir' in p or 'url' in p or 'basedir' in p or 'daglogdir' in p:
out=cp.get('paths',p)
if 'webdir' in p or 'basedir' in p or 'daglogdir' in p:
out=this_cp.get('paths',p)
# append approximant prefix
subpath=os.path.join(out,sampler,app,roq)
cp.set('paths',p,subpath)
subpath=os.path.join(out,roq)
this_cp.set('paths',p,subpath)
mkdirs(subpath)
path=cp.get('paths','roq_b_matrix_directory')
path=this_cp.get('paths','roq_b_matrix_directory')
thispath=os.path.join(path,roq)
cp.set('paths','roq_b_matrix_directory',thispath)
this_cp.set('paths','roq_b_matrix_directory',thispath)
flow=roq_params[roq]['flow'] / roq_mass_freq_scale_factor
srate=2.*roq_params[roq]['fhigh'] / roq_mass_freq_scale_factor
if srate > 8192:
......@@ -320,107 +306,67 @@ for cp in generate_variations(master_cp,variations):
seglen=roq_params[roq]['seglen'] * roq_mass_freq_scale_factor
# params.dat uses the convention q>1 so our q_min is the inverse of their qmax
cp.set('engine','srate',str(srate))
cp.set('engine','seglen',str(seglen))
if cp.has_option('lalinference','flow'):
tmp=cp.get('lalinference','flow')
tmp=eval(tmp)
ifos=tmp.keys()
this_cp.set('engine','srate',str(srate))
this_cp.set('engine','seglen',str(seglen))
if this_cp.has_option('lalinference','flow'):
tmp=this_cp.get('lalinference','flow')
tmp=eval(tmp)
ifos=tmp.keys()
else:
tmp={}
ifos=eval(cp.get('analysis','ifos'))
for i in ifos:
tmp[i]=flow
cp.set('lalinference','flow',str(tmp))
tmp={}
ifos=eval(this_cp.get('analysis','ifos'))
for i in ifos:
tmp[i]=flow
this_cp.set('lalinference','flow',str(tmp))
if roq_bounds == 'chirp_mass_q':
mc_min=mc_priors[roq][0]*roq_mass_freq_scale_factor
mc_max=mc_priors[roq][1]*roq_mass_freq_scale_factor
# params.dat uses the convention q>1 so our q_min is the inverse of their qmax
q_min=1./float(roq_params[roq]['qmax'])
cp.set('engine','chirpmass-min',str(mc_min))
cp.set('engine','chirpmass-max',str(mc_max))
cp.set('engine','q-min',str(q_min))
cp.set('engine','comp-min', str(max(roq_params[roq]['compmin'] * roq_mass_freq_scale_factor, mc_min * pow(1+q_min, 1./5.) * pow(q_min, 2./5.))))
cp.set('engine','comp-max', str(mc_max * pow(1+q_min, 1./5.) * pow(q_min, -3./5.)))
mc_min=mc_priors[roq][0]*roq_mass_freq_scale_factor
mc_max=mc_priors[roq][1]*roq_mass_freq_scale_factor
# params.dat uses the convention q>1 so our q_min is the inverse of their qmax
q_min=1./float(roq_params[roq]['qmax'])
this_cp.set('engine','chirpmass-min',str(mc_min))
this_cp.set('engine','chirpmass-max',str(mc_max))
this_cp.set('engine','q-min',str(q_min))
this_cp.set('engine','comp-min', str(max(roq_params[roq]['compmin'] * roq_mass_freq_scale_factor, mc_min * pow(1+q_min, 1./5.) * pow(q_min, 2./5.))))
this_cp.set('engine','comp-max', str(mc_max * pow(1+q_min, 1./5.) * pow(q_min, -3./5.)))
elif roq_bounds == 'component_mass':
m1_min = m1_priors[roq][0]
m1_max = m1_priors[roq][1]
m2_min = m2_priors[roq][0]
m2_max = m2_priors[roq][1]
cp.set('engine','mass1-min',str(m1_min))
cp.set('engine','mass1-max',str(m1_max))
cp.set('engine','mass2-min',str(m2_min))
cp.set('engine','mass2-max',str(m2_max))
local_work_dir=cp.get('paths','daglogdir')
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp,dax=opts.dax,site=opts.grid_site)
if((opts.dax) and not cp.has_option('lalinference','fake-cache')):
# Create a text file with the frames listed
pfnfile = dag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
else:
peg_frame_cache = '/dev/null'
m1_min = m1_priors[roq][0]
m1_max = m1_priors[roq][1]
m2_min = m2_priors[roq][0]
m2_max = m2_priors[roq][1]
this_cp.set('engine','mass1-min',str(m1_min))
this_cp.set('engine','mass1-max',str(m1_max))
this_cp.set('engine','mass2-min',str(m2_min))
this_cp.set('engine','mass2-max',str(m2_max))
yield this_cp
raise StopIteration()
# A directory to store the DAX temporary files
execdir=os.path.join(local_work_dir,'lalinference_pegasus_'+str(uuid.uuid1()))
olddir=os.getcwd()
if opts.grid_site is not None:
site='local,'+opts.grid_site
else:
site=None
# Create the DAX scripts
if opts.dax:
dag.prepare_dax(tmp_exec_dir=execdir,grid_site=site,peg_frame_cache=peg_frame_cache)
# Ugly hack to replace pegasus.transfer.links=true in the pegasus.properties files created by pipeline.py
# Turns off the creation of links for files on the local file system. We use pegasus.transfer.links=false
# to make sure we have a copy of the data in the runing directory (useful when the data comes from temporary
# low latency buffer).
if cp.has_option('analysis','pegasus.transfer.links'):
if cp.get('analysis','pegasus.transfer.links')=='false':
lines=[]
with open('pegasus.properties') as fin:
for line in fin:
line = line.replace('pegasus.transfer.links=true', 'pegasus.transfer.links=false')
lines.append(line)
with open('pegasus.properties','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('condor','accounting_group'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group">'+cp.get('condor','accounting_group')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
if cp.has_option('condor','accounting_group_user'):
lines=[]
with open('sites.xml') as fin:
for line in fin:
if '<profile namespace="condor" key="getenv">True</profile>' in line:
line=line+' <profile namespace="condor" key="accounting_group_user">'+cp.get('condor','accounting_group_user')+'</profile>\n'
lines.append(line)
with open('sites.xml','w') as fout:
for line in lines:
fout.write(line)
dagjob=pipeline.CondorDAGManJob(os.path.join(cp.get('paths','basedir'),dag.get_dag_file()),
cp.get('paths','basedir'))
dagnode=pipeline.CondorDAGManNode(dagjob)
outerdag.add_node(dagnode)
dag.write_sub_files()
dag.write_dag()
dag.write_script()
if(opts.dax):
# Create a text file with the frames listed
pfnfile = outerdag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
# Create an outer dag to wrap the sub-dags
outerdaglog=os.path.join(daglogdir,'lalinference_multi_'+str(uuid.uuid1())+'.log')
outerdag=pipeline.CondorDAG(outerdaglog,dax=False)
outerdag.set_dag_file(os.path.join(cp.get('paths','basedir'),'multidag'))
master_cp=cp
# Iterate over variations and generate sub-dags
for cp in generate_variations(master_cp,variations):
basepath=cp.get('paths','basedir')
# Copy injection file into place as paths outside basedir are inaccessible
if cp.has_option('input','injection-file'):
injpath=cp.get('input','injection-file')
myinjpath=os.path.join(basepath,os.path.basename(injpath))
os.link(injpath, myinjpath)
cp.set('input','injection-file',myinjpath)
for this_cp in setup_roq(cp):
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(this_cp,dax=False)
dagjob=pipeline.CondorDAGManJob(os.path.join(this_cp.get('paths','basedir'),dag.get_dag_file()),
this_cp.get('paths','basedir'))
dagnode=pipeline.CondorDAGManNode(dagjob)
outerdag.add_node(dagnode)
dag.write_sub_files()
dag.write_dag()
dag.write_script()
outerdag.write_sub_files()
outerdag.write_dag()
......@@ -428,8 +374,6 @@ outerdag.write_script()
# End of program
print 'Successfully created DAG file.'
if not opts.dax:
print 'Now run condor_submit_dag %s\n'%(outerdag.get_dag_file())
if opts.condor_submit:
import subprocess
......@@ -444,14 +388,3 @@ if opts.condor_submit:
else:
print 'Unable to submit DAG file'
if opts.pegasus_submit:
import subprocess
from subprocess import Popen
os.chdir(os.path.abspath(cp.get('paths','basedir')))
x = subprocess.Popen('./pegasus_submit_dax')
x.wait()
if x.returncode==0:
print 'Submitted DAX file'
else:
print 'Unable to submit DAX file'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment