Commit c496dc63 authored by Vivien Raymond's avatar Vivien Raymond

Fixed fake-cache and set universe pegasus issues

Original: 90e3131042e386be645bb4cff049c86f758c913a
parent 97f5495b
......@@ -91,7 +91,7 @@ if opts.pipedown_db is not None:
# Create the DAG from the configparser object
dag=pipe_utils.LALInferencePipelineDAG(cp,dax=opts.dax,site=opts.grid_site)
if(opts.dax):
if((opts.dax) and not cp.has_option('lalinference','fake-cache')):
# Create a text file with the frames listed
pfnfile = dag.create_frame_pfn_file()
peg_frame_cache = inspiralutils.create_pegasus_cache_file(pfnfile)
......@@ -123,11 +123,10 @@ if not opts.dax:
if opts.condor_submit:
import subprocess
from subprocess import Popen
x = subprocess.Popen(['condor_submit_dag',fulldagpath])
x.wait()
if x.returncode==0:
print 'Submitted DAG file'
else:
print 'Unable to submit DAG file'
......@@ -74,7 +74,7 @@ def readLValert(SNRthreshold=0,gid=None,flow=40.0,gracedb="gracedb",savepsdpath=
Parse LV alert file, continaing coinc, sngl, coinc_event_map.
and create a list of Events as input for pipeline
Based on Chris Pankow's script
"""
"""
output=[]
from glue.ligolw import utils
from glue.ligolw import lsctables
......@@ -139,7 +139,7 @@ def readLValert(SNRthreshold=0,gid=None,flow=40.0,gracedb="gracedb",savepsdpath=
fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = max(dur), srate = srate, trigSNR = trigSNR, fhigh = fhigh)
if(coinc.snr>SNRthreshold): output.append(ev)
print "Found %d coinc events in table." % len(coinc_events)
os.chdir(cwd)
return output
......@@ -157,7 +157,7 @@ def open_pipedown_database(database_filename,tmp_space):
if tmp_space:
dbtables.set_temp_store_directory(connection,tmp_space)
dbtables.DBTable_set_connection(connection)
return (connection,working_filename)
return (connection,working_filename)
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
......@@ -200,7 +200,7 @@ def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpse
fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
fh.close()
return output.values()
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
"""
......@@ -284,7 +284,7 @@ def scan_timefile(timefile):
times.append(float(time))
timefilehandle.close()
return times
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
"""
Get a psd.xml.gz file and:
......@@ -307,7 +307,7 @@ def get_xml_psds(psdxml,ifos,outpath,end_time=None):
print "ERROR, cannot import pylal.series in bppu/get_xml_psds()\n"
exit(1)
import numpy as np
out={}
if not os.path.isdir(outpath):
os.makedirs(outpath)
......@@ -319,7 +319,7 @@ def get_xml_psds(psdxml,ifos,outpath,end_time=None):
got_all=1
for ifo in ifos:
path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
# Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
# Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
if os.path.isfile(path_to_ascii_psd):
got_all*=1
else:
......@@ -329,7 +329,7 @@ def get_xml_psds(psdxml,ifos,outpath,end_time=None):
for ifo in ifos:
out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
return out
# We need to convert the PSD for one or more IFOS. Open the file
if not os.path.isfile(psdxml):
print "ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml
......@@ -344,9 +344,9 @@ def get_xml_psds(psdxml,ifos,outpath,end_time=None):
for instrument in xmlpsd.keys():
#name of the ascii file we are going to write the PSD into
path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
# Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
# Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
if os.path.isfile(path_to_ascii_psd):
continue
continue
# get data for the IFO
ifodata=xmlpsd[instrument]
#check data is not empty
......@@ -362,20 +362,20 @@ def get_xml_psds(psdxml,ifos,outpath,end_time=None):
# Fill a two columns array of (freq, psd) and save it in the ascii file
f0=ifodata.f0
deltaF=ifodata.deltaF
combine=[]
for i in np.arange(len(data.data)) :
combine.append([f0+i*deltaF,np.sqrt(data.data[i])])
np.savetxt(path_to_ascii_psd,combine)
ifo=instrument.encode('ascii')
ifo=instrument.encode('ascii')
# set node.psds dictionary with the path to the ascii files
out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
return out
def create_pfn_tuple(filename,protocol='file://',site='local'):
return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
class LALInferencePipelineDAG(pipeline.CondorDAG):
def __init__(self,cp,dax=False,site='local'):
self.subfiles=[]
......@@ -463,7 +463,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if len(self.events)==0:
print 'No input events found, please check your config if you expect some events'
self.times=[e.trig_time for e in self.events]
# Set up the segments
if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
(mintime,maxtime)=self.get_required_data(self.times)
......@@ -472,12 +472,12 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if not self.config.has_option('input','gps-end-time'):
self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
self.add_science_segments()
# Save the final configuration that is being used
conffilename=os.path.join(self.basepath,'config.ini')
with open(conffilename,'wb') as conffile:
self.config.write(conffile)
# Generate the DAG according to the config given
for event in self.events: self.add_full_analysis(event)
......@@ -496,7 +496,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
skyareanode=SkyAreaNode(self.skyareajob)
skyareanode.add_resultspage_parent(p)
self.add_node(skyareanode)
def add_full_analysis(self,event):
if self.engine=='lalinferencenest':
result=self.add_full_analysis_lalinferencenest(event)
......@@ -517,7 +517,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
gpsend=self.config.get('input','gps-end-time')
pfnfile=iu.create_frame_pfn_file(self.frtypes,gpsstart,gpsend)
return pfnfile
def get_required_data(self,times):
"""
Calculate the data that will be needed to process all events
......@@ -551,7 +551,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
"""
for time in self.times:
self.add_full_analysis(Event(trig_time=time))
def select_events(self):
"""
Read events from the config parser. Understands both ranges and comma separated events, or combinations
......@@ -600,7 +600,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if self.config.has_option('input','events'):
selected_events=self.config.get('input','events')
print 'Selected events %s'%(str(selected_events))
if selected_events=='all':
selected_events=None
else:
......@@ -782,7 +782,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
if event.GID is not None:
self.add_gracedb_log_node(respagenode,event.GID)
return True
def add_full_analysis_lalinferencemcmc(self,event):
"""
Generate an end-to-end analysis of a given event
......@@ -876,7 +876,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
sciseg.set_df_node(df_node)
self.segments[ifo].append(sciseg)
os.chdir(curdir)
def get_datafind_node(self,ifo,frtype,gpsstart,gpsend):
node=pipeline.LSCDataFindNode(self.datafind_job)
node.set_observatory(ifo[0])
......@@ -885,7 +885,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
node.set_end(gpsend)
#self.add_node(node)
return node
def add_engine_node(self,event,ifos=None,extra_options=None):
"""
Add an engine node to the dag. Will find the appropriate cache files automatically.
......@@ -996,7 +996,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
# Add the nodes it depends on
for ifokey, seg in node.scisegs.items():
dfnode=seg.get_df_node()
if 1==1:
#if dfnode is not None and dfnode not in self.get_nodes():
if self.config.has_option('lalinference','roq'):
......@@ -1061,7 +1061,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
for ifo in ifos:
node.add_var_arg('--'+ifo+'-roqweights '+os.path.join(roqeventpath,'weights_'+ifo+'.dat'))
node.add_input_file(os.path.join(roqeventpath,'weights_'+ifo+'.dat'))
node.add_var_arg('--roqtime_steps '+os.path.join(roqeventpath,'roq_sizes.dat'))
node.add_input_file(os.path.join(roqeventpath,'roq_sizes.dat'))
if self.config.has_option('paths','rom_nodes'):
......@@ -1074,7 +1074,7 @@ class LALInferencePipelineDAG(pipeline.CondorDAG):
for (opt,arg) in event.engine_opts.items():
node.add_var_opt(opt,arg)
return node
def add_results_page_node(self,resjob=None,outdir=None,parent=None,extra_options=None,gzip_output=None):
if resjob is None:
resjob=self.results_page_job
......@@ -1154,6 +1154,10 @@ class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
pipeline.CondorDAGJob.__init__(self,universe,exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
# Set grid site if needed
if cp.has_option('engine','resume'):
self.resume=True
else:
self.resume=False
if site:
self.set_grid_site(site)
if site!='local':
......@@ -1195,11 +1199,7 @@ class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_ini_opts(cp,'engine')
self.set_stdout_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).out'))
self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).err'))
if cp.has_option('engine','resume'):
self.resume=True
else:
self.resume=False
def set_grid_site(self,site=None):
"""
Over-load base class method to choose condor universe properly
......@@ -1207,9 +1207,12 @@ class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
if site is not None and site!='local':
self.set_universe('vanilla')
else:
self.set_universe('standard')
if self.resume:
self.set_universe('vanilla')
else:
self.set_universe('standard')
pipeline.CondorDAGJob.set_grid_site(self,site)
def __write_sub_file_mcmc_mpi(self):
"""
Nasty hack to insert the MPI stuff into the arguments
......@@ -1231,7 +1234,7 @@ class EngineJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
subfile=open(subfilepath,'w')
subfile.write(outstring)
subfile.close()
class EngineNode(pipeline.CondorDAGNode):
new_id = itertools.count().next
def __init__(self,li_job):
......@@ -1265,16 +1268,16 @@ class EngineNode(pipeline.CondorDAGNode):
def set_max_psdlength(self,psdlength):
self.maxlength=psdlength
def set_padding(self,padding):
self.padding=padding
def set_psdstart(self,psdstart):
self.psdstart=psdstart
def set_seed(self,seed):
self.add_var_opt('randomseed',str(seed))
def set_srate(self,srate):
self.add_var_opt('srate',str(srate))
......@@ -1301,10 +1304,10 @@ class EngineNode(pipeline.CondorDAGNode):
for i in self.ifos:
tmpst="%s%s-PSD.dat,"%(pathroot,i)
st+=tmpst
self.add_output_file(tmpst[:-1])
self.add_output_file(tmpst[:-1])
st=st[:-1]
self.psdfiles=st
def get_psd_files(self):
return self.psdfiles
......@@ -1320,7 +1323,7 @@ class EngineNode(pipeline.CondorDAGNode):
st="%s_snr.txt"%pathroot
self.add_output_file(st)
self.snrfile=st
def get_snr_file(self):
return self.snrfile
......@@ -1330,7 +1333,7 @@ class EngineNode(pipeline.CondorDAGNode):
"""
self.__trigtime=float(time)
self.add_var_opt('trigtime',str(time))
def set_event_number(self,event):
"""
Set the event number in the injection XML.
......@@ -1338,7 +1341,7 @@ class EngineNode(pipeline.CondorDAGNode):
if event is not None:
self.__event=int(event)
self.add_var_opt('event',str(event))
def set_injection(self,injfile,event):
"""
Set a software injection to be performed.
......@@ -1347,7 +1350,7 @@ class EngineNode(pipeline.CondorDAGNode):
self.set_event_number(event)
def get_trig_time(self): return self.__trigtime
def add_fake_ifo_data(self,ifo,sciseg,fake_cache_name,fake_channel_name,timeslide=0):
"""
Dummy method to set up fake data without needing to run datafind
......@@ -1359,7 +1362,7 @@ class EngineNode(pipeline.CondorDAGNode):
self.channels[ifo]=fake_channel_name
self.fakedata=True
return 1
def add_ifo_data(self,ifo,sciseg,channelname,timeslide=0):
self.ifos.append(ifo)
self.scisegs[ifo]=sciseg
......@@ -1374,7 +1377,7 @@ class EngineNode(pipeline.CondorDAGNode):
self.channels[ifo]=channelname
return 1
else: return 0
def set_cache(self,filename,ifo):
"""
Add a cache file from LIGODataFind. Based on same method from pipeline.AnalysisNode
......@@ -1395,7 +1398,7 @@ class EngineNode(pipeline.CondorDAGNode):
if not self.__finaldata:
self._finalize_ifo_data()
pipeline.CondorDAGNode.finalize(self)
def _finalize_ifo_data(self):
"""
Add final list of IFOs and data to analyse to command line arguments.
......@@ -1428,7 +1431,7 @@ class EngineNode(pipeline.CondorDAGNode):
self.GPSstart=starttime
self.__GPSend=endtime
length=endtime-starttime
# Now we need to adjust the start time and length to make sure the maximum data length
# is not exceeded.
trig_time=self.get_trig_time()
......@@ -1442,7 +1445,7 @@ class EngineNode(pipeline.CondorDAGNode):
#print 'Over-riding start time to user-specified value %f'%(self.GPSstart)
#if self.GPSstart<starttime or self.GPSstart>endtime:
# print 'ERROR: Over-ridden time lies outside of science segment!'
# raise Exception('Bad psdstart specified')
# raise Exception('Bad psdstart specified')
self.add_var_opt('psdstart',str(self.GPSstart))
if self.psdlength is None:
self.psdlength=self.__GPSend-self.GPSstart-2*self.padding-self.seglen-1
......@@ -1466,7 +1469,7 @@ class LALInferenceNestNode(EngineNode):
EngineNode.__init__(self,li_job)
self.engine='lalinferencenest'
self.outfilearg='outfile'
def set_output_file(self,filename):
self.nsfile=filename+'.dat'
self.posfile=self.nsfile
......@@ -1536,7 +1539,7 @@ class ResultsPageJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_condor_cmd('RequestMemory','2000')
self.add_ini_opts(cp,'resultspage')
# self.add_opt('Nlive',cp.get('analysis','nlive'))
if cp.has_option('results','skyres'):
self.add_opt('skyres',cp.get('results','skyres'))
......@@ -1571,7 +1574,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
if event is not None:
self.__event=int(event)
self.add_var_arg('--eventnum '+str(event))
def get_event_number(self):
return self.__event
def set_psd_files(self,st):
......@@ -1604,7 +1607,7 @@ class ResultsPageNode(pipeline.CondorDAGNode):
self.add_file_opt('bsn',bsnfile)
def set_header_file(self,headerfile):
self.add_var_arg('--header '+headerfile)
class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
"""
Class defining the coherence test job to be run as part of a pipeline.
......@@ -1612,7 +1615,7 @@ class CoherenceTestJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','coherencetest')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
self.add_opt('coherent-incoherent','')
self.add_condor_cmd('getenv','True')
self.set_stdout_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).out'))
......@@ -1664,7 +1667,7 @@ class MergeNSJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
def __init__(self,cp,submitFile,logdir,dax=False):
exe=cp.get('condor','mergescript')
pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
pipeline.AnalysisJob.__init__(self,cp,dax=dax)
self.set_sub_file(os.path.abspath(submitFile))
self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
......@@ -1701,7 +1704,7 @@ class MergeNSNode(pipeline.CondorDAGNode):
self.posfile=file
self.Bfilename=self.posfile+'_B.txt'
self.add_output_file(self.Bfilename)
def get_pos_file(self): return self.posfile
def get_B_file(self): return self.Bfilename
......@@ -1731,7 +1734,7 @@ class GraceDBNode(pipeline.CondorDAGNode):
if gid: self.set_gid(gid)
if parent: self.set_parent_resultspage(parent,gid)
self.__finalized=False
def set_page_path(self,path):
"""
Set the path to the results page, after self.baseurl.
......@@ -1812,7 +1815,7 @@ class SkyAreaNode(pipeline.CondorDAGNode):
self.add_parent(parent)
if posfile:
self.set_posterior_file(posfile)
def set_posterior_file(self,posfile):
self.add_file_opt('samples',posfile,file_is_output_file=False)
self.posfile=posfile
......@@ -1827,7 +1830,7 @@ class SkyAreaNode(pipeline.CondorDAGNode):
def add_resultspage_parent(self,resultspagenode):
self.set_posterior_file(resultspagenode.get_pos_file())
self.set_outdir(resultspagenode.get_output_path())
self.add_parent(resultspagenode)
self.add_parent(resultspagenode)
self.set_injection(resultspagenode.get_injection(),resultspagenode.get_event_number())
class SkyAreaJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
......@@ -1844,4 +1847,3 @@ class SkyAreaJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
self.add_condor_cmd('getenv','True')
# Add user-specified options from ini file
self.add_ini_opts(cp,'skyarea')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment