postcoh_finalsink.py 45.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#
# Copyright (C) 2015 Qi Chu
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from collections import deque
import threading
import sys
import StringIO
23
from shutil import copyfile
24
25
26
27
28
29
30
31
import httplib
import math
import subprocess
import re
import time
import numpy
import os
import fcntl
32
import logging
33
34
35
36
37
38
39
40
41
42
43
44
import pdb

# The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
import pygtk
pygtk.require("2.0")
import gobject
gobject.threads_init()
import pygst
pygst.require('0.10')
import gst

try:
45
	from ligo.gracedb.rest import GraceDb
46
except ImportError:
47
	print >>sys.stderr, "warning: gracedb import failed, program will crash if gracedb uploads are attempted"
Qi Chu's avatar
Qi Chu committed
48
	GraceDb = None
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

from glue import iterutils
from glue import segments
from glue.ligolw import ligolw
from glue.ligolw import dbtables
from glue.ligolw import ilwd
from glue.ligolw import table
from glue.ligolw import lsctables
from glue.ligolw import array as ligolw_array
from glue.ligolw import param as ligolw_param
from glue.ligolw import utils as ligolw_utils

from glue.ligolw.utils import ligolw_sqlite
from glue.ligolw.utils import ligolw_add
from glue.ligolw.utils import process as ligolw_process
from glue.ligolw.utils import search_summary as ligolw_search_summary
from glue.ligolw.utils import segments as ligolw_segments

import lal
from lal import LIGOTimeGPS

from gstlal import bottle
from gstlal import reference_psd
from gstlal.pipemodules.postcohtable import postcoh_table_def 
from gstlal.pipemodules.postcohtable import postcohtable
74
from gstlal.pipemodules import pipe_macro
75
76
77
78
79
80

lsctables.LIGOTimeGPS = LIGOTimeGPS

#
# =============================================================================
#
81
#						 glue.ligolw Content Handlers
82
83
84
85
86
87
#
# =============================================================================
#


class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
88
	pass
89
90
91
92
ligolw_array.use_in(LIGOLWContentHandler)
ligolw_param.use_in(LIGOLWContentHandler)
lsctables.use_in(LIGOLWContentHandler)

93
94
#
class SegmentDocument(object):
95
	def __init__(self, ifos, verbose = False):
96

97
		self.get_another = lambda: SegmentDocument(ifos, verbose = verbose)
98

99
100
101
102
		self.filename = None
		#
		# build the XML document
		#
103

104
105
		self.xmldoc = ligolw.Document()
		self.xmldoc.appendChild(ligolw.LIGO_LW())
106

107
		self.process = ligolw_process.register_to_xmldoc(self.xmldoc, "gstlal_inspiral_postcohspiir_online", {})
108
		self.segtype = pipe_macro.ONLINE_SEG_TYPE_NAME
109
		self.seglistdict = {self.segtype: segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in re.findall('..', ifos))}
110
111


112
113
	def set_filename(self, filename):
		self.filename = filename
114

115
116
117
118
119
120
121
	def write_output_file(self, verbose = False):
		assert self.filename is not None
		with ligolw_segments.LigolwSegments(self.xmldoc, self.process) as llwsegments:
			for segtype, one_type_dict in self.seglistdict.items():
				llwsegments.insert_from_segmentlistdict(one_type_dict, name = segtype, comment = "SPIIR postcoh snapshot")
		ligolw_process.set_process_end_time(self.process)
		ligolw_utils.write_filename(self.xmldoc, self.filename, gz = (self.filename or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None)
122
123
124

#
class PostcohDocument(object):
125
	def __init__(self, verbose = False):
126

127
		self.get_another = lambda: PostcohDocument(verbose = verbose)
128

129
		self.filename = None
130

131
132
133
		#
		# build the XML document
		#
134

135
136
		self.xmldoc = ligolw.Document()
		self.xmldoc.appendChild(ligolw.LIGO_LW())
137

138
139
140
		# FIXME: process table, search summary table
		# FIXME: should be implemented as lsctables.PostcohInspiralTable
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(postcoh_table_def.PostcohInspiralTable))
141

142
143
	def set_filename(self, filename):
		self.filename = filename
144

145
146
147
	def write_output_file(self, verbose = False):
		assert self.filename is not None
		ligolw_utils.write_filename(self.xmldoc, self.filename, gz = (self.filename or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None)
148
149
150
151
152



class OnlinePerformer(object):

153
154
155
	def __init__(self, parent_lock):
		# setup bottle routes
		bottle.route("/latency_history.txt")(self.web_get_latency_history)
156

157
158
		self.latency_history = deque(maxlen = 1000)
		self.parent_lock = parent_lock
159

160
161
162
	def web_get_latency_history(self):
		with self.parent_lock:
			# first one in the list is sacrificed for a time stamp
163
164
			for time, latency, cohsnr, cmbchisq in self.latency_history:
				yield "%f %e %f %f\n" % (time, latency, cohsnr, cmbchisq)
165

166
	def update_eye_candy(self, candidate):
167
		latency_val = (float(candidate.end), float(lal.UTCToGPS(time.gmtime()) - candidate.end), candidate.cohsnr, candidate.cmbchisq)
168
		self.latency_history.append(latency_val)
169
170
171


class FAPUpdater(object):
172
173
174
175
176
177
	def __init__(self, path, input_prefix_list, ifos, output_list_string = None, collect_walltime_string = None, verbose = None):
		self.path = path
		self.input_prefix_list = input_prefix_list
		self.ifos = ifos
		self.procs_combine_stats = []
		self.procs_update_fap_stats = []
178
		self.output = []
179
180
181
182
183
184
185
186
187
188
		if output_list_string is not None:
			self.output = output_list_string.split(",")

		self.collect_walltime = []
		self.rm_fnames = []
		if collect_walltime_string is not None:
			times = collect_walltime_string.split(",")
			for itime in times:
				self.collect_walltime.append(int(itime))

189
190
191
192
193
194
195
196
197
		self.combine_duration = 86400*2
		self.max_nstats_perbank = 3
		# FIXME: fixed number of banks per job
		self.max_nbank_perjob = 10
		# set the limit for maximum input string length
		# when the number of banks reaches 140, it will give you a signal 7 error in OPA2
		# FIXME: hard-codede, the first entry in collect_walltime is the longest
		self.max_nstats_formargi = (self.collect_walltime[0]/self.combine_duration + self.max_nstats_perbank + 1)* self.max_nbank_perjob

198
		if self.output and len(self.output) != len(self.collect_walltime):
199
200
201
202
203
204
205
206
207
			raise ValueError("number of input walltimes does match the number of input filenames: %s does not match %s" % (collect_walltime_string, output_list_string))

		self.verbose = verbose


	def wait_last_process_finish(self, procs):
		if len(procs) > 0:
			for proc in procs:
				if proc.poll() is None:
208
209
210
					(stdoutdata, stderrdata) = proc.communicate()
					if proc.returncode != 0:
						print >> sys.stderr, "last process return code", proc.returncode, stderrdata
211
212
213
214
215
216
217
218
		
		# delete all update processes when they are finished
		del procs[:]

	def get_fnames(self, keyword):
		# both update_fap_stats and combine_stats need to access latest cleaned
		# uped stats files
		# make sure need-to-remove files have been removed
219

220
221
222
		self.wait_last_process_finish(self.procs_combine_stats)
		# remove files that have been combined from last process
		try:
223
224
			map(lambda x: os.remove(x), self.rm_fnames)
			self.rm_fnames = []
225
		except:
226
227
228
229
230
			print >> sys.stderr, "remove files failed, rm_fnames %s" % ', '.join(self.rm_fnames)
			return None

		ls_fnames = os.listdir(str(self.path))
		grep_fnames = [fname for fname in ls_fnames if keyword in fname]
231
		# remove file names that contain "next" which are temporary files
232
		valid_fnames = [one_fname for one_fname in grep_fnames if not re.search("next", one_fname)]
233
234
		return valid_fnames

235
236
237
238
239
240
241
242
243
	def get_valid_bankstats(self, ls_fnames, boundary):
		valid_fnames = []
		for ifname in ls_fnames:
			ifname_split = ifname.split("_")
			# FIXME: hard coded the stats name e.g. bank16_stats_1187008882_1800.xml.gz
			if len(ifname_split)> 1 and ifname[-4:] != "next" and ifname_split[-2].isdigit() and int(ifname_split[-2]) > boundary:
				valid_fnames.append("%s/%s" % (self.path, ifname))
		return valid_fnames

244
	def update_fap_stats(self, cur_buftime):
245
246

		logging.info("update fap %d" % cur_buftime)
247
248
249
250
251
		self.wait_last_process_finish(self.procs_update_fap_stats)
		# list all the files in the path
		#nprefix = len(self.input_prefix_list[0].split("_"))
		# FIXME: hard-coded keyword, assuming name name e.g. bank16_stats_1187008882_1800.xml.gz
		ls_fnames = self.get_fnames("stats")
252
		if ls_fnames is None or len(ls_fnames) == 0:
253
254
255
256
257
			return

		for (i, collect_walltime) in enumerate(self.collect_walltime):
			boundary = cur_buftime - collect_walltime
			# find the files within the collection time
258
259
260
261
262
263
264
265
266
			valid_fnames = self.get_valid_bankstats(ls_fnames, boundary)

			# reach the limit for maximum input string length
			# when the number of banks reaches 140, it will give you a signal 7 error in OPA2
			while len(valid_fnames) > self.max_nstats_formargi:
				logging.info("update fap: %d stats files for marignalization, over the input string length limit, combining" % len(valid_fnames))
				self.combine_stats()
				ls_fnames = self.get_fnames("stats")
				valid_fnames = self.get_valid_bankstats(ls_fnames, boundary)
267
268
269
270
271
272
273

			if len(valid_fnames) > 0:
				input_for_cmd = ",".join(valid_fnames)
				# execute the cmd in a different process
				proc = self.call_calcfap(self.output[i], input_for_cmd, self.ifos, collect_walltime, verbose = self.verbose)
				self.procs_update_fap_stats.append(proc)

274
	def call_calcfap(self, fout, fin, ifos, walltime, update_pdf = True, verbose = False):
275
276
277
278
279
280
		cmd = []
		cmd += ["gstlal_cohfar_calc_fap"]
		cmd += ["--input", fin]
		cmd += ["--input-format", "stats"]
		cmd += ["--output", fout]
		cmd += ["--ifos", ifos]
281
282
283
		if update_pdf:
			cmd += ["--update-pdf"]
		logging.info("%s" % cmd)
284
		proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
285
286
287
		return proc

	# combine stats every day
288
289
290
	def combine_stats(self):

		logging.info("combine_stats")
291
292
		# max number of files to be combined
		ls_fnames = self.get_fnames("bank")
293
		if ls_fnames is None or len(ls_fnames) == 0:
294
295
296
			return
	
		# FIXME: decode information assuming fixed stats name e.g. bank16_stats_1187008882_1800.xml.gz
297
		# decode to {'16', ['bank16_stats_1187008882_1800.xml.gz', ..]}
298
299
300
301
302
303
304
305
306
307
308
		stats_dict = {}
		for ifname in ls_fnames:
			this_bankid = ifname.split('_')[0][4:]
			stats_dict.setdefault(this_bankid, []).append(ifname)
	
		if '' in stats_dict.keys():
			del stats_dict['']
	
		for bankid,bank_fnames in stats_dict.items():
			collected_fnames = []
			for one_bank_fname in bank_fnames: 
309
				this_walltime = int(one_bank_fname.split('.')[-3].split('_')[-1])
Qi Chu's avatar
Qi Chu committed
310
				collected_walltimes = list(map(lambda x: int(os.path.split(x)[-1].split('_')[-1].split('.')[0]), collected_fnames))
311
				total_collected_walltime = sum(collected_walltimes)
312
				if this_walltime >= self.combine_duration:
313
					continue
314
				elif len(collected_fnames) >= self.max_nstats_perbank or total_collected_walltime >= self.combine_duration:
Qi Chu's avatar
Qi Chu committed
315
					start_banktime = int(os.path.split(collected_fnames[0])[-1].split('_')[-2])
316
317
					fout = "%s/bank%s_stats_%d_%d.xml.gz" % (self.path, bankid, start_banktime, total_collected_walltime)

318
					proc = self.call_calcfap(fout, ','.join(collected_fnames), self.ifos, total_collected_walltime, update_pdf = False, verbose = self.verbose)
319
320
321
322
323
324
325
326
					self.procs_combine_stats.append(proc)
					# mark to remove collected_fnames
					for frm in collected_fnames:
						self.rm_fnames.append(frm)
					collected_fnames = []
					collected_fnames.append("%s/%s" % (self.path, one_bank_fname))
				else:
					collected_fnames.append("%s/%s" % (self.path, one_bank_fname))
327
328

class FinalSink(object):
329
	def __init__(self, channel_dict, process_params, pipeline, need_online_perform, path, output_prefix, output_name, far_factor, cluster_window = 0.5, snapshot_interval = None, fapupdater_interval = None, cohfar_accumbackground_output_prefix = None, cohfar_accumbackground_output_name = None, fapupdater_output_fname = None, fapupdater_collect_walltime_string = None, singlefar_veto_thresh = 0.01, chisq_ratio_veto_thresh = 8.0, gracedb_far_threshold = None, gracedb_group = "Test", gracedb_search = "LowMass", gracedb_pipeline = "spiir", gracedb_service_url = "https://gracedb.ligo.org/api/", gracedb_offline_annote = None, output_skymap = 0, superevent_thresh = 3.8e-7, opa_cohsnr_thresh = 8, verbose = False):
330
331
332
333
334
335
336
337
		#
		# initialize
		#
		self.lock = threading.Lock()
		self.verbose = verbose
		self.pipeline = pipeline
		self.is_first_buf = True
		self.is_first_event = True
338
339
		self.channel_dict = channel_dict
		self.ifos = lsctables.ifos_from_instrument_set(channel_dict.keys()).replace(",", "") # format: "H1L1V1"
340
341
342
343
344
345
346

		# cluster parameters
		self.cluster_window = cluster_window
		self.candidate = None
		self.cluster_boundary = None
		self.need_candidate_check = False
		self.cur_event_table = lsctables.New(postcoh_table_def.PostcohInspiralTable)
347
		self.chisq_ratio_thresh = chisq_ratio_veto_thresh
348
		self.superevent_thresh = superevent_thresh
349
350
351
352
353
354
355
		# FIXME: hard-coded the opa_thresh that all triggers less than this thresh will be tested
		# if their cohsnr SNRs are smaller than the given opa_cohsnr_snr
		# if smaller, no uploading.
		# opa_thresh is chosen as 1e-6 as to not launch lalinference jobs
		self.opa_thresh = 1e-6
		self.opa_cohsnr_thresh = opa_cohsnr_thresh

356
		self.nevent_clustered = 0
357
		self.singlefar_veto_thresh = singlefar_veto_thresh
358
359
360
361
362
363
364
365

		# gracedb parameters
		self.far_factor = far_factor
		self.gracedb_far_threshold = gracedb_far_threshold
		self.gracedb_group = gracedb_group
		self.gracedb_search = gracedb_search
		self.gracedb_pipeline = gracedb_pipeline
		self.gracedb_service_url = gracedb_service_url
366
367
368
369
		if gracedb_offline_annote:
			self.gracedb_offline_annote = True
		else:
			self.gracedb_offline_annote = False
370
371
372
373
374
		if GraceDb:
			self.gracedb_client = GraceDb(gracedb_service_url)

		# keep a record of segments and is snapshotted
		# our segments is determined by if incoming buf is GAP
375
		self.seg_document = SegmentDocument(self.ifos)
376
377
378
379
380
381
382
383
384
385
386

		# the postcoh doc stores clustered postcoh triggers and is snapshotted
		self.postcoh_document = PostcohDocument()
		self.postcoh_table = postcoh_table_def.PostcohInspiralTable.get_table(self.postcoh_document.xmldoc)

		# deprecated: save the last 30s zerolags to help check the significance of current candidate
		# hard-coded to be 30s to be consistent with iDQ range
		# self.lookback_event_table = lsctables.New(postcoh_table_def.PostcohInspiralTable)
		# self.lookback_window = 30
		# self.lookback_boundary = None
		# coinc doc to be uploaded to gracedb
387
		self.coincs_document = CoincsDocFromPostcoh(path, process_params, channel_dict)
388
389
390
391
392
393
		# get the values needed for skymap uploads accompaning the trigger uploads
		for param in process_params:
			if param == 'cuda_postcoh_detrsp_fname':
				self.cuda_postcoh_detrsp_fname = process_params[param]
			if param == 'cuda_postcoh_output_skymap':
				self.cuda_postcoh_output_skymap = process_params[param]
394
395
396
397
398
399
400
401
402
		# snapshot parameters
		self.path = path
		self.output_prefix = output_prefix
		self.output_name = output_name
		self.snapshot_interval = snapshot_interval
		self.thread_snapshot = None
		self.thread_snapshot_segment = None
		self.t_snapshot_start = None
		self.snapshot_duration = None
403
		# set logging to report status
404
		self.log_fname = "%s/finalsink_debug_log" % (path)
405
		logging.basicConfig(filename=self.log_fname, format='%(asctime)s %(message)s', level = logging.DEBUG)
406
407
408
409
410
411

		# background updater
		self.total_duration = None
		self.t_start = None
		self.t_fapupdater_start = None
		self.fapupdater_interval = fapupdater_interval
412
		self.fapupdater = FAPUpdater(path = path, input_prefix_list = cohfar_accumbackground_output_prefix, output_list_string = fapupdater_output_fname, collect_walltime_string = fapupdater_collect_walltime_string, ifos = self.ifos, verbose = self.verbose)
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428

		# online information performer
		self.need_online_perform = need_online_perform
		self.onperformer = OnlinePerformer(parent_lock = self.lock)

		# trigger control
		self.trigger_control_doc = "trigger_control.txt"
		if not os.path.exists(self.trigger_control_doc):
			file(self.trigger_control_doc, 'w').close()
		self.last_trigger = []
		self.last_submitted_trigger = []
		self.last_trigger.append((0, 1))
		self.last_submitted_trigger.append((0, 1))

		# skymap
		self.output_skymap = output_skymap
429
		self.thread_upload_skymap = None
430

431
432
433
434
435
	def __pass_test(self, candidate):
		if self.candidate.far <= 0.0:
			return False

		# just submit it if is a low-significance trigger
436
		if self.candidate.far < self.gracedb_far_threshold and self.candidate.far > self.superevent_thresh:
437
438
			return True

439
440
441
442
		if self.candidate.far < self.opa_thresh and self.candidate.cohsnr < self.opa_cohsnr_thresh:
			# print "suppressed", self.candidate.cohsnr, self.candidate.far
			return False

443
		# FIXME: any two of the sngl fars need to be < singlefar_veto_thresh 
444
		# single far veto for high-significance trigger
445
446
                # add an upper limit for the chisq for uploaded event compared to the last line, hardcoded to have uploaded event with chisq < 3 
                ifo_active=[self.candidate.chisq_H!=0 and self.candidate.chisq_H < 3,self.candidate.chisq_L!=0 and self.candidate.chisq_L < 3,self.candidate.chisq_V!=0 and self.candidate.chisq_V < 3]
447
		ifo_fars_ok=[self.candidate.far_h < self.singlefar_veto_thresh and self.candidate.far_h > 0., self.candidate.far_l < self.singlefar_veto_thresh and self.candidate.far_l > 0., self.candidate.far_v < self.singlefar_veto_thresh and self.candidate.far_v > 0. ]
448
		ifo_chisqs=[self.candidate.chisq_H,self.candidate.chisq_L,self.candidate.chisq_V]
449
		if self.candidate.far < self.superevent_thresh:
450
451
452
			return sum([i for (i,v) in zip(ifo_fars_ok,ifo_active) if v])>=2 and all((lambda x: [i1/i2 < self.chisq_ratio_thresh for i1 in x for i2 in x])([i for (i,v) in zip(ifo_chisqs,ifo_active) if v]))


453
454
455
	def appsink_new_buffer(self, elem):
		with self.lock:
			buf = elem.emit("pull-buffer")
Qi Chu's avatar
Qi Chu committed
456
			if buf.flag_is_set(gst.BUFFER_FLAG_GAP):
457
				logging.info("buf gap at %d" % buf.timestamp)
Qi Chu's avatar
Qi Chu committed
458
				return
459
460
461
462
			buf_timestamp = LIGOTimeGPS(0, buf.timestamp)
			newevents = postcohtable.GSTLALPostcohInspiral.from_buffer(buf)
			self.need_candidate_check = False

463
464
465
			if len(newevents) == 0:
				return

466
467
468
469
470
471
472
473
474
475
476
477
478
			# NOTE: the first entry is used to add to the segments, not a really event
			participating_ifos = re.findall('..', newevents[0].ifos)
			buf_seg = segments.segment(buf_timestamp, buf_timestamp + LIGOTimeGPS(0, buf.duration))
			for segtype, one_type_dict in self.seg_document.seglistdict.items():
				for ifo in one_type_dict.keys():
					if ifo in participating_ifos:
						this_seglist = one_type_dict[ifo]
						this_seglist = this_seglist + segments.segmentlist([buf_seg])
						this_seglist.coalesce()
						one_type_dict[ifo] = this_seglist

			# remove the first event entry
			newevents = newevents[1:]
479
			nevent = len(newevents)
480

481
482
483
484
485
486
487
488
489
490
491
			# initialization
			if self.is_first_buf:
				self.t_snapshot_start = buf_timestamp
				self.t_fapupdater_start = buf_timestamp
				self.t_start = buf_timestamp
				self.is_first_buf = False

			if self.is_first_event and nevent > 0:
				self.cluster_boundary = buf_timestamp + self.cluster_window
				self.is_first_event = False

492
			# extend newevents to cur_event_table
493
494
495
496
497
498
			self.cur_event_table.extend(newevents)

			if self.cluster_window == 0:
				self.postcoh_table.extend(newevents)
				del self.cur_event_table[:]

499
			# NOTE: only consider clustered trigger for uploading to gracedb 
500
			# check if the newevents is over boundary
501
502
			# this loop will exit when the cluster_boundary is incremented to be > the buf_timestamp, see plot in self.cluster()

503
504
505
506
507
508
509
			while self.cluster_window > 0 and self.cluster_boundary and buf_timestamp > self.cluster_boundary:
				self.cluster(self.cluster_window)

				if self.need_candidate_check:
					self.nevent_clustered += 1
					self.__set_far(self.candidate)
					self.postcoh_table.append(self.candidate)
510
					if self.gracedb_far_threshold and self.__pass_test(self.candidate):
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
						self.__do_gracedb_alert(self.candidate)
					if self.need_online_perform:
						self.onperformer.update_eye_candy(self.candidate)
					self.candidate = None
					self.need_candidate_check = False

			# dump zerolag candidates when interval is reached
			self.snapshot_duration = buf_timestamp - self.t_snapshot_start
			if self.snapshot_interval is not None and self.snapshot_duration >= self.snapshot_interval:
				snapshot_filename = self.get_output_filename(self.output_prefix, self.output_name, self.t_snapshot_start, self.snapshot_duration)
				self.snapshot_output_file(snapshot_filename)
				self.snapshot_segment_file(self.t_snapshot_start, self.snapshot_duration)
				self.t_snapshot_start = buf_timestamp
				self.nevent_clustered = 0
				# also combine background_stats files so we don't end up with too many files
				self.fapupdater.combine_stats()

			# do calcfap when interval is reached
			fapupdater_duration = buf_timestamp - self.t_fapupdater_start
			if self.fapupdater_interval is not None and fapupdater_duration >= self.fapupdater_interval:
				self.fapupdater.update_fap_stats(buf_timestamp)
				self.t_fapupdater_start = buf_timestamp


	def __select_head_event(self):
		# last event should have the smallest timestamp
		#assert len(self.cur_event_table) != 0
		if len(self.cur_event_table) == 0:
			return None

		head_event = self.cur_event_table[0]
		for row in self.cur_event_table:
			if row.end < head_event.end:
				head_event = row	
		return head_event

	def cluster(self, cluster_window):
548
549
550
551
552
553
554
555
556
557
558
559
560
		# send candidate to be gracedb checked only when:
		# time ->->->->
		#                     |buf_timestamp
		#          ___________(cur_table)
		#                |boundary
		#           |candidate to be gracedb checked = peak of cur_table < boundary
		#                  |candidate remain = peak of cur_table > boundary
		# afterwards:
		#                     |buf_timestamp
		#                 ____(cur_table cleaned)
		#                           |boundary incremented

		# always choose the head event to test its end against boundary
561
562
563
		if self.candidate is None:
			self.candidate = self.__select_head_event()

564
		# make sure the candidate is within the boundary
565
566
		if self.candidate is None or self.candidate.end > self.cluster_boundary:
			self.cluster_boundary = self.cluster_boundary + cluster_window
567
			self.candidate = None # so we can reselect a candidate next time
568
569
570
571
572
573
574
575
			return
		# the first event in cur_event_table
		peak_event = self.__select_head_event()
		# find the max cohsnr event within the boundary of cur_event_table
		for row in self.cur_event_table:
			if row.end <= self.cluster_boundary and row.cohsnr > peak_event.cohsnr:
				peak_event = row

576
		# cur_table is empty and we do have a candidate, so need to check the candidate
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
		if peak_event is None:
			# no event within the boundary, candidate is the peak, update boundary
			self.cluster_boundary = self.cluster_boundary + cluster_window
			self.need_candidate_check = True
			return

		if peak_event.end <= self.cluster_boundary and peak_event.cohsnr > self.candidate.cohsnr:
			self.candidate = peak_event
			iterutils.inplace_filter(lambda row: row.end > self.cluster_boundary, self.cur_event_table)
			# update boundary
			self.cluster_boundary = self.candidate.end + cluster_window
			self.need_candidate_check = False
		else:
			iterutils.inplace_filter(lambda row: row.end > self.cluster_boundary, self.cur_event_table)
			# update boundary
			self.cluster_boundary = self.cluster_boundary + cluster_window
			self.need_candidate_check = True

	def __set_far(self, candidate):
		candidate.far = (max(candidate.far_2h, candidate.far_1d, candidate.far_1w)) * self.far_factor
		candidate.far_h = (max(candidate.far_h_2h, candidate.far_h_1d, candidate.far_h_1w)) * self.far_factor
		candidate.far_l = (max(candidate.far_l_2h, candidate.far_l_1d, candidate.far_l_1w)) * self.far_factor
		candidate.far_v = (max(candidate.far_v_2h, candidate.far_v_1d, candidate.far_v_1w)) * self.far_factor

	# def __lookback_far(self, candidate):
		# FIXME: hard-code to check event that's < 5e-7
		# if candidate.far > 5e-7:
		#	 return
		# else:
		#	 count_events = sum((lookback_event.far < 1e-4) for lookback_event in self.lookback_event_table)
		#	 if count_events > 1:
		#		 # FAR estimation is not valide for this period, increase the FAR
		#		 # FIXME: should derive FAR from count_events
		#		  candidate.far = 9.99e-6

			# all_snr_H = self.lookback_event_table.getColumnByName('snglsnr_H')
			# all_snr_L = self.lookback_event_table.getColumnByName('snglsnr_L')
			# all_snr_V = self.lookback_event_table.getColumnByName('snglsnr_V')
			# all_chisq_H = self.lookback_event_table.getColumnByName('chisq_H')
			# all_chisq_L = self.lookback_event_table.getColumnByName('chisq_L')
			# all_chisq_V = self.lookback_event_table.getColumnByName('chisq_V')
			# count_better_H = sum((snr > candidate.snglsnr_H && chisq < candidate.chisq_H) for (snr, chisq) in zip(all_snr_H, allchisq_H)) 
			# count_better_L = sum((snr > candidate.snglsnr_L && chisq < candidate.chisq_L) for (snr, chisq) in zip(all_snr_L, allchisq_L)) 
			# count_better_V = sum((snr > candidate.snglsnr_V && chisq < candidate.chisq_V) for (snr, chisq) in zip(all_snr_V, allchisq_V)) 
			# if count_better_H > 0 or count_better_L > 0 or count_better_V > 0:
			#	 candidate.far = 9.99e-6
	
	
	def __need_trigger_control(self, trigger):
		# do trigger control
		# FIXME: implement a sql solution for node communication ?

		with open(self.trigger_control_doc, "r") as f:
		  content = f.read().splitlines()
		
		is_submitted_idx = -1
		if len(content) > 0:
		  (last_time, last_far, is_submitted) = content[-1].split(",")
		  last_time = float(last_time)
		  last_far = float(last_far)
		  while is_submitted == "0" and len(content) + is_submitted_idx > 0:
			is_submitted_idx = is_submitted_idx - 1;
			(last_time, last_far, is_submitted) = content[is_submitted_idx].split(",")
		  last_time = float(last_time)
		  last_far = float(last_far)
		else:
		  last_time = self.last_trigger[-1][0]
		  last_far = self.last_trigger[-1][1]

		last_submitted_time = last_time
		last_submitted_far = last_far

		trigger_is_submitted = 0

		# suppress the trigger 
		# if it is not one order of magnitude more significant than the last trigger 
		# or if it not more significant the last submitted trigger
654
		# FIXME: what if there are two adjacent significant events
655
		if ((abs(float(trigger.end) - last_time) < 50 and abs(trigger.far/last_far) > 0.5)) or (abs(float(trigger.end) - float(last_submitted_time)) < 100 and trigger.far > last_submitted_far*0.5) :
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
			print >> sys.stderr, "trigger controled, time %f, FAR %f, last_far %f, last_submitted time %f, last_submitted far %f" % (float(trigger.end), trigger.far, last_far, last_submitted_time, last_submitted_far)
			self.last_trigger.append((trigger.end, trigger.far))
			line = "%f,%e,%d\n" % (float(trigger.end), trigger.far, trigger_is_submitted)
			with open(self.trigger_control_doc, "a") as f:
			  f.write(line)
			return True
		
		print >> sys.stderr, "trigger passed, time %f, FAR %f, last_far %f, last_submitted time %f, last_submitted_far %f" % (float(trigger.end), trigger.far, last_far, last_submitted_time, last_submitted_far)

		trigger_is_submitted = 1
		#self.last_trigger.append((trigger.end, trigger.far))
		#self.last_submitted_trigger.append((trigger.end, trigger.far))
		line = "%f,%e,%d\n" % (float(trigger.end), trigger.far, trigger_is_submitted)
		with open(self.trigger_control_doc, "a") as f:
		  f.write(line)
	
		return False

	def __do_gracedb_alert(self, trigger):

		if self.__need_trigger_control(trigger):
			return
			
		# do alerts
		gracedb_ids = []
		common_messages = []

		self.coincs_document.assemble_tables(trigger)
		xmldoc = self.coincs_document.xmldoc
685
		filename = "%s_%s_%d_%d.xml" % (trigger.ifos, trigger.end_time, trigger.bankid, trigger.tmplt_idx)
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
		#
		# construct message and send to gracedb.
		# we go through the intermediate step of
		# first writing the document into a string
		# buffer incase there is some safety in
		# doing so in the event of a malformed
		# document;  instead of writing directly
		# into gracedb's input pipe and crashing
		# part way through.
		#
		message = StringIO.StringIO()
		#message2 = file(filename, "w")
		#pdb.set_trace()
		ligolw_utils.write_fileobj(xmldoc, message, gz = False)
		ligolw_utils.write_filename(xmldoc, filename, gz = False, trap_signals = None)
		xmldoc.unlink()
	
		print >>sys.stderr, "sending %s to gracedb ..." % filename
		gracedb_upload_itrial = 1
		# FIXME: make this optional from cmd line?
		while gracedb_upload_itrial < 10:
			try:
708
				resp = self.gracedb_client.createEvent(self.gracedb_group, self.gracedb_pipeline, filename, filecontents = message.getvalue(), search = self.gracedb_search, offline = self.gracedb_offline_annote)
709
710
711
712
713
714
715
				resp_json = resp.json()
				if resp.status != httplib.CREATED:
					print >>sys.stderr, "gracedb upload of %s failed" % filename
				else:
						print >>sys.stderr, "event assigned grace ID %s" % resp_json["graceid"]
						gracedb_ids.append(resp_json["graceid"])
						break
716
717
			except Exception as e:
                                print(e)
718
719
720
721
722
723
724
725
726
727
				gracedb_upload_itrial += 1
		#else:
		#  proc = subprocess.Popen(("/bin/cp", "/dev/stdin", filename), stdin = subprocess.PIPE)
		#  proc.stdin.write(message.getvalue())
		#  proc.stdin.flush()
		#  proc.stdin.close()
		message.close()

		gracedb_upload_itrial = 1
		# write a log to explain far
728
		#for gid in gracedb_ids:
729
730
731
732
733
734
735
736

		# delete the xmldoc and get a new empty one for next upload
		coincs_document = self.coincs_document.get_another()
		del self.coincs_document
		self.coincs_document = coincs_document
		if not gracedb_ids:
			print "gracedb upload of %s failed completely" % filename
			return
737
		gid = gracedb_ids[0]
738
739
740
		log_message = "Optimal ra and dec from this coherent pipeline: (%f, %f) in degrees" % (trigger.ra, trigger.dec)
		while gracedb_upload_itrial < 10:
			try:
741
				resp = self.gracedb_client.writeLog(gid, log_message , filename = None, tagname = "analyst_comments")
742
743
744
745
746
747
748
				if resp.status != httplib.CREATED:
						print >>sys.stderr, "gracedb upload of log failed"
				else:
					break
			except:
				gracedb_upload_itrial += 1

749
		# upload skymap if skymap_fname of the triggers is not empty
750
751
		#if len(trigger.skymap_fname) > 0:
		if False:
752
753
754
755
756
757
758
759
760
761
			# make sure the last round of output dumping is finished 
			if self.thread_upload_skymap is not None and self.thread_upload_skymap.isAlive():
				self.thread_upload_skymap.join()
	
			# free the last used memory
			del self.thread_upload_skymap
			# start new thread
			self.thread_upload_skymap = threading.Thread(target = upload_skymap, args =(self.gracedb_client, gid, trigger.ifos, trigger.skymap_fname, trigger.end_time, self.output_skymap, self.cuda_postcoh_detrsp_fname, self.verbose, ))
			self.thread_upload_skymap.start()
		
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791

		if self.verbose:
			print >>sys.stderr, "retrieving PSDs from whiteners and generating psd.xml.gz ..."
		psddict = {}
		#FIXME: for more complex detector names
		instruments = re.findall('..', trigger.ifos)
		for instrument in instruments:
			elem = self.pipeline.get_by_name("lal_whiten_%s" % instrument)
			data = numpy.array(elem.get_property("mean-psd"))
			psddict[instrument] = lal.CreateREAL8FrequencySeries(
				name = "PSD",
				epoch = LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0),
				f0 = 0.0,
				deltaF = elem.get_property("delta-f"),
				sampleUnits = lal.Unit("s strain^2"),	# FIXME:  don't hard-code this
				length = len(data)
			)
			psddict[instrument].data.data = data
		fobj = StringIO.StringIO()
		reference_psd.write_psd_fileobj(fobj, psddict, gz = True)
		common_messages.append(("strain spectral densities", "psd.xml.gz", "psd", fobj.getvalue()))


		#
		# do PSD and ranking data file uploads
		#

		while common_messages:
			message, filename, tag, contents = common_messages.pop()
			gracedb_upload_itrial = 1
