lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
import os
9
import socket
10
from lalapps import inspiralutils
11 12
import uuid
import ast
13
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
14
import string
15
from math import floor,ceil,log,pow
16
import sys
17
import random
18
from itertools import permutations
19
import shutil
20
import numpy as np
21 22 23 24 25

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
46
        (c,d)=fslocation.split(USER,1)
47 48 49 50
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
51 52
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
82 83 84 85
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
86
  new_id=itertools.count().next
87
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
88
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
89
    self.injection=SimInspiral
90
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
91
    self.sngltrigger=SnglInspiral
92 93 94 95
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
96 97
    self.GID=GID
    self.coinctrigger=CoincInspiral
98 99 100 101
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
102 103
    self.duration = duration
    self.srate = srate
104
    self.trigSNR = trigSNR
105
    self.fhigh = fhigh
106
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
107 108 109
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
110 111 112
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
113
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
114 115 116
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
117
    if self.sngltrigger is not None:
118
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
119
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
120 121 122 123
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
124 125 126 127 128 129 130
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
131 132

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
133

134
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False):
135
  """
136
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
137 138
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
139
  """
140
  output=[]
141
  from glue.ligolw import utils as ligolw_utils
142
  from glue.ligolw import lsctables
143
  from glue.ligolw import ligolw
144
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
145
    pass
146
  lsctables.use_in(LIGOLWContentHandler)
Rory Smith's avatar
Rory Smith committed
147
  from lal import series as lalseries
148 149 150
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
  from ligo.gracedb.rest import GraceDb
151 152 153 154
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
155 156
  cwd=os.getcwd()
  os.chdir(basepath)
157 158 159 160 161 162 163 164
  print "Download %s coinc.xml" % gid
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
165
  # Parse PSD
166
  srate_psdfile=16384
167
  fhigh=None
168
  if downloadpsd:
169
    print "Download %s psd.xml.gz" % gid
170 171 172 173 174 175
    if reference_psd is not None:
      xmlpsd = ligolw_utils.load_fileobj(client.files(gid, "psd.xml.gz"), contenthandler = lalseries.PSDContentHandler)[0]
      psd = lalseries.read_psd_xmldoc(xmlpsd)
      ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
    else:
      open("psd.xml.gz", "w").write(client.files(gid, "psd.xml.gz").read())
176 177 178
    psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
    combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
    srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
179
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
180
  for coinc in coinc_events:
181
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
182 183
    dur=[]
    srate=[]
184
    horizon_distance=[]
185
    for e in these_sngls:
186
      if roq==False:
187 188 189 190
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
191
      # determine horizon distance
192
      if threshold_snr is not None:
193 194 195 196 197
        if e.eff_distance is not None:
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
198
        else:
199 200 201 202 203
          if reference_psd is not None and downloadpsd:
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
204 205 206 207 208 209 210 211 212 213 214
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
        if downloadpsd:
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
215
    else:
216
      duration = None
217
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
218
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
219 220
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
221

222
  print "Found %d coinc events in table." % len(coinc_events)
223
  os.chdir(cwd)
224 225
  return output

226 227 228 229 230 231 232
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
233
    import sqlite3
234 235 236 237
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
238
    #dbtables.DBTable_set_connection(connection)
239
    return (connection,working_filename)
240

241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

284
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
285 286 287 288 289 290 291 292 293 294 295 296 297 298
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
299
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
300 301
		"
	if gpsstart is not None:
302
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
303
	if gpsend is not None:
304
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
305 306
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
307 308
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
309
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
310
    	extra={}
311
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
312 313
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
314
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
315 316 317 318
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
319 320 321 322 323 324
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
325
	return output.values()
326

327
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
344
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
345
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
346 347
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
348
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
349
	joinstr = ' and '
350
	if gpsstart is not None:
351
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
352
	if gpsend is not None:
353
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
354 355
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
356
	db_out=database_connection.cursor().execute(get_coincs)
357 358
	from pylal import SnglInspiralUtils
	extra={}
359
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
360 361 362 363 364 365 366 367 368
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
369 370 371 372 373 374
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
375
	return output.values()
376

377 378 379 380 381 382 383 384
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
385 386 387
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
388 389
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
390 391
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
392
  if name=='lalinferencebambi' or name=='lalinferencebambimpi':
393
    return LALInferenceBAMBINode
394 395
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
396 397
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
398 399
  return EngineNode

400 401 402
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
403
        engine_list=['lalinferencenest','lalinferencemcmc','lalinferencebambimpi']
404 405 406 407 408 409 410 411 412
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

413
def scan_timefile(timefile):
414 415 416 417 418 419 420 421 422 423 424 425
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
	print 'Skipping duplicate time %s'%(time)
	continue
      print 'Read time %s'%(time)
      times.append(float(time))
426 427
    timefilehandle.close()
    return times
428

429 430 431 432
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
433 434
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
435 436 437
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
438 439
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
440 441
  """
  lal=1
442
  from glue.ligolw import utils as ligolw_utils
443
  try:
Rory Smith's avatar
Rory Smith committed
444
    #from pylal import series
445
    from lal import series as lalseries
446
    lal=0
447
  except ImportError:
448
    print "ERROR, cannot import lal.series in bppu/get_xml_psds()\n"
449
    exit(1)
450

451 452 453 454 455 456 457 458 459 460 461
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
462
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
463 464 465 466 467 468 469 470 471
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
472

473 474 475 476
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
    print "ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml
    sys.exit(1)
477
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
478 479 480 481 482 483 484 485 486
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
      print "ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml)
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
487
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
488
    if os.path.isfile(path_to_ascii_psd):
489
      continue
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
    if lal==0:
      #pylal stores the series in ifodata.data
      data=ifodata
    else:
      # lal stores it in ifodata.data.data
      data=ifodata.data
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
505

506
    combine=[]
507

Rory Smith's avatar
Rory Smith committed
508
    for i in np.arange(len(data.data.data)) :
509
      combine.append([f0+i*deltaF,data.data.data[i]])
510
    np.savetxt(path_to_ascii_psd,combine)
511
    ifo=instrument.encode('ascii')
512 513 514
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
515

516 517 518
def get_trigger_chirpmass(gid=None,gracedb="gracedb"):
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
519 520
  from glue.ligolw import utils as ligolw_utils
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
521
    pass
522
  lsctables.use_in(LIGOLWContentHandler)
523 524 525 526
  import subprocess

  cwd=os.getcwd()
  subprocess.call([gracedb,"download", gid ,"coinc.xml"])
527
  xmldoc=ligolw_utils.load_filename("coinc.xml",contenthandler = LIGOLWContentHandler)
528 529 530 531 532 533 534 535 536 537 538 539
  coinctable = lsctables.CoincInspiralTable.get_table(xmldoc)
  coinc_events = [event for event in coinctable]
  sngltable = lsctables.SnglInspiralTable.get_table(xmldoc)
  sngl_events = [event for event in sngltable]
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
    these_sngls = [e for e in sngl_events if e.event_id in [c.event_id for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id] ]
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
540
  # check that trigger masses are identical in each IFO
541 542 543
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

544
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
545 546 547 548
  os.remove("coinc.xml")

  return mchirp

549
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):
550

551 552
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
553
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
554

555 556 557
  if gid is not None and sim_inspiral is not None:
    print "Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n"
    sys.exit(1)
558 559 560 561 562 563

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
564 565
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
566 567 568 569 570 571 572
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
573
    i+=1'''
574 575
  if gid is not None:
  	trigger_mchirp = get_trigger_chirpmass(gid)
576 577
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
578 579
  else:
	trigger_mchirp = None
580 581 582

  return mc_priors, trigger_mchirp

Vivien Raymond's avatar
Vivien Raymond committed
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
    print "Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n"
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
    trigger_mchirp = get_trigger_chirpmass(gid)
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

609
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
610 611 612 613 614
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
615
  scale_factor = 1.
616
  if force_flow == None and trigger_mchirp != None:
617 618 619
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
620
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
621
  elif force_flow != None:
622
     scale_factor = 20./force_flow
623
  return scale_factor
624

625 626
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
627

Vivien Raymond's avatar
Vivien Raymond committed
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
    print 'Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.'
    sys.exit(1)
  return roq_bounds

649
class LALInferencePipelineDAG(pipeline.CondorDAG):
650
  def __init__(self,cp,dax=False,site='local'):
651 652
    self.subfiles=[]
    self.config=cp
653
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
654
    self.EngineNode=chooseEngineNode(self.engine)
655
    self.site=site
656
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
657
      self.basepath=cp.get('paths','basedir')
658 659 660
    else:
      self.basepath=os.getcwd()
      print 'No basepath specified, using current directory: %s'%(self.basepath)
661
    mkdirs(self.basepath)
662
    print("Generating LALInference DAG in {0}".format(self.basepath))
663 664
    if dax:
        os.chdir(self.basepath)
665
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
666
    mkdirs(self.posteriorpath)
667 668 669
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
670
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
671
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
672
      self.cachepath=cp.get('paths','cachedir')
673 674
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
675
    mkdirs(self.cachepath)
676
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
677
      self.logpath=cp.get('paths','logdir')
678 679
    else:
      self.logpath=os.path.join(self.basepath,'log')
680
    mkdirs(self.logpath)
681
    if cp.has_option('analysis','ifos'):
682
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
683 684 685
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
686 687 688 689
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
690
      self.segments[ifo]=[]
691 692 693
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
694
    self.dq={}
695
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
696
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
697
    self.use_available_data=False
698
    self.webdir=cp.get('paths','webdir')
699 700 701 702
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
703
    # Set up necessary job files.
704
    self.prenodes={}
705
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
706
    self.datafind_job.add_opt('url-type','file')
707 708 709
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
710 711 712 713
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
714
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
715
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
716 717
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
718
    if self.config.getboolean('analysis','roq'):
719 720 721 722 723
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
724 725 726 727 728
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
729 730 731 732
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
733
        for a in permutations(self.ifos,N):
734 735
            ifocombos.append(a)
    for ifos in ifocombos:
736
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
737
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
738
    self.results_page_job.set_grid_site('local')
739
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
740
    self.cotest_results_page_job.set_grid_site('local')
741 742 743 744 745 746 747 748
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
749
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
750
    self.coherence_test_job.set_grid_site('local')
751
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
752
    self.gracedbjob.set_grid_site('local')
753
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
754
    self.events=self.setup_from_inputs()
755

756
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
757
    if len(self.events)==0:
758 759
      print 'No input events found, please check your config if you expect some events'
    self.times=[e.trig_time for e in self.events]
760

761
    # Set up the segments
762
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
763
      (mintime,maxtime)=self.get_required_data(self.times)
764 765 766 767
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
768
    self.add_science_segments()
769

770
    # Save the final configuration that is being used
771
    # first to the run dir
772 773
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
774
      self.config.write(conffile)
775 776 777 778
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
779

780
    # Generate the DAG according to the config given
781
    for event in self.events: self.add_full_analysis(event)
782
    self.add_skyarea_followup()
783 784 785
    if self.config.has_option('analysis','upload-to-gracedb'):
      if self.config.getboolean('analysis','upload-to-gracedb'):
        self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
786
    self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
787
    self.set_dag_file(os.path.join(self.basepath,self.dagfilename))
788 789 790 791 792 793 794
    if self.is_dax():
      self.set_dax_file(self.dagfilename)

  def add_skyarea_followup(self):
    # Add skyarea jobs if the executable is given
    # Do one for each results page for now
    if self.config.has_option('condor','skyarea'):
795 796
      self.skyareajob=SkyAreaJob(self.config,os.path.join(self.basepath,'skyarea.sub'),self.logpath,dax=self.is_dax())
      respagenodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
797
      if self.engine=='lalinferenceburst':
798
          prefix='LIB'
799
      else:
800
          prefix='LALInference'
801
      for p in respagenodes:
802
          skyareanode=SkyAreaNode(self.skyareajob,prefix=prefix)
803
          skyareanode.add_resultspage_parent(p)
804
          skyareanode.set_ifos(p.ifos)
805
          self.add_node(skyareanode)
806

807
  def add_full_analysis(self,event):
808
    if self.engine=='lalinferencenest' or  self.engine=='lalinferenceburst':
809
      result=self.add_full_analysis_lalinferencenest(event)
810
    elif self.engine=='lalinferencemcmc':
811
      result=self.add_full_analysis_lalinferencemcmc(event)
812
    elif self.engine=='lalinferencebambi' or self.engine=='lalinferencebambimpi':
813 814 815
      result=self.add_full_analysis_lalinferencebambi(event)
    return result

816 817 818 819 820 821 822 823 824
  def create_frame_pfn_file(self):
    """
    Create a pegasus cache file name, uses inspiralutils
    """
    import inspiralutils as iu
    gpsstart=self.config.get('input','gps-start-time')
    gpsend=self.config.get('input','gps-end-time')
    pfnfile=iu.create_frame_pfn_file(self.frtypes,gpsstart,gpsend)
    return pfnfile