Commit d2474455 authored by Leo P. Singer's avatar Leo P. Singer
Browse files

Add posterior samples to FITS sky map postprocessing

This moves development from the original repository:
https://github.com/farr/skyarea

Some minor changes occurred in the course of refactoring:
 * The contents of the module `sky_area.sky_area_clustering` moved
   to `lalinference.kde`.
 * The Python script `run_sky_area` was renamed to
   `lalinference_samples_to_fits` and the command line interface
   was ported from `optparse` to `argparse`.
parent 8129345a
Pipeline #11976 passed with stages
in 25 minutes and 37 seconds
......@@ -34,6 +34,7 @@ pybin_scripts = \
imrtgr_imr_consistency_test \
lalinference_average_skymaps \
lalinference_pipe \
lalinference_samples_to_fits \
cbcBayesBurstPostProc \
cbcBayesCombinePosteriors \
cbcBayesCompPos \
......
#
# kde.py: KDE utilities for density estimation in unusual topologies.
#
# Copyright 2012 Will M. Farr <will.farr@ligo.org>
# Modified 2017 Leo P. Singer <leo.singer@ligo.org> to handle 1D KDEs
# gracefully.
# Copyright (C) 2012-2017 Will M. Farr <will.farr@ligo.org>
# Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
......@@ -21,9 +18,28 @@
#
from __future__ import division
from six.moves import copyreg
from six import with_metaclass
from functools import partial
from astropy.coordinates import SkyCoord
from astropy.table import Table
from astropy.utils.console import ProgressBar
from astropy.utils.misc import NumpyRNGContext
import healpy as hp
import logging
import numpy as np
from scipy.stats import gaussian_kde
from . import distance
from .bayestar import moc
from .eigenframe import EigenFrame
log = logging.getLogger()
__all__ = ('BoundedKDE', 'Clustered2DSkyKDE', 'Clustered3DSkyKDE',
'Clustered2Plus1DSkyKDE')
class BoundedKDE(gaussian_kde):
"""Density estimation using a KDE on bounded domains.
......@@ -111,3 +127,408 @@ class BoundedKDE(gaussian_kde):
construct the KDE that have a lower KDE density than ``pt``."""
return np.count_nonzero(self(self.dataset) < self(pt)) / self.n
def km_assign(mus, cov, pts):
"""Implements the assignment step in the k-means algorithm. Given a
set of centers, ``mus``, a covariance matrix used to produce a
metric on the space, ``cov``, and a set of points, ``pts`` (shape
``(npts, ndim)``), assigns each point to its nearest center,
returning an array of indices of shape ``(npts,)`` giving the
assignments.
"""
k = mus.shape[0]
n = pts.shape[0]
dists = np.zeros((k, n))
for i, mu in enumerate(mus):
dx = pts - mu
try:
dists[i, :] = np.sum(dx * np.linalg.solve(cov, dx.T).T, axis=1)
except np.linalg.LinAlgError:
dists[i, :] = np.nan
return np.nanargmin(dists, axis=0)
def km_centroids(pts, assign, k):
"""Implements the centroid-update step of the k-means algorithm.
Given a set of points, ``pts``, of shape ``(npts, ndim)``, and an
assignment of each point to a region, ``assign``, and the number
of means, ``k``, returns an array of shape ``(k, ndim)`` giving
the centroid of each region.
"""
mus = np.zeros((k, pts.shape[1]))
for i in range(k):
sel = assign == i
if np.sum(sel) > 0:
mus[i, :] = np.mean(pts[sel, :], axis=0)
else:
mus[i, :] = pts[np.random.randint(pts.shape[0]), :]
return mus
def k_means(pts, k):
"""Implements k-means clustering on the set of points.
:param pts: Array of shape ``(npts, ndim)`` giving the points on
which k-means is to operate.
:param k: Positive integer giving the number of regions.
:return: ``(centroids, assign)``, where ``centroids`` is an ``(k,
ndim)`` array giving the centroid of each region, and ``assign``
is a ``(npts,)`` array of integers between 0 (inclusive) and k
(exclusive) indicating the assignment of each point to a region.
"""
assert pts.shape[0] > k, 'must have more points than means'
cov = np.cov(pts, rowvar=0)
mus = np.random.permutation(pts)[:k, :]
assign = km_assign(mus, cov, pts)
while True:
old_mus = mus
old_assign = assign
mus = km_centroids(pts, assign, k)
assign = km_assign(mus, cov, pts)
if np.all(assign == old_assign):
break
return mus, assign
def _cluster(cls, pts, trials, i, seed):
k = i // trials
if k == 0:
raise ValueError('Expected at least one cluster')
try:
if k == 1:
assign = np.zeros(len(pts), dtype=np.intp)
else:
with NumpyRNGContext(i + seed):
_, assign = k_means(pts, k)
obj = cls(pts, assign=assign)
except np.linalg.LinAlgError:
return -np.inf,
else:
return obj.bic, k, obj.kdes
class _mapfunc(object):
def __init__(self, func):
self._func = func
def __call__(self, i_arg):
i, arg = i_arg
return i, self._func(arg)
class ClusteredKDE(object):
def __init__(self, pts, max_k=40, trials=5, assign=None,
multiprocess=False):
self.multiprocess = multiprocess
if assign is None:
log.info('clustering ...')
# Make sure that each thread gets a different random number state.
# We start by drawing a random integer s in the main thread, and
# then the i'th subprocess will seed itself with the integer i + s.
#
# The seed must be an unsigned 32-bit integer, so if there are n
# threads, then s must be drawn from the interval [0, 2**32 - n).
seed = np.random.randint(0, 2**32 - max_k * trials)
func = partial(_cluster, type(self), pts, trials, seed=seed)
self.bic, self.k, self.kdes = max(
self._map(func, range(trials, (max_k + 1) * trials)),
key=lambda items: items[:2])
else:
# Build KDEs for each cluster, skipping degenerate clusters
self.kdes = []
npts, ndim = pts.shape
self.k = assign.max() + 1
for i in range(self.k):
sel = (assign == i)
cluster_pts = pts[sel, :]
# Equivalent to but faster than len(set(pts))
# FIXME: replace with the following in Numpy >= 1.13.0:
# nuniq = len(np.unique(cluster_pts, axis=0))
nuniq = len(np.unique(
np.ascontiguousarray(cluster_pts).view(
'V{}'.format(ndim * pts.dtype.itemsize))))
# Skip if there are fewer unique points than dimensions
if nuniq <= ndim:
continue
try:
kde = gaussian_kde(cluster_pts.T)
except (np.linalg.LinAlgError, ValueError):
# If there are fewer unique points than degrees of freedom,
# then the KDE will fail because the covariance matrix is
# singular. In that case, don't bother adding that cluster.
pass
else:
self.kdes.append(kde)
# Calculate BIC
# The number of parameters is:
#
# * ndim for each centroid location
#
# * (ndim+1)*ndim/2 Kernel covariances for each cluster
#
# * one weighting factor for the cluster (minus one for the
# overall constraint that the weights must sum to one)
nparams = self.k*ndim + self.k*((ndim+1)*(ndim)/2) + self.k - 1
with np.errstate(divide='ignore'):
self.bic = (
np.sum(np.log(self.eval_kdes(pts))) -
nparams/2.0*np.log(npts))
def eval_kdes(self, pts):
pts = pts.T
return sum(w * kde(pts) for w, kde in zip(self.weights, self.kdes))
def __call__(self, pts):
return self.eval_kdes(pts)
@property
def weights(self):
"""Get the cluster weights: the fraction of the points within each
cluster."""
w = np.asarray([kde.n for kde in self.kdes])
return w / np.sum(w)
def _map(self, func, items):
# FIXME: ProgressBar.map(..., multiprocess=True) uses imap_unordered,
# but we want the result to come back in order. This should be fixed,
# or at least correctly documented, in Astropy.
if self.multiprocess:
_, result = zip(*sorted(ProgressBar.map(_mapfunc(func),
list(enumerate(items)),
multiprocess=True)))
return list(result)
else:
return ProgressBar.map(func, items, multiprocess=False)
class SkyKDE(ClusteredKDE):
@classmethod
def transform(cls, pts):
"""Override in sub-classes to transform points."""
raise NotImplementedError
def __init__(self, pts, max_k=40, trials=5, assign=None,
multiprocess=False):
if assign is None:
pts = self.transform(pts)
super(SkyKDE, self).__init__(
pts, max_k=max_k, trials=trials, assign=assign,
multiprocess=multiprocess)
def __call__(self, pts):
return super(SkyKDE, self).__call__(self.transform(pts))
def _bayestar_adaptive_grid(self, top_nside=16, rounds=8):
"""Implement of the BAYESTAR adaptive mesh refinement scheme as
described in Section VI of Singer & Price 2016, PRD, 93, 024013
(http://dx.doi.org/10.1103/PhysRevD.93.024013).
FIXME: Consider refactoring BAYESTAR itself to perform the adaptation
step in Python.
"""
top_npix = hp.nside2npix(top_nside)
nrefine = top_npix // 4
cells = zip([0] * nrefine, [top_nside // 2] * nrefine, range(nrefine))
for iround in range(rounds - 1):
print('adaptive refinement round {} of {} ...'.format(
iround + 1, rounds - 1))
cells = sorted(cells, key=lambda p_n_i: p_n_i[0] / p_n_i[1]**2)
new_nside, new_ipix = np.transpose([
(nside * 2, ipix * 4 + i)
for _, nside, ipix in cells[-nrefine:] for i in range(4)])
theta, phi = hp.pix2ang(new_nside, new_ipix, nest=True)
ra = phi
dec = 0.5 * np.pi - theta
p = self(np.column_stack((ra, dec)))
cells[-nrefine:] = zip(p, new_nside, new_ipix)
return cells
def as_healpix(self):
"""Returns a HEALPix multi-order map of the posterior density."""
post, nside, ipix = zip(*self._bayestar_adaptive_grid())
post = np.asarray(list(post))
nside = np.asarray(list(nside))
ipix = np.asarray(list(ipix))
# Make sure that sky map is normalized (it should be already)
post /= np.sum(post * hp.nside2pixarea(nside))
# Convert from NESTED to UNIQ pixel indices
order = np.log2(nside).astype(int)
uniq = moc.nest2uniq(order.astype(np.int8), ipix.astype(np.uint64))
# Done!
return Table([uniq, post], names=['UNIQ', 'PROBDENSITY'])
# We have to put in some hooks to make instances of Clustered2DSkyKDE picklable
# because we dynamically create subclasses with different values of the 'frame'
# class variable. This gets even trickier because we need both the class and
# instance objects to be picklable.
class _Clustered2DSkyKDEMeta(type):
"""Metaclass to make dynamically created subclasses of Clustered2DSkyKDE
picklable."""
def _Clustered2DSkyKDEMeta_pickle(cls):
"""Pickle dynamically created subclasses of Clustered2DSkyKDE."""
return type, (cls.__name__, cls.__bases__, {'frame': cls.frame})
# Register function to pickle subclasses of Clustered2DSkyKDE.
copyreg.pickle(_Clustered2DSkyKDEMeta, _Clustered2DSkyKDEMeta_pickle)
def _Clustered2DSkyKDE_factory(name, frame):
"""Unpickle instances of dynamically created subclasses of
Clustered2DSkyKDE.
FIXME: In Python 3, we could make this a class method of Clustered2DSkyKDE.
Unfortunately, Python 2 is picky about pickling bound class methods."""
new_cls = type(name, (Clustered2DSkyKDE,), {'frame': frame})
return super(Clustered2DSkyKDE, Clustered2DSkyKDE).__new__(new_cls)
class Clustered2DSkyKDE(with_metaclass(_Clustered2DSkyKDEMeta, SkyKDE)):
r"""Represents a kernel-density estimate of a sky-position PDF that has
been decomposed into clusters, using a different kernel for each
cluster.
The estimated PDF is
.. math::
p\left( \vec{\theta} \right) = \sum_{i = 0}^{k-1} \frac{N_i}{N}
\sum_{\vec{x} \in C_i} N\left[\vec{x}, \Sigma_i\right]\left( \vec{\theta}
\right)
where :math:`C_i` is the set of points belonging to cluster
:math:`i`, :math:`N_i` is the number of points in this cluster,
:math:`\Sigma_i` is the optimally-converging KDE covariance
associated to cluster :math:`i`.
The number of clusters, :math:`k` is chosen to maximize the `BIC
<http://en.wikipedia.org/wiki/Bayesian_information_criterion>`_
for the given set of points being drawn from the clustered KDE.
The points are assigned to clusters using the k-means algorithm,
with a decorrelated metric. The overall clustering behavior is
similar to the well-known `X-Means
<http://www.cs.cmu.edu/~dpelleg/download/xmeans.pdf>`_ algorithm.
"""
frame = None
@classmethod
def transform(cls, pts):
pts = SkyCoord(*pts.T, unit='rad').transform_to(cls.frame).spherical
return np.column_stack((pts.lon.rad, np.sin(pts.lat.rad)))
def __new__(cls, pts, *args, **kwargs):
frame = EigenFrame.for_coords(SkyCoord(*pts.T, unit='rad'))
name = '{:s}_{:x}'.format(cls.__name__, id(frame))
new_cls = type(name, (cls,), {'frame': frame})
return super(Clustered2DSkyKDE, cls).__new__(new_cls)
def __reduce__(self):
"""Pickle instances of dynamically created subclasses of
Clustered2DSkyKDE."""
factory_args = self.__class__.__name__, self.frame
return _Clustered2DSkyKDE_factory, factory_args, self.__dict__
def eval_kdes(self, pts):
base = super(Clustered2DSkyKDE, self).eval_kdes
dphis = (0.0, 2.0*np.pi, -2.0*np.pi)
phi, z = pts.T
return sum(base(np.column_stack((phi+dphi, z))) for dphi in dphis)
class Clustered3DSkyKDE(SkyKDE):
"""Like :class:`Clustered2DSkyKDE`, but clusters in 3D
space. Can compute volumetric posterior density (per cubic Mpc),
and also produce Healpix maps of the mean and standard deviation
of the log-distance."""
@classmethod
def transform(cls, pts):
return SkyCoord(*pts.T, unit='rad').cartesian.xyz.value.T
def __call__(self, pts, distances=False):
"""Given an array of positions in RA, DEC, compute the marginal sky
posterior and optinally the conditional distance parameters."""
func = partial(distance.cartesian_kde_to_moments,
datasets=[_.dataset for _ in self.kdes],
inverse_covariances=[_.inv_cov for _ in self.kdes],
weights=self.weights)
probdensity, mean, std = zip(*self._map(func, self.transform(pts)))
if distances:
mu, sigma, norm = distance.moments_to_parameters(mean, std)
return probdensity, mu, sigma, norm
else:
return probdensity
def posterior_spherical(self, pts):
"""Evaluate the posterior probability density in spherical polar
coordinates, as a function of (ra, dec, distance)."""
return super(Clustered3DSkyKDE, self).__call__(pts)
def as_healpix(self):
"""Returns a HEALPix multi-order map of the posterior density
and conditional distance distribution parameters."""
m = super(Clustered3DSkyKDE, self).as_healpix()
order, ipix = moc.uniq2nest(m['UNIQ'])
nside = 2 ** order.astype(int)
theta, phi = hp.pix2ang(nside, ipix.astype(np.int64), nest=True)
p = np.column_stack((phi, 0.5 * np.pi - theta))
print('evaluating distance layers ...')
_, m['DISTMU'], m['DISTSIGMA'], m['DISTNORM'] = self(p, distances=True)
return m
class Clustered2Plus1DSkyKDE(Clustered3DSkyKDE):
"""A hybrid sky map estimator that uses a 2D clustered KDE for the marginal
distribution as a function of (RA, Dec) and a 3D clustered KDE for the
conditional distance distribution."""
def __init__(self, pts, max_k=40, trials=5, assign=None,
multiprocess=False):
if assign is None:
self.twod = Clustered2DSkyKDE(
pts, max_k=max_k, trials=trials, assign=assign,
multiprocess=multiprocess)
super(Clustered2Plus1DSkyKDE, self).__init__(
pts, max_k=max_k, trials=trials, assign=assign,
multiprocess=multiprocess)
def __call__(self, pts, distances=False):
probdensity = self.twod(pts)
if distances:
base = super(Clustered2Plus1DSkyKDE, self)
_, distmu, distsigma, distnorm = base.__call__(pts, distances=True)
return probdensity, distmu, distsigma, distnorm
else:
return probdensity
def posterior_spherical(self, pts):
"""Evaluate the posterior probability density in spherical polar
coordinates, as a function of (ra, dec, distance)."""
base = super(Clustered2Plus1DSkyKDE, self)
return self(pts) * base.posterior_spherical(pts) / base.__call__(pts)
#
# Copyright (C) 2011-2017 Will M. Farr <will.farr@ligo.org>
# Leo P. Singer <leo.singer@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Generate a FITS sky map file from posterior samples using clustering and
kernel density estimation.
"""
if __name__ == '__main__':
# Command line interface.
from argparse import FileType, SUPPRESS
from lalinference.bayestar.command import ArgumentParser, DirType
parser = ArgumentParser()
parser.add_argument('samples', type=FileType('rb'), metavar='SAMPLES.hdf5',
help='posterior samples file')
# Only present for backward compatibility with --samples syntax
parser.add_argument('--samples', action='store_false', dest='_ignored',
help=SUPPRESS)
parser.add_argument('--outdir', '-o', default='.', type=DirType(),
help='output directory')
parser.add_argument('--fitsoutname', default='skymap.fits.gz',
metavar='SKYMAP.fits[.gz]',
help='filename for the FITS file')
parser.add_argument('--loadpost', type=FileType('rb'),
metavar='SKYPOST.obj',
help='filename for pickled posterior state')
parser.add_argument('--maxpts', type=int,
help='maximum number of posterior points to use')
parser.add_argument('--trials', type=int, default=5,
help='number of trials at each clustering number '
'[default: %(default)s]')
parser.add_argument('--disable-distance-map', dest='enable_distance_map',
action='store_false', default=True,
help='disable output of HEALPix map of distance'
' estimates')
parser.add_argument('-j', '--jobs', action='store_true',
help='Use multiple threads')
parser.add_argument('--seed', type=int, help='use specified random seed '
'[default: use system entropy]')
parser.add_argument('--objid', help='event ID to store in FITS header')
args = parser.parse_args()
# Late imports
from lalinference import io
from lalinference.bayestar.sky_map import rasterize
from lalinference import InferenceVCSInfo as vcs_info
from astropy.table import Table
from astropy.time import Time
import numpy as np
import os
import sys
import six.moves.cPickle as pickle
from lalinference.kde import Clustered2Plus1DSkyKDE, Clustered2DSkyKDE
import logging
log = logging.getLogger()
log.info('reading samples')
try:
data = io.read_samples(args.samples.name)
except IOError:
# FIXME: remove this code path once we support only HDF5
data = Table.read(args.samples, format='ascii')
if args.maxpts is not None:
# Shuffle the data and take a random subsample.
# Note that if arg.maxpts > len(data), then this
# just selects the entire array.
log.info('taking random subsample of chain')
if args.seed is not None:
# use fixed seed so that results are reproducible
np.random.seed(args.seed)
data = data[np.random.choice(len(data), args.maxpts, replace=False)]
try:
dist = data['dist']
except KeyError:
try:
dist = data['distance']
except KeyError:
dist = None
if args.loadpost is None:
if dist is None:
if args.enable_distance_map:
parser.error("The posterior samples file '{0}' does not have "
"a distance column named 'dist' or 'distance'. "
"Cannot generate distance map.".format(
args.samples.name))
pts = np.column_stack((data['ra'], data['dec']))
else:
pts = np.column_stack((data['ra'], data['dec'], dist))
if args.enable_distance_map:
cls = Clustered2Plus1DSkyKDE
else:
cls = Clustered2DSkyKDE
skypost = cls(pts, trials=args.trials, multiprocess=args.jobs)
log.info('pickling')
with open(os.path.join(args.outdir, 'skypost.obj'), 'wb') as out:
pickle.dump(skypost, out)
else:
skypost = pickle.load(args.loadpost)
skypost.multiprocess = args.j
log.info('making skymap')
hpmap = rasterize(skypost.as_healpix())
hpmap.meta['creator'] = parser.prog
hpmap.meta['origin'] = 'LIGO/Virgo'
hpmap.meta['gps_creation_time'] = Time.now().gps
hpmap.meta['vcs_info'] = vcs_info
hpmap.meta['history'] = [