lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
import os
9
import socket
10
from lalapps import inspiralutils
11 12
import uuid
import ast
13
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
14
import string
15
from math import floor,ceil,log,pow
16
import sys
17
import random
18
from itertools import permutations
19
import shutil
20
import numpy as np
21
import math
22
from six.moves import range
23 24 25 26 27

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
48
        (c,d)=fslocation.split(USER,1)
49 50 51 52
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
53 54
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
84 85 86 87
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
88
  new_id=itertools.count().next
89
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
90
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
91
    self.injection=SimInspiral
92
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
93
    self.sngltrigger=SnglInspiral
94 95 96 97
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
98 99
    self.GID=GID
    self.coinctrigger=CoincInspiral
100 101 102 103
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
104 105
    self.duration = duration
    self.srate = srate
106
    self.trigSNR = trigSNR
107
    self.fhigh = fhigh
108
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
109 110 111
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
112 113 114
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
115
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
116 117 118
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
119
    if self.sngltrigger is not None:
120
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
121
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
122 123 124 125
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
126 127 128 129 130 131 132
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
133 134

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
135

136
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False):
137
  """
138
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
139 140
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
141
  """
142
  output=[]
143
  from glue.ligolw import utils as ligolw_utils
144
  from glue.ligolw import lsctables
145
  from glue.ligolw import ligolw
146
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
147
    pass
148
  lsctables.use_in(LIGOLWContentHandler)
Rory Smith's avatar
Rory Smith committed
149
  from lal import series as lalseries
150 151
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
152
  from ligo.gracedb.rest import GraceDb, HTTPError
153 154 155 156
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
157 158
  cwd=os.getcwd()
  os.chdir(basepath)
159
  print("Download %s coinc.xml" % gid)
160 161 162 163 164 165 166
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
167
  # Parse PSD
168
  srate_psdfile=16384
169
  fhigh=None
170
  psdfileobj = None
171
  if downloadpsd:
172
    print("Download %s psd.xml.gz" % gid)
173 174 175
    try:
      psdfileobj = client.files(gid, "psd.xml.gz")
    except HTTPError:
176
      print("Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid)
177 178 179 180 181 182 183 184 185 186
    if psdfileobj is not None:
      if reference_psd is not None:
        xmlpsd = ligolw_utils.load_fileobj(psdfileobj, contenthandler = lalseries.PSDContentHandler)[0]
        psd = lalseries.read_psd_xmldoc(xmlpsd)
        ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
      else:
        open("psd.xml.gz", "w").write(psdfileobj.read())
      psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
      combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
      srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
187
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
188
  for coinc in coinc_events:
189
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
190 191
    dur=[]
    srate=[]
192
    horizon_distance=[]
193
    for e in these_sngls:
194
      if roq==False:
195 196 197 198
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
199
      # determine horizon distance
200
      if threshold_snr is not None:
201
        if e.eff_distance is not None and not math.isnan(e.eff_distance):
202 203 204 205
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
206
        else:
207
          if reference_psd is not None and psdfileobj is not None:
208 209 210 211
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
212 213 214 215 216
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
217
        if psdfileobj is not None:
218 219 220 221 222
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
223
    else:
224
      duration = None
225
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
226
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
227 228
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
229

230
  print("Found %d coinc events in table." % len(coinc_events))
231
  os.chdir(cwd)
232 233
  return output

234 235 236 237 238 239 240
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
241
    import sqlite3
242 243 244 245
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
246
    #dbtables.DBTable_set_connection(connection)
247
    return (connection,working_filename)
248

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

292
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
293 294 295 296 297 298 299 300 301 302 303 304 305 306
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
307
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
308 309
		"
	if gpsstart is not None:
310
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
311
	if gpsend is not None:
312
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
313 314
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
315 316
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
317
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
318
    	extra={}
319
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
320 321
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
322
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
323 324 325 326
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
327 328 329 330 331 332
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
333
	return output.values()
334

335
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
352
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
353
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
354 355
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
356
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
357
	joinstr = ' and '
358
	if gpsstart is not None:
359
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
360
	if gpsend is not None:
361
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
362 363
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
364
	db_out=database_connection.cursor().execute(get_coincs)
365 366
	from pylal import SnglInspiralUtils
	extra={}
367
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
368 369 370 371 372 373 374 375 376
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
377 378 379 380 381 382
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
383
	return output.values()
384

385 386 387 388 389 390 391 392
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
393 394 395
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
396 397
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
398 399
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
400
  if name=='lalinferencebambi' or name=='lalinferencebambimpi':
401
    return LALInferenceBAMBINode
402 403
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
404 405
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
406 407
  return EngineNode

408 409 410
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
411
        engine_list=['lalinferencenest','lalinferencemcmc','lalinferencebambimpi']
412 413 414 415 416 417 418 419 420
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

421
def scan_timefile(timefile):
422 423 424 425 426 427 428 429
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
430
	print('Skipping duplicate time %s'%(time))
431
	continue
432
      print('Read time %s'%(time))
433
      times.append(float(time))
434 435
    timefilehandle.close()
    return times
436

437 438 439 440
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
441 442
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
443 444 445
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
446 447
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
448 449
  """
  lal=1
450
  from glue.ligolw import utils as ligolw_utils
451
  try:
Rory Smith's avatar
Rory Smith committed
452
    #from pylal import series
453
    from lal import series as lalseries
454
    lal=0
455
  except ImportError:
456
    print("ERROR, cannot import lal.series in bppu/get_xml_psds()\n")
457
    exit(1)
458

459 460 461 462 463 464 465 466 467 468 469
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
470
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
471 472 473 474 475 476 477 478 479
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
480

481 482
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
483
    print("ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml)
484
    sys.exit(1)
485
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
486 487 488
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
489
      print("ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml))
490 491 492 493 494
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
495
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
496
    if os.path.isfile(path_to_ascii_psd):
497
      continue
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
    if lal==0:
      #pylal stores the series in ifodata.data
      data=ifodata
    else:
      # lal stores it in ifodata.data.data
      data=ifodata.data
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
513

514
    combine=[]
515

Rory Smith's avatar
Rory Smith committed
516
    for i in np.arange(len(data.data.data)) :
517
      combine.append([f0+i*deltaF,data.data.data[i]])
518
    np.savetxt(path_to_ascii_psd,combine)
519
    ifo=instrument.encode('ascii')
520 521 522
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
523

524 525 526
def get_trigger_chirpmass(gid=None,gracedb="gracedb"):
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
527 528
  from glue.ligolw import utils as ligolw_utils
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
529
    pass
530
  lsctables.use_in(LIGOLWContentHandler)
531
  from ligo.gracedb.rest import GraceDb
532
  cwd=os.getcwd()
533 534 535 536 537
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
538 539 540 541
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
542
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
543 544 545
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
546
  # check that trigger masses are identical in each IFO
547 548 549
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

550
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
551 552 553 554
  os.remove("coinc.xml")

  return mchirp

555
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):
556

557 558
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
559
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
560

561
  if gid is not None and sim_inspiral is not None:
562
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
563
    sys.exit(1)
564 565 566 567 568 569

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
570 571
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
572 573 574 575 576 577 578
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
579
    i+=1'''
580 581
  if gid is not None:
  	trigger_mchirp = get_trigger_chirpmass(gid)
582 583
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
584 585
  else:
	trigger_mchirp = None
586 587 588

  return mc_priors, trigger_mchirp

Vivien Raymond's avatar
Vivien Raymond committed
589 590 591 592 593 594 595 596
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
597
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
Vivien Raymond's avatar
Vivien Raymond committed
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
    trigger_mchirp = get_trigger_chirpmass(gid)
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

615
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
616 617 618 619 620
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
621
  scale_factor = 1.
622
  if force_flow == None and trigger_mchirp != None:
623 624 625
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
626
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
627
  elif force_flow != None:
628
     scale_factor = 20./force_flow
629
  return scale_factor
630

631 632
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
633

Vivien Raymond's avatar
Vivien Raymond committed
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
651
    print('Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.')
Vivien Raymond's avatar
Vivien Raymond committed
652 653 654
    sys.exit(1)
  return roq_bounds

655
class LALInferencePipelineDAG(pipeline.CondorDAG):
656
  def __init__(self,cp,dax=False,site='local'):
657 658
    self.subfiles=[]
    self.config=cp
659
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
660
    self.EngineNode=chooseEngineNode(self.engine)
661
    self.site=site
662
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
663
      self.basepath=cp.get('paths','basedir')
664 665
    else:
      self.basepath=os.getcwd()
666
      print('No basepath specified, using current directory: %s'%(self.basepath))
667
    mkdirs(self.basepath)
668
    print("Generating LALInference DAG in {0}".format(self.basepath))
669 670
    if dax:
        os.chdir(self.basepath)
671
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
672
    mkdirs(self.posteriorpath)
673 674 675
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
676
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
677
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
678
      self.cachepath=cp.get('paths','cachedir')
679 680
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
681
    mkdirs(self.cachepath)
682
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
683
      self.logpath=cp.get('paths','logdir')
684 685
    else:
      self.logpath=os.path.join(self.basepath,'log')
686
    mkdirs(self.logpath)
687
    if cp.has_option('analysis','ifos'):
688
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
689 690 691
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
692 693 694 695
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
696
      self.segments[ifo]=[]
697 698 699
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
700
    self.dq={}
701
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
702
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
703
    self.use_available_data=False
704
    self.webdir=cp.get('paths','webdir')
705 706 707 708
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
709
    # Set up necessary job files.
710
    self.prenodes={}
711
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
712
    self.datafind_job.add_opt('url-type','file')
713 714 715
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
716 717 718 719
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
720
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
721
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
722 723
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
724
    if self.config.getboolean('analysis','roq'):
725 726 727 728 729
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
730 731 732 733 734
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
735 736 737 738
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
739
        for a in permutations(self.ifos,N):
740 741
            ifocombos.append(a)
    for ifos in ifocombos:
742
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
743
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
744
    self.results_page_job.set_grid_site('local')
745
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
746
    self.cotest_results_page_job.set_grid_site('local')
747 748 749 750 751 752 753 754
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
755
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
756
    self.coherence_test_job.set_grid_site('local')
757
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
758
    self.gracedbjob.set_grid_site('local')
759
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
760
    self.events=self.setup_from_inputs()
761

762
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
763
    if len(self.events)==0:
764
      print('No input events found, please check your config if you expect some events')
765
    self.times=[e.trig_time for e in self.events]
766

767
    # Set up the segments
768
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
769
      (mintime,maxtime)=self.get_required_data(self.times)
770 771 772 773
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
774
    self.add_science_segments()
775

776
    # Save the final configuration that is being used
777
    # first to the run dir
778 779
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
780
      self.config.write(conffile)
781 782 783 784
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
785

786
    # Generate the DAG according to the config given
787
    for event in self.events: self.add_full_analysis(event)
788
    self.add_skyarea_followup()
789 790 791
    if self.config.has_option('analysis','upload-to-gracedb'):
      if self.config.getboolean('analysis','upload-to-gracedb'):
        self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
792
    self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
793
    self.set_dag_file(os.path.join(self.basepath,self.dagfilename))
794 795 796 797 798