Commit 11a264d8 authored by Kipp Cannon's avatar Kipp Cannon

Merge branch 'snglcoincagain' into 'master'

snglcoinc:  more work on coincidence engine

See merge request !525
parents 459a42ff a92e3d85
Pipeline #35673 passed with stages
in 74 minutes and 23 seconds
......@@ -135,11 +135,16 @@ class singlesqueue(object):
# NegInfinity object from the segments library, however, is
# compatible with LIGOTimeGPS.
self.t_complete = NegInfinity
# queue of events that are too new to be certain to be able
# to construct all possible coincs between themselves and
# other queues
# queue of events. the queue's contents are time-ordered,
# but not necessarily complete beyond self.t_complete
self.queue = collections.deque()
# id() --> event mapping for the contents of queue
# id() --> event mapping for the contents of queue. sets
# will be used to track the status of events, e.g. which
# have and haven't been used to form candidates. we don't
# require the event objects to be hashable and suitable for
# inclusion in sets, instead we put their Python IDs into
# the sets, but then we need a way to turn an ID back into
# an event, and that's what this provides
self.index = {}
@property
......@@ -194,12 +199,12 @@ class singlesqueue(object):
# FIXME: this might be more costly than tracking two
# queues and moving events from one to the next in time
# order as .t_complete advances
if events and self.event_time(min(events, key = self.event_time)) < self.t_complete:
raise ValueError("t_complete violation: earliest event is %g, previous t_complete was %g" % (self.event_time(min(events, key = self.event_time)), self.t_complete))
events = list(events)
while self.queue and self.event_time(self.queue[-1]) >= self.t_complete:
events.append(self.queue.pop())
events.sort(key = self.event_time)
if events and self.event_time(events[0]) < self.t_complete:
raise ValueError("t_complete violation: earliest event is %g, previous t_complete was %g" % (self.event_time(events[0]), self.t_complete))
self.queue.extend(events)
# update the marker labelling time up to which the event
......@@ -275,9 +280,15 @@ class multidict(UserDict):
return d[x]
raise KeyError(x)
def __nonzero__(self):
return any(self.dicts)
def __contains__(self, x):
return any(x in d for d in self.dicts)
def __len__(self):
return sum(len(d) for d in self.dicts)
def __iter__(self):
return itertools.chain(*(iter(d) for d in self.dicts))
......@@ -362,7 +373,7 @@ class coincgen_doubles(object):
self.index = multidict(*(queue.index for queue in self.queues.values()))
# pre-compute the light travel time
self.light_travel_time = light_travel_time(*offset_vector)
# Python id()s of events currently in the queue that have
# Python id()s of events currently in the queues that have
# been reported in coincidences
self.used = set()
......@@ -405,8 +416,8 @@ class coincgen_doubles(object):
coincident events up to t, i.e., requiring the time of at
least one of the events in each coincident pair to precede
t. The internal queues are flushed to t. t cannot be
greater than self.t_complete, or it can be the special
value None (see below).
greater than self.t_coinc_complete, or it can be the
special value None (see below).
The order of the IDs in each tuple is alphabetical by
instrument.
......@@ -498,10 +509,10 @@ class coincgen_doubles(object):
flushed_unused.clear()
flushed_unused |= flushed - self.used
# remove the events being flushed from the used set
# remove the events being flushed from the used set. if
# we've been flushed, there better not be anything left
self.used -= flushed
# if we've been flushed, there better not be anything left
assert t is not None or not self.used
......@@ -803,6 +814,7 @@ class TimeSlideGraph(object):
coincgen_doubles_type, offset_vector, coinc_window, min_instruments, time_slide_id = time_slide_id
) for time_slide_id, offset_vector in sorted(offset_vector_dict.items())
)
self.index = multidict(*(node.index for node in self.head))
# the set of the Python id()'s of the events contained in
# the internal queues that have been reported in
# coincidences (including single-detector coincidences if
......@@ -833,6 +845,14 @@ class TimeSlideGraph(object):
return min(node.age for node in self.head)
@property
def t_coinc_complete(self):
"""
The earliest of the graph's head nodes' .t_coinc_complete.
"""
return min(node.t_coinc_complete for node in self.head)
def push(self, instrument, events, t_complete):
"""
Push new events from some instrument into the internal
......@@ -848,22 +868,35 @@ class TimeSlideGraph(object):
node.push(instrument, events, t_complete)
def pull(self, threshold_data, newly_used = None, flushed = None, flushed_unused = None, flush = False, verbose = False):
# FIXME: note that we call each time slide's head node
# with a different t, which means we cannot compare the
# reported unused flushed events from one with another.
# we'll have to find a way to do our own book-keeping here.
def pull(self, threshold_data, newly_used = None, flushed = None, flushed_unused = None, flush = False, coinc_sieve = (lambda events: False), verbose = False):
if verbose:
print("constructing coincs for target offset vectors ...", file=sys.stderr)
# NOTE: we can only rely on the "flushed" and so on event
# sets from the graph head nodes to be compatible with each
# other if all nodes are .pull()ed to the same t.
# otherwise what has been flushed from one node in the
# graph might not yet be flushed from another. we do use a
# single time, but that means the latency is always the
# worst-case scenario: whichever graph node requires
# triggers to be held the longest for the coincidence test
# determines the latency of all graph nodes. online we
# only do a zero-lag analysis, and offline we don't care
# about latency, so for now this is OK, but be aware of
# this limitation if you are tempted to mess about with
# this code's logic.
t = None if flush else self.t_coinc_complete
# flatten ID index for faster performance in loop. NOTE:
# this must be done before calling .pull() on the graph
# because that operation will flush events from the
# internal index, leaving it incomplete for our needs. we
# do it outside the loop over nodes because we'll need it
# when the loop terminates
index = dict(itertools.chain(*(node.index.iteritems() for node in self.head)))
index = dict(self.index.iteritems())
newly_used_ids = set()
sieved_ids = set()
flushed_ids = set()
for n, node in enumerate(self.head, start = 1):
if verbose:
......@@ -872,29 +905,34 @@ class TimeSlideGraph(object):
# coincs contain at least min_instruments events
# because those that don't meet the criteria are
# excluded during coinc construction.
coincs, partial_coincs, _flushed_ids = node.pull((None if flush else node.t_coinc_complete), threshold_data, verbose)
coincs, partial_coincs, _flushed_ids = node.pull(t, threshold_data, verbose)
flushed_ids |= _flushed_ids
for coinc in itertools.chain(coincs, partial_coincs):
newly_used_ids.update(coinc)
# use the index to convert Python IDs back
# to event objects
yield node, tuple(index[event_id] for event_id in coinc)
newly_used_ids -= self.used
self.used |= newly_used_ids
events = tuple(index[event_id] for event_id in coinc)
# apply the coinc sieve test. this lever
# allows calling code to reduce the event
# rate going into output documents
if coinc_sieve(events):
sieved_ids.update(coinc)
else:
newly_used_ids.update(coinc)
yield node, events
if newly_used_ids or sieved_ids:
newly_used_ids -= self.used
sieved_ids -= self.used
self.used |= newly_used_ids | sieved_ids
flushed_unused_ids = flushed_ids - self.used
self.used -= flushed_ids
# if we've been flushed then there can't be any events left
# in the queues
assert not flush or not self.used
assert not flush or not (self.used or self.index)
# use the index to populate newly_used, flushed, and
# flushed_unused with event objects
if newly_used is not None:
newly_used[:] = (index[event_id] for event_id in newly_used_ids)
# FIXME: these next two are broken unless only one time
# slide is being considered. See above. this is OK for
# the inspiral online search which only does a zero-lag
# analysis, but will have to be fixed for offline analyses
if flushed is not None:
flushed[:] = (index[event_id] for event_id in flushed_ids)
if flushed_unused is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment