lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
import os
9
import socket
10
from lalapps import inspiralutils
11 12
import uuid
import ast
13
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
14
import string
15
from math import floor,ceil,log,pow
16
import sys
17
import random
18
from itertools import permutations
19
import shutil
20
import numpy as np
21 22 23 24 25

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
46
        (c,d)=fslocation.split(USER,1)
47 48 49 50
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
51 52
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
82 83 84 85
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
86
  new_id=itertools.count().next
87
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
88
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
89
    self.injection=SimInspiral
90
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
91
    self.sngltrigger=SnglInspiral
92 93 94 95
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
96 97
    self.GID=GID
    self.coinctrigger=CoincInspiral
98 99 100 101
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
102 103
    self.duration = duration
    self.srate = srate
104
    self.trigSNR = trigSNR
105
    self.fhigh = fhigh
106
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
107 108 109
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
110 111 112
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
113
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
114 115 116
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
117
    if self.sngltrigger is not None:
118
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
119
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
120 121 122 123
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
124 125 126 127 128 129 130
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
131 132

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
133

134
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False):
135
  """
136
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
137 138
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
139
  """
140
  output=[]
141
  from glue.ligolw import utils as ligolw_utils
142
  from glue.ligolw import lsctables
143
  from glue.ligolw import ligolw
144
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
145
    pass
146
  lsctables.use_in(LIGOLWContentHandler)
Rory Smith's avatar
Rory Smith committed
147
  from lal import series as lalseries
148 149
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
150
  from ligo.gracedb.rest import GraceDb, HTTPError
151 152 153 154
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
155 156
  cwd=os.getcwd()
  os.chdir(basepath)
157 158 159 160 161 162 163 164
  print "Download %s coinc.xml" % gid
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
165
  # Parse PSD
166
  srate_psdfile=16384
167
  fhigh=None
168
  psdfileobj = None
169
  if downloadpsd:
170
    print "Download %s psd.xml.gz" % gid
171 172 173 174 175 176 177 178 179 180 181 182 183 184
    try:
      psdfileobj = client.files(gid, "psd.xml.gz")
    except HTTPError:
      print "Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid
    if psdfileobj is not None:
      if reference_psd is not None:
        xmlpsd = ligolw_utils.load_fileobj(psdfileobj, contenthandler = lalseries.PSDContentHandler)[0]
        psd = lalseries.read_psd_xmldoc(xmlpsd)
        ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
      else:
        open("psd.xml.gz", "w").write(psdfileobj.read())
      psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
      combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
      srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
185
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
186
  for coinc in coinc_events:
187
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
188 189
    dur=[]
    srate=[]
190
    horizon_distance=[]
191
    for e in these_sngls:
192
      if roq==False:
193 194 195 196
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
197
      # determine horizon distance
198
      if threshold_snr is not None:
199 200 201 202 203
        if e.eff_distance is not None:
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
204
        else:
205
          if reference_psd is not None and psdfileobj is not None:
206 207 208 209
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
210 211 212 213 214
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
215
        if psdfileobj is not None:
216 217 218 219 220
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
221
    else:
222
      duration = None
223
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
224
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
225 226
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
227

228
  print "Found %d coinc events in table." % len(coinc_events)
229
  os.chdir(cwd)
230 231
  return output

232 233 234 235 236 237 238
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
239
    import sqlite3
240 241 242 243
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
244
    #dbtables.DBTable_set_connection(connection)
245
    return (connection,working_filename)
246

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

290
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
291 292 293 294 295 296 297 298 299 300 301 302 303 304
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
305
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
306 307
		"
	if gpsstart is not None:
308
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
309
	if gpsend is not None:
310
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
311 312
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
313 314
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
315
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
316
    	extra={}
317
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
318 319
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
320
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
321 322 323 324
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
325 326 327 328 329 330
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
331
	return output.values()
332

333
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
350
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
351
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
352 353
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
354
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
355
	joinstr = ' and '
356
	if gpsstart is not None:
357
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
358
	if gpsend is not None:
359
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
360 361
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
362
	db_out=database_connection.cursor().execute(get_coincs)
363 364
	from pylal import SnglInspiralUtils
	extra={}
365
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
366 367 368 369 370 371 372 373 374
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
375 376 377 378 379 380
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
381
	return output.values()
382

383 384 385 386 387 388 389 390
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
391 392 393
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
394 395
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
396 397
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
398
  if name=='lalinferencebambi' or name=='lalinferencebambimpi':
399
    return LALInferenceBAMBINode
400 401
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
402 403
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
404 405
  return EngineNode

406 407 408
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
409
        engine_list=['lalinferencenest','lalinferencemcmc','lalinferencebambimpi']
410 411 412 413 414 415 416 417 418
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

419
def scan_timefile(timefile):
420 421 422 423 424 425 426 427 428 429 430 431
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
	print 'Skipping duplicate time %s'%(time)
	continue
      print 'Read time %s'%(time)
      times.append(float(time))
432 433
    timefilehandle.close()
    return times
434

435 436 437 438
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
439 440
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
441 442 443
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
444 445
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
446 447
  """
  lal=1
448
  from glue.ligolw import utils as ligolw_utils
449
  try:
Rory Smith's avatar
Rory Smith committed
450
    #from pylal import series
451
    from lal import series as lalseries
452
    lal=0
453
  except ImportError:
454
    print "ERROR, cannot import lal.series in bppu/get_xml_psds()\n"
455
    exit(1)
456

457 458 459 460 461 462 463 464 465 466 467
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
468
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
469 470 471 472 473 474 475 476 477
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
478

479 480 481 482
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
    print "ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml
    sys.exit(1)
483
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
484 485 486 487 488 489 490 491 492
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
      print "ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml)
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
493
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
494
    if os.path.isfile(path_to_ascii_psd):
495
      continue
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
    if lal==0:
      #pylal stores the series in ifodata.data
      data=ifodata
    else:
      # lal stores it in ifodata.data.data
      data=ifodata.data
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
511

512
    combine=[]
513

Rory Smith's avatar
Rory Smith committed
514
    for i in np.arange(len(data.data.data)) :
515
      combine.append([f0+i*deltaF,data.data.data[i]])
516
    np.savetxt(path_to_ascii_psd,combine)
517
    ifo=instrument.encode('ascii')
518 519 520
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
521

522 523 524
def get_trigger_chirpmass(gid=None,gracedb="gracedb"):
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
525 526
  from glue.ligolw import utils as ligolw_utils
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
527
    pass
528
  lsctables.use_in(LIGOLWContentHandler)
529 530 531 532
  import subprocess

  cwd=os.getcwd()
  subprocess.call([gracedb,"download", gid ,"coinc.xml"])
533
  xmldoc=ligolw_utils.load_filename("coinc.xml",contenthandler = LIGOLWContentHandler)
534 535 536 537 538 539 540 541 542 543 544 545
  coinctable = lsctables.CoincInspiralTable.get_table(xmldoc)
  coinc_events = [event for event in coinctable]
  sngltable = lsctables.SnglInspiralTable.get_table(xmldoc)
  sngl_events = [event for event in sngltable]
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
    these_sngls = [e for e in sngl_events if e.event_id in [c.event_id for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id] ]
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
546
  # check that trigger masses are identical in each IFO
547 548 549
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

550
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
551 552 553 554
  os.remove("coinc.xml")

  return mchirp

555
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):
556

557 558
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
559
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
560

561 562 563
  if gid is not None and sim_inspiral is not None:
    print "Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n"
    sys.exit(1)
564 565 566 567 568 569

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
570 571
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
572 573 574 575 576 577 578
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
579
    i+=1'''
580 581
  if gid is not None:
  	trigger_mchirp = get_trigger_chirpmass(gid)
582 583
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
584 585
  else:
	trigger_mchirp = None
586 587 588

  return mc_priors, trigger_mchirp

Vivien Raymond's avatar
Vivien Raymond committed
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
    print "Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n"
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
    trigger_mchirp = get_trigger_chirpmass(gid)
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

615
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
616 617 618 619 620
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
621
  scale_factor = 1.
622
  if force_flow == None and trigger_mchirp != None:
623 624 625
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
626
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
627
  elif force_flow != None:
628
     scale_factor = 20./force_flow
629
  return scale_factor
630

631 632
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
633

Vivien Raymond's avatar
Vivien Raymond committed
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
    print 'Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.'
    sys.exit(1)
  return roq_bounds

655
class LALInferencePipelineDAG(pipeline.CondorDAG):
656
  def __init__(self,cp,dax=False,site='local'):
657 658
    self.subfiles=[]
    self.config=cp
659
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
660
    self.EngineNode=chooseEngineNode(self.engine)
661
    self.site=site
662
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
663
      self.basepath=cp.get('paths','basedir')
664 665 666
    else:
      self.basepath=os.getcwd()
      print 'No basepath specified, using current directory: %s'%(self.basepath)
667
    mkdirs(self.basepath)
668
    print("Generating LALInference DAG in {0}".format(self.basepath))
669 670
    if dax:
        os.chdir(self.basepath)
671
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
672
    mkdirs(self.posteriorpath)
673 674 675
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
676
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
677
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
678
      self.cachepath=cp.get('paths','cachedir')
679 680
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
681
    mkdirs(self.cachepath)
682
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
683
      self.logpath=cp.get('paths','logdir')
684 685
    else:
      self.logpath=os.path.join(self.basepath,'log')
686
    mkdirs(self.logpath)
687
    if cp.has_option('analysis','ifos'):
688
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
689 690 691
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
692 693 694 695
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
696
      self.segments[ifo]=[]
697 698 699
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
700
    self.dq={}
701
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
702
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
703
    self.use_available_data=False
704
    self.webdir=cp.get('paths','webdir')
705 706 707 708
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
709
    # Set up necessary job files.
710
    self.prenodes={}
711
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
712
    self.datafind_job.add_opt('url-type','file')
713 714 715
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
716 717 718 719
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
720
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
721
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
722 723
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
724
    if self.config.getboolean('analysis','roq'):
725 726 727 728 729
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
730 731 732 733 734
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
735 736 737 738
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
739
        for a in permutations(self.ifos,N):
740 741
            ifocombos.append(a)
    for ifos in ifocombos:
742
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
743
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
744
    self.results_page_job.set_grid_site('local')
745
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
746
    self.cotest_results_page_job.set_grid_site('local')
747 748 749 750 751 752 753 754
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
755
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
756
    self.coherence_test_job.set_grid_site('local')
757
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
758
    self.gracedbjob.set_grid_site('local')
759
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
760
    self.events=self.setup_from_inputs()
761

762
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
763
    if len(self.events)==0:
764 765
      print 'No input events found, please check your config if you expect some events'
    self.times=[e.trig_time for e in self.events]
766

767
    # Set up the segments
768
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
769
      (mintime,maxtime)=self.get_required_data(self.times)
770 771 772 773
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
774
    self.add_science_segments()
775

776
    # Save the final configuration that is being used
777
    # first to the run dir
778 779
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
780
      self.config.write(conffile)
781 782 783 784
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
785

786
    # Generate the DAG according to the config given
787
    for event in self.events: self.add_full_analysis(event)
788
    self.add_skyarea_followup()
789 790 791
    if self.config.has_option('analysis','upload-to-gracedb'):
      if self.config.getboolean('analysis','upload-to-gracedb'):
        self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
792
    self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
793
    self.set_dag_file(os.path.join(self.basepath,self.dagfilename))
794 795 796 797 798 799 800
    if self.is_dax():
      self.set_dax_file(self.dagfilename)

  def add_skyarea_followup(self):
    # Add skyarea jobs if the executable is given
    # Do one for each results page for now
    if self.config.has_option('condor','skyarea'):
801 802
      self.skyareajob=SkyAreaJob(self.config,os.path.join(self.basepath,'skyarea.sub'),self.logpath,dax=self.is_dax())
      respagenodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
803
      if self.engine=='lalinferenceburst':
804
          prefix='LIB'
805
      else:
806
          prefix='LALInference'
807
      for p in respagenodes:
808
          skyareanode=SkyAreaNode(self.skyareajob,prefix=prefix)
809
          skyareanode.add_resultspage_parent(p)
810
          skyareanode.set_ifos(p.ifos)
811
          self.add_node(skyareanode)
812

813
  def add_full_analysis(self,event):
814
    if self.engine=='lalinferencenest' or  self.engine=='lalinferenceburst':
815
      result=self.add_full_analysis_lalinferencenest(event)
816
    elif self.engine=='lalinferencemcmc':
817
      result=self.add_full_analysis_lalinferencemcmc(event)
818
    elif self.engine=='lalinferencebambi' or self.engine=='lalinferencebambimpi':