792
			gid = gracedb_ids[0]
793
794
			while gracedb_upload_itrial < 10:
				try:
795
					resp = self.gracedb_client.writeLog(gid, message, filename = filename, filecontents = contents, tagname = tag)
796
797
					resp_json = resp.json()
					if resp.status != httplib.CREATED:
798
						print >>sys.stderr, "gracedb upload of %s for ID %s failed" % (filename, gid)
799
800
801
802
803
804
805
806
807
808
809
810
811
812
					else:
						break
				except:
					gracedb_upload_itrial += 1

	def get_output_filename(self, output_prefix, output_name, t_snapshot_start, snapshot_duration):
		if output_prefix is not None:
			fname = "%s_%d_%d.xml.gz" % (output_prefix, t_snapshot_start, snapshot_duration)
			return fname
		assert output_name is not None
		return output_name

	def snapshot_segment_file(self, t_snapshot_start, duration, verbose = False):
		filename = "%s/%s_SEGMENTS_%d_%d.xml.gz" % (self.path, self.ifos, t_snapshot_start, duration)
813
		logging.info("snapshotting %s" % filename)
814
815
816
817
818
819
820
		# make sure the last round of output dumping is finished 
		if self.thread_snapshot_segment is not None and self.thread_snapshot_segment.isAlive():
			self.thread_snapshot_segment.join()
	
		# free the last used memory
		del self.thread_snapshot_segment
		# copy the memory
821
		seg_document_cpy = self.seg_document
822
823
824
825
826
827
828
829
830
831
832
833
834
835
		seg_document_cpy.set_filename(filename)
		# free thread context
		# start new thread
		self.thread_snapshot_segment = threading.Thread(target = seg_document_cpy.write_output_file, args =(seg_document_cpy, ))
		self.thread_snapshot_segment.start()

		# set a new document for seg_document
		seg_document = self.seg_document.get_another()
		# remember to delete the old seg doc
		del self.seg_document
		self.seg_document = seg_document

	def snapshot_output_file(self, filename, verbose = False):
		# make sure the last round of output dumping is finished 
836
		logging.info("snapshotting %s" % filename)
837
838
839
		if self.thread_snapshot is not None and self.thread_snapshot.isAlive():
			self.thread_snapshot.join()
	
840
841
		# copy the memory
		postcoh_document_cpy = self.postcoh_document
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
		postcoh_document_cpy.set_filename(filename)
		# free thread context
		del self.thread_snapshot
		self.thread_snapshot = threading.Thread(target = postcoh_document_cpy.write_output_file, args =(postcoh_document_cpy, ))
		self.thread_snapshot.start()

		# set a new document for postcoh_document
		postcoh_document = self.postcoh_document.get_another()
		# remember to delete the old postcoh doc
		del self.postcoh_table
		del self.postcoh_document
		self.postcoh_document = postcoh_document
		self.postcoh_table = postcoh_table_def.PostcohInspiralTable.get_table(self.postcoh_document.xmldoc)

	def __wait_internal_process_finish(self):
		if self.thread_snapshot is not None and self.thread_snapshot.isAlive():
			self.thread_snapshot.join()

		if self.thread_snapshot_segment is not None and self.thread_snapshot_segment.isAlive():
			self.thread_snapshot_segment.join()
	
863
864
865
		if self.thread_upload_skymap is not None and self.thread_upload_skymap.isAlive():
			self.thread_upload_skymap.join()

866
867
868
869
870
871
872
873
874
875
876
877
		self.fapupdater.wait_last_process_finish(self.fapupdater.procs_update_fap_stats)
		self.fapupdater.wait_last_process_finish(self.fapupdater.procs_combine_stats)

	def write_output_file(self, filename = None, verbose = False):
		self.__wait_internal_process_finish()
		self.__write_output_file(filename, verbose = verbose)

	def __write_output_file(self, filename = None, verbose = False):
		if filename is not None:
			self.postcoh_document.set_filename(filename)
		self.postcoh_document.write_output_file(verbose = verbose)
		# FIXME: hard-coded segment filename
878
879
880
881
		if self.t_snapshot_start:
			seg_filename = "%s/%s_SEGMENTS_%d_%d.xml.gz" % (self.path, self.ifos, self.t_snapshot_start, self.snapshot_duration)
		else:
			seg_filename = "%s/%s_SEGMENTS.xml.gz" % (self.path, self.ifos)
882
883
		self.seg_document.set_filename(seg_filename)
		self.seg_document.write_output_file(verbose = verbose)
884
885

class CoincsDocFromPostcoh(object):
886
887
	sngl_inspiral_columns = ("process_id", "ifo", "end_time", "end_time_ns", "eff_distance", "coa_phase", "mass1", "mass2", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "sigmasq", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "event_id", "Gamma0", "Gamma1")

888
	def __init__(self, url, process_params, channel_dict, comment = None, verbose = False):
889
890
891
		#
		# build the XML document
		#
892
		self.get_another = lambda: CoincsDocFromPostcoh(url = url, process_params = process_params, channel_dict = channel_dict, comment = comment, verbose = verbose)
893
	
894
		self.channel_dict = channel_dict
895
896
897
		self.url = url
		self.xmldoc = ligolw.Document()
		self.xmldoc.appendChild(ligolw.LIGO_LW())
898
		self.process = ligolw_process.register_to_xmldoc(self.xmldoc, u"gstlal_inspiral_postcohspiir_online", process_params, comment = comment, ifos = channel_dict.keys())
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
	
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SnglInspiralTable, columns = self.sngl_inspiral_columns))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincDefTable))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincTable))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincMapTable))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.TimeSlideTable))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincInspiralTable))
		self.xmldoc.childNodes[-1].appendChild(lsctables.New(postcoh_table_def.PostcohInspiralTable))

	# path here is the job id
	def assemble_tables(self, trigger):
		self.assemble_snglinspiral_table(trigger)
		coinc_def_table = lsctables.CoincDefTable.get_table(self.xmldoc)
		coinc_table = lsctables.CoincTable.get_table(self.xmldoc)
		coinc_inspiral_table = lsctables.CoincInspiralTable.get_table(self.xmldoc)
		postcoh_table = postcoh_table_def.PostcohInspiralTable.get_table(self.xmldoc)

		row = coinc_def_table.RowType()
		row.search = "inspiral"
		row.description = "sngl_inspiral<-->sngl_inspiral coincidences"
		row.coinc_def_id = "coinc_definer:coinc_def_id:3"
		row.search_coinc_type = 0
		coinc_def_table.append(row)

		row = coinc_table.RowType()
		row.coinc_event_id = "coinc_event:coinc_event_id:1"
		row.instruments = ','.join(re.findall('..',trigger.ifos)) #FIXME: for more complex detector names
		row.nevents = 2
		row.process_id = self.process.process_id
		row.coinc_def_id = "coinc_definer:coinc_def_id:3"
		row.time_slide_id = "time_slide:time_slide_id:6"
		row.likelihood = 0
		coinc_table.append(row)
		
		row = coinc_inspiral_table.RowType()
		row.false_alarm_rate = trigger.fap
		row.mchirp = trigger.mchirp 
		row.minimum_duration = trigger.template_duration
		row.mass = trigger.mtotal
		row.end_time = trigger.end_time
		row.coinc_event_id = "coinc_event:coinc_event_id:1"
940
941
942
943
944
945
946
		#Manoj: add Network SNR to sngl_inspiral table instead of Coherent SNR. Change is reflected on gracedb event page.
		#row.snr = trigger.cohsnr
		network_snr2 = 0
                for ifo in re.findall('..', trigger.ifos):
                        network_snr2 += (getattr(trigger, "snglsnr_%s" % ifo[0]))**2
                network_snr = numpy.sqrt(network_snr2)  ##network_snr = sqrt(H**2 + L**2 + V**2)
                row.snr = network_snr
947
948
949
950
951
952
953
954
955
956
957
958
959
		row.end_time_ns = trigger.end_time_ns
		row.combined_far = trigger.far
		row.ifos = ','.join(re.findall('..',trigger.ifos)) #FIXME: for more complex detector names
		coinc_inspiral_table.append(row)

		self.assemble_coinc_map_table(trigger)
		self.assemble_time_slide_table(trigger)

		postcoh_table.append(trigger)

	def assemble_coinc_map_table(self, trigger):

		coinc_map_table = lsctables.CoincMapTable.get_table(self.xmldoc)
960
		iifo = 0
961
		# FIXME: hard-coded ifo length
962
		for ifo in re.findall('..', trigger.ifos):
963
964
965
966
967
			row = coinc_map_table.RowType()
			row.event_id = "sngl_inspiral:event_id:%d" % iifo
			row.table_name = "sngl_inspiral"
			row.coinc_event_id = "coinc_event:coinc_event_id:1"
			coinc_map_table.append(row)
968
			iifo += 1
969
970
971
972
973

	def assemble_time_slide_table(self, trigger):

		time_slide_table = lsctables.TimeSlideTable.get_table(self.xmldoc)
		# FIXME: hard-coded ifo length
974
		for ifo in re.findall('..', trigger.ifos):
975
			row = time_slide_table.RowType()
976
			row.instrument = ifo
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
			row.time_slide_id = "time_slide:time_slide_id:6"
			row.process_id = self.process.process_id
			row.offset = 0
			time_slide_table.append(row)


	def assemble_snglinspiral_table(self, trigger):
		sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(self.xmldoc)
		for standard_column in ("process_id", "ifo", "search", "channel", "end_time", "end_time_ns", "end_time_gmst", "impulse_time", "impulse_time_ns", "template_duration", "event_duration", "amplitude", "eff_distance", "coa_phase", "mass1", "mass2", "mchirp", "mtotal", "eta", "kappa", "chi", "tau0", "tau2", "tau3", "tau4", "tau5", "ttotal", "psi0", "psi3", "alpha", "alpha1", "alpha2", "alpha3", "alpha4", "alpha5", "alpha6", "beta", "f_final", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "cont_chisq", "cont_chisq_dof", "sigmasq", "rsqveto_duration", "Gamma0", "Gamma1", "Gamma2", "Gamma3", "Gamma4", "Gamma5", "Gamma6", "Gamma7", "Gamma8", "Gamma9", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "event_id"):
			try:
				sngl_inspiral_table.appendColumn(standard_column)
			except ValueError:
				# already has it
				pass

992
		# FIXME: hard-coded ifo len == 2
993
		iifo = 0
994
		for ifo in re.findall('..', trigger.ifos):
995
996
997
998
999
			row = sngl_inspiral_table.RowType()
			# Setting the individual row
			row.process_id = self.process.process_id
			row.ifo =  ifo
			row.search = self.url
1000
			row.channel = self.channel_dict[ifo]
For faster browsing, not all history is shown. View entire blame