Commit 09adb9f5 authored by Kipp Cannon's avatar Kipp Cannon

snglcoinc: new stream-based coincidence engine

- this patch replaces snglcoinc's coincidence engine with a native
  streaming engine.  the old implementation was written at a time when it
  was believed that time slides were the focus of coincidence analyses, and
  was optimized for performing very large numbers of time slides.  we now
  know time slides are not how good noise models are constructed and that
  it is better to optimize for high trigger rates so as to allow for the
  lowest possible thresholds.  that means optimizing not for high offset
  vector count but, instead, for ram use and lower comparison operation
  count, i.e., adopting a "streaming model", where events flow into the
  comparison engine and coincidences flow out, a relatively small number of
  time shift offset vectors are considered, and events are only held in
  memory until they are no longer needed to construct coincidences.
parent ae94509a
......@@ -132,19 +132,19 @@ if options.coincidence_segments is not None:
# for the purposes of the coinc segs feature, the coinc
# time is minimum of event peak times. this is a fast, and
# guaranteed reproducible definition
return len(events) < min_instruments or min(event.peak + offset_vector[event.ifo] for event in events) not in coinc_segs
return min(event.peak + offset_vector[event.ifo] for event in events) not in coinc_segs
else:
def coinc_segs_ntuple_comparefunc(events, offset_vector, min_instruments = options.min_instruments):
return len(events) < min_instruments
def coinc_segs_ntuple_comparefunc(*args):
return False
if options.coincidence_algorithm == "excesspower":
EventListType = burca.ExcessPowerEventList
coincgen_doubles = burca.ep_coincgen_doubles
ntuple_comparefunc = coinc_segs_ntuple_comparefunc
CoincTables = burca.ExcessPowerCoincTables
CoincDef = burca.ExcessPowerBBCoincDef
elif options.coincidence_algorithm == "stringcusp":
EventListType = burca.StringEventList
coincgen_doubles = burca.string_coincgen_doubles
ntuple_comparefunc = lambda *args: coinc_segs_ntuple_comparefunc(*args) or burca.StringCuspCoincTables.ntuple_comparefunc(*args)
CoincTables = burca.StringCuspCoincTables
CoincDef = burca.StringCuspBBCoincDef
......@@ -195,13 +195,21 @@ for n, filename in enumerate(filenames):
# Run coincidence algorithm.
#
if options.coincidence_algorithm == "excesspower":
delta_t = 2. * bruca.ep_coincgen_doubles.get_coincs.max_edge_peak_delta(lsctables.SnglBurstTable.get_table(xmldoc))
elif options.coincidence_algorithm == "stringcusp":
delta_t = options.threshold
else:
raise Exception("should never get here")
burca.burca(
xmldoc = xmldoc,
process_id = process.process_id,
EventListType = EventListType,
coincgen_doubles = coincgen_doubles,
CoincTables = CoincTables,
coinc_definer_row = CoincDef,
threshold = options.threshold,
delta_t = delta_t,
ntuple_comparefunc = ntuple_comparefunc,
verbose = options.verbose
)
......
......@@ -28,12 +28,12 @@ from __future__ import print_function
from bisect import bisect_left, bisect_right
import itertools
import math
import sys
from glue.ligolw import lsctables
from glue.ligolw.utils import process as ligolw_process
from . import snglcoinc
......@@ -158,6 +158,8 @@ class StringCuspCoincTables(snglcoinc.CoincTables):
@staticmethod
def ntuple_comparefunc(events, offset_vector, disallowed = frozenset(("H1", "H2"))):
# disallow H1,H2 only coincs
# FIXME: implement this in the ranking statistic, not like
# this
return set(event.ifo for event in events) == disallowed
def coinc_rows(self, process_id, time_slide_id, events):
......@@ -169,7 +171,7 @@ class StringCuspCoincTables(snglcoinc.CoincTables):
#
# =============================================================================
#
# Event List Management
# Coincidence Generators
#
# =============================================================================
#
......@@ -180,75 +182,75 @@ class StringCuspCoincTables(snglcoinc.CoincTables):
#
class ExcessPowerEventList(snglcoinc.EventList):
"""
A customization of the EventList class for use with the excess
power search.
"""
def make_index(self):
"""
Sort events by peak time so that a bisection search can
retrieve them. Note that the bisection search relies on
the __cmp__() method of the SnglBurst row class having
previously been set to compare the event's peak time to a
LIGOTimeGPS.
"""
# sort by peak time
self.sort(key = lambda event: event.peak)
# for the events in this list, record the largest
# difference between an event's peak time and either its
# start or stop times
if self:
self.max_edge_peak_delta = max(max(float(event.peak - event.start), float(event.start + event.duration - event.peak)) for event in self)
else:
# max() doesn't like empty lists
self.max_edge_peak_delta = 0
class ep_coincgen_doubles(snglcoinc.coincgen_doubles):
class singlesqueue(snglcoinc.coincgen_doubles.singlesqueue):
@staticmethod
def event_time(event):
return event.peak
@staticmethod
def comparefunc(a, offseta, b, light_travel_time, ignored):
if abs(a.central_freq - b.central_freq) > (a.bandwidth + b.bandwidth) / 2:
return True
astart = a.start + offseta
bstart = b.start
if astart > bstart + b.duration + light_travel_time:
# a starts after the end of b
return True
class get_coincs(object):
@staticmethod
def max_edge_peak_delta(events):
# the largest difference between any event's peak
# time and either its start or stop times
if not events:
return 0.0
return max(max(float(event.peak - event.start), float(event.start + event.duration - event.peak)) for event in events)
@staticmethod
def comparefunc(a, offseta, b, light_travel_time):
if abs(a.central_freq - b.central_freq) > (a.bandwidth + b.bandwidth) / 2:
return True
astart = a.start + offseta
bstart = b.start
if astart > bstart + b.duration + light_travel_time:
# a starts after the end of b
return True
if bstart > astart + a.duration + light_travel_time:
# b starts after the end of a
return True
if bstart > astart + a.duration + light_travel_time:
# b starts after the end of a
return True
# time-frequency times intersect
return False
# time-frequency times intersect
return False
def __init__(self, events):
self.events = events
# for this instance, replace the method with the
# pre-computed value
self.max_edge_peak_delta = self.max_edge_peak_delta(events)
def get_coincs(self, event_a, offset_a, light_travel_time, ignored):
# event_a's peak time
peak = event_a.peak
def __call__(self, event_a, offset_a, light_travel_time, ignored):
# event_a's peak time
peak = event_a.peak
# difference between event_a's peak and start times
dt = float(peak - event_a.start)
# difference between event_a's peak and start times
dt = float(peak - event_a.start)
# largest difference between event_a's peak time and either
# its start or stop times
dt = max(dt, event_a.duration - dt)
# largest difference between event_a's peak time
# and either its start or stop times
dt = max(dt, event_a.duration - dt)
# add our own max_edge_peak_delta and the light travel time
# between the two instruments (when done, if event_a's peak
# time differs by more than this much from the peak time of
# an event in this list then it is *impossible* for them to
# be coincident)
dt += self.max_edge_peak_delta + light_travel_time
# add our own max_edge_peak_delta and the light
# travel time between the two instruments (when
# done, if event_a's peak time differs by more than
# this much from the peak time of an event in this
# list then it is *impossible* for them to be
# coincident)
dt += self.max_edge_peak_delta + light_travel_time
# apply time shift
peak += offset_a
# apply time shift
peak += offset_a
# extract the subset of events from this list that
# pass coincidence with event_a (use bisection
# searches for the minimum and maximum allowed peak
# times to quickly identify a subset of the full
# list)
return [event_b for event_b in self.events[bisect_left(self.events, peak - dt) : bisect_right(self.events, peak + dt)] if not self.comparefunc(event_a, offset_a, event_b, light_travel_time)]
# extract the subset of events from this list that pass
# coincidence with event_a (use bisection searches for the
# minimum and maximum allowed peak times to quickly
# identify a subset of the full list)
return [event_b for event_b in self[bisect_left(self, peak - dt) : bisect_right(self, peak + dt)] if not self.comparefunc(event_a, offset_a, event_b, light_travel_time, ignored)]
#
......@@ -256,25 +258,15 @@ class ExcessPowerEventList(snglcoinc.EventList):
#
class StringEventList(snglcoinc.EventList):
"""
A customization of the EventList class for use with the string
search.
"""
def make_index(self):
"""
Sort events by peak time so that a bisection search can
retrieve them. Note that the bisection search relies on
the __cmp__() method of the SnglBurst row class having
previously been set to compare the event's peak time to a
LIGOTimeGPS.
"""
self.sort(key = lambda event: event.peak)
class string_coincgen_doubles(ep_coincgen_doubles):
class get_coincs(object):
def __init__(self, events):
self.events = events
def get_coincs(self, event_a, offset_a, light_travel_time, threshold):
peak = event_a.peak + offset_a
coinc_window = threshold + light_travel_time
return self[bisect_left(self, peak - coinc_window) : bisect_right(self, peak + coinc_window)]
def __call__(self, event_a, offset_a, light_travel_time, threshold):
peak = event_a.peak + offset_a
coinc_window = threshold + light_travel_time
return self.events[bisect_left(self.events, peak - coinc_window) : bisect_right(self.events, peak + coinc_window)]
#
......@@ -289,10 +281,11 @@ class StringEventList(snglcoinc.EventList):
def burca(
xmldoc,
process_id,
EventListType,
coincgen_doubles,
CoincTables,
coinc_definer_row,
threshold,
delta_t,
ntuple_comparefunc = lambda events, offset_vector: False,
min_instruments = 2,
verbose = False
......@@ -306,26 +299,28 @@ def burca(
coinc_tables = CoincTables(xmldoc, coinc_definer_row)
#
# build the event list accessors, populated with events from those
# processes that can participate in a coincidence
# construct offset vector assembly graph
#
eventlists = snglcoinc.EventListDict(EventListType, lsctables.SnglBurstTable.get_table(xmldoc), instruments = set(coinc_tables.time_slide_table.getColumnByName("instrument")))
time_slide_graph = snglcoinc.TimeSlideGraph(coincgen_doubles, coinc_tables.time_slide_index, delta_t, min_instruments = min_instruments, verbose = verbose)
#
# construct offset vector assembly graph
# collect events.
#
time_slide_graph = snglcoinc.TimeSlideGraph(coinc_tables.time_slide_index, min_instruments = min_instruments, verbose = verbose)
sngl_burst_table = lsctables.SnglBurstTable.get_table(xmldoc)
for instrument, events in itertools.groupby(sorted(sngl_burst_table, key = lambda row: row.ifo), lambda event: event.ifo):
events = tuple(events)
time_slide_graph.push(instrument, events, max(event.peak for event in events))
#
# retrieve all coincidences, apply the final n-tuple compare func
# and record the survivors
#
for node, coinc in time_slide_graph.get_coincs(eventlists, threshold, verbose = verbose):
if not ntuple_comparefunc(coinc, node.offset_vector):
coinc_tables.append_coinc(*coinc_tables.coinc_rows(process_id, node.time_slide_id, coinc))
for node, events in time_slide_graph.pull(threshold, flush = True, verbose = verbose):
if not ntuple_comparefunc(events, node.offset_vector):
coinc_tables.append_coinc(*coinc_tables.coinc_rows(process_id, node.time_slide_id, events))
#
# done
......
This diff is collapsed.
......@@ -34,8 +34,8 @@ import time
from glue.ligolw import ligolw
from glue.ligolw import lsctables
from glue import offsetvector
import lal
from lalburst import offsetvector
from lalburst import snglcoinc
......@@ -220,78 +220,84 @@ class InspiralCoincTables(snglcoinc.CoincTables):
#
# =============================================================================
#
# Event List Management
# Coincidence Generator Implementation
#
# =============================================================================
#
class InspiralEventList(snglcoinc.EventList):
"""
A customization of the EventList class for use with the inspiral
search.
"""
@staticmethod
def template(event):
"""
Returns an immutable hashable object (it can be used as a
dictionary key) uniquely identifying the template that
produced the given event.
"""
return event.mass1, event.mass2, event.spin1x, event.spin1y, event.spin1z, event.spin2x, event.spin2y, event.spin2z
def make_index(self):
"""
Sort events into bins according to their template so that a
dictionary look-up can retrieve all triggers from a given
template. Then sort bins by end time so that a bisection
search can retrieve triggers from a template within a
window of time. Note that the bisection search relies on
the __cmp__() method of the SnglInspiral row class having
previously been set to compare the event's end time to a
LIGOTimeGPS.
"""
self.index = {}
for event in self:
self.index.setdefault(self.template(event), []).append(event)
for events in self.index.values():
events.sort(key = lambda event: event.end)
def get_coincs(self, event_a, offset_a, light_travel_time, delta_t):
#
# event_a's end time, shifted to be with respect to end
# times in this list.
#
end = event_a.end + offset_a
#
# the coincidence window
#
coincidence_window = light_travel_time + delta_t
class coincgen_doubles(snglcoinc.coincgen_doubles):
class singlesqueue(snglcoinc.coincgen_doubles.singlesqueue):
@staticmethod
def event_time(event):
return event.end
class get_coincs(object):
@staticmethod
def template(event):
"""
Returns an immutable hashable object (it can be
used as a dictionary key) uniquely identifying the
template that produced the given event.
"""
return event.mass1, event.mass2, event.spin1x, event.spin1y, event.spin1z, event.spin2x, event.spin2y, event.spin2z
def __init__(self, events):
"""
Sort events into bins according to their template
so that a dictionary look-up can retrieve all
triggers from a given template. The events must be
provided in time order so that a bisection search
can retrieve triggers from a template within a
window of time. Note that the bisection search
relies on the __cmp__() method of the SnglInspiral
row class having previously been set to compare the
event's end time to a LIGOTimeGPS.
"""
self.index = {}
index_setdefault = self.index.setdefault
template = self.template
for event in events:
index_setdefault(template(event), []).append(event)
def __call__(self, event_a, offset_a, light_travel_time, delta_t):
#
# event_a's end time, shifted to be with respect to
# end times in this list.
#
end = event_a.end + offset_a
#
# the coincidence window
#
coincidence_window = light_travel_time + delta_t
#
# extract the subset of events from this list that
# pass coincidence with event_a. use a bisection
# search for the minimum allowed end time and a
# brute-force scan for the maximum allowed end
# time. because the number of events in the
# coincidence window is generally quite small, the
# brute-force scan has a lower expected operation
# count than a second bisection search to find the
# upper bound in the sequence
#
#
# extract the subset of events from this list that pass
# coincidence with event_a. use a bisection search for the
# minimum allowed end time and a brute-force scan for the
# maximum allowed end time. because the number of events
# in the coincidence window is generally quite small, the
# brute-force scan has a lower expected operation count
# than a second bisection search to find the upper bound in
# the sequence
#
try:
events = self.index[self.template(event_a)]
except KeyError:
# that template didn't produce any events in this
# instrument. this is rare given the SNR thresholds
# typical today so trapping the exception is more
# efficient than testing
return ()
stop = end + coincidence_window
return tuple(itertools.takewhile(lambda event: event.end <= stop, events[bisect_left(events, end - coincidence_window):]))
try:
events = self.index[self.template(event_a)]
except KeyError:
# that template didn't produce any events
# in this instrument. this is rare given
# the SNR thresholds typical today so
# trapping the exception is more efficient
# than testing
return ()
stop = end + coincidence_window
return tuple(itertools.takewhile(lambda event: event.end <= stop, events[bisect_left(events, end - coincidence_window):]))
#
......@@ -326,8 +332,14 @@ def ligolw_thinca(
coinc_tables = InspiralCoincTables(xmldoc, coinc_definer_row)
#
# build the event list accessors. apply vetoes by excluding events
# from the lists that fall in vetoed segments
# construct offset vector assembly graph
#
time_slide_graph = snglcoinc.TimeSlideGraph(coincgen_doubles, coinc_tables.time_slide_index, delta_t, min_instruments = min_instruments, verbose = verbose)
#
# collect events. apply vetoes by excluding events from the lists
# that fall in vetoed segments
#
sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc)
......@@ -336,12 +348,9 @@ def ligolw_thinca(
if seglists is not None:
# don't do in-place
seglists = seglists - veto_segments
#
# construct offset vector assembly graph
#
time_slide_graph = snglcoinc.TimeSlideGraph(coinc_tables.time_slide_index, min_instruments = min_instruments, verbose = verbose)
for instrument, events in itertools.groupby(sorted(sngl_inspiral_table, key = lambda row: row.ifo), lambda event: event.ifo):
events = tuple(events)
time_slide_graph.push(instrument, events, max(event.end for event in events))
#
# retrieve all coincidences, apply the final n-tuple compare func
......@@ -349,11 +358,7 @@ def ligolw_thinca(
#
gps_time_now = float(lal.UTCToGPS(time.gmtime()))
for node, events in time_slide_graph.get_coincs(
snglcoinc.EventListDict(InspiralEventList, sngl_inspiral_table, instruments = set(coinc_tables.time_slide_table.getColumnByName("instrument"))),
delta_t,
verbose = verbose
):
for node, events in time_slide_graph.pull(delta_t, flush = True, verbose = verbose):
if not ntuple_comparefunc(events, node.offset_vector):
coinc, coincmaps, coinc_inspiral = coinc_tables.coinc_rows(process_id, node.time_slide_id, events, seglists = seglists)
if likelihood_func is not None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment