Commit fdfeb418 authored by Adam Mercer's avatar Adam Mercer
Browse files

Merge branch 'snglcoinc_work' into 'master'

snglcoinc.py:  another bug fix

See merge request !1668
parents dda420b7 c158b236
Pipeline #283525 passed with stages
in 179 minutes and 13 seconds
......@@ -303,6 +303,7 @@ def burca(
delta_t,
ntuple_comparefunc = lambda events, offset_vector: False,
min_instruments = 2,
incremental = True,
verbose = False
):
#
......@@ -324,11 +325,25 @@ def burca(
#
sngl_burst_table = lsctables.SnglBurstTable.get_table(xmldoc)
for instrument, events in itertools.groupby(sorted(sngl_burst_table, key = lambda row: row.ifo), lambda event: event.ifo):
time_slide_graph.push(instrument, tuple(events), PosInfinity)
if not incremental:
# normal version: push everything into the graph, then
# pull out all coincs in one operation below using the
# final flush
for instrument, events in itertools.groupby(sorted(sngl_burst_table, key = lambda row: row.ifo), lambda event: event.ifo):
time_slide_graph.push(instrument, tuple(events), PosInfinity)
else:
# slower diagnostic version. simulate an online
# incremental analysis by pushing events into the graph in
# time order and collecting candidates as we go. we still
# do the final flush operation below.
for instrument, events in itertools.groupby(sorted(sngl_burst_table, key = lambda row: (row.peak, row.ifo)), lambda event: event.ifo):
events = tuple(events)
if time_slide_graph.push(instrument, events, max(event.peak for event in events)):
for node, events in time_slide_graph.pull(coinc_sieve = ntuple_comparefunc):
coinc_tables.append_coinc(*coinc_tables.coinc_rows(process_id, node.time_slide_id, events, "sngl_burst"))
#
# retrieve all coincidences.
# retrieve all remaining coincidences.
#
for node, events in time_slide_graph.pull(coinc_sieve = ntuple_comparefunc, flush = True):
......
......@@ -43,7 +43,7 @@ import random
import scipy.optimize
from scipy import spatial
import sys
from collections import Counter, UserDict
from collections import ChainMap, Counter
import warnings
......@@ -69,54 +69,6 @@ from .git_version import version as __version__
#
class multidict(UserDict):
"""
Read-only dictionary view into a collection of dictionaries.
Example:
>>> x = {"a": 10., "b": 100.}
>>> y = {"c": 1000., "d": 10000.}
>>> z = multidict(x, y)
>>> z["a"]
10.0
>>> z["c"]
1000.0
>>> "d" in z
True
>>> "e" in z
False
"""
def __init__(self, *dicts):
self.dicts = dicts
if not self.dicts:
raise ValueError("len(dicts) must be > 0")
def __getitem__(self, x):
for d in self.dicts:
if x in d:
return d[x]
raise KeyError(x)
def __nonzero__(self):
return any(self.dicts)
def __contains__(self, x):
return any(x in d for d in self.dicts)
def __len__(self):
return sum(len(d) for d in self.dicts)
def __iter__(self):
return itertools.chain(*(iter(d) for d in self.dicts))
def items(self):
return itertools.chain(*(d.items() for d in self.dicts))
def keys(self):
return list(self)
def light_travel_time(instrument1, instrument2):
"""
Compute and return the time required for light to travel through
......@@ -553,33 +505,23 @@ class singlesqueue(object):
"""
t is the time up to which the calling code is in the
process of constructing all available n-tuple coincident
candidates. The return value is a pair of tuples which,
together, contain all events from the queue whose
time-shifted end times are up to (not including) t +
max_coinc_window, which are all the events from this queue
that could form a coincidence with an event up to (not
including) t.
The first of the two tuples contains the events from the
queue that will be flushed and not reported again. These
are the events from this queue that themselves precede t.
The second contains the additional events in the queue that
will be retained and reported again in future .pull()
requests. They are reported separately so that the calling
code can use the distinction to track which events have
been used to form candidates and which have not been (doing
so requires us to tell it which events we are now finished
with, so it can decide their fate).
candidates. The return value is a tuple of all events from
the queue whose time-shifted end times are up to (not
including) t + max_coinc_window, which are all the events
from this queue that could form a coincidence with an event
up to (not including) t.
t is defined with respect to the time-shifted event times.
The contents of both tuples are time-ordered as defined by
.event_time(), and all events in the first tuple precede
all events in the second tuple, i.e., concatenating the
tuples is also a time-ordered sequence.
The contents of the returned tuple is time-ordered as
defined by .event_time().
Assuming that t cannot go backwards, any of the reported
events that cannot be used again are removed from the
internal queue.
If t is None, then the queue is completely flushed. All
remaining events are pulled from the queue and reported in
the first tuple, .t_complete is reset to -inf and all other
the tuple, .t_complete is reset to -inf and all other
internal state is reset. After calling .pull() with t =
None, the object's state is equivalent to its initial state
and it is ready to process a new stream of events.
......@@ -594,7 +536,7 @@ class singlesqueue(object):
del self.queue[:]
self.index.clear()
self.t_complete = segments.NegInfinity
return events, ()
return events
# unshift back to our events' time scale
t -= self.offset
......@@ -616,8 +558,7 @@ class singlesqueue(object):
# can form coincidences with things in other detectors up
# to t. this is computing time F for this event list in
# the description above.
fornexttime_events = tuple(entry.event for entry in self.queue[:bisect_left(self.queue, t + self.max_coinc_window)])
return flushed_events, fornexttime_events
return flushed_events + tuple(entry.event for entry in self.queue[:bisect_left(self.queue, t + self.max_coinc_window)])
class coincgen_doubles(object):
......@@ -718,12 +659,17 @@ class coincgen_doubles(object):
"""
if len(offset_vector) != 2:
raise ValueError("offset_vector must name exactly 2 instruments (got %d)" % len(offset_vector))
# the coincidence window for our pair of detectors
self.coinc_window = coinc_windows[frozenset(offset_vector)]
max_coinc_window = max(coinc_windows.values())
if self.coinc_window < 0.:
raise ValueError("coinc_window < 0 (%g)" % coinc_window)
# the largest coincidence window between any pair of
# detectors in the network
max_coinc_window = max(coinc_windows.values())
if max_coinc_window < 0.:
raise ValueError("max(coinc_window) < 0 (%g)" % max_coinc_window)
# initialize the singlesqueue objects, one for each of our
# detectors
# FIXME: the max_coinc_window is the largest of all
# coincidence windows, but as described above in the notes
# about this algorithm we only need it to be the largest of
......@@ -733,10 +679,7 @@ class coincgen_doubles(object):
# coincidence overhead
self.queues = dict((instrument, self.singlesqueue(offset, max_coinc_window)) for instrument, offset in offset_vector.items())
# view into the id() --> event indexes of the queues
self.index = multidict(*(queue.index for queue in self.queues.values()))
# Python id()s of events currently in the queues that have
# been reported in coincidences
self.used = set()
self.index = ChainMap(*(queue.index for queue in self.queues.values()))
@property
def offset_vector(self):
......@@ -771,7 +714,7 @@ class coincgen_doubles(object):
"""
self.queues[instrument].push(events, t_complete)
def doublesgen(self, eventsa, offset_a, eventsb):
def doublesgen(self, eventsa, offset_a, eventsb, singles_ids):
"""
For internal use only.
"""
......@@ -792,24 +735,29 @@ class coincgen_doubles(object):
else:
unswap = lambda a, b: (a, b)
# initialize the singles_ids set
singles_ids.clear()
singles_ids.update(id(event) for event in eventsb)
singles_ids_add = singles_ids.add
singles_ids_discard = singles_ids.discard
# for each event in list A, iterate over events from the
# other list that are coincident with the event, and return
# the pairs. while doing this, collect the IDs of events
# that are used in coincidences in a set for later logic.
coinc_window = self.coinc_window
used = set()
used_add = used.add
queueb_get_coincs = self.get_coincs(eventsb)
for eventa in eventsa:
matches = queueb_get_coincs(eventa, offset_a, coinc_window)
eventa = id(eventa)
if matches:
eventa = id(eventa)
used_add(eventa)
for eventb in matches:
eventb = id(eventb)
used_add(eventb)
singles_ids_discard(eventb)
yield unswap(eventa, eventb)
self.used |= used
else:
singles_ids_add(eventa)
def pull(self, t, singles_ids):
"""
......@@ -832,9 +780,8 @@ class coincgen_doubles(object):
The order of the IDs in each tuple is alphabetical by
instrument.
The Python IDs of events flushed from the queues that have
not been reported in a coincident pair are placed in the
singles_ids set.
The Python IDs of events that are not reported in a
coincident pair are placed in the singles_ids set.
NOTE: the singles_ids parameter passed to this function
must a set or set-like object. It must support Python set
......@@ -848,32 +795,18 @@ class coincgen_doubles(object):
instrumenta, instrumentb = sorted(self.queues)
offset_a = self.queues[instrumenta].offset - self.queues[instrumentb].offset
# retrieve the event lists. these are event objects, not
# their python IDs. the .pull() methods will safety check
# t, we don't have to do it here. t is the
# t_coinc_complete parameter in the algorithm description
# above, but the singlesqueues take care of the timing
# logic for us we just pass the t along to them.
flusheda, fornexttimea = self.queues[instrumenta].pull(t)
flushedb, fornexttimeb = self.queues[instrumentb].pull(t)
# construct coincident pairs. the pairs are the python IDs
# of the coincident events, not the events themselves.
yield from self.doublesgen(flusheda + fornexttimea, offset_a, flushedb + fornexttimeb)
# populate the singles_ids set and remove the events being
# flushed from the used set. if we've been flushed, there
# better not be anything left
# retrieve the event lists to be considered for coincidence
# using .pull() on the singlesqueues. these return
# sequences of event objects, not their python IDs. the
# .pull() methods will safety check t against their
# time-shifted t_completes, we don't have to do it here.
# from the event sequences, we construct coincident pairs.
# the pairs are the python IDs of the coincident events,
# not the events themselves. while doing this, the
# singles_ids set is populated with python IDs of events
# that do not form pairs.
singles_ids.clear()
singles_ids.update(id(event) for event in flusheda)
singles_ids.update(id(event) for event in flushedb)
flushed = singles_ids.copy()
singles_ids -= self.used
self.used -= flushed
assert t is not None or not self.used
return sorted(self.doublesgen(self.queues[instrumenta].pull(t), offset_a, self.queues[instrumentb].pull(t), singles_ids))
#
......@@ -904,8 +837,14 @@ class TimeSlideGraphNode(object):
if len(offset_vector) > 2:
self.components = tuple(TimeSlideGraphNode(coincgen_doubles_type, component_offset_vector, coinc_windows, min_instruments) for component_offset_vector in self.component_offsetvectors(offset_vector, len(offset_vector) - 1))
# view into the id() --> event indexes of the
# nodes
self.index = multidict(*(node.index for node in self.components))
# nodes. we assume they are all ChainMap
# instances, and, for performance reasons, flatten
# the hierarchy one level. if that was also done
# at the lower levels, ensures that, globally, the
# ChainMap hierarchy doesn't grow in depth as the
# graph is constructed. it's only ever a ChainMap
# of dicts, not a ChainMap of ChainMaps of ...
self.index = ChainMap(*(index for node in self.components for index in node.index.maps))
elif len(offset_vector) == 2:
self.components = (coincgen_doubles_type(offset_vector, coinc_windows),)
self.index = self.components[0].index
......@@ -984,7 +923,7 @@ class TimeSlideGraphNode(object):
node.push(instrument, events, t_complete)
return self.t_coinc_complete != t_before
def pull(self, t, verbose = False):
def pull(self, t):
"""
Using the events contained in the internal queues construct
and return all coincidences they participate in. It is
......@@ -1062,24 +1001,19 @@ class TimeSlideGraphNode(object):
#
if len(self.offset_vector) == 2:
if verbose:
print("\tconstructing %s ..." % str(self.offset_vector), file=sys.stderr)
#
# search for and record coincidences. coincs is a
# sorted tuple of event ID pairs, where each pair
# of IDs is, itself, ordered alphabetically by
# instrument name. here, for the two-instrument
# case, the "partial coincs" are any singles that
# failed to form coincidences, which corresponds to
# the "flushed_unused" parameter of the
# coincgen_doubles' .pull() method, but we convert
# the set into a set of 1-element tuples so they
# take the form of 1-detector coincs
# failed to form coincidences, but we convert the
# set into a set of 1-element tuples so they take
# the form of 1-detector coincs
#
singles_ids = set()
coincs = tuple(sorted(self.components[0].pull(t, singles_ids)))
coincs = tuple(self.components[0].pull(t, singles_ids))
return coincs, (set((eventid,) for eventid in singles_ids) if self.keep_partial else set())
#
......@@ -1095,7 +1029,7 @@ class TimeSlideGraphNode(object):
# first collect all coincs and partial coincs from the
# component nodes in the graph
component_coincs_and_partial_coincs = tuple(component.pull(t, verbose = verbose) for component in self.components)
component_coincs_and_partial_coincs = tuple(component.pull(t) for component in self.components)
component_coincs = tuple(elem[0] for elem in component_coincs_and_partial_coincs)
if self.keep_partial:
......@@ -1143,8 +1077,6 @@ class TimeSlideGraphNode(object):
partial_coincs = set()
del component_coincs_and_partial_coincs
if verbose:
print("\tassembling %s ..." % str(self.offset_vector), file=sys.stderr)
# magic: we can form all n-instrument coincs by knowing
# just three sets of the (n-1)-instrument coincs no matter
# what n is (n > 2).
......@@ -1263,7 +1195,7 @@ class TimeSlideGraph(object):
)
# view into the id() --> event indexes of the graph nodes
self.index = multidict(*(node.index for node in self.head))
self.index = ChainMap(*(node.index for node in self.head))
# the set of the Python id()'s of the events contained in
# the internal queues that have formed coincident
......@@ -1331,16 +1263,12 @@ class TimeSlideGraph(object):
return any([node.push(instrument, events, t_complete) for node in self.head if instrument in node.offset_vector])
def pull(self, newly_reported = None, flushed = None, flushed_unused = None, flush = False, coinc_sieve = None, event_collector = None, verbose = False):
if verbose:
print("constructing coincs for target offset vectors ...", file=sys.stderr)
def pull(self, newly_reported = None, flushed = None, flushed_unused = None, flush = False, coinc_sieve = None, event_collector = None):
# flatten ID index for faster performance in loop. NOTE:
# this must be done before calling .pull() on the graph
# because that operation will flush events from the
# internal index, leaving it incomplete for our needs. we
# do it outside the loop over nodes because we'll need it
# when the loop terminates
# this is also done to freeze the contents of the index.
# calling .pull() on the graph nodes flushes events from
# the internal queues which removes them from the index,
# but we need the index to turn IDs back into events.
index = dict(self.index.items())
# default coinc sieve
......@@ -1363,9 +1291,6 @@ class TimeSlideGraph(object):
event_time = node.event_time
offset_vector = node.offset_vector
if verbose:
print("%d/%d: %s" % (n, len(self.head), str(node.offset_vector)), file=sys.stderr)
if flush:
t = None
candidate_seg = segments.segment(node.previous_t, segments.PosInfinity)
......@@ -1377,7 +1302,7 @@ class TimeSlideGraph(object):
# coincs contain at least min_instruments events
# because those that don't meet the criteria are
# excluded during coinc construction.
for event_ids in itertools.chain(*node.pull(t, verbose)):
for event_ids in itertools.chain(*node.pull(t)):
# use the index to convert Python IDs back
# to event objects
events = tuple(index[event_id] for event_id in event_ids)
......
......@@ -267,10 +267,9 @@ def ligolw_thinca(
seglists,
ntuple_comparefunc = None,
veto_segments = None,
likelihood_func = None,
fapfar = None,
min_instruments = 2,
coinc_definer_row = InspiralCoincDef,
incremental = True,
verbose = False
):
#
......@@ -297,26 +296,34 @@ def ligolw_thinca(
sngl_inspiral_table = (event for event in sngl_inspiral_table if event.ifo not in veto_segments or event.end not in veto_segments[event.ifo])
# don't do in-place
seglists = seglists - veto_segments
for instrument, events in itertools.groupby(sorted(sngl_inspiral_table, key = lambda row: row.ifo), lambda event: event.ifo):
events = tuple(events)
time_slide_graph.push(instrument, events, max(event.end for event in events))
#
# retrieve all coincidences, apply the final n-tuple compare func
# and record the survivors
# push into coincidence graph and collect candidates
#
for node, events in time_slide_graph.pull(coinc_sieve = ntuple_comparefunc, flush = True, verbose = verbose):
coinc, coincmaps, coinc_inspiral = coinc_tables.coinc_rows(process_id, node.time_slide_id, events, seglists = seglists)
if likelihood_func is not None:
coinc.likelihood = likelihood_func(events, node.offset_vector)
if fapfar is not None:
# FIXME: add proper columns to
# store these values in
coinc_inspiral.combined_far = fapfar.far_from_rank(coinc.likelihood)
coinc_inspiral.false_alarm_rate = fapfar.fap_from_rank(coinc.likelihood)
# finally, append coinc to tables
coinc_tables.append_coinc(coinc, coincmaps, coinc_inspiral)
if not incremental:
# normal version: push everything into the graph, then
# pull out all coincs in one operation below using the
# final flush
for instrument, events in itertools.groupby(sorted(sngl_inspiral_table, key = lambda row: row.ifo), lambda event: event.ifo):
time_slide_graph.push(instrument, tuple(events), PosInfinity)
else:
# slower diagnostic version. simulate an online
# incremental analysis by pushing events into the graph in
# time order and collecting candidates as we go. we still
# do the final flush operation below.
for instrument, events in itertools.groupby(sorted(sngl_inspiral_table, key = lambda row: (row.end, row.ifo)), lambda event: event.ifo):
events = tuple(events)
if time_slide_graph.push(instrument, events, max(event.end for event in events)):
for node, events in time_slide_graph.pull(coinc_sieve = ntuple_comparefunc):
coinc_tables.append_coinc(*coinc_tables.coinc_rows(process_id, node.time_slide_id, events, seglists = seglists))
#
# retrieve all remaining coincidences.
#
for node, events in time_slide_graph.pull(coinc_sieve = ntuple_comparefunc, flush = True):
coinc_tables.append_coinc(*coinc_tables.coinc_rows(process_id, node.time_slide_id, events, seglists = seglists))
#
# done
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment