lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
from glue.ligolw import ligolw, lsctables, utils
9
import os
10
import socket
11
from lalapps import inspiralutils
12 13
import uuid
import ast
14
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
15
import string
16
from math import floor,ceil,log,pow
17
import sys
18
import random
19
from itertools import permutations
20
import shutil
21
import numpy as np
22
import math
23
from six.moves import range
24 25 26 27 28

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
49
        (c,d)=fslocation.split(USER,1)
50 51 52 53
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
54 55
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
85 86 87 88
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
89
  new_id=itertools.count().next
90
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
91
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
92
    self.injection=SimInspiral
93
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
94
    self.sngltrigger=SnglInspiral
95 96 97 98
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
99 100
    self.GID=GID
    self.coinctrigger=CoincInspiral
101 102 103 104
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
105 106
    self.duration = duration
    self.srate = srate
107
    self.trigSNR = trigSNR
108
    self.fhigh = fhigh
109
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
110 111 112
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
113 114 115
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
116
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
117 118 119
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
120
    if self.sngltrigger is not None:
121
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
122
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
123 124 125 126
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
127 128 129 130 131 132 133
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
134 135

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
136

137
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False,service_url=None):
138
  """
139
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
140 141
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
142
  """
143
  output=[]
144
  from glue.ligolw import utils as ligolw_utils
145
  from glue.ligolw import lsctables
146
  from glue.ligolw import ligolw
Rory Smith's avatar
Rory Smith committed
147
  from lal import series as lalseries
148 149
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
150
  from ligo.gracedb.rest import GraceDb, HTTPError
151 152 153 154
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
155 156
  cwd=os.getcwd()
  os.chdir(basepath)
157
  print("Download %s coinc.xml" % gid)
158 159 160 161
  if service_url is None:
    client = GraceDb()
  else:
    client = GraceDb(service_url=service_url)
162
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
163 164 165 166 167
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
168
  # Parse PSD
169
  srate_psdfile=16384
170
  fhigh=None
171
  psdfileobj = None
172
  if downloadpsd:
173
    print("Download %s psd.xml.gz" % gid)
174 175 176
    try:
      psdfileobj = client.files(gid, "psd.xml.gz")
    except HTTPError:
177
      print("Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid)
178 179 180 181 182 183 184 185 186 187
    if psdfileobj is not None:
      if reference_psd is not None:
        xmlpsd = ligolw_utils.load_fileobj(psdfileobj, contenthandler = lalseries.PSDContentHandler)[0]
        psd = lalseries.read_psd_xmldoc(xmlpsd)
        ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
      else:
        open("psd.xml.gz", "w").write(psdfileobj.read())
      psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
      combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
      srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
188
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
189
  for coinc in coinc_events:
190
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
191 192
    dur=[]
    srate=[]
193
    horizon_distance=[]
194
    for e in these_sngls:
195
      if roq==False:
196 197 198 199
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
200
      # determine horizon distance
201
      if threshold_snr is not None:
202
        if e.eff_distance is not None and not math.isnan(e.eff_distance):
203 204 205 206
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
207
        else:
208
          if reference_psd is not None and psdfileobj is not None:
209 210 211 212
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
213 214 215 216 217
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
218
        if psdfileobj is not None:
219 220 221 222 223
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
224
    else:
225
      duration = None
226
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
227
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
228 229
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
230

231
  print("Found %d coinc events in table." % len(coinc_events))
232
  os.chdir(cwd)
233 234
  return output

235 236 237 238 239 240 241
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
242
    import sqlite3
243 244 245 246
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
247
    #dbtables.DBTable_set_connection(connection)
248
    return (connection,working_filename)
249

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

293
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
294 295 296 297 298 299 300 301 302 303 304 305 306 307
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
308
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
309 310
		"
	if gpsstart is not None:
311
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
312
	if gpsend is not None:
313
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
314 315
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
316 317
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
318
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
319
    	extra={}
320
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
321 322
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
323
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
324 325 326 327
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
328 329 330 331 332 333
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
334
	return output.values()
335

336
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
353
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
354
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
355 356
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
357
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
358
	joinstr = ' and '
359
	if gpsstart is not None:
360
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
361
	if gpsend is not None:
362
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
363 364
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
365
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
366
        # Timeslide functionality requires obsolete pylal - will be removed
367
        import pylal
John Douglas Veitch's avatar
John Douglas Veitch committed
368
        from pylal import SnglInspiralUtils
369
	extra={}
370
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
371 372 373 374 375 376 377 378 379
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
380 381 382 383 384 385
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
386
	return output.values()
387

388 389 390 391 392 393 394 395
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
396 397 398
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
399 400
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
401 402
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
403
  if name=='lalinferencebambi' or name=='lalinferencebambimpi':
404
    return LALInferenceBAMBINode
405 406
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
407 408
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
409 410
  return EngineNode

411 412 413
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
414
        engine_list=['lalinferencenest','lalinferencemcmc','lalinferencebambimpi']
415 416 417 418 419 420 421 422 423
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

424
def scan_timefile(timefile):
425 426 427 428 429 430 431 432
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
433
	print('Skipping duplicate time %s'%(time))
434
	continue
435
      print('Read time %s'%(time))
436
      times.append(float(time))
437 438
    timefilehandle.close()
    return times
439

440 441 442 443
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
444 445
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
446 447 448
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
449 450
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
451
  """
452
  from glue.ligolw import utils as ligolw_utils
453
  try:
454
    from lal import series as lalseries
455
  except ImportError:
456
    print("ERROR, cannot import lal.series in bppu/get_xml_psds()\n")
457
    raise
458

459 460 461 462 463 464 465 466 467 468 469
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
470
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
471 472 473 474 475 476 477 478 479
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
480

481 482
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
483
    print("ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml)
484
    sys.exit(1)
485
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
486 487 488
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
489
      print("ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml))
490 491 492 493 494
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
495
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
496
    if os.path.isfile(path_to_ascii_psd):
497
      continue
498 499 500 501 502 503
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
504
    data=ifodata.data
505 506 507
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
508

509
    combine=[]
510

Rory Smith's avatar
Rory Smith committed
511
    for i in np.arange(len(data.data.data)) :
512
      combine.append([f0+i*deltaF,data.data.data[i]])
513
    np.savetxt(path_to_ascii_psd,combine)
514
    ifo=instrument.encode('ascii')
515 516 517
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
518

519
def get_trigger_chirpmass(gid=None,gracedb="gracedb",service_url=None):
520 521
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
522
  from glue.ligolw import utils as ligolw_utils
523
  from ligo.gracedb.rest import GraceDb
524
  cwd=os.getcwd()
525 526 527 528
  if service_url is None:
    client = GraceDb()
  else:
    client = GraceDb(service_url=service_url)
529
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
530 531 532
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
533 534 535 536
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
537
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
538 539 540
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
541
  # check that trigger masses are identical in each IFO
542 543 544
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

545
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
546 547 548 549
  os.remove("coinc.xml")

  return mchirp

550
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None, service_url=None):
551

552 553
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
554
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
555

556
  if gid is not None and sim_inspiral is not None:
557
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
558
    sys.exit(1)
559 560 561 562 563 564

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
565 566
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
567 568 569 570 571 572 573
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
574
    i+=1'''
575
  if gid is not None:
576
        trigger_mchirp = get_trigger_chirpmass(gid=gid,service_url=service_url)
577 578
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
579 580
  else:
	trigger_mchirp = None
581 582 583

  return mc_priors, trigger_mchirp

584
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None,service_url=None):
Vivien Raymond's avatar
Vivien Raymond committed
585 586 587 588 589 590 591

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
592
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
Vivien Raymond's avatar
Vivien Raymond committed
593 594 595 596 597 598 599 600 601
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
602
    trigger_mchirp = get_trigger_chirpmass(gid,service_url=service_url)
Vivien Raymond's avatar
Vivien Raymond committed
603 604 605 606 607 608 609
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

610
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
611 612 613 614 615
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
616
  scale_factor = 1.
617
  if force_flow == None and trigger_mchirp != None:
618 619 620
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
621
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
622
  elif force_flow != None:
623
     scale_factor = 20./force_flow
624
  return scale_factor
625

626 627
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
628

Vivien Raymond's avatar
Vivien Raymond committed
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
646
    print('Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.')
Vivien Raymond's avatar
Vivien Raymond committed
647 648 649
    sys.exit(1)
  return roq_bounds

650
class LALInferencePipelineDAG(pipeline.CondorDAG):
651
  def __init__(self,cp,dax=False,site='local'):
652 653
    self.subfiles=[]
    self.config=cp
654
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
655
    self.EngineNode=chooseEngineNode(self.engine)
656
    self.site=site
657
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
658
      self.basepath=cp.get('paths','basedir')
659 660
    else:
      self.basepath=os.getcwd()
661
      print('No basepath specified, using current directory: %s'%(self.basepath))
662
    mkdirs(self.basepath)
663
    print("Generating LALInference DAG in {0}".format(self.basepath))
664 665
    if dax:
        os.chdir(self.basepath)
666
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
667
    mkdirs(self.posteriorpath)
668 669 670
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
671
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
672
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
673
      self.cachepath=cp.get('paths','cachedir')
674 675
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
676
    mkdirs(self.cachepath)
677
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
678
      self.logpath=cp.get('paths','logdir')
679 680
    else:
      self.logpath=os.path.join(self.basepath,'log')
681
    mkdirs(self.logpath)
682
    if cp.has_option('analysis','ifos'):
683
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
684 685 686
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
687 688 689 690
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
691
      self.segments[ifo]=[]
692 693 694
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
695
    self.dq={}
696
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
697
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
698
    self.use_available_data=False
699
    self.webdir=cp.get('paths','webdir')
700 701 702 703
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
704
    # Set up necessary job files.
705
    self.prenodes={}
706
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
707
    self.datafind_job.add_opt('url-type','file')
708 709 710
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
711 712 713 714
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
715
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
716
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
717 718
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
719
    if self.config.getboolean('analysis','roq'):
720 721 722 723 724
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
725 726 727 728 729
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
730 731 732 733
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
734
        for a in permutations(self.ifos,N):
735 736
            ifocombos.append(a)
    for ifos in ifocombos:
737
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
738
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
739
    self.results_page_job.set_grid_site('local')
740
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
741
    self.cotest_results_page_job.set_grid_site('local')
742 743 744 745 746 747 748 749
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
750
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
751
    self.coherence_test_job.set_grid_site('local')
752
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
753
    self.gracedbjob.set_grid_site('local')
754
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
755
    self.events=self.setup_from_inputs()
756

757
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
758
    if len(self.events)==0:
759
      print('No input events found, please check your config if you expect some events')
760
    self.times=[e.trig_time for e in self.events]
761

762
    # Set up the segments
763
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
764
      (mintime,maxtime)=self.get_required_data(self.times)
765 766 767 768
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
769
    self.add_science_segments()
770

771
    # Save the final configuration that is being used
772
    # first to the run dir
773 774
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
775
      self.config.write(conffile)
776 777 778 779
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
780

781
    # Generate the DAG according to the config given
782
    for event in self.events: self.add_full_analysis(event)
783
    self.add_skyarea_followup()