lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
from glue.ligolw import ligolw, lsctables, utils
9
import os
10
import socket
11
from lalapps import inspiralutils
12 13
import uuid
import ast
14
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
15
import string
16
from math import floor,ceil,log,pow
17
import sys
18
import random
19
from itertools import permutations
20
import shutil
21
import numpy as np
22
import math
23
from six.moves import range
24 25 26 27 28

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
49
        (c,d)=fslocation.split(USER,1)
50 51 52 53
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
54 55
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
85 86 87 88
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
89
  new_id=itertools.count().next
90
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
91
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
92
    self.injection=SimInspiral
93
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
94
    self.sngltrigger=SnglInspiral
95 96 97 98
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
99 100
    self.GID=GID
    self.coinctrigger=CoincInspiral
101 102 103 104
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
105 106
    self.duration = duration
    self.srate = srate
107
    self.trigSNR = trigSNR
108
    self.fhigh = fhigh
109
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
110 111 112
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
113 114 115
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
116
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
117 118 119
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
120
    if self.sngltrigger is not None:
121
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
122
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
123 124 125 126
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
127 128 129 130 131 132 133
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
134 135

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
136

137
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False,service_url=None):
138
  """
139
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
140 141
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
142
  """
143
  output=[]
144
  from glue.ligolw import utils as ligolw_utils
145
  from glue.ligolw import lsctables
146
  from glue.ligolw import ligolw
Rory Smith's avatar
Rory Smith committed
147
  from lal import series as lalseries
148 149
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
150
  from ligo.gracedb.rest import GraceDb, HTTPError
151 152 153 154
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
155 156
  cwd=os.getcwd()
  os.chdir(basepath)
157
  print("Download %s coinc.xml" % gid)
158 159 160 161
  if service_url is None:
    client = GraceDb()
  else:
    client = GraceDb(service_url=service_url)
162
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
163 164 165 166 167
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
168
  # Parse PSD
169
  srate_psdfile=16384
170
  fhigh=None
171
  psdfileobj = None
172
  if downloadpsd:
173
    print("Download %s psd.xml.gz" % gid)
174 175 176
    try:
      psdfileobj = client.files(gid, "psd.xml.gz")
    except HTTPError:
177
      print("Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid)
178 179 180 181 182 183 184 185 186 187
    if psdfileobj is not None:
      if reference_psd is not None:
        xmlpsd = ligolw_utils.load_fileobj(psdfileobj, contenthandler = lalseries.PSDContentHandler)[0]
        psd = lalseries.read_psd_xmldoc(xmlpsd)
        ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
      else:
        open("psd.xml.gz", "w").write(psdfileobj.read())
      psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
      combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
      srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
188
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
189
  for coinc in coinc_events:
190
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
191 192
    dur=[]
    srate=[]
193
    horizon_distance=[]
194
    for e in these_sngls:
195
      if roq==False:
196 197 198 199
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
200
      # determine horizon distance
201
      if threshold_snr is not None:
202
        if e.eff_distance is not None and not math.isnan(e.eff_distance):
203 204 205 206
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
207
        else:
208
          if reference_psd is not None and psdfileobj is not None:
209 210 211 212
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
213 214 215 216 217
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
218
        if psdfileobj is not None:
219 220 221 222 223
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
224
    else:
225
      duration = None
226
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
227
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
228 229
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
230

231
  print("Found %d coinc events in table." % len(coinc_events))
232
  os.chdir(cwd)
233 234
  return output

235 236 237 238 239 240 241
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
242
    import sqlite3
243 244 245 246
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
247
    #dbtables.DBTable_set_connection(connection)
248
    return (connection,working_filename)
249

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

293
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
294 295 296 297 298 299 300 301 302 303 304 305 306 307
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
308
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
309 310
		"
	if gpsstart is not None:
311
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
312
	if gpsend is not None:
313
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
314 315
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
316 317
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
318
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
319
    	extra={}
320
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
321 322
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
323
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
324 325 326 327
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
328 329 330 331 332 333
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
334
	return output.values()
335

336
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
353
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
354
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
355 356
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
357
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
358
	joinstr = ' and '
359
	if gpsstart is not None:
360
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
361
	if gpsend is not None:
362
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
363 364
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
365
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
366
        # Timeslide functionality requires obsolete pylal - will be removed
367
        import pylal
John Douglas Veitch's avatar
John Douglas Veitch committed
368
        from pylal import SnglInspiralUtils
369
	extra={}
370
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
371 372 373 374 375 376 377 378 379
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
380 381 382 383 384 385
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
386
	return output.values()
387

388 389 390 391 392 393 394 395
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
396 397 398
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
399 400
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
401 402
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
403 404
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
405 406
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
407 408
  return EngineNode

409 410 411
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
412
        engine_list=['lalinferencenest','lalinferencemcmc']
413 414 415 416 417 418 419 420 421
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

422
def scan_timefile(timefile):
423 424 425 426 427 428 429 430
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
431
	print('Skipping duplicate time %s'%(time))
432
	continue
433
      print('Read time %s'%(time))
434
      times.append(float(time))
435 436
    timefilehandle.close()
    return times
437

438 439 440 441
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
442 443
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
444 445 446
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
447 448
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
449
  """
450
  from glue.ligolw import utils as ligolw_utils
451
  try:
452
    from lal import series as lalseries
453
  except ImportError:
454
    print("ERROR, cannot import lal.series in bppu/get_xml_psds()\n")
455
    raise
456

457 458 459 460 461 462 463 464 465 466 467
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
468
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
469 470 471 472 473 474 475 476 477
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
478

479 480
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
481
    print("ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml)
482
    sys.exit(1)
483
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
484 485 486
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
487
      print("ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml))
488 489 490 491 492
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
493
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
494
    if os.path.isfile(path_to_ascii_psd):
495
      continue
496 497 498 499 500 501
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
502
    data=ifodata.data
503 504 505
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
506

507
    combine=[]
508

Rory Smith's avatar
Rory Smith committed
509
    for i in np.arange(len(data.data.data)) :
510
      combine.append([f0+i*deltaF,data.data.data[i]])
511
    np.savetxt(path_to_ascii_psd,combine)
512
    ifo=instrument.encode('ascii')
513 514 515
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
516

517
def get_trigger_chirpmass(gid=None,gracedb="gracedb",service_url=None):
518 519
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
520
  from glue.ligolw import utils as ligolw_utils
521
  from ligo.gracedb.rest import GraceDb
522
  cwd=os.getcwd()
523 524 525 526
  if service_url is None:
    client = GraceDb()
  else:
    client = GraceDb(service_url=service_url)
527
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
528 529 530
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
531 532 533 534
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
535
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
536 537 538
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
539
  # check that trigger masses are identical in each IFO
540 541 542
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

543
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
544 545 546 547
  os.remove("coinc.xml")

  return mchirp

548
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None, service_url=None):
549

550 551
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
552
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
553

554
  if gid is not None and sim_inspiral is not None:
555
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
556
    sys.exit(1)
557 558 559 560 561 562

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
563 564
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
565 566 567 568 569 570 571
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
572
    i+=1'''
573
  if gid is not None:
574
        trigger_mchirp = get_trigger_chirpmass(gid=gid,service_url=service_url)
575 576
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
577 578
  else:
	trigger_mchirp = None
579 580 581

  return mc_priors, trigger_mchirp

582
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None,service_url=None):
Vivien Raymond's avatar
Vivien Raymond committed
583 584 585 586 587 588 589

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
590
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
Vivien Raymond's avatar
Vivien Raymond committed
591 592 593 594 595 596 597 598 599
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
600
    trigger_mchirp = get_trigger_chirpmass(gid,service_url=service_url)
Vivien Raymond's avatar
Vivien Raymond committed
601 602 603 604 605 606 607
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

608
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
609 610 611 612 613
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
614
  scale_factor = 1.
615
  if force_flow == None and trigger_mchirp != None:
616 617 618
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
619
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
620
  elif force_flow != None:
621
     scale_factor = 20./force_flow
622
  return scale_factor
623

624 625
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
626

Vivien Raymond's avatar
Vivien Raymond committed
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
644
    print('Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.')
Vivien Raymond's avatar
Vivien Raymond committed
645 646 647
    sys.exit(1)
  return roq_bounds

648
class LALInferencePipelineDAG(pipeline.CondorDAG):
649
  def __init__(self,cp,dax=False,site='local'):
650 651
    self.subfiles=[]
    self.config=cp
652
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
653
    self.EngineNode=chooseEngineNode(self.engine)
654
    self.site=site
655
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
656
      self.basepath=cp.get('paths','basedir')
657 658
    else:
      self.basepath=os.getcwd()
659
      print('No basepath specified, using current directory: %s'%(self.basepath))
660
    mkdirs(self.basepath)
661
    print("Generating LALInference DAG in {0}".format(self.basepath))
662 663
    if dax:
        os.chdir(self.basepath)
664
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
665
    mkdirs(self.posteriorpath)
666 667 668
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
669
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
670
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
671
      self.cachepath=cp.get('paths','cachedir')
672 673
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
674
    mkdirs(self.cachepath)
675
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
676
      self.logpath=cp.get('paths','logdir')
677 678
    else:
      self.logpath=os.path.join(self.basepath,'log')
679
    mkdirs(self.logpath)
680
    if cp.has_option('analysis','ifos'):
681
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
682 683 684
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
685 686 687 688
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
689
      self.segments[ifo]=[]
690 691 692
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
693
    self.dq={}
694
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
695
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
696
    self.use_available_data=False
697
    self.webdir=cp.get('paths','webdir')
698 699 700 701
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
702
    # Set up necessary job files.
703
    self.prenodes={}
704
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
705
    self.datafind_job.add_opt('url-type','file')
706 707 708
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
709 710 711 712
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
713
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
714
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
715 716
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
717
    if self.config.getboolean('analysis','roq'):
718 719 720 721 722
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
723 724 725 726 727
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
728 729 730 731
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
732
        for a in permutations(self.ifos,N):
733 734
            ifocombos.append(a)
    for ifos in ifocombos:
735
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
736
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
737
    self.results_page_job.set_grid_site('local')
738
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
739
    self.cotest_results_page_job.set_grid_site('local')
740 741 742 743 744 745 746 747
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
748
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
749
    self.coherence_test_job.set_grid_site('local')
750
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
751
    self.gracedbjob.set_grid_site('local')
752 753
    self.mapjob = SkyMapJob(cp, os.path.join(self.basepath,'skymap.sub'), self.logpath)
    self.plotmapjob = PlotSkyMapJob(cp, os.path.join(self.basepath,'plotskymap.sub'),self.logpath)
754
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
755
    self.events=self.setup_from_inputs()
756

757
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
758
    if len(self.events)==0:
759
      print('No input events found, please check your config if you expect some events')
760
    self.times=[e.trig_time for e in self.events]
761

762
    # Set up the segments
763
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
764
      (mintime,maxtime)=self.get_required_data(self.times)
765 766 767 768
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
769
    self.add_science_segments()
770

771
    # Save the final configuration that is being used
772
    # first to the run dir
773 774
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
775
      self.config.write(conffile)
776 777 778 779
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
780

781
    # Generate the DAG according to the config given
782
    for event in self.events: self.add_full_analysis(event)
783 784 785
    if self.config.has_option('analysis','upload-to-gracedb'):
      if self.config.getboolean('analysis','upload-to-gracedb'):
        self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)