= two_epsilon * n: dt = tuple(random_uniform(*window) for window in windows) if all(abs(dt[i] - dt[j]) <= maxdt for i, j, maxdt in ijseq): n += 1 d += 1 self.rate_factors[key] = float(n) / float(d) for instrument in instruments: self.rate_factors[key] *= 2. * self.tau[frozenset((anchor, instrument))] # done computing rate_factors @property def all_instrument_combos(self): """ A tuple of all possible instrument combinations (as frozensets). Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 1) >>> coincrates.all_instrument_combos (frozenset(['V1']), frozenset(['H1']), frozenset(['L1']), frozenset(['V1', 'H1']), frozenset(['V1', 'L1']), frozenset(['H1', 'L1']), frozenset(['V1', 'H1', 'L1'])) >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> coincrates.all_instrument_combos (frozenset(['V1', 'H1']), frozenset(['V1', 'L1']), frozenset(['H1', 'L1']), frozenset(['V1', 'H1', 'L1'])) """ all_instruments = tuple(self.instruments) return tuple(frozenset(instruments) for n in range(self.min_instruments, len(all_instruments) + 1) for instruments in itertools.combinations(all_instruments, n)) def coinc_rates(self, **rates): """ Given the event rates for a collection of instruments, compute the rates at which N-way coincidences occur among them where N >= min_instruments. The return value is a dictionary whose keys are frozensets of instruments and whose values are the rate of coincidences for that set. NOTE: the computed rates are the rates at which coincidences among at least those instruments occur, not the rate at which coincidences among exactly those instruments occur. e.g., considering the H1, L1, V1 network, for the pair H1, L1 the return value is the sum of the rate at which those two instruments form double coincidences and also the rate at which they participate in H1, L1, V1 triple coincidences. See also .strict_coinc_rates(). Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> coincrates.coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.003) {frozenset(['V1', 'H1']): 1.9372787960306537e-07, frozenset(['V1', 'H1', 'L1']): 1.0125819710267318e-11, frozenset(['H1', 'L1']): 6.00513846088957e-08, frozenset(['V1', 'L1']): 3.77380092200718e-07} >>> coincrates.coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.002) {frozenset(['V1', 'H1']): 1.291519197353769e-07, frozenset(['V1', 'H1', 'L1']): 6.750546473511545e-12, frozenset(['H1', 'L1']): 6.00513846088957e-08, frozenset(['V1', 'L1']): 2.5158672813381197e-07} >>> coincrates.coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.001) {frozenset(['V1', 'H1']): 6.457595986768845e-08, frozenset(['V1', 'H1', 'L1']): 3.3752732367557724e-12, frozenset(['H1', 'L1']): 6.00513846088957e-08, frozenset(['V1', 'L1']): 1.2579336406690598e-07} """ if set(rates) != self.instruments: raise ValueError("require event rates for %s, got rates for %s" % (", ".join(sorted(self.instruments)), ", ".join(sorted(rates)))) if any(rate < 0. for rate in rates.values()): # they don't have to be non-negative for this # method to succede, but negative values are # nonsensical and other things will certainly fail. # it's best to catch and report the problem as # early in the code path as it can be identified, # so that when the problem is encountered it's # easier to identify the cause raise ValueError("rates must be >= 0") if self.tau and max(rates.values()) * max(self.tau.values()) >= 1.: raise ValueError("events per coincidence window must be << 1: rates = %s, max window = %g" % (rates, max(self.tau.values()))) # compute \mu_{1} * \mu_{2} ... \mu_{N} * FACTOR where # FACTOR is the previously-computed proportionality # constant from self.rate_factors coinc_rates = dict(self.rate_factors) for instruments in coinc_rates: for instrument in instruments: coinc_rates[instruments] *= rates[instrument] return coinc_rates def strict_coinc_rates(self, **rates): """ Given the event rates for a collection of instruments, compute the rates at which strict N-way coincidences occur among them where N >= min_instruments. The return value is a dictionary whose keys are frozensets of instruments and whose values are the rate of coincidences for that set. NOTE: the computed rates are the rates at which coincidences occur among exactly those instrument combinations, excluding the rate at which each combination participates in higher-order coincs. e.g., considering the H1, L1, V1 network, for the pair H1, L1 the return value is the rate at which H1, L1 doubles occur, not including the rate at which the H1, L1 pair participates in H1, L1, V1 triples. See also .coinc_rates(). Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> coincrates.strict_coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.003) {frozenset(['V1', 'H1']): 1.937177537833551e-07, frozenset(['V1', 'H1', 'L1']): 1.0125819710267318e-11, frozenset(['H1', 'L1']): 6.004125878918543e-08, frozenset(['V1', 'L1']): 3.7736996638100773e-07} >>> coincrates.strict_coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.002) {frozenset(['V1', 'H1']): 1.2914516918890337e-07, frozenset(['V1', 'H1', 'L1']): 6.750546473511545e-12, frozenset(['H1', 'L1']): 6.004463406242219e-08, frozenset(['V1', 'L1']): 2.5157997758733847e-07} >>> coincrates.strict_coinc_rates(H1 = 0.001, L1 = 0.002, V1 = 0.001) {frozenset(['V1', 'H1']): 6.457258459445168e-08, frozenset(['V1', 'H1', 'L1']): 3.3752732367557724e-12, frozenset(['H1', 'L1']): 6.004800933565894e-08, frozenset(['V1', 'L1']): 1.2578998879366924e-07} """ # initialize from the plain coinc rates strict_coinc_rates = self.coinc_rates(**rates) # iterating over the instrument combos from the combo with # the most instruments to the combo with the least # instruments ... for instruments in sorted(strict_coinc_rates, reverse = True, key = lambda instruments: len(instruments)): # ... subtract from its rate the rate at which # combos containing it occur (which, because we're # moving from biggest combo to smallest combo, have # already had the rates of higher order combos # containing themselves subtracted) for key, rate in strict_coinc_rates.items(): if instruments < key: strict_coinc_rates[instruments] -= rate # make sure this didn't produce any negative rates assert all(rate >= 0. for rate in strict_coinc_rates.values()), "encountered negative rate: %s" % strict_coinc_rates return strict_coinc_rates def marginalized_strict_coinc_counts(self, seglists, **rates): """ A dictionary mapping instrument combination (as a frozenset) to the total number of coincidences involving precisely that combination of instruments expected from the background. """ if set(seglists) != self.instruments: raise ValueError("require segmentlists for %s, got %s" % (", ".join(sorted(self.instruments)), ", ".join(sorted(seglists)))) # time when exactly a given set of instruments are on livetime = dict((on_instruments, float(abs(seglists.intersection(on_instruments) - seglists.union(self.instruments - on_instruments)))) for on_instruments in self.all_instrument_combos) coinc_count = dict.fromkeys(livetime, 0.0) for on_instruments, T in livetime.items(): for coinc_instruments, rate in self.strict_coinc_rates(**dict((instrument, (rate if instrument in on_instruments else 0.)) for instrument, rate in rates.items())).items(): coinc_count[coinc_instruments] += T * rate return coinc_count def lnP_instruments(self, **rates): """ Given the event rates for a collection of instruments, compute the natural logarithm of the probability that a coincidence is found to involve exactly a given set of instruments. This is equivalent to the ratios of the values in the dictionary returned by .strict_coinc_rates() to their sum. Raises ZeroDivisionError if all coincidence rates are 0. See also .strict_coinc_rates(). Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> coincrates.lnP_instruments(H1 = 0.001, L1 = 0.002, V1 = 0.003) {frozenset(['V1', 'H1']): -1.181124067253893, frozenset(['V1', 'H1', 'L1']): -11.040192999777876, frozenset(['H1', 'L1']): -2.352494317162074, frozenset(['V1', 'L1']): -0.5143002401188091} """ strict_coinc_rates = self.strict_coinc_rates(**rates) total_rate = sum(strict_coinc_rates.values()) if total_rate == 0.: raise ZeroDivisionError("all rates are 0") P_instruments = dict((instruments, rate / total_rate) for instruments, rate in strict_coinc_rates.items()) norm = sum(sorted(P_instruments.values())) # safety check: result should be nearly exactly normalized assert abs(1.0 - norm) < 1e-14 return dict((instruments, (math.log(P / norm) if P else NegInf)) for instruments, P in P_instruments.items()) def random_instruments(self, **rates): """ Generator that, given the event rates for a collection of instruments, yields a sequence of two-element tuples each containing a randomly-selected frozen set of instruments and the natural logarithm of the ratio of the rate at which that combination of instruments forms coincidences to the rate at which it is being yielded by this generator. See also .lnP_instruments(). Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> x = iter(coincrates.random_instruments(H1 = 0.001, L1 = 0.002, V1 = 0.003)) >>> x.next() # doctest: +SKIP (frozenset(['H1', 'L1']), -3.738788683913535) """ # guaranteed non-empty lnP_instruments = self.lnP_instruments(**rates) lnN = math.log(len(lnP_instruments)) results = tuple((instruments, lnP - lnN) for instruments, lnP in lnP_instruments.items()) choice = random.choice while 1: yield choice(results) def plausible_toas(self, instruments): """ Generator that yields dictionaries of random event time-of-arrival offsets for the given instruments such that the time-of-arrivals are mutually coincident given the maximum allowed inter-instrument \\Delta t's. The values returned are offsets, and would need to be added to some common time to yield absolute arrival times. Example: >>> coincrates = CoincRates(("H1", "L1", "V1"), 0.005, 2) >>> x = iter(coincrates.plausible_toas(("H1", "L1"))) >>> x.next() # doctest: +SKIP {'H1': 0.0, 'L1': -0.010229226372297711} """ instruments = tuple(instruments) if set(instruments) > self.instruments: raise ValueError("not configured for %s" % ", ".join(sorted(set(instruments) - self.instruments))) if len(instruments) < self.min_instruments: raise ValueError("require at least %d instruments, got %d" % (self.min_instruments, len(instruments))) anchor, instruments = instruments[0], instruments[1:] anchor_offset = ((anchor, 0.0),) # don't build inside loop uniform = random.uniform windows = tuple((instrument, -self.tau[frozenset((anchor, instrument))], +self.tau[frozenset((anchor, instrument))]) for instrument in instruments) ijseq = tuple((i, j, self.tau[frozenset((instruments[i], instruments[j]))]) for (i, j) in itertools.combinations(range(len(instruments)), 2)) while 1: dt = tuple((instrument, uniform(lo, hi)) for instrument, lo, hi in windows) if all(abs(dt[i][1] - dt[j][1]) <= maxdt for i, j, maxdt in ijseq): yield dict(anchor_offset + dt) # # ============================================================================= # # Triangulation # # ============================================================================= # class TOATriangulator(object): """ Time-of-arrival triangulator. See section 6.6.4 of "Gravitational-Wave Physics and Astronomy" by Creighton and Anderson. An instance of this class is a function-like object that accepts a tuple of event arival times and returns a tuple providing information derived by solving for the maximum-likelihood source location assuming Gaussian-distributed timing errors. """ def __init__(self, rs, sigmas, v = lal.C_SI): """ Create and initialize a triangulator object. rs is a sequence of location 3-vectors, sigmas is a sequence of the timing uncertainties for those locations. Both sequences must be in the same order --- the first sigma in the sequence is interpreted as belonging to the first location 3-vector --- and, of course, they must be the same length. v is the speed at which the wave carrying the signals travels. The rs 3-vectors carry units of distance, the sigmas carry units of time, v carries units of distance/time. What units are used for the three is arbitrary, but they must be mutually consistent. The default value for v in c, the speed of light, in metres/second, therefore the location 3-vectors should be given in metres and the sigmas should be given in seconds unless a value for v is provided with different units. Example: >>> from numpy import array >>> triangulator = TOATriangulator([ ... array([-2161414.92636, -3834695.17889, 4600350.22664]), ... array([ -74276.0447238, -5496283.71971 , 3224257.01744 ]), ... array([ 4546374.099 , 842989.697626, 4378576.96241 ]) ... ], [ ... 0.005, ... 0.005, ... 0.005 ... ]) ... >>> This creates a TOATriangulator instance configured for the LIGO Hanford, LIGO Livingston and Virgo antennas with 5 ms time-of-arrival uncertainties at each location. Note: rs and sigmas may be iterated over multiple times. """ assert len(rs) == len(sigmas) assert len(rs) >= 2 self.rs = numpy.vstack(rs) self.sigmas = numpy.array(sigmas) self.v = v # sigma^-2 -weighted mean of locations rbar = sum(self.rs / self.sigmas[:,numpy.newaxis]**2) / sum(1 / self.sigmas**2) # the ith row is r - \bar{r} for the ith location self.R = self.rs - rbar # ith row is \sigma_i^-2 (r_i - \bar{r}) / c M = self.R / (self.v * self.sigmas[:,numpy.newaxis]) self.U, self.S, self.VT = numpy.linalg.svd(M) if len(rs) >= 3: # if the smallest singular value is less than 10^-8 * the # largest singular value, assume the network is degenerate self.singular = abs(self.S.min() / self.S.max()) < 1e-8 else: # len(rs) == 2 self.max_dt = numpy.dot(self.rs[1] - self.rs[0], self.rs[1] - self.rs[0])**.5 / self.v def __call__(self, ts): """ Triangulate the direction to the source of a signal based on a tuple of times when the signal was observed. ts is a sequence of signal arrival times. One arrival time must be provided for each of the observation locations provided when the instance was created, and the units of the arrival times must be the same as the units used for the sequence of sigmas. The return value is a tuple of information derived by solving for the maximum-likelihood source location assuming Gaussian-distributed timing errors. The return value is (n, toa, chi2 / DOF, dt) where n is a unit 3-vector pointing from the co-ordinate origin towards the source of the signal, toa is the time-of-arrival of the signal at the co-ordinate origin, chi2 / DOF is the \\chi^{2} per degree-of-freedom from to the arrival time residuals, and dt is the root-sum-square of the arrival time residuals. Example: >>> from numpy import array >>> from numpy import testing >>> triangulator = TOATriangulator([ ... array([-2161414.92636, -3834695.17889, 4600350.22664]), ... array([ -74276.0447238, -5496283.71971 , 3224257.01744 ]), ... array([ 4546374.099 , 842989.697626, 4378576.96241 ]) ... ], [ ... 0.005, ... 0.005, ... 0.005 ... ]) ... >>> n, toa, chi2_per_dof, dt = triangulator([ ... 794546669.429688, ... 794546669.41333, ... 794546669.431885 ... ]) ... >>> n array([[-0.45605637, 0.75800934, 0.46629865], [-0.45605637, 0.75800934, 0.46629865]]) >>> testing.assert_approx_equal(toa, 794546669.4269662) >>> testing.assert_approx_equal(chi2_per_dof, 0.47941941158371465) >>> testing.assert_approx_equal(dt, 0.005996370224459011) NOTE: if len(rs) >= 4, n is a 1x3 array. if len(rs) == 3, n is a 2x3 array. if len(rs) == 2, n is None. NOTE: n is not the source direction but the propagation direction of GW. Therefore, if you want source direction, you have to multiply -1. NOTE: n is represented by earth fixed coordinate, not celestial coordinate. Up to your purpose, you should transform \\phi -> RA. To do it, you can use dir2coord. """ assert len(ts) == len(self.sigmas) # change of t co-ordinate to avoid LIGOTimeGPS overflow t0 = min(ts) ts = numpy.array([float(t - t0) for t in ts]) # sigma^-2 -weighted mean of arrival times tbar = sum(ts / self.sigmas**2) / sum(1 / self.sigmas**2) # the i-th element is ts - tbar for the i-th location tau = (ts - tbar) / self.sigmas tau_prime = numpy.dot(self.U.T, tau)[:3] if len(self.rs) >= 3: if self.singular: # len(rs) == 3 np = numpy.array([tau_prime / self.S, tau_prime / self.S]) try: np[0][2] = math.sqrt(1.0 - np[0][0]**2 - np[0][1]**2) np[1][2] = -math.sqrt(1.0 - np[1][0]**2 - np[1][1]**2) except ValueError: # two point is mergered, n_0 = n_1 np.T[2] = 0.0 np /= math.sqrt(numpy.dot(np[0], np[0])) # compute n from n' n = numpy.array([numpy.zeros(3), numpy.zeros(3)]) n = numpy.dot(self.VT.T, np.T).T # safety check the nomalization of the result assert abs(numpy.dot(n[0], n[0]) - 1.0) < 1e-8 assert abs(numpy.dot(n[1], n[1]) - 1.0) < 1e-8 # arrival time at origin toa = sum((ts - numpy.dot(self.rs, n[0]) / self.v) / self.sigmas**2) / sum(1 / self.sigmas**2) # chi^{2} chi2 = sum((numpy.dot(self.R, n[0]) / (self.v * self.sigmas) - tau)**2) # root-sum-square timing residual dt = ts - toa - numpy.dot(self.rs, n[0]) / self.v dt = math.sqrt(numpy.dot(dt, dt)) else: # len(rs) >= 4 def n_prime(l, Stauprime = self.S * tau_prime, S2 = self.S * self.S): return Stauprime / (S2 + l) def secular_equation(l): np = n_prime(l) return numpy.dot(np, np) - 1 # values of l that make the denominator of # n'(l) 0 lsing = -self.S * self.S # least negative of them is used as lower # bound for bisection search root finder # (elements of S are ordered from greatest # to least, so the last element of lsing is # the least negative) l_lo = lsing[-1] # find a suitable upper bound for the root # finder FIXME: in Jolien's original code # l_hi was hard-coded to 1 but we can't # figure out why the root must be <= 1, so # I put this loop to be safe but at some # point it would be good to figure out if # 1.0 can be used because it would allow # this loop to be skipped l_hi = 1.0 while secular_equation(l_lo) / secular_equation(l_hi) > 0: l_lo, l_hi = l_hi, l_hi * 2 # solve for l l = scipy.optimize.brentq(secular_equation, l_lo, l_hi) # compute n' np = n_prime(l) # compute n from n' n = numpy.dot(self.VT.T, np) # safety check the nomalization of the result assert abs(numpy.dot(n, n) - 1.0) < 1e-8 # arrival time at origin toa = sum((ts - numpy.dot(self.rs, n) / self.v) / self.sigmas**2) / sum(1 / self.sigmas**2) # chi^{2} chi2 = sum((numpy.dot(self.R, n) / (self.v * self.sigmas) - tau)**2) # root-sum-square timing residual dt = ts - toa - numpy.dot(self.rs, n) / self.v dt = math.sqrt(numpy.dot(dt, dt)) else: # len(rs) == 2 # s_1 == s_2 == 0 # compute an n' xp = tau_prime[0] / self.S[0] try: np = numpy.array([xp, 0, math.sqrt(1-xp**2)]) except ValueError: # two point is mergered, n_0 = n_1 np = numpy.array([xp, 0, 0]) # compute n from n' n = numpy.dot(self.VT.T, np) # safety check the nomalization of the result assert abs(numpy.dot(n, n) - 1.0) < 1e-8 # arrival time at origin toa = sum((ts - numpy.dot(self.rs, n) / self.v) / self.sigmas**2) / sum(1 / self.sigmas**2) # chi^{2} chi2 = sum((numpy.dot(self.R, n) / (self.v * self.sigmas) - tau)**2) # root-sum-square timing residual dt = ts - toa - numpy.dot(self.rs, n) / self.v dt = math.sqrt(numpy.dot(dt, dt)) # set n None n = None # done return n, t0 + toa, chi2 / len(self.sigmas), dt @staticmethod def dir2coord(n, gps): """ This transforms from propagation direction vector to right ascension and declination source co-ordinates. The input is the propagation vector, n, in Earth fixed co-ordinates (x axis through Greenwich merdian and equator, z axis through North Pole), and the time. n does not need to be a unit vector. The return value is the (ra, dec) pair, in radians. NOTE: right ascension is not necessarily in [0, 2\\pi). """ # safety checks if len(n) != 3: raise ValueError("n must be a 3-vector") # normalize n n /= math.sqrt(numpy.dot(n, n)) # transform from propagation to source direction n = -n # compute ra, dec RA = numpy.arctan2(n[1], n[0]) + lal.GreenwichMeanSiderealTime(gps) DEC = numpy.arcsin(n[2]) # done return RA, DEC # # ============================================================================= # # Coincidence Parameter Distributions # # ============================================================================= # # # Base class for parameter distribution densities for use in log likelihood # ratio ranking statistics # class LnLRDensity(object): """ Base class for parameter distribution densities for use in log likelihood ratio ranking statistics. Generally several instances of (subclasses of) this will be grouped together to construct a log likelihood ratio class for use as a ranking statistic in a trigger-based search. For example, as a minimum one would expect one instance for the numerator and another for the denominator, but additional ones might be included in a practical ranking statistic implementation, for example a third might be used for storing a histogram of the candidates observed in a search. Typically, the ranking statistic implementation will provide a function to transform a candidate to the arguments to use with the .__call__() implementation, and so in this way a LnLRDensity object is generally only meaningful in the context of the ranking statistic class for which it has been constructed. """ def __call__(self, *args, **kwargs): """ Evaluate. Return the natural logarithm of the density evaluated at the given parameters. """ raise NotImplementedError def __iadd__(self, other): """ Marginalize the two densities. """ raise NotImplementedError def increment(self, *args, **kwargs): """ Increment the counts defining this density at the given parameters. """ raise NotImplementedError def copy(self): """ Return a duplicate copy of this object. """ raise NotImplementedError def finish(self): """ Ensure all internal densities are normalized, and initialize interpolator objects as needed for smooth evaluation. Must be invoked before .__call__() will yield sensible results. NOTE: for some implementations this operation will irreversibly alter the contents of the counts array, for example often this operation will involve the convolution of the counts with a density estimation kernel. If it is necessary to preserve a pristine copy of the counts data, use the .copy() method to obtain a copy of the data, first, and then .finish() the copy. """ raise NotImplementedError def samples(self): """ Generator returning a sequence of parameter values drawn from the distribution density. Some subclasses might choose not to implement this, and those that do might choose to use an MCMC-style sample generator and so the samples should not be assumed to be statistically independent. """ raise NotImplementedError def to_xml(self, name): """ Serialize to an XML fragment and return the root element of the resulting XML tree. Subclasses must chain to this method, then customize the return value as needed. """ return ligolw.LIGO_LW({u"Name": u"%s:lnlrdensity" % name}) @classmethod def get_xml_root(cls, xml, name): """ Sub-classes can use this in their overrides of the .from_xml() method to find the root element of the XML serialization. """ name = u"%s:lnlrdensity" % name xml = [elem for elem in xml.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == name] if len(xml) != 1: raise ValueError("XML tree must contain exactly one %s element named %s" % (ligolw.LIGO_LW.tagName, name)) return xml[0] @classmethod def from_xml(cls, xml, name): """ In the XML document tree rooted at xml, search for the serialized LnLRDensity object named name, and deserialize it. The return value is the deserialized LnLRDensity object. """ # Generally implementations should start with something # like this: #xml = cls.get_xml_root(xml, name) #self = cls() #return self raise NotImplementedError # # Likelihood Ratio # # starting from Bayes' theorem: # # P(coinc is a g.w. | its parameters) # P(those parameters | coinc is g.w.) * P(coinc is g.w.) # = ------------------------------------------------------ # P(parameters) # # P(those parameters | coinc is g.w.) * P(coinc is g.w.) # = ------------------------------------------------------------------------- # P(noise params) * P(coinc is not g.w.) + P(inj params) * P(coinc is g.w.) # # P(inj params) * P(coinc is g.w.) # = --------------------------------------------------------------------------- # P(noise params) * [1 - P(coinc is g.w.)] + P(inj params) * P(coinc is g.w.) # # P(inj params) * P(coinc is g.w.) # = ---------------------------------------------------------------------- # P(noise params) + [P(inj params) - P(noise params)] * P(coinc is g.w.) # # this last form above is used below to compute the LHS # # [P(inj params) / P(noise params)] * P(coinc is g.w.) # = -------------------------------------------------------------- # 1 + [[P(inj params) / P(noise params)] - 1] * P(coinc is g.w.) # # Lambda * P(coinc is g.w.) P(inj params) # = ----------------------------------- where Lambda = --------------- # 1 + (Lambda - 1) * P(coinc is g.w.) P(noise params) # # Differentiating w.r.t. Lambda shows the derivative is always positive, so # thresholding on Lambda is equivalent to thresholding on P(coinc is a g.w. # | its parameters). The limits: Lambda=0 --> P(coinc is a g.w. | its # parameters)=0, Lambda=+inf --> P(coinc is a g.w. | its parameters)=1. We # interpret Lambda=0/0 to mean P(coinc is a g.w. | its parameters)=0 since # although it can't be noise it's definitely not a g.w.. We do not protect # against NaNs in the Lambda = +inf/+inf case. class LnLikelihoodRatioMixin(object): """ Mixin class to provide the standard log likelihood ratio methods. Intended to be added to the parent classes of a ranking statistic class defining .numerator and .denominator attributes that are both instances of (subclasses of) the LnLRDensity class. The ranking statistic class will then acquire a .__call__() method allowing it to be used as a log likelihood ratio function, and also a .ln_lr_samples() method providing importance-weighted sampling of the log likelihood ratio distribution in the signal and noise (numerator and denominator) populations. """ def __call__(self, *args, **kwargs): """ Return the natural logarithm of the likelihood ratio for the given parameters. The likelihood ratio is P(params | signal) / P(params | noise). The probability that the events are the result of a gravitiational wave is a monotonically increasing function of the likelihood ratio, so ranking events from "most like a gravitational wave" to "least like a gravitational wave" can be performed by calculating the (logarithm of the) likelihood ratios. The arguments are passed verbatim to the .__call__() methods of the .numerator and .denominator attributes of self. NOTE: it is possible for sub-classes to override this method, and chain to it if they wish. There is no requirement that this method evaluate the ratio .numerator / .denominator, for example it would not invalidate the output of .ln_lr_samples() if the computation of the return value includes some kind of cuts, or other non-trivial logic. """ lnP_signal = self.numerator(*args, **kwargs) lnP_noise = self.denominator(*args, **kwargs) if math.isinf(lnP_noise) and math.isinf(lnP_signal): # need to handle a special case if lnP_noise < 0. and lnP_signal < 0.: # both probabilities are 0. "correct" # answer is -inf, because if a candidate is # in a region of parameter space where the # probability of a signal occuring is 0 # then it is not a signal. is it also, # aparently, not noise, which is curious # but irrelevant because we are seeking a # result that is a monotonically increasing # function of the probability that a # candidate is a signal, which is # impossible in this part of the parameter # space. return NegInf # all remaining cases are handled correctly by the # expression that follows, but one still deserves a # warning if lnP_noise > 0. and lnP_signal > 0.: # both probabilities are +inf. no correct # answer. NaN will be returned in this # case, and it helps to have a record in # the log of why that happened. warnings.warn("inf/inf encountered") return lnP_signal - lnP_noise def ln_lr_samples(self, random_params_seq, signal_noise_pdfs = None): """ Generator that yields an unending sequence of 3-element tuples. Each tuple's elements are a value of the natural logarithm of the likelihood rato, the natural logarithm of the relative frequency of occurance of that likelihood ratio in the signal population corrected for the relative frequency at which the sampler is yielding that value, and the natural logarithm of the relative frequency of occurance of that likelihood ratio in the noise population similarly corrected for the relative frequency at which the sampler is yielding that value. The intention is for the first element of each tuple to be added to histograms using the two relative frequencies as weights, i.e., the two relative frequencies give the number of times one should consider this one draw of log likelihood ratio to have occured in the two populations. random_params_seq is a sequence (generator is OK) yielding 3-element tuples whose first two elements provide the *args and **kwargs values passed to the numerator and denominator density functions, and whose thrid element is the natural logarithm of the probability density from which the parameters have been drawn evaluated at the parameters. On each iteration, the *args and **kwargs values yielded by random_params_seq are passed to our own .__call__() method to evalute the log likelihood ratio at that choice of parameter values. If signal_noise_pdfs is None the parameters are also passed to the .__call__() mehods of our own .numerator and .denominator attributes to obtain the signal and noise population densities at those parameters. If signal_noise_pdfs is not None then, instead, the parameters are passed to the .__call__() methods of its .numerator and .denominator attributes to obtain those densities. If histograming the results as described above, the effect is to draw paramter values from the signal and noise populations defined by signal_noise_pdfs' PDFs but with log likelihood ratios evaluated using our own PDFs. """ if signal_noise_pdfs is None: lnP_signal_func = self.numerator lnP_noise_func = self.denominator else: lnP_signal_func = signal_noise_pdfs.numerator lnP_noise_func = signal_noise_pdfs.denominator for args, kwargs, lnP_params in random_params_seq: lnP_signal = lnP_signal_func(*args, **kwargs) lnP_noise = lnP_noise_func(*args, **kwargs) yield self(*args, **kwargs), lnP_signal - lnP_params, lnP_noise - lnP_params