#!/usr/bin/env python
#
# Copyright (C) 2018--2020, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Histogram facilities.
"""
from __future__ import print_function, division
import numbers
from astropy.io import fits
import matplotlib
import numpy
import scipy.stats
import scipy.signal
from ixpeobssim.core.fitting import fit_histogram, USE_ABSOLUTE_SIGMA
from ixpeobssim.instrument.gpd import gpd_map_binning, GPD_PHYSICAL_HALF_SIDE_X,\
GPD_PHYSICAL_HALF_SIDE_Y
from ixpeobssim.utils.matplotlib_ import plt, setup_gca, draggable_colorbar
from ixpeobssim.utils.logging_ import logger
# pylint: disable=invalid-name, too-many-arguments, attribute-defined-outside-init, too-many-instance-attributes
[docs]
class xHistogramBase:
"""Base class for an n-dimensional histogram.
This interface to histograms is profoundly different for the minimal
numpy/matplotlib approach, where histogramming methods return bare
vectors of bin edges and counts. The main underlying ideas are
* we keep track of the bin contents, the bin entries and the sum of the
weights squared (the latter for the purpose of calculating the errors);
* we control the axis label and the plotting styles;
* we provide two separate interfaces, fill() and set_content(), to
fill the histogram from either unbinned or binned data;
* we support the basic arithmetics (addition, subtraction and multiplication
by a scalar);
* we support full data persistence (I/O) in FITS format.
Note that this base class is not meant to be instantiated directly, and
the interfaces to concrete histograms of specific dimensionality are
defined in the sub-classes.
Parameters
----------
binning : n-tuple of array
the bin edges on the different axes.
labels : n-tuple of strings
the text labels for the different axes.
"""
PLOT_OPTIONS = dict()
def __init__(self, binning, labels):
"""Constructor.
"""
assert len(labels) == len(binning) + 1
self.binning = tuple(binning)
self.labels = list(labels)
self.shape = tuple(len(bins) - 1 for bins in self.binning)
self.num_axes = len(self.binning)
self.content = numpy.zeros(shape=self.shape, dtype=float)
self.entries = numpy.zeros(shape=self.shape, dtype=float)
self.sumw2 = numpy.zeros(shape=self.shape, dtype=float)
[docs]
def fill(self, *data, weights=None):
"""Fill the histogram from unbinned data.
Note this method is returning the histogram instance, so that the function
call can be chained.
"""
if weights is None:
weights = numpy.ones(data[0].shape, dtype=float)
elif isinstance(weights, numbers.Number):
weights = numpy.full(data[0].shape, weights, dtype=float)
data = numpy.vstack(data).T
content, _ = numpy.histogramdd(data, bins=self.binning, weights=weights)
entries, _ = numpy.histogramdd(data, bins=self.binning)
sumw2, _ = numpy.histogramdd(data, bins=self.binning, weights=weights**2.)
self.content += content
self.entries += entries
self.sumw2 += sumw2
return self
[docs]
def set_content(self, content, entries=None, errors=None):
"""Set the bin contents programmatically from binned data.
Note this method is returning the histogram instance, so that the function
call can be chained.
"""
assert content.shape == self.shape
self.content = content
if entries is not None:
assert entries.shape == self.shape
self.entries = entries
if errors is not None:
self.set_errors(errors)
return self
[docs]
def errors(self):
"""Return the bin errors.
"""
return numpy.sqrt(self.sumw2)
[docs]
def set_errors(self, errors):
"""
"""
assert errors.shape == self.shape
self.sumw2 = errors**2.
[docs]
def bin_centers(self, axis=0):
"""Return the bin centers for a specific axis.
"""
return 0.5 * (self.binning[axis][1:] + self.binning[axis][:-1])
[docs]
def bin_widths(self, axis=0):
"""Return the bin widths for a specific axis.
"""
return numpy.diff(self.binning[axis])
def _axis_complement(self, axis=0):
"""Return a tuple of integers identifying all the histogram axes
but the one passed as an arguments. If, e.g, num_axes is 3,
calling _axis_complement(1) will return (0, 2).
This is used to sum and/or average on all the histogram dimensions
except for a given one, see, e.g., the mean() and rms() methods below.
"""
return tuple(ax for ax in range(self.num_axes) if ax != axis)
[docs]
def mean(self, axis=0):
"""Calculate the (binned) mean eastimate along a given axis.
"""
content = numpy.sum(self.content, axis=self._axis_complement(axis))
return numpy.sum(self.bin_centers(axis) * content) / self.sum()
[docs]
def rms(self, axis=0):
"""Calculate the (binned) rms eastimate along a given axis.
.. warning::
Mind this is not using anything clever (e.g. the Welford algorithm)
and is not particularly numerically stable.
"""
bin_centers = self.bin_centers(axis)
content = numpy.sum(self.content, axis=self._axis_complement(axis))
_sum = numpy.sum(bin_centers * content) / self.sum()
_sum2 = numpy.sum(bin_centers**2. * content) / self.sum()
return numpy.sqrt(_sum2 - _sum**2.)
[docs]
@staticmethod
def bisect(binning, values, side='left'):
"""Return the indices corresponding to a given array of values for a
given binning.
"""
return numpy.searchsorted(binning, values, side) - 1
[docs]
def find_bin(self, *coords):
"""Find the bin corresponding to a given set of "physical" coordinates
on the histogram axes.
This returns a tuple of integer indices that can be used to address
the histogram content.
"""
return tuple([self.bisect(binning, value) for binning, value in zip(self.binning, coords)])
[docs]
def find_bin_value(self, *coords):
"""Find the histogram content corresponding to a given set of "physical"
coordinates on the histogram axes.
"""
return self.content[self.find_bin(*coords)]
[docs]
def num_entries(self):
"""Return the number of entries in the histogram.
"""
return self.entries.sum()
[docs]
def sum(self, axis=None):
"""return the sum of weights in the histogram.
"""
return self.content.sum(axis)
[docs]
def empty_copy(self):
"""Create an empty copy of a histogram.
"""
return self.__class__(*self.binning, *self.labels)
[docs]
def copy(self):
"""Create a full copy of a histogram.
"""
hist = self.empty_copy()
hist.set_content(self.content.copy(), self.entries.copy())
return hist
def __add__(self, other):
"""Histogram addition.
"""
hist = self.empty_copy()
hist.set_content(self.content + other.content,
self.entries + other.entries,
numpy.sqrt(self.sumw2 + other.sumw2))
return hist
def __sub__(self, other):
"""Histogram subtraction.
"""
hist = self.empty_copy()
hist.set_content(self.content - other.content,
self.entries + other.entries,
numpy.sqrt(self.sumw2 + other.sumw2))
return hist
def __mul__(self, value):
"""Histogram multiplication by a scalar.
Args
----
value : array_like
The scale factor for the multiplication---must be either a scalar
or an array of the same shape of the histogram content.
"""
hist = self.empty_copy()
hist.set_content(self.content * value, self.entries, self.errors() * value)
return hist
def __rmul__(self, value):
"""Histogram multiplication by a scalar.
"""
return self.__mul__(value)
[docs]
def set_axis_label(self, axis, label):
"""Set the label for a given axis.
"""
self.labels[axis] = label
def _plot(self, **kwargs):
"""No-op plot() method, to be overloaded by derived classes.
"""
raise NotImplementedError('_plot() not implemented for %s' % self.__class__.__name__)
[docs]
def plot(self, **kwargs):
"""Plot the histogram.
"""
for key, value in self.PLOT_OPTIONS.items():
kwargs.setdefault(key, value)
self._plot(**kwargs)
setup_gca(xmin=self.binning[0][0], xmax=self.binning[0][-1],
xlabel=self.labels[0], ylabel=self.labels[1])
[docs]
@staticmethod
def label_keyword(axis):
"""Header keyword for the axis labels.
"""
return 'LABEL%d' % axis
[docs]
@staticmethod
def entries_hdu_name():
"""Extension name for the bin entries.
"""
return 'ENTRIES'
[docs]
@staticmethod
def sumw2_hdu_name():
"""Extension name for the bin entries.
"""
return 'SUMW2'
[docs]
@staticmethod
def binning_hdu_name(axis):
"""Extension name for the axis binnings.
"""
return 'BINNING%d' % axis
[docs]
@staticmethod
def binning_col_name():
"""Column name for the axis binnings.
"""
return 'EDGES'
[docs]
@classmethod
def from_file(cls, file_path):
""" Load the histogram from a FITS file.
Note that we transpose the image data back at read time---see the
comment in the save() method.
"""
logger.info('Loading histogram from %s...', file_path)
with fits.open(file_path) as hdu_list:
content = hdu_list['Primary'].data.T
entries = hdu_list[cls.entries_hdu_name()].data.T
errors = numpy.sqrt(hdu_list[cls.sumw2_hdu_name()].data.T)
num_axes = len(content.shape)
edges = [hdu_list[cls.binning_hdu_name(i)].data[cls.binning_col_name()] \
for i in range(num_axes)]
labels = [hdu_list['Primary'].header[cls.label_keyword(i)] for i in range(num_axes + 1)]
hist = cls(*edges, *labels)
hist.set_content(content, entries, errors)
return hist
[docs]
def save(self, file_path, overwrite=True, **header_keywords):
""" Save the histogram (with edges) to a FITS file.
Note that all the image data are transposed so that the thing can be
correctly visualized with standard FITS viewers---at least in two
dimensions.
"""
logger.info('Saving histogram to %s...', file_path)
# Save the histogram content in a series of images.
hdu_list = [
fits.PrimaryHDU(self.content.T),
fits.ImageHDU(self.entries.T, name=self.entries_hdu_name()),
fits.ImageHDU(self.sumw2.T, name=self.sumw2_hdu_name())
]
for key, value in header_keywords.items():
hdu_list[0].header.set(key, value)
for i, label in enumerate(self.labels):
hdu_list[0].header.set(self.label_keyword(i), label)
for i, binning in enumerate(self.binning):
col = fits.Column(name=self.binning_col_name(), array=binning, format='E')
hdu = fits.BinTableHDU.from_columns([col])
hdu.name = self.binning_hdu_name(i)
hdu_list.append(hdu)
hdu_list = fits.HDUList(hdu_list)
hdu_list.writeto(file_path, overwrite=overwrite)
[docs]
class xHistogram1d(xHistogramBase):
"""Container class for one-dimensional histograms.
"""
PLOT_OPTIONS = dict(lw=1.25, alpha=0.4, histtype='stepfilled')
def __init__(self, xbins, xlabel='', ylabel='Entries/bin'):
"""Constructor.
"""
xHistogramBase.__init__(self, (xbins, ), [xlabel, ylabel])
def _plot(self, **kwargs):
"""Overloaded make_plot() method.
"""
plt.hist(self.bin_centers(0), self.binning[0], weights=self.content, **kwargs)
[docs]
def errorbar_data(self):
"""Return the x, y, dy arrays that can be used to build a scatter plot
(with errors) from the histogram.
"""
return self.bin_centers(0), self.content, self.errors()
[docs]
def errorbar(self, **kwargs):
"""Plot the histogram as a scatter plot.
"""
kwargs.setdefault('fmt', 'o')
x, y, dy = self.errorbar_data()
plt.errorbar(x, y, dy, **kwargs)
setup_gca(xlabel=self.labels[0], ylabel=self.labels[1])
[docs]
def scatter_plot(self):
"""Turn the histogram into a scatter plot.
.. warning::
This is to be removed.
"""
return xScatterPlot(self.bin_centers(0), self.content, xlabel=self.labels[0])
[docs]
def gaussian_kde_smooth(self, bandwidth=2.):
"""Create a copy of the histogram where the weights are smoothed with a
gaussian kernel density estimatore.
Args
----
bandwidth : float
The sigma of the gaussian kernel, in (fractional) number of bins.
"""
n = int(5. * bandwidth + 0.5)
kernel_domain = numpy.linspace(-n, n, 2 * n + 1)
kernel = scipy.stats.norm(0, bandwidth).pdf(kernel_domain)
hist = self.copy()
hist.content = numpy.convolve(self.content, kernel, 'same')
return hist
[docs]
def fit(self, model, p0=None, sigma=None, xmin=-numpy.inf, xmax=numpy.inf,
absolute_sigma=USE_ABSOLUTE_SIGMA, check_finite=True, method=None,
verbose=True, **kwargs):
"""Fit the histogram with a model.
"""
return fit_histogram(model, self, p0, sigma, xmin, xmax, absolute_sigma,
check_finite, method, verbose, **kwargs)
[docs]
class xScatterPlot:
"""Small class encapsulating a scatter plot.
Technically speaking, this would not belong here, as a scatter plot is not
strictly related to any of the histogram classes, but a 1-dimensional
histogram with errors can techincally be turned into a scatter plot,
so we introduce the concept here for completeness.
.. warning::
Consider removing this class.
"""
def __init__(self, x, y, dy=None, dx=None, xlabel=None, ylabel=None):
"""Constructor.
"""
self.x = x
self.y = y
self.dy = dy
self.dx = dx
self.labels = (xlabel, ylabel)
[docs]
def plot(self, **kwargs):
"""Plot the scatter plot.
"""
kwargs.setdefault('fmt', 'o')
kwargs.setdefault('markeredgecolor', 'black')
plt.errorbar(self.x, self.y, self.dy, self.dx, **kwargs)
setup_gca(xlabel=self.labels[0], ylabel=self.labels[1])
[docs]
class xHistogram2d(xHistogramBase):
"""Container class for two-dimensional histograms.
"""
PLOT_OPTIONS = dict(cmap=plt.get_cmap('hot'))
def __init__(self, xbins, ybins, xlabel='', ylabel='', zlabel='Entries/bin'):
"""Constructor.
"""
xHistogramBase.__init__(self, (xbins, ybins), [xlabel, ylabel, zlabel])
def _plot(self, logz=False, **kwargs):
"""Overloaded make_plot() method.
"""
x, y = (v.flatten() for v in numpy.meshgrid(self.bin_centers(0), self.bin_centers(1)))
bins = self.binning
w = self.content.T.flatten()
if logz:
# Hack for a deprecated functionality in matplotlib 3.3.0
# Parameters norm and vmin/vmax should not be used simultaneously
# If logz is requested, we intercent the bounds when created the norm
# and refrain from passing vmin/vmax downstream.
vmin = kwargs.pop('vmin', None)
vmax = kwargs.pop('vmax', None)
kwargs.setdefault('norm', matplotlib.colors.LogNorm(vmin, vmax))
plt.hist2d(x, y, bins, weights=w, **kwargs)
colorbar = draggable_colorbar()
if self.labels[2] is not None:
colorbar.set_label(self.labels[2])
[docs]
def gaussian_kde_smooth(self, bandwidth=(2., 2.)):
"""Create a copy of the histogram where the weights are smoothed with a
gaussian kernel density estimatore.
.. warning::
This is essentially untested.
Args
----
bandwidth : 2-element tuple float
The sigma of the gaussian kernel, in (fractional) number of bins.
"""
nx, ny = [int(5. * bw + 0.5) for bw in bandwidth]
dx = numpy.linspace(-nx, nx, 2 * nx + 1)
dy = numpy.linspace(-ny, ny, 2 * ny + 1)
kernel_domain = numpy.dstack(numpy.meshgrid(dx, dy))
sigma = numpy.array([[bandwidth[0], 0.], [0., bandwidth[1]]])
kernel = scipy.stats.multivariate_normal((0., 0.), sigma).pdf(kernel_domain)
hist = self.copy()
hist.content = scipy.signal.convolve2d(self.content, kernel, 'same')
return hist
[docs]
def hslice(self, bin_):
"""Return the horizontal slice for a given bin.
"""
hist = xHistogram1d(self.binning[0], self.labels[0])
hist.set_content(self.content[:, bin_], self.entries[:, bin_])
return hist
[docs]
def hslices(self):
"""Return a list of all the horizontal slices.
"""
return tuple(self.hslice(bin_) for bin_ in range(self.shape[1]))
[docs]
def hbisect(self, y):
"""Return the horizontal slice corresponding to a given y value.
"""
return self.hslice(self.bisect(self.binning[1], y))
[docs]
def vslice(self, bin_):
"""Return the vertical slice for a given bin.
"""
hist = xHistogram1d(self.binning[1], self.labels[1])
hist.set_content(self.content[bin_], self.entries[bin_])
return hist
[docs]
def vslices(self):
"""Return a list of all the vertical slices.
"""
return tuple(self.vslice(bin) for bin in range(self.shape[0]))
[docs]
def vbisect(self, x):
"""Return the vertical slice corresponding to a given y value.
"""
return self.vslice(self.bisect(self.binning[0], x))
[docs]
class xModulationCube2d(xHistogram2d):
"""Specialized class for modulations cubes.
"""
def __init__(self, xbins, ybins):
"""Constructor.
"""
labels = ['Energy [keV]', '$\\phi$ [rad]', 'Entries/bin']
xHistogram2d.__init__(self, xbins, ybins, *labels)
[docs]
class xGpdMap2d(xHistogram2d):
"""2-dimensional GPD map.
"""
def __init__(self, nside=10, zlabel='Entries/bin'):
"""Constructor.
"""
edges = gpd_map_binning(GPD_PHYSICAL_HALF_SIDE_X, GPD_PHYSICAL_HALF_SIDE_Y, nside)
labels = ['x [mm]', 'y [mm]', zlabel]
xHistogram2d.__init__(self, *edges, *labels)
[docs]
class xHistogram3d(xHistogramBase):
"""Container class for three-dimensional histograms.
"""
def __init__(self, xbins, ybins, zbins, xlabel='', ylabel='', zlabel='',
wlabel='Entries/bin'):
"""Constructor.
"""
xHistogramBase.__init__(self, (xbins, ybins, zbins), [xlabel, ylabel, zlabel, wlabel])
[docs]
class xGpdMap3d(xHistogram3d):
"""Three-dimensional histogram where the first two axes represent the
GPD active area in Physical coordinates.
"""
def __init__(self, nside, zbins, zlabel='', wlabel='Entries/bin'):
"""Constructor.
"""
edges = gpd_map_binning(GPD_PHYSICAL_HALF_SIDE_X, GPD_PHYSICAL_HALF_SIDE_Y, nside)
xHistogram3d.__init__(self, *edges, zbins, 'x [mm]', 'y [mm]', zlabel, wlabel)