lalinference_pipe_utils.py 127 KB
Newer Older
1

2
#flow DAG Class definitions for LALInference Pipeline
3
# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4

John Douglas Veitch's avatar
John Douglas Veitch committed
5
import itertools
6
import glue
7
from glue import pipeline,segmentsUtils,segments
8
import os
9
import socket
10
from lalapps import inspiralutils
11 12
import uuid
import ast
13
import pdb
John Douglas Veitch's avatar
John Douglas Veitch committed
14
import string
15
from math import floor,ceil,log,pow
16
import sys
17
import random
18
from itertools import permutations
19
import shutil
20
import numpy as np
21
import math
22 23 24 25 26

# We use the GLUE pipeline utilities to construct classes for each
# type of job. Each class has inputs and outputs, which are used to
# join together types of jobs into a DAG.

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
def guess_url(fslocation):
    """
    Try to work out the web address of a given path
    """
    SERVER="localhost"
    USER=os.environ['USER']
    HOST=socket.getfqdn()
    if 'public_html' in fslocation:
        k='public_html/'
    elif 'WWW' in fslocation:
        k='WWW/'
    elif 'www_html' in fslocation:
        k='www_html/'
    else:
        k=None
    if k is not None:
        (a,b)=fslocation.split(k)
        webpath=os.path.join('~%s'%(USER),b)
        onweb=True
    else:
47
        (c,d)=fslocation.split(USER,1)
48 49 50 51
        for k in ['public_html','WWW','www_html']:
            trypath=c+os.environ['USER']+'/'+k+d
            #Follow symlinks
            if os.path.realpath(trypath)==os.path.normpath(fslocation):
52 53
                #(a,b)=trypath.split(k)
                webpath=os.path.join('~%s'%(USER),d)
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
                onweb=True
                break
            else:
                webpath=fslocation
                onweb=False
    if 'atlas' in HOST:
        url="https://atlas1.atlas.aei.uni-hannover.de/"
    elif 'cit' in HOST or 'caltech' in HOST:
        url="https://ldas-jobs.ligo.caltech.edu/"
    elif 'ligo-wa' in HOST:
        url="https://ldas-jobs.ligo-wa.caltech.edu/"
    elif 'ligo-la' in HOST:
        url="https://ldas-jobs.ligo-la.caltech.edu/"
    elif 'uwm' in HOST or 'nemo' in HOST:
        url="https://ldas-jobs.phys.uwm.edu/"
    elif 'phy.syr.edu' in HOST:
        url="https://sugar-jobs.phy.syr.edu/"
    elif 'arcca.cf.ac.uk' in HOST:
        url="https://geo2.arcca.cf.ac.uk/"
    elif 'vulcan' in HOST:
        url="https://galahad.aei.mpg.de/"
    else:
        if onweb:
          url="http://%s/"%(HOST)
        else:
          url=HOST+':'
    url=url+webpath
    return(url)

John Douglas Veitch's avatar
John Douglas Veitch committed
83 84 85 86
class Event():
  """
  Represents a unique event to run on
  """
John Douglas Veitch's avatar
John Douglas Veitch committed
87
  new_id=itertools.count().next
88
  def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
John Douglas Veitch's avatar
John Douglas Veitch committed
89
    self.trig_time=trig_time
John Douglas Veitch's avatar
John Douglas Veitch committed
90
    self.injection=SimInspiral
91
    self.burstinjection=SimBurst
John Douglas Veitch's avatar
John Douglas Veitch committed
92
    self.sngltrigger=SnglInspiral
93 94 95 96
    if timeslide_dict is None:
      self.timeslides={}
    else:
      self.timeslides=timeslide_dict
97 98
    self.GID=GID
    self.coinctrigger=CoincInspiral
99 100 101 102
    if ifos is None:
      self.ifos = []
    else:
      self.ifos = ifos
103 104
    self.duration = duration
    self.srate = srate
105
    self.trigSNR = trigSNR
106
    self.fhigh = fhigh
107
    self.horizon_distance = horizon_distance
John Douglas Veitch's avatar
John Douglas Veitch committed
108 109 110
    if event_id is not None:
        self.event_id=event_id
    else:
John Douglas Veitch's avatar
John Douglas Veitch committed
111 112 113
        self.event_id=Event.new_id()
    if self.injection is not None:
        self.trig_time=self.injection.get_end()
114
        if event_id is None: self.event_id=int(str(self.injection.simulation_id).split(':')[2])
115 116 117
    if self.burstinjection is not None:
        self.trig_time=self.burstinjection.get_end()
        if event_id is None: self.event_id=int(str(self.burstinjection.simulation_id).split(':')[2])
John Douglas Veitch's avatar
John Douglas Veitch committed
118
    if self.sngltrigger is not None:
119
        self.trig_time=self.sngltrigger.get_end()
John Douglas Veitch's avatar
John Douglas Veitch committed
120
        self.event_id=int(str(self.sngltrigger.event_id).split(':')[2])
121 122 123 124
    if self.coinctrigger is not None:
        self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
    if self.GID is not None:
        self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
125 126 127 128 129 130 131
    self.engine_opts={}
  def set_engine_option(self,opt,val):
    """
    Can set event-specific options for the engine nodes
    using this option, e.g. ev.set_engine_option('time-min','1083759273')
    """
    self.engine_opts[opt]=val
John Douglas Veitch's avatar
John Douglas Veitch committed
132 133

dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
John Douglas Veitch's avatar
John Douglas Veitch committed
134

135
def readLValert(threshold_snr=None,gid=None,flow=20.0,gracedb="gracedb",basepath="./",downloadpsd=True,roq=False):
136
  """
137
  Parse LV alert file, containing coinc, sngl, coinc_event_map.
138 139
  and create a list of Events as input for pipeline
  Based on Chris Pankow's script
140
  """
141
  output=[]
142
  from glue.ligolw import utils as ligolw_utils
143
  from glue.ligolw import lsctables
144
  from glue.ligolw import ligolw
145
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
146
    pass
147
  lsctables.use_in(LIGOLWContentHandler)
Rory Smith's avatar
Rory Smith committed
148
  from lal import series as lalseries
149 150
  import lal
  from lalsimulation import SimInspiralChirpTimeBound, GetApproximantFromString, IMRPhenomDGetPeakFreq
151
  from ligo.gracedb.rest import GraceDb, HTTPError
152 153 154 155
  try:
    from gstlal import reference_psd
  except ImportError:
    reference_psd = None
156 157
  cwd=os.getcwd()
  os.chdir(basepath)
158
  print("Download %s coinc.xml" % gid)
159 160 161 162 163 164 165
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
  ifos = sorted(coinc_events[0].instruments)
  trigSNR = coinc_events[0].snr
166
  # Parse PSD
167
  srate_psdfile=16384
168
  fhigh=None
169
  psdfileobj = None
170
  if downloadpsd:
171
    print("Download %s psd.xml.gz" % gid)
172 173 174
    try:
      psdfileobj = client.files(gid, "psd.xml.gz")
    except HTTPError:
175
      print("Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid)
176 177 178 179 180 181 182 183 184 185
    if psdfileobj is not None:
      if reference_psd is not None:
        xmlpsd = ligolw_utils.load_fileobj(psdfileobj, contenthandler = lalseries.PSDContentHandler)[0]
        psd = lalseries.read_psd_xmldoc(xmlpsd)
        ligolw_utils.write_filename(xmlpsd, "psd.xml.gz", gz = True)
      else:
        open("psd.xml.gz", "w").write(psdfileobj.read())
      psdasciidic = get_xml_psds(os.path.realpath("./psd.xml.gz"),ifos,os.path.realpath('./PSDs'),end_time=None)
      combine = np.loadtxt(psdasciidic[psdasciidic.keys()[0]])
      srate_psdfile = pow(2.0, ceil( log(float(combine[-1][0]), 2) ) ) * 2
186
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
187
  for coinc in coinc_events:
188
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
189 190
    dur=[]
    srate=[]
191
    horizon_distance=[]
192
    for e in these_sngls:
193
      if roq==False:
194 195 196 197
        chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
        fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
        dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
        srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
198
      # determine horizon distance
199
      if threshold_snr is not None:
200
        if e.eff_distance is not None and not math.isnan(e.eff_distance):
201 202 203 204
          if e.snr > threshold_snr:
            horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
          else:
            horizon_distance.append(2 * e.eff_distance)
205
        else:
206
          if reference_psd is not None and psdfileobj is not None:
207 208 209 210
            if not roq==False:
              fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
            HorizonDistanceObj = reference_psd.HorizonDistance(f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0, m1 = e.mass1, m2 = e.mass2)
            horizon_distance.append(HorizonDistanceObj(psd[e.ifo], snr = threshold_snr)[0])
211 212 213 214 215
    if srate:
      if max(srate)<srate_psdfile:
        srate = max(srate)
      else:
        srate = srate_psdfile
216
        if psdfileobj is not None:
217 218 219 220 221
          fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
    else:
      srate = None
    if dur:
      duration = max(dur)
222
    else:
223
      duration = None
224
    horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
225
    ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
226 227
             trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
    output.append(ev)
228

229
  print("Found %d coinc events in table." % len(coinc_events))
230
  os.chdir(cwd)
231 232
  return output

233 234 235 236 237 238 239
def open_pipedown_database(database_filename,tmp_space):
    """
    Open the connection to the pipedown database
    """
    if not os.access(database_filename,os.R_OK):
	raise Exception('Unable to open input file: %s'%(database_filename))
    from glue.ligolw import dbtables
240
    import sqlite3
241 242 243 244
    working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
    connection = sqlite3.connect(working_filename)
    if tmp_space:
	dbtables.set_temp_store_directory(connection,tmp_space)
245
    #dbtables.DBTable_set_connection(connection)
246
    return (connection,working_filename)
247

248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
        WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
		"
	if gpsstart is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
	if gpsend is not None:
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
	db_out=database_connection.cursor().execute(get_coincs)
    	extra={}
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
	return output.values()

291
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
292 293 294 295 296 297 298 299 300 301 302 303 304 305
	"""
	Returns a list of Event objects
	from pipedown data base. Can dump some stats to dumpfile if given,
	and filter by gpsstart and gpsend to reduce the nunmber or specify
	max_cfar to select by combined FAR
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	# Get coincs
	get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
		FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
		sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
		join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
306
		WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
307 308
		"
	if gpsstart is not None:
309
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
310
	if gpsend is not None:
311
		get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
312 313
	if max_cfar !=-1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
314 315
	if min_cfar != -1:
		get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
316
	db_out=database_connection.cursor().execute(get_coincs)
John Douglas Veitch's avatar
John Douglas Veitch committed
317
    	extra={}
318
	for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
John Douglas Veitch's avatar
John Douglas Veitch committed
319 320
      		coinc_id=int(coinc_id.split(":")[-1])
	  	if not coinc_id in output.keys():
321
			output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
John Douglas Veitch's avatar
John Douglas Veitch committed
322 323 324 325
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=0
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
326 327 328 329 330 331
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
332
	return output.values()
333

334
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
	"""
	Returns a list of Event objects
	with times and timeslide offsets
	"""
	output={}
	if gpsstart is not None: gpsstart=float(gpsstart)
	if gpsend is not None: gpsend=float(gpsend)
	db_segments=[]
	sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
	db_out = database_connection.cursor().execute(sql_seg_query)
	for d in db_out:
		if d not in db_segments:
			db_segments.append(d)
	seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
	db_out_saved=[]
	# Get coincidences
351
	get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
352
		    FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
353 354
		    == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
		    on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
355
		    join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
John Douglas Veitch's avatar
John Douglas Veitch committed
356
	joinstr = ' and '
357
	if gpsstart is not None:
358
		get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
359
	if gpsend is not None:
360
		get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
361 362
	if max_cfar!=-1:
		get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
363
	db_out=database_connection.cursor().execute(get_coincs)
364 365
	from pylal import SnglInspiralUtils
	extra={}
366
	for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
367 368 369 370 371 372 373 374 375
		coinc_id=int(coinc_id.split(":")[-1])
		seg=filter(lambda seg:sngl_time in seg,seglist)[0]
		slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
		if not coinc_id in output.keys():
			output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
			extra[coinc_id]={}
		output[coinc_id].timeslides[ifo]=slid_time-sngl_time
		output[coinc_id].ifos.append(ifo)
		extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
376 377 378 379 380 381
	if dumpfile is not None:
		fh=open(dumpfile,'w')
		for co in output.keys():
			for ifo in output[co].ifos:
				fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
		fh.close()
382
	return output.values()
383

384 385 386 387 388 389 390 391
def mkdirs(path):
  """
  Helper function. Make the given directory, creating intermediate
  dirs if necessary, and don't complain about it already existing.
  """
  if os.access(path,os.W_OK) and os.path.isdir(path): return
  else: os.makedirs(path)

John Douglas Veitch's avatar
John Douglas Veitch committed
392 393 394
def chooseEngineNode(name):
  if name=='lalinferencenest':
    return LALInferenceNestNode
395 396
  if name=='lalinferenceburst':
    return LALInferenceBurstNode
John Douglas Veitch's avatar
John Douglas Veitch committed
397 398
  if name=='lalinferencemcmc':
    return LALInferenceMCMCNode
399
  if name=='lalinferencebambi' or name=='lalinferencebambimpi':
400
    return LALInferenceBAMBINode
401 402
  if name=='lalinferencedatadump':
    return LALInferenceDataDumpNode
403 404
  if name=='bayeswavepsd':
    return BayesWavePSDNode
John Douglas Veitch's avatar
John Douglas Veitch committed
405 406
  return EngineNode

407 408 409
def get_engine_name(cp):
    name=cp.get('analysis','engine')
    if name=='random':
410
        engine_list=['lalinferencenest','lalinferencemcmc','lalinferencebambimpi']
411 412 413 414 415 416 417 418 419
        if cp.has_option('input','gid'):
            gid=cp.get('input','gid')
            engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
        else:
            engine_number=random.randint(0,1)
        return engine_list[engine_number]
    else:
        return name

420
def scan_timefile(timefile):
421 422 423 424 425 426 427 428
    import re
    p=re.compile('[\d.]+')
    times=[]
    timefilehandle=open(timefile,'r')
    for time in timefilehandle:
      if not p.match(time):
	continue
      if float(time) in times:
429
	print('Skipping duplicate time %s'%(time))
430
	continue
431
      print('Read time %s'%(time))
432
      times.append(float(time))
433 434
    timefilehandle.close()
    return times
435

436 437 438 439
def get_xml_psds(psdxml,ifos,outpath,end_time=None):
  """
  Get a psd.xml.gz file and:
  1) Reads it
440 441
  2) Checks the psd file contains all the IFO we want to analyze
  3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
442 443 444
  Input:
    psdxml: psd.xml.gz file
    ifos: list of ifos used for the analysis
445 446
    outpath: path where the ascii PSD will be written to
    (end_time): trigtime for this event. Will be used a part of the PSD file name
447 448
  """
  lal=1
449
  from glue.ligolw import utils as ligolw_utils
450
  try:
Rory Smith's avatar
Rory Smith committed
451
    #from pylal import series
452
    from lal import series as lalseries
453
    lal=0
454
  except ImportError:
455
    print("ERROR, cannot import lal.series in bppu/get_xml_psds()\n")
456
    exit(1)
457

458 459 460 461 462 463 464 465 466 467 468
  out={}
  if not os.path.isdir(outpath):
    os.makedirs(outpath)
  if end_time is not None:
    time=repr(float(end_time))
  else:
    time=''
  #check we don't already have ALL the psd files #
  got_all=1
  for ifo in ifos:
    path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
469
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
470 471 472 473 474 475 476 477 478
    if os.path.isfile(path_to_ascii_psd):
      got_all*=1
    else:
      got_all*=0
  if got_all==1:
    #print "Already have PSD files. Nothing to do...\n"
    for ifo in ifos:
      out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
    return out
479

480 481
  # We need to convert the PSD for one or more IFOS. Open the file
  if not os.path.isfile(psdxml):
482
    print("ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml)
483
    sys.exit(1)
484
  xmlpsd =  lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml,contenthandler = lalseries.PSDContentHandler))
485 486 487
  # Check the psd file contains all the IFOs we want to analize
  for ifo in ifos:
    if not ifo in [i.encode('ascii') for i in xmlpsd.keys()]:
488
      print("ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml))
489 490 491 492 493
      sys.exit(1)
  #loop over ifos in psd xml file
  for instrument in xmlpsd.keys():
    #name of the ascii file we are going to write the PSD into
    path_to_ascii_psd=os.path.join(outpath,instrument.encode('ascii')+'_psd_'+time+'.txt')
494
    # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
495
    if os.path.isfile(path_to_ascii_psd):
496
      continue
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
    # get data for the IFO
    ifodata=xmlpsd[instrument]
    #check data is not empty
    if ifodata is None:
      continue
    # we have data. Get psd array
    if lal==0:
      #pylal stores the series in ifodata.data
      data=ifodata
    else:
      # lal stores it in ifodata.data.data
      data=ifodata.data
    # Fill a two columns array of (freq, psd) and save it in the ascii file
    f0=ifodata.f0
    deltaF=ifodata.deltaF
512

513
    combine=[]
514

Rory Smith's avatar
Rory Smith committed
515
    for i in np.arange(len(data.data.data)) :
516
      combine.append([f0+i*deltaF,data.data.data[i]])
517
    np.savetxt(path_to_ascii_psd,combine)
518
    ifo=instrument.encode('ascii')
519 520 521
    # set node.psds dictionary with the path to the ascii files
    out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
  return out
522

523 524 525
def get_trigger_chirpmass(gid=None,gracedb="gracedb"):
  from glue.ligolw import lsctables
  from glue.ligolw import ligolw
526 527
  from glue.ligolw import utils as ligolw_utils
  class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
528
    pass
529
  lsctables.use_in(LIGOLWContentHandler)
530
  from ligo.gracedb.rest import GraceDb
531
  cwd=os.getcwd()
532 533 534 535 536
  client = GraceDb()
  xmldoc = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = LIGOLWContentHandler)[0]
  ligolw_utils.write_filename(xmldoc, "coinc.xml")
  coinc_events = lsctables.CoincInspiralTable.get_table(xmldoc)
  sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(xmldoc))
537 538 539 540
  coinc_map = lsctables.CoincMapTable.get_table(xmldoc)
  mass1 = []
  mass2 = []
  for coinc in coinc_events:
541
    these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
542 543 544
    for e in these_sngls:
      mass1.append(e.mass1)
      mass2.append(e.mass2)
545
  # check that trigger masses are identical in each IFO
546 547 548
  assert len(set(mass1)) == 1
  assert len(set(mass2)) == 1

549
  mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
550 551 552 553
  os.remove("coinc.xml")

  return mchirp

554
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):
555

556 557
  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
558
  mc_priors = {}
Vivien Raymond's avatar
Vivien Raymond committed
559

560
  if gid is not None and sim_inspiral is not None:
561
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
562
    sys.exit(1)
563 564 565 566 567 568

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
  ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
569 570
  # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
  '''i=0
571 572 573 574 575 576 577
  for roq in ordered_roq_paths:
    if i>0:
      # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
      #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
      mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
    if i<len(roq_paths)-1:
      mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
578
    i+=1'''
579 580
  if gid is not None:
  	trigger_mchirp = get_trigger_chirpmass(gid)
581 582
  elif sim_inspiral is not None:
        trigger_mchirp = sim_inspiral.mchirp
583 584
  else:
	trigger_mchirp = None
585 586 587

  return mc_priors, trigger_mchirp

Vivien Raymond's avatar
Vivien Raymond committed
588 589 590 591 592 593 594 595
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, gid=None,sim_inspiral=None):

  ## XML and GID cannot be given at the same time
  ## sim_inspiral must already point at the right row
  m1_priors = {}
  m2_priors = {}

  if gid is not None and sim_inspiral is not None:
596
    print("Error in get_roq_mchirp_priors, cannot use both gid and sim_inspiral\n")
Vivien Raymond's avatar
Vivien Raymond committed
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
    sys.exit(1)

  for roq in roq_paths:
    params=os.path.join(path,roq,'params.dat')
    roq_params[roq]=np.genfromtxt(params,names=True)
    m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
    m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]

  if gid is not None:
    trigger_mchirp = get_trigger_chirpmass(gid)
  elif sim_inspiral is not None:
    trigger_mchirp = sim_inspiral.mchirp
  else:
    trigger_mchirp = None

  return m1_priors, m2_priors, trigger_mchirp

614
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
615 616 617 618 619
  mc_priors_keys_int = map(lambda k : int(k[:-1]), mc_priors.keys())
  roq_min = mc_priors.keys()[np.argmin(mc_priors_keys_int)]
  roq_max = mc_priors.keys()[np.argmax(mc_priors_keys_int)]
  mc_max = mc_priors[roq_min][1]
  mc_min = mc_priors[roq_max][0]
620
  scale_factor = 1.
621
  if force_flow == None and trigger_mchirp != None:
622 623 624
     if trigger_mchirp >= mc_max:
        scale_factor = 2.**(floor(trigger_mchirp/mc_max))
     if trigger_mchirp <= mc_min:
625
        scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
626
  elif force_flow != None:
627
     scale_factor = 20./force_flow
628
  return scale_factor
629

630 631
def create_pfn_tuple(filename,protocol='file://',site='local'):
    return( (os.path.basename(filename),protocol+os.path.abspath(filename),site) )
632

Vivien Raymond's avatar
Vivien Raymond committed
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
def mchirp_from_components(m1, m2):
  return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)

def Query_ROQ_Bounds_Type(path, roq_paths):
  # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
  import numpy as np
  roq = roq_paths[0]
  params = os.path.join(path,roq,'params.dat')
  roq_params0 = np.genfromtxt(params,names=True)
  roq_names_set = set(roq_params0.dtype.names)
  component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
  chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
  if roq_names_set.issuperset(component_mass_bounds_set):
    roq_bounds = 'component_mass'
  elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
    roq_bounds = 'chirp_mass_q'
  else:
650
    print('Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.')
Vivien Raymond's avatar
Vivien Raymond committed
651 652 653
    sys.exit(1)
  return roq_bounds

654
class LALInferencePipelineDAG(pipeline.CondorDAG):
655
  def __init__(self,cp,dax=False,site='local'):
656 657
    self.subfiles=[]
    self.config=cp
658
    self.engine=get_engine_name(cp)
John Douglas Veitch's avatar
John Douglas Veitch committed
659
    self.EngineNode=chooseEngineNode(self.engine)
660
    self.site=site
661
    if cp.has_option('paths','basedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
662
      self.basepath=cp.get('paths','basedir')
663 664
    else:
      self.basepath=os.getcwd()
665
      print('No basepath specified, using current directory: %s'%(self.basepath))
666
    mkdirs(self.basepath)
667
    print("Generating LALInference DAG in {0}".format(self.basepath))
668 669
    if dax:
        os.chdir(self.basepath)
670
    self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
John Douglas Veitch's avatar
John Douglas Veitch committed
671
    mkdirs(self.posteriorpath)
672 673 674
    daglogdir=cp.get('paths','daglogdir')
    mkdirs(daglogdir)
    self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
675
    super(LALInferencePipelineDAG,self).__init__(self.daglogfile,dax=dax)
676
    if cp.has_option('paths','cachedir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
677
      self.cachepath=cp.get('paths','cachedir')
678 679
    else:
      self.cachepath=os.path.join(self.basepath,'caches')
680
    mkdirs(self.cachepath)
681
    if cp.has_option('paths','logdir'):
John Douglas Veitch's avatar
John Douglas Veitch committed
682
      self.logpath=cp.get('paths','logdir')
683 684
    else:
      self.logpath=os.path.join(self.basepath,'log')
685
    mkdirs(self.logpath)
686
    if cp.has_option('analysis','ifos'):
687
      self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
688 689 690
    else:
      self.ifos=['H1','L1','V1']
    self.segments={}
691 692 693 694
    if cp.has_option('datafind','veto-categories'):
      self.veto_categories=cp.get('datafind','veto-categories')
    else: self.veto_categories=[]
    for ifo in self.ifos:
695
      self.segments[ifo]=[]
696 697 698
    self.computeroqweightsnode={}
    self.bayeslinenode={}
    self.bayeswavepsdnode={}
699
    self.dq={}
700
    self.frtypes=ast.literal_eval(cp.get('datafind','types'))
701
    self.channels=ast.literal_eval(cp.get('data','channels'))
John Douglas Veitch's avatar
John Douglas Veitch committed
702
    self.use_available_data=False
703
    self.webdir=cp.get('paths','webdir')
704 705 706 707
    if cp.has_option('analysis','dataseed'):
      self.dataseed=cp.getint('analysis','dataseed')
    else:
      self.dataseed=None
708
    # Set up necessary job files.
709
    self.prenodes={}
710
    self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config,dax=self.is_dax())
711
    self.datafind_job.add_opt('url-type','file')
712 713 714
    # If running on OSG use its datafind server
    if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
        self.datafind_job.add_opt('server','datafind.ligo.org')
715 716 717 718
    if cp.has_option('condor','accounting_group'):
      self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
    if cp.has_option('condor','accounting_group_user'):
      self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
719
    self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
720
    self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,dax=self.is_dax())
721 722
    self.preengine_job.set_grid_site('local')
    self.preengine_job.set_universe('vanilla')
723
    if self.config.getboolean('analysis','roq'):
724 725 726 727 728
      self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath,dax=self.is_dax())
      self.computeroqweights_job.set_grid_site('local')
    if self.config.has_option('condor','bayesline'):
      self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath,dax=self.is_dax())
      self.bayesline_job.set_grid_site('local')
729 730 731 732 733
    self.bayeswavepsd_job={}
    if self.config.has_option('condor','bayeswave'):
      for ifo in self.ifos:
         self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath,dax=self.is_dax())
         self.bayeswavepsd_job[ifo].set_grid_site('local')
734 735 736 737
    # Need to create a job file for each IFO combination
    self.engine_jobs={}
    ifocombos=[]
    for N in range(1,len(self.ifos)+1):
738
        for a in permutations(self.ifos,N):
739 740
            ifocombos.append(a)
    for ifos in ifocombos:
741
        self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine,dax=self.is_dax(), site=site)
742
    self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath,dax=self.is_dax())
743
    self.results_page_job.set_grid_site('local')
744
    self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath,dax=self.is_dax())
745
    self.cotest_results_page_job.set_grid_site('local')
746 747 748 749 750 751 752 753
    if self.engine=='lalinferencemcmc':
        self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath,dax=self.is_dax())
        self.combine_job.set_grid_site('local')
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='mcmc')
        self.merge_job.set_grid_site('local')
    else:
        self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,dax=self.is_dax(),engine='nest')
        self.merge_job.set_grid_site('local')
754
    self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath,dax=self.is_dax())
755
    self.coherence_test_job.set_grid_site('local')
756
    self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath,dax=self.is_dax())
757
    self.gracedbjob.set_grid_site('local')
758
    # Process the input to build list of analyses to do
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
759
    self.events=self.setup_from_inputs()
760

761
    # Sanity checking
John Douglas Veitch's avatar
Working  
John Douglas Veitch committed
762
    if len(self.events)==0:
763
      print('No input events found, please check your config if you expect some events')
764
    self.times=[e.trig_time for e in self.events]
765

766
    # Set up the segments
767
    if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
768
      (mintime,maxtime)=self.get_required_data(self.times)
769 770 771 772
      if not self.config.has_option('input','gps-start-time'):
        self.config.set('input','gps-start-time',str(int(floor(mintime))))
      if not self.config.has_option('input','gps-end-time'):
        self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
773
    self.add_science_segments()
774

775
    # Save the final configuration that is being used
776
    # first to the run dir
777 778
    conffilename=os.path.join(self.basepath,'config.ini')
    with open(conffilename,'wb') as conffile:
John Douglas Veitch's avatar
John Douglas Veitch committed
779
      self.config.write(conffile)
780 781 782 783
    if self.config.has_option('paths','webdir'):
      mkdirs(self.config.get('paths','webdir'))
      with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'wb') as conffile:
        self.config.write(conffile)
784

785
    # Generate the DAG according to the config given
786
    for event in self.events: self.add_full_analysis(event)
787
    self.add_skyarea_followup()
788 789 790
    if self.config.has_option('analysis','upload-to-gracedb'):
      if self.config.getboolean('analysis','upload-to-gracedb'):
        self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
791
    self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
792
    self.set_dag_file(os.path.join(self.basepath,self.dagfilename))
793 794 795 796 797 798 799
    if self.is_dax():
      self.set_dax_file(self.dagfilename)

  def add_skyarea_followup(self):
    # Add skyarea jobs if the executable is given
    # Do one for each results page for now
    if self.config.has_option('condor','skyarea'):