# Copyright (C) 2015--2022, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Binned data structures pertaining to the (spectro-)polarimetric analysis.
"""
from __future__ import print_function, division
import os
import numbers
import astropy
from astropy.io import fits
import numpy
import scipy
from ixpeobssim.binning.base import xEventBinningBase, xBinnedFileBase
from ixpeobssim.binning.fmt import xBinTableHDUPHA1, xBinTableHDUPCUBE, xBinTableHDUEBOUNDS
from ixpeobssim.core.fitsio import xFITSImageBase
from ixpeobssim.core.hist import xScatterPlot
from ixpeobssim.core.stokes import xModelStokesParameters
from ixpeobssim.evt.align import align_stokes_parameters
from ixpeobssim.evt.kislat2015 import xStokesAnalysis
from ixpeobssim.irf import load_modf
from ixpeobssim.irf.caldb import irf_file_path
from ixpeobssim.irf.ebounds import NUM_CHANNELS
from ixpeobssim.utils.astro import region_compound
from ixpeobssim.utils.logging_ import logger, abort
from ixpeobssim.utils.matplotlib_ import plt, setup_gca, nlog_errorbars,\
plot_arrows, plot_ellipse, setup_gca_stokes
from ixpeobssim.utils.units_ import arcmin_to_arcsec
# pylint: disable=invalid-name, no-member, attribute-defined-outside-init
# pylint: disable=too-many-locals, too-many-instance-attributes
[docs]
class xEventBinningPHA1Base(xEventBinningBase):
"""Base class for PHA1 binning.
"""
ANCRFILE = None
XFLT0001 = None
SUPPORTED_KWARGS = ['mc', 'weights', 'weightcol', 'grayfilter']
def __init__(self, file_path, **kwargs):
"""Overloaded constructor.
"""
xEventBinningBase.__init__(self, file_path, **kwargs)
if self.irf_name is None:
abort('Please set --irfname command-line switch explicitely')
self.binning = numpy.linspace(0, NUM_CHANNELS, NUM_CHANNELS + 1) - 0.5
self.channels = numpy.arange(NUM_CHANNELS)
self.livetime = self.event_file.livetime()
def _binned_data(self, stokes_weights=None):
"""Base binning algorithm to be specialized by derived classes.
Args
----
stokes_weights : array_like, optional
The weights to be assigned to the histogram entries to distinguish
between the PHA1 (None), PHA1U (2. * sin(2. * phi)) and
PHA1Q (2. * cos(2. * phi)) binning mode. Note these should not
be confused with the actual event weights in an ensemble weighted
analysis---the latter are automaically picked up in the function body
depending on the command-line arguments passed to the app.
Note that, in terms of assigning errors to the output spectra when the
weights are not None, we use the standard formula for weighted
histograms, where the errors are just the square root of the sum of the
weights squared, see e.g.
www.hep.uiuc.edu/e687/memos/weight_err.ps
"""
# Retrieve the pi.
pi = self.event_file.pi_data(self.get('mc'))
# Stokes weights, i.e., I, Q or U?
if stokes_weights is None:
stokes_weights = numpy.full(pi.shape, 1.)
# Event weights.
event_weights = self.weight_data()
# Full weights.
weights = stokes_weights * event_weights
# Accumulate the necessary histograms.
w, _ = numpy.histogram(pi, bins=self.binning, weights=weights)
w2, _ = numpy.histogram(pi, bins=self.binning, weights=weights**2)
ew, _ = numpy.histogram(pi, bins=self.binning, weights=event_weights)
ew2, _ = numpy.histogram(pi, bins=self.binning, weights=event_weights**2.)
# Calculate the "normalization" for our weight prescription.
# Note the protection against zero-division errors, see
# https://bitbucket.org/ixpesw/ixpeobssim/issues/470/runtime-warning-in-xpbin
N = numpy.zeros(ew.shape)
mask = ew2 != 0
N[mask] = ew[mask] / ew2[mask] / self.livetime
# And we're good to go :-)
rate = N * w
error = N * numpy.sqrt(w2)
return self.channels, rate, error
def _normalized_binned_data(self, weights):
"""Normalized version of the binning routine for Q/I and U/I normalized
Stokes parameters.
Note that some care must be taken in performing the division and
propagating the uncertainties in order to avoid zero-division runtime
errors.
For the uncertainties we carefully avoid propagating the relative errors,
to avoid problems with points where uq == 0, duq != 0 and duq/uq is nan.
For the bins where i == 0 we currently explicitly set both the value and
the uncertainty of the normalized Stokes parameter to 0, whith the
intent of doing the right thing when we rebin spectra.
Warning
-------
We might want to put some more thought into what the error should be
when i == 0, because in this case, if we don't rebin, we have potential
problems downstream with the chisquare calculation (although one might
argue that in this case the chisquare is not the right statistics to
start with.)
"""
# Retrieve the releval un-normalized spectra.
_, uq, duq = self._binned_data(weights)
_, i, di = self._binned_data(None)
# Initialize the normalized rate and associated errors.
rate = numpy.full(i.shape, 0.)
error = numpy.full(i.shape, 0.)
# Mask to avoid runtime errors due to zero division downstream.
mask = i > 0.
# Restric all the calculation to the bins with at least 1 I count.
uq = uq[mask]
duq = duq[mask]
i = i[mask]
di = di[mask]
# Overerwrite with the proper values the portion of the output array
# where i > 0.
#
# Note that we propagate the absolute error as, if we were to
# perform the error propagation through the relative errors, we
# could be bitten by occasional data points where uq == 0, duq/uq is nan,
# and yet the point itself is perfectly legitimate.
rate[mask] = uq / i
error[mask] = numpy.sqrt((uq * di)**2. + (i * duq)**2.) / (i**2.)
return self.channels, rate, error
[docs]
def binned_data(self):
"""Method to be overloaded by derived classes.
"""
raise NotImplementedError
[docs]
def bin_(self):
"""Overloaded method.
"""
primary_hdu = self.build_primary_hdu()
data = self.binned_data()
spec_hdu = xBinTableHDUPHA1(data)
spec_hdu.setup_header(self.event_file.primary_keywords())
du_id = self.event_file.du_id()
keywords = [
('EXPOSURE', self.event_file.livetime()),
('RESPFILE', irf_file_path(self.irf_name, du_id, 'rmf')),
('ANCRFILE', irf_file_path(self.irf_name, du_id, self.ANCRFILE,
gray_filter=self.get('grayfilter'))),
('XFLT0001', self.XFLT0001)
]
spec_hdu.setup_header(keywords)
hdu_list = fits.HDUList([primary_hdu, spec_hdu])
self.write_output_file(hdu_list)
[docs]
class xEventBinningPHA1(xEventBinningPHA1Base):
"""Original algorithm for PHA1 files.
"""
INTENT = 'count spectrum'
ANCRFILE = 'arf'
XFLT0001 = 'Stokes:0'
[docs]
def binned_data(self):
"""Overloaded binning algorithm.
"""
return xEventBinningPHA1Base._binned_data(self, None)
[docs]
class xEventBinningPHA1Q(xEventBinningPHA1Base):
"""Class for PHA1 binning.
"""
INTENT = 'Stokes Q spectrum'
ANCRFILE = 'mrf'
XFLT0001 = 'Stokes:1'
[docs]
def binned_data(self):
"""Overloaded binning algorithm.
"""
return xEventBinningPHA1Base._binned_data(self, self.event_file.q_data())
[docs]
class xEventBinningPHA1U(xEventBinningPHA1Base):
"""Subclass for creating Stokes U spectra.
"""
INTENT = 'Stokes U spectrum'
ANCRFILE = 'mrf'
XFLT0001 = 'Stokes:2'
[docs]
def binned_data(self):
"""Overloaded binning algorithm.
"""
return xEventBinningPHA1Base._binned_data(self, self.event_file.u_data())
[docs]
class xEventBinningPHA1QN(xEventBinningPHA1Q):
"""Subclass for creating normalized Stokes Q spectra.
"""
INTENT = 'normalized Stokes Q spectrum (Q/I)'
ANCRFILE = 'modf'
[docs]
def binned_data(self):
"""Overloaded binning algorithm.
"""
return self._normalized_binned_data(self.event_file.q_data())
[docs]
class xEventBinningPHA1UN(xEventBinningPHA1U):
"""Subclass for creating normalized Stokes U spectra.
"""
INTENT = 'normalized Stokes U spectrum (U/I)'
ANCRFILE = 'modf'
[docs]
def binned_data(self):
"""Overloaded binning algorithm.
"""
return self._normalized_binned_data(self.event_file.u_data())
[docs]
class xBinnedCountSpectrum(xBinnedFileBase):
"""Binned count spectrum.
"""
def _read_data(self):
"""Overloaded method.
"""
# pylint: disable=no-member
self.spectrum_header = self.hdu_list['SPECTRUM'].header
self._read_binary_table_data(xBinTableHDUPHA1.NAME)
def __iadd__(self, other):
""" Overloaded method for PHA1 binned file addition.
"""
# pylint: disable=no-member, attribute-defined-outside-init
self._check_iadd(other, ('RATE', 'STAT_ERR'), ('CHANNEL', ))
self.RATE += other.RATE
self.STAT_ERR = numpy.sqrt(self.STAT_ERR**2. + other.STAT_ERR**2.)
return self
[docs]
def respfile(self):
"""Return the value of the RESPFILE keyword in the SPECTRUM extension
header, stripped from the first part of the pathe (i.e., the file name
only).
"""
return os.path.basename(self.spectrum_header['RESPFILE'])
[docs]
def plot(self, **kwargs):
"""Overloaded plot method.
"""
# pylint: disable=no-member, arguments-differ
ylabel = kwargs.pop('ylabel', 'Rate [Hz]')
grids = kwargs.pop('grids', True)
logy = kwargs.pop('logy', True)
kwargs.setdefault('fmt', 'o')
nlog_errorbars(self.CHANNEL, self.RATE, self.STAT_ERR, **kwargs)
setup_gca(xlabel='Channel', xmax=self.CHANNEL.max(), ylabel=ylabel, grids=grids, logy=logy)
[docs]
class xEventBinningPCUBE(xEventBinningBase):
"""Class for PCUBE binning.
.. versionadded:: 12.0.0
"""
INTENT = 'polarization cube (in energy layers)'
SUPPORTED_KWARGS = ['mc', 'acceptcorr', 'weights', 'weightcol', 'grayfilter'] + \
xEventBinningBase._energy_binning_kwargs()
[docs]
def bin_(self):
"""Overloaded method.
"""
if self.irf_name is None:
abort('Please set --irfname command-line switch explicitely')
energy = self.event_file.energy_data(self.get('mc'))
ebinning = self.make_energy_binning(energy, **self.kwargs)
q, u = self.event_file.stokes_data()
modf = load_modf(self.irf_name, self.event_file.du_id())
aeff = self.load_aeff_for_polarization_analysis()
analysis = xStokesAnalysis(q, u, energy, modf, aeff, self.event_file.livetime(),
self.weight_data(), self.get('acceptcorr'))
table = analysis.polarization_table(ebinning)
data = [table[key] for key in xBinTableHDUPCUBE.spec_names()]
primary_hdu = self.build_primary_hdu()
bin_table_hdu = xBinTableHDUPCUBE(data)
bin_table_hdu.setup_header(self.event_file.primary_keywords())
gti_hdu = self.event_file.hdu_list['GTI']
hdu_list = fits.HDUList([primary_hdu, bin_table_hdu, gti_hdu])
self.write_output_file(hdu_list)
[docs]
class xBinnedPolarizationCube(xBinnedFileBase):
"""Read-mode interface to a PCUBE FITS file.
.. versionadded:: 12.0.0
"""
def _read_data(self):
"""Overloaded method.
"""
self._read_binary_table_data(xBinTableHDUPCUBE.NAME)
[docs]
def backscal(self):
"""Return the value of the BACKSCAL header keyword, if present.
"""
try:
return self.primary_header['BACKSCAL']
except KeyError:
logger.warning('Polarization cube has no BACKSCAL header keyword set')
return None
def __check_compat(self, other):
"""Check the basic polarization cube data structure before attempting
to do operations with other polarization cubes.
"""
same_shape = xBinTableHDUPCUBE.POL_COL_NAMES
same_values = ('ENERG_LO', 'ENERG_HI')
self._check_iadd(other, same_shape, same_values)
def __recalculate_derived(self):
"""Recalculate all the derived quantities after arithmetic operations.
"""
# Recalculate the normalized Stokes parameters, and propagate the errors.
args = self.I, self.Q, self.U, self.MU, self.W2
self.QN, self.UN, self.I_ERR, self.Q_ERR, self.U_ERR, self.QN_ERR, self.UN_ERR, \
self.QUN_COV, self.P_VALUE, self.CONFID, self.SIGNIF = \
xStokesAnalysis.calculate_stokes_errors(*args)
# Update the MDP.
self.MDP_99 = xStokesAnalysis.calculate_mdp99(self.MU, self.I, self.W2)
self.N_EFF, self.FRAC_W = xStokesAnalysis.calculate_n_eff(self.COUNTS, self.I, self.W2)
# Recalculate the polarization degree and angle.
self.PD, self.PD_ERR, self.PA, self.PA_ERR = \
xStokesAnalysis.calculate_polarization(*args, degrees=True)
def __iadd__(self, other):
"""Overloaded method for PCUBE binned data addition.
"""
self.__check_compat(other)
self.E_MEAN = self._weighted_average(other, 'E_MEAN', 'I')
self.MU = self._weighted_average(other, 'MU', 'I')
self.COUNTS += other.COUNTS
self.W2 += other.W2
self.I += other.I
self.Q += other.Q
self.U += other.U
self.__recalculate_derived()
return self
def __isub__(self, other):
"""Overloaded method for polarization cube subtraction.
"""
self.__check_compat(other)
self.E_MEAN = self._weighted_average(other, 'E_MEAN', 'I', invert_w2=True)
self.MU = self._weighted_average(other, 'MU', 'I', invert_w2=True)
self.COUNTS -= other.COUNTS
# Note W2 must always be added.
self.W2 += other.W2
self.I -= other.I
self.Q -= other.Q
self.U -= other.U
# Propagate the uncertainties in the polarization cube difference.
# This was added in response to https://bitbucket.org/ixpesw/ixpeobssim/issues/614
# and should be considered as a short-term fix, waiting for a full
# refactoring of the polarization cube arithmetic.
self.I_ERR = numpy.sqrt(self.I_ERR**2. + other.I_ERR**2.)
self.Q_ERR = numpy.sqrt(self.Q_ERR**2. + other.Q_ERR**2.)
self.U_ERR = numpy.sqrt(self.U_ERR**2. + other.U_ERR**2.)
args = self.I, self.Q, self.U, self.I_ERR, self.Q_ERR, self.U_ERR
self.QN, self.UN, self.QN_ERR, self.UN_ERR = \
xStokesAnalysis.normalized_stokes_parameters(*args)
self.QUN_COV = xStokesAnalysis.stokes_covariance(self.I, self.QN, self.UN, self.W2)
self.P_VALUE, self.CONFID, self.SIGNIF = \
xStokesAnalysis.significance(self.Q, self.U, self.Q_ERR, self.U_ERR)
self.MDP_99 = xStokesAnalysis.calculate_mdp99(self.MU, self.I, self.W2)
self.N_EFF, self.FRAC_W = xStokesAnalysis.calculate_n_eff(self.COUNTS, self.I, self.W2)
self.PD, self.PD_ERR, self.PA, self.PA_ERR = \
xStokesAnalysis.calculate_polarization_sub(*args, degrees=True)
return self
def __imul__(self, other):
"""Overloaded method for polarization cube multiplication by a scalar.
"""
assert isinstance(other, numbers.Number)
# Need to be careful, here, as the counts are supposed to be integer.
counts = (self.COUNTS * other + 0.5).astype(int)
self.COUNTS = counts
# Note the square in W2!
self.W2 *= other**2
self.I *= other
self.Q *= other
self.U *= other
self.__recalculate_derived()
return self
[docs]
def polarization(self):
"""Return the polarization information in the form of a four element
tuple (PD, PD_err, PA, PA_err) of arrays.
"""
return self.PD, self.PD_ERR, self.PA, self.PA_ERR
def _energy_scatter_plot(self, y, dy, ylabel=None):
"""Make a scatter plot of a given quanity vs. energy.
Basic function for plot_polarization_degree() and plot_polarization_angle().
"""
x = self.E_MEAN
dx = [self.E_MEAN - self.ENERG_LO, self.ENERG_HI - self.E_MEAN]
return xScatterPlot(x, y, dy, dx, xlabel='Energy [keV]', ylabel=ylabel)
[docs]
def plot_polarization_degree(self, min_num_sigmas=2., **kwargs):
"""Plot the polarization degree as a function of energy.
.. warning::
This is deprecated in favor of the standard plots of the Stokes parameters.
"""
ylabel = 'Polarization degree'
limits = (self.PD / self.PD_ERR) < min_num_sigmas
PD = self.PD + limits * min_num_sigmas * self.PD_ERR
scatter = self._energy_scatter_plot(PD, self.PD_ERR, ylabel)
scatter.plot(uplims=limits, **kwargs)
[docs]
def plot_polarization_angle(self, **kwargs):
"""Plot the polarization angle as a function of energy.
.. warning::
This is deprecated in favor of the standard plots of the Stokes parameters.
"""
ylabel = 'Polarization angle [$^\\circ$]'
scatter = self._energy_scatter_plot(self.PA, self.PA_ERR, ylabel)
scatter.plot(**kwargs)
[docs]
def plot(self, **kwargs):
"""Default plotting for the polarization cubes.
.. warning::
This should be considered in evolution, and the API might change.
This is plotting the polarization cube in normalized Stokes parameters
space, with custom grids to help reading out the polarization degree and
angle.
A brief explanation of the supporte keywords arduments.
Arguments
---------
side : float, optional
The absolute value of the maximum Q/I and U/I to be displayed
(note the aspect ratio of the plot is set to 'equal', and the
plot is assumed to be squared). By default this is driven by the
maximum value of the polarization degree over the polarization cube.
pd_grid : array_like
The polarization degree radial grid in correspondence of which the
gridding circumpherences are plotted to guide the eye.
pd_grid_label_angle : float
The angle at which the text labels for the PD grids are plotted.
pa_grid_step : float
The step of the tangential grid in the polarization angle.
sigma_levels : array_like
The sigma levels at which the ellipses repesenting the normalized
Stokes parameter contours are plotted.
sigma_ls : tuple of the same size as sigma_levels
The line styles corresponding to sigma levels---by default the
innermost ellipse is solid and the others are dashed.
colors : array_like or str, optional
The colors for the elliptical contours (by deafult the proper color
for the specific DU in the polarization cube is picked).
label : str, optional
Optional label to be displayed in the legend---this is only used if
the marker boolean flag is True and only associated to the first
layer in the cube.
marker : bool
If True, a marker is plotted at the center of the elliptical contours.
marker_size : float
The size for the optional markers.
annotate_energies : bool
If true, the energy layers are annotated with nice arrows and text
labels (note the positioning is still fragile).
setup_axes : bool
Call the underlying setup_gca_stokes() hook at the end. (This flag
is provided to allow disingaging multiple grid plotting in case one
wants to overlay multiple polarization cubes.)
"""
# Cache all the relevant command-line arguments.
side = kwargs.get('side', None)
pd_grid = kwargs.get('pd_grid')
pd_grid_label_angle = kwargs.get('pd_grid_label_angle', 45.)
pa_grid_step = kwargs.get('pa_grid_step', 30.)
sigma_levels = kwargs.get('sigma_levels', (1., 2., 3.))
sigma_ls = kwargs.get('sigma_ls', None)
colors = kwargs.get('colors', None)
label = kwargs.get('label', None)
marker = kwargs.get('marker', True)
marker_size = kwargs.get('marker_size', 4)
annotate_energies = kwargs.get('annotate_energies', True)
setup_axes = kwargs.get('setup_axes', True)
# Setup the default side---this is just the maximum of the polarization
# degree across the cube plus twice the associated sigma.
if side is None:
index = numpy.argmax(self.PD)
side = min(self.PD[index] + 2. * self.PD_ERR[index], 1.)
# Setup the radial grid for the polarization angle. In order to have a
# sensible default, here, we judge based on the plot side and try and
# setup a sensible regular grid.
if pd_grid is None:
if side <= 0.05:
step = 0.01
elif side <= 0.1:
step = 0.02
elif side <= 0.25:
step = 0.05
else:
step = 0.1
pd_grid = numpy.arange(step, side, step)
# If the color is not set, default to the color for the DU.
if colors is None:
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
elif isinstance(colors, str):
colors = [colors] * len(self.QN)
# Default line style is solid for the first level, and dashed for the others.
if sigma_ls is None:
sigma_ls = ['solid'] + ['dashed'] * (len(sigma_levels) - 1)
# Loop over the actual data.
args = self.QN, self.UN, self.QN_ERR, self.UN_ERR, self.ENERG_LO, self.ENERG_HI, colors
for i, (q, u, dq, du, emin, emax, color) in enumerate(zip(*args)):
# Plot all the contours relative to the desired sigma levels.
for scale, ls in zip(sigma_levels, sigma_ls):
plot_ellipse((q, u), 2. * scale * dq, 2. * scale * du, color=color,
ls=ls, zorder=10)
# Plot the markers.
if marker:
_kwargs = dict(ms=marker_size, color=color, zorder=10)
# For the first layer, we also set the label, so that we can
# effectively use a legend downstream.
if i == 0:
plt.plot(q, u, 'o', label=label, **_kwargs)
else:
plt.plot(q, u, 'o', **_kwargs)
# Annotations for the energy layers.
if annotate_energies:
label = '%.2f-%.2f keV' % (emin, emax)
if q > 0:
xtext = q - 0.05 + 0.02 * i
else:
xtext = q + 0.05 - 0.02 * i
if u > 0:
ytext = u - 0.05 - 0.02 * i
else:
ytext = u + 0.05 + 0.02 * i
plt.gca().annotate(label, xy=(q, u), xycoords='data', xytext=(xtext, ytext),
textcoords='data', size='small', ha='center', zorder=10, color=color,
arrowprops=dict(arrowstyle="->", color=color, lw=0.75,
shrinkA=5, shrinkB=1, patchA=None, patchB=None,
connectionstyle='angle3,angleA=90,angleB=0'))
# Finally, set up the axes.
if setup_axes:
setup_gca_stokes(side, pd_grid, pd_grid_label_angle, pa_grid_step)
[docs]
def as_table(self):
"""Return the polarization cube as an astropy table.
"""
col_names = ['Quantity']
col_names += ['%.2f--%.2f keV' % (emin, emax) for emin, emax in \
zip(self.ENERG_LO, self.ENERG_HI)]
col_types = [str] + [float] * (len(col_names) - 1)
table = astropy.table.Table(names=col_names, dtype=col_types)
for col_name, *_ in xBinTableHDUPCUBE.DATA_SPECS[2:]:
table.add_row([col_name] + list(self.__getattr__(col_name)))
return table
def __str__(self):
"""String formatting.
"""
return '%s content:\n%s' % (self.__class__.__name__, self.as_table())
[docs]
class xEventBinningMDPMAPCUBE(xEventBinningBase):
"""Class for MDPMAPCUBE binning.
"""
INTENT = 'MDP map cube in sky coordinates and energy layers'
SUPPORTED_KWARGS = ['mc', 'acceptcorr', 'weights', 'weightcol', 'grayfilter'] + \
xEventBinningBase._image_wcs_kwargs() + xEventBinningBase._energy_binning_kwargs()
EXTENSIONS_NAMES = xBinTableHDUPCUBE.MDP_COL_NAMES
[docs]
def process_kwargs(self):
"""Overloaded method.
"""
xEventBinningBase.process_kwargs(self)
self.process_image_ref_kwargs()
@staticmethod
def _binned_data(stokes_analysis, *args, **kwargs):
"""Retrieve the necessary data from the underlying xStokesAnalysis object.
"""
return stokes_analysis.mdp_map_cube(*args, **kwargs)
[docs]
def bin_(self):
"""Overloaded method.
"""
if self.irf_name is None:
abort('Please set --irfname command-line switch explicitely')
# Retrieve the relevant columns in the input event file.
ra, dec = self.event_file.sky_position_data(self.get('mc'))
energy = self.event_file.energy_data(self.get('mc'))
ebinning = self.make_energy_binning(energy, **self.kwargs)
q, u = self.event_file.stokes_data()
# Load the necessary response functions.
modf = load_modf(self.irf_name, self.event_file.du_id())
aeff = self.load_aeff_for_polarization_analysis()
analysis = xStokesAnalysis(q, u, energy, modf, aeff, self.event_file.livetime(),
self.weight_data(), self.get('acceptcorr'))
# Prepare the arrays and binning for mapping the polarization.
wcs_ = self._build_image_wcs(**self.kwargs)
x, y, xybinning = self._pixelize_skycoords(ra, dec, wcs_)
# Note the energy axis is up front in order to be able to view the
# images with fv at a later stage.
binning = (ebinning, *xybinning)
binned_data = self._binned_data(analysis, x, y, binning)
# Prepare the output HDU list.
header = wcs_.to_header()
hdu_list = fits.HDUList()
hdu = self.build_primary_hdu(header=header)
hdu_list.append(hdu)
for ext_name in self.EXTENSIONS_NAMES:
data = binned_data[ext_name]
hdu_list.append(fits.ImageHDU(data, header=header, name=ext_name))
logger.info('Creating EBOUNDS...')
ebounds = xBinTableHDUEBOUNDS((ebinning[:-1], ebinning[1:]))
ebounds.setup_header(self.event_file.primary_keywords())
hdu_list.append(ebounds)
self.write_output_file(hdu_list)
[docs]
class xBinnedMDPMapCube(xBinnedFileBase):
"""Read-mode interface to a MDPMAPCUBE FITS file.
"""
EXTENSIONS_NAMES = xEventBinningMDPMAPCUBE.EXTENSIONS_NAMES
def _read_data(self):
"""Overloaded method.
"""
self._img_header = self.hdu_list['I'].header
self.wcs = astropy.wcs.WCS(self._img_header)
for ext_name in self.EXTENSIONS_NAMES:
self._read_image_data(ext_name)
self._read_binary_table_data(xBinTableHDUEBOUNDS.NAME)
def __iadd__(self, other):
"""Overloaded method for binned data addition.
"""
same_shape = xBinTableHDUPCUBE.MDP_COL_NAMES
same_values = ('ENERG_LO', 'ENERG_HI')
self._check_iadd(other, same_shape, same_values)
self.E_MEAN = self._weighted_average(other, 'E_MEAN', 'I')
self.COUNTS += other.COUNTS
self.MU = self._weighted_average(other, 'MU', 'I')
self.W2 += other.W2
self.I += other.I
self.MDP_99 = xStokesAnalysis.calculate_mdp99(self.MU, self.I, self.W2)
self.N_EFF, self.FRAC_W = xStokesAnalysis.calculate_n_eff(self.COUNTS, self.I, self.W2)
return self
[docs]
def map_shape(self):
"""Return the shape of the underlying sky-maps.
Mind the underlying arrays are all 3-dimensional cubes with the energy
binning as the first dimension---so it's the last two that we care about.
(And, for completeness: since the arrays are all the same, we use I
for convenience.)
"""
return self.I.shape[1:]
[docs]
def pixel_size(self):
"""Return the pixel size of the underlying spatial map.
"""
cdelt1, cdelt2, _ = self.wcs.wcs.get_cdelt()
assert abs(cdelt1) == abs(cdelt2)
return abs(cdelt1)
[docs]
def ds9_region_mask(self, region_list, region_slice=None):
"""Return the (spatial) array map corresponding to a given ds9 region list.
If there is more than one region in the region list, by default the
output mask corresponds to the logical or of all the regions.
The region_slice optional argument allows to restrict the mask to a
subset of the regions.
Arguments
---------
region_list :
The region list defining the mask
region_slice : int or slice (optional)
An optional slice designator to select a subset of the region list.
"""
if region_slice is not None:
if isinstance(region_slice, int):
region_slice = slice(region_slice, region_slice + 1)
region_list = region_list[region_slice]
region = region_compound(region_list)[0].to_pixel(self.wcs)
return region.to_mask().to_image(self.map_shape()).astype(bool)
@staticmethod
def _sum_image_pixels(data, spatial_mask, energy_layer=0):
"""Sum one of the underlying image extentions over a given spatial
mask and energy slice.
"""
return data[energy_layer][spatial_mask].sum()
[docs]
def sum_pixels(self, spatial_mask, energy_layer=0):
"""Sum the relevant quantities over a given spatial mask and energy slice.
"""
args = spatial_mask, energy_layer
I = self._sum_image_pixels(self.I, *args)
counts = self._sum_image_pixels(self.COUNTS, *args)
mu = self._sum_image_pixels(self.MU * self.I, *args)
if I > 0.:
mu /= I
W2 = self._sum_image_pixels(self.W2, *args)
mdp = xStokesAnalysis.calculate_mdp99(mu, I, W2)
return {'COUNTS': counts, 'I': I, 'MU': mu, 'W2': W2, 'MDP_99': mdp}
[docs]
def energy_binning(self):
"""Return the underlying energy binning in the form of a numpy array.
Note the array has one more element than the underlying ENERG_LO and
ENERG_HI arrays, i.e., if the cube is binned in two energy layers,
say 2--4 keV and 4--8 keV, the energy binning is [2. 4. 8.].
"""
return numpy.append(self.ENERG_LO, self.ENERG_HI[-1])
[docs]
def num_energy_layers(self):
"""Return the number of energy layers in the cube.
"""
return len(self.ENERG_LO)
[docs]
def energy_range(self, energy_layer):
"""Return the energy range, i.e., (emin, emax) in keV, for a given
energy layer.
"""
return self.ENERG_LO[energy_layer], self.ENERG_HI[energy_layer]
[docs]
def energy_label(self, energy_layer):
"""Return a text label corresponding to the energy layer (e.g, to
identify the energy range on a plot).
"""
return '%.2f--%.2f keV' % self.energy_range(energy_layer)
def _plot_layer(self, data, energy_layer, **kwargs):
"""Delegated function to make a plot of one of the underlying image
extensions in *one* energy layer of the cube.
This essentially calculates the proper slice of the input data and all
the necessary arguments to be passed to xFITSImageBase for doing the
actual plot.
Note this function is *not* creating any matplotlib canvas---it is left
to the caller to do that.
Arguments
---------
data : array
The underlying data array. This can be the array corresponding to
one of the underlying image extensions of the cube, or any
array of the same shape.
energy_layer : int
The identifier of the energy layer in the cube.
kwargs : dict
Additional keyword arguments being passed to the underlying
xFITSImageBase.make_plot() call.
"""
assert energy_layer in range(0, self.num_energy_layers())
data = data[energy_layer, :, :]
slices = ('x', 'y', energy_layer)
xFITSImageBase.make_plot(data, self.wcs, slices=slices, **kwargs)
xFITSImageBase.add_label(self.energy_label(energy_layer), y=0.92)
def _plot_base(self, data, energy_layers, mask, figname, prefix=None,
zlabel=None, post_plot_hook=None, **kwargs):
"""Delegated base function to plot multiple energy layers of the same
image extension.
"""
# pylint: disable=arguments-differ
# Do some magic with the energy_layers argument.
if energy_layers is None:
energy_layers = list(range(0, self.num_energy_layers()))
elif isinstance(energy_layers, int):
energy_layers = [energy_layers]
# If we are passing a mask, we create a copy of the data array (not to)
# mess up with the underlying data, and explicitely set to zero all the
# values that are not in the mask.
if mask is not None:
data = data.copy()
data[numpy.logical_not(mask)] = 0.
# Loop over the energy layers.
if zlabel is None:
zlabel = figname
kwargs.setdefault('zlabel', zlabel)
for energy_layer in energy_layers:
figure_name = '%s %s' % (figname, self.energy_label(energy_layer))
if prefix is not None:
figure_name = '%s %s' % (prefix, figure_name)
plt.figure(figure_name)
self._plot_layer(data, energy_layer, **kwargs)
if post_plot_hook is not None:
post_plot_hook(energy_layer, mask)
[docs]
def plot_mdp_map(self, energy_layers=None, prefix=None, **kwargs):
"""Plot the MDP map.
"""
self._plot_base(self.MDP_99, energy_layers, None, 'MDP', prefix, 'MDP (99% CL)', **kwargs)
[docs]
def plot(self, prefix=None):
"""Plot the data.
"""
self.plot_mdp_map(prefix=prefix)
def __str__(self):
"""String formatting.
"""
text = xBinnedFileBase.__str__(self)
for ext_name in self.EXTENSIONS_NAMES:
val = ('%s' % self.__getattr__(ext_name)).replace('\n', '')
text += '\n%s: %s' % (ext_name, val)
text += '\nEnergy bounds:'
for emin, emax in zip(self.ENERG_LO, self.ENERG_HI):
text += '\n- %.3f--%.3f keV' % (emin, emax)
return text
[docs]
class xEventBinningMDPMAP(xEventBinningMDPMAPCUBE):
"""Class for MDPMAP binning.
"""
INTENT = 'MDP map in sky coordinates'
SUPPORTED_KWARGS = ['mc', 'acceptcorr', 'weights', 'weightcol', 'grayfilter',
'emin', 'emax'] + xEventBinningBase._image_wcs_kwargs()
[docs]
def process_kwargs(self):
"""Overloaded method.
Note we have to call the method of the base class closest in the
inheritance hierarchy, to make sure that all the ingredients for the
WCS object generation are properly setup.
"""
xEventBinningMDPMAPCUBE.process_kwargs(self)
self.set('ebins', 1)
[docs]
class xEventBinningPMAPCUBE(xEventBinningMDPMAPCUBE):
"""Class for PMAPCUBE binning.
"""
INTENT = 'polarization map cube in sky coordinates and energy layers'
SUPPORTED_KWARGS = xEventBinningMDPMAPCUBE.SUPPORTED_KWARGS
EXTENSIONS_NAMES = xBinTableHDUPCUBE.POL_COL_NAMES
@staticmethod
def _binned_data(stokes_analysis, *args, **kwargs):
"""Retrieve the necessary data from the underlying xStokesAnalysis object.
"""
return stokes_analysis.polarization_map_cube(*args, **kwargs)
[docs]
class xEventBinningPMAP(xEventBinningPMAPCUBE):
"""Class for PMAP binning.
"""
INTENT = 'polarization map in sky coordinates'
SUPPORTED_KWARGS = xEventBinningMDPMAP.SUPPORTED_KWARGS
[docs]
def process_kwargs(self):
"""Overloaded method.
Note we have to call the method of the base class closest in the
inheritance hierarchy, to make sure that all the ingredients for the
WCS object generation are properly setup.
"""
xEventBinningPMAPCUBE.process_kwargs(self)
self.set('ebins', 1)
[docs]
class xBinnedPolarizationMapCube(xBinnedMDPMapCube):
"""Read-mode interface to a PMAPCUBE FITS file.
"""
EXTENSIONS_NAMES = xEventBinningPMAPCUBE.EXTENSIONS_NAMES
def __recalculate(self):
"""
"""
# Recalculate the normalized Stokes parameters, and propagate the errors.
args = self.I, self.Q, self.U, self.MU, self.W2
self.QN, self.UN, self.I_ERR, self.Q_ERR, self.U_ERR, self.QN_ERR, self.UN_ERR, \
self.QUN_COV, self.P_VALUE, self.CONFID, self.SIGNIF = \
xStokesAnalysis.calculate_stokes_errors(*args)
# Recalculate the polarization degree and angle.
self.PD, self.PD_ERR, self.PA, self.PA_ERR = \
xStokesAnalysis.calculate_polarization(*args, degrees=True)
self.MDP_99 = xStokesAnalysis.calculate_mdp99(self.MU, self.I, self.W2)
def __iadd__(self, other):
"""Overloaded method for binned data addition.
"""
self._check_iadd(other, ('Q', 'U'))
xBinnedMDPMapCube.__iadd__(self, other)
self.Q += other.Q
self.U += other.U
self.__recalculate()
return self
[docs]
def convolve(self, kernel):
"""Convolve the polarization cube with a generic binned kernel.
"""
for layer in range(self.num_energy_layers()):
self.I[layer] = scipy.signal.convolve2d(self.I[layer], kernel, mode='same')
self.Q[layer] = scipy.signal.convolve2d(self.Q[layer], kernel, mode='same')
self.U[layer] = scipy.signal.convolve2d(self.U[layer], kernel, mode='same')
self.W2[layer] = scipy.signal.convolve2d(self.W2[layer], kernel, mode='same')
self.MU[layer] = scipy.signal.convolve2d(self.MU[layer], kernel, mode='same') /\
kernel.sum()
self.__recalculate()
[docs]
def align(self):
"""Align the polarization direction to the radial direction on a bin by
bin basis.
Warning
-------
This will need to be reviewed when we fix issue #597.
"""
x0, y0, _ = self.wcs.wcs.crpix
_, dec0, _ = self.wcs.wcs.crval
shape = self.map_shape()
x, y = numpy.indices(shape)
dx = (x - x0) * numpy.cos(numpy.radians(dec0))
dy = y - y0
phi0 = numpy.arctan2(dy, -dx)
q0 = xStokesAnalysis.stokes_q(phi0, weights=None)
u0 = xStokesAnalysis.stokes_u(phi0, weights=None)
self.Q, self.U = align_stokes_parameters(self.Q, self.U, q0, u0)
[docs]
def radial_profile(self, n=10, layer=0):
"""Create a radial polarization profile of the source.
Warning
-------
This needs to be cleaned up and properly documented.
"""
x0, y0, _ = self.wcs.wcs.crpix
shape = self.map_shape()
_, pixel_size, _ = self.wcs.wcs.cdelt
x, y = numpy.indices(shape)
profile = []
for r0 in range(n, shape[0] // 2 - n):
r = numpy.sqrt((x - x0)**2. + (y - y0)**2)
mask = numpy.logical_and(r > r0 - n, r < r0 + n)
data = self.sum_pixels(mask, layer)
data['RADIUS'] = r0 * pixel_size * 60.
profile.append(data)
return profile
[docs]
def sum_pixels(self, spatial_mask, energy_layer=0):
"""Sum the relevant quantities over a given spatial mask and energy slice.
"""
args = spatial_mask, energy_layer
data = xBinnedMDPMapCube.sum_pixels(self, *args)
data['Q'] = self._sum_image_pixels(self.Q, *args)
data['U'] = self._sum_image_pixels(self.U, *args)
args = data['I'], data['Q'], data['U'], data['MU'], data['W2']
pd, pd_err, pa, pa_err = xStokesAnalysis.calculate_polarization(*args, degrees=True)
data.update({'PD': pd, 'PD_ERR': pd_err, 'PA': pa, 'PA_ERR': pa_err})
qn, un, i_err, q_err, u_err, qn_err, un_err, qun_cov, p_value, confid, signif = \
xStokesAnalysis.calculate_stokes_errors(*args)
data.update({'QN': qn, 'UN': un, 'I_ERR': i_err, 'Q_ERR': q_err, 'U_ERR': u_err,
'QN_ERR': qn_err, 'UN_ERR': un_err, 'QUN_COV': qun_cov, 'P_VALUE': p_value,
'CONFID': confid, 'SIGNIF': signif})
return data
[docs]
def calculate_significance_mask(self, num_sigma=2., intensity_percentile=0.):
"""Utitlity function to calculate a mask on the underlying pixel cube
where the polarization degree and angle can be reliably plotted.
The calculation is based on two different metrics:
* the value of I in each given pixel, compared with a given percentile
of the overall I distribution across the cube;
* the number of sigma that the measured polarization degree differs
from either 0 or 1 (if the measurement is too close to 0 or 1 and
misses the physical bounds by less than a few error bars, then it
is effectively not a measurement).
Note the logic is positive---i.e., we're passing the mask identifying
the *good* pixels. This is typically negated downstream to set to
zero all the other pixels.
Arguments
---------
num_sigma : float
The number of standard deviations representing the significance of
the measurement of the polarization degree.
intensity_percentile : float
the threshold on the I value for the pixels, based on the percentile
of the I distribution across all the pixels.
"""
i_mask = self.I >= numpy.percentile(self.I[self.I > 0], intensity_percentile)
pd_mask_lo = self.PD >= num_sigma * self.PD_ERR
pd_mask_hi = self.PD <= 1. - num_sigma * self.PD_ERR
# We need this, in case we set the errors to zero for bins with no counts.
err_mask = numpy.logical_and(self.PD_ERR > 0., self.PA_ERR > 0.)
return numpy.logical_and.reduce((i_mask, pd_mask_lo, pd_mask_hi, err_mask))
[docs]
def plot_stokes_parameters(self, energy_layers=None, prefix=None, **kwargs):
"""Plot the Stokes parameters.
"""
self._plot_base(self.I, energy_layers, None, 'Stokes I', prefix, **kwargs)
self._plot_base(self.Q, energy_layers, None, 'Stokes Q', prefix, **kwargs)
self._plot_base(self.U, energy_layers, None, 'Stokes U', prefix, **kwargs)
[docs]
def plot_significance(self, energy_layers=None, prefix=None, **kwargs):
"""Plot the significance.
"""
self._plot_base(self.SIGNIF, energy_layers, None, 'Significance', prefix, **kwargs)
[docs]
def normalized_stokes_parameters(self):
"""Calculate the normalized Stokes parameters, i.e., Q/I and U/I.
"""
q = xModelStokesParameters.normalize(self.Q, self.I)
u = xModelStokesParameters.normalize(self.U, self.I)
return q, u
[docs]
def plot_normalized_stokes_parameters(self, energy_layers=None, num_sigma=1.,
intensity_percentile=0.05, prefix=None,
**kwargs):
"""Plot the normalized Stokes parameters.
"""
q, u = self.normalized_stokes_parameters()
mask = self.calculate_significance_mask(num_sigma, intensity_percentile)
self._plot_base(q, energy_layers, mask, 'Stokes Q/I', prefix, **kwargs)
self._plot_base(u, energy_layers, mask, 'Stokes U/I', prefix, **kwargs)
def _calculate_sky_grid(self):
"""Calculate the sky grid corresponding to the center of the pixels
in the underlying map extensions.
Note that this could be cached and calculated exactly once, but I
doubt it would make any practical difference.
"""
nx, ny = self.map_shape()
x, y = numpy.meshgrid(numpy.arange(nx), numpy.arange(ny))
ra, dec, _ = self.wcs.wcs_pix2world(x, y, [0], 0)
return ra, dec
def _overlay_arrows(self, energy_layer, mask=None, **kwargs):
"""Method to overlay the arrows of the polarization information.
"""
x, y = self._calculate_sky_grid()
args = self.PD[energy_layer], self.PA[energy_layer]
dx, dy = xModelStokesParameters.pdpa_to_xy(*args, degrees=True)
if mask is not None:
mask = mask[energy_layer]
x, y, dx, dy = x[mask], y[mask], dx[mask], dy[mask]
plot_arrows((x, y), (dx, dy), **kwargs)
[docs]
def plot_polarization_degree(self, energy_layers=None, num_sigma=2.,
arrows=True, prefix=None, **kwargs):
"""Plot the polarization degree.
"""
mask = self.calculate_significance_mask(num_sigma, 0.)
if arrows:
hook = self._overlay_arrows
else:
hook = None
self._plot_base(self.PD, energy_layers, mask, 'Polarization degree',
prefix, post_plot_hook=hook, **kwargs)
[docs]
def plot_polarization_angle(self, energy_layers=None, num_sigma=2.,
prefix=None, **kwargs):
"""Plot the polarization angle.
"""
mask = self.calculate_significance_mask(num_sigma, 0.)
self._plot_base(self.PA, energy_layers, mask, 'Polarization angle',
prefix, 'Polarization angle [$^\\circ$]', **kwargs)
[docs]
def save_to_ds9(self, output_file_path, num_sigma=2., intensity_percentile=0.,
line_color='white'):
"""Save the polarization arrows to a ds9 region.
"""
# pylint: disable=line-too-long
_x, _y = self._calculate_sky_grid()
sigma_mask = self.calculate_significance_mask(num_sigma, intensity_percentile)
for energy_layer in range(self.num_energy_layers()):
pd, pa = self.PD[energy_layer], self.PA[energy_layer]
mask = sigma_mask[energy_layer]
x = _x[mask]
y = _y[mask]
masked_pd = pd[mask]
masked_pa = pa[mask]
file_path = '%s_%d.reg' % (output_file_path, energy_layer)
logger.info('Saving the polarization values ds9 region file to %s...', file_path)
with open(file_path, 'w') as ds9_region_file:
ds9_region_file.write('# Region file for ixpe polarization degree and angle at %s sigma significance\n' % num_sigma)
ds9_region_file.write('global color=%s dashlist=8 3 width=1 font="helvetica 10 normal roman" select=1 highlite=1 dash=0 fixed=0 edit=1 move=1 delete=1 include=1 source=1\n fk5\n' % line_color)
# Multiplying the pd value by 60 because the vector length in the
# ds9 region is in arcseconds.
v_length = arcmin_to_arcsec(masked_pd)
for x_pos, y_pos, vl, pa in zip(x, y, v_length, masked_pa):
ds9_region_file.write('#vector(%s,%s,%s",%s) vector=0\n' %\
(x_pos, y_pos, vl, pa))
[docs]
def plot(self, prefix=None):
"""Plot everything.
"""
self.plot_stokes_parameters(prefix=prefix)
self.plot_normalized_stokes_parameters(prefix=prefix)
self.plot_polarization_degree(prefix=prefix)
self.plot_polarization_angle(prefix=prefix)