# Copyright (C) 2015--2022, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Base classes for binned data products.
"""
from __future__ import print_function, division
from collections.abc import Iterable
from functools import reduce
import numpy
from ixpeobssim.core.fitsio import xPrimaryHDU, read_hdu_list_in_memory
from ixpeobssim.evt.event import xEventFile
from ixpeobssim.irf import load_arf
from ixpeobssim.utils.astro import build_wcs
from ixpeobssim.utils.logging_ import logger, abort
from ixpeobssim.utils.units_ import arcmin_to_arcsec, arcsec_to_degrees
# pylint: disable=invalid-name, too-many-arguments, no-member
[docs]
class xEventBinningBase:
"""Base class for the event binning.
This is essentially opening an event file and keeping track of the
xEventFile object, along all the keyword arguments passed to the
constructor.
"""
INTENT = None
SUPPORTED_KWARGS = None
def __init__(self, file_path, **kwargs):
"""Constructor.
"""
self.event_file = xEventFile(file_path)
self.irf_name = kwargs.get('irfname')
if self.irf_name is None:
self.irf_name = self.event_file.irf_name()
self.kwargs = kwargs
self.process_kwargs()
[docs]
def process_kwargs(self):
"""Check the keyword arguments and if the output file is not set, create
a path on the fly, based on the event file and the binning algorithm.
"""
if self.get('suffix') is None:
suffix = self.get('algorithm').lower()
else:
suffix = self.get('suffix')
evfile = self.event_file.file_path()
outfile = evfile.replace('.fits', '_%s.fits' % suffix)
self.set('outfile', outfile)
[docs]
def process_image_ref_kwargs(self):
"""Set the xref and yref values for the output image (in sky) coordinates.
If either xref or yref are not specifified, the center of the ROI specified
in the proper extension is assumed.
"""
# Retieve the coordinates of the ROI center.
xref, yref = self.event_file.wcs_reference()
if self.get('xref') is None:
self.set('xref', xref)
if self.get('yref') is None:
self.set('yref', yref)
# And if we still haven't succeded, we give up.
if None in (self.get('xref'), self.get('yref')):
logger.error('Cannot retrieve the ROI center information.')
abort('Please set the --xref and --yref command-line switches explicitely')
@staticmethod
def _image_wcs_kwargs():
"""Return the list of valid keywords for the _build_image_wcs() method.
"""
return ['npix', 'pixsize', 'proj', 'xref', 'yref']
@staticmethod
def _build_image_wcs(default_img_side=17.5, **kwargs):
"""Build the WCS object for the output image.
This is a convenicence function meant to be used by all the binning
algorithms needing to create programmatically a WCS object to operate
in sky coordinates.
Arguments
---------
default_img_side : float
The default size of the output image in arcmin.
kwargs : dict
All the command-line keyword arguments passed to xpbin.
"""
ra0 = kwargs.get('xref', 0.)
dec0 = kwargs.get('yref', 0.)
nside = kwargs.get('npix', 200)
# Retrieve the pixel size...
pixel_size = kwargs.get('pixsize', None)
# ...fall back to the default image size if the pixel size is not set.
# (Mind the image size in arcmin, and the pixel size is in arcsec.)
if pixel_size is None:
pixel_size = arcmin_to_arcsec(default_img_side) / nside
# Convert from arcsec to degrees.
pixel_size = arcsec_to_degrees(pixel_size)
projection = kwargs.get('proj', 'TAN')
return build_wcs(ra0, dec0, nside, pixel_size, projection)
@staticmethod
def _pixelize_skycoords(ra, dec, wcs_, origin=0, offset=0.5):
"""Convert sky-coordinates into pixel coordinates for a given wcs.
This is supposed to be the main function for handling the sky-to-pixel
conversion in binned object, and is fundamentally different from the
astropy.WCS.wcs_world2pix() method called under the hood, in that
it provides a series of digitized values suitable to be passed to the
numpy.histogramdd() machinery, along with the proper binning to be
used downstream. For this very purpose, an offset in the (0, 1) open
interval (0.5 by default) is added to the return values, so that the
x and y output values do not lie on the bin edges when used to create
histograms downstream. Note that if the offset is set to zero, this will
introduce an offset in the output maps, see the test_wcs.py unit test
for more information.
Note that the origin argument is largely irrelevant as long as the proper
binning (i.e., the one returned by the function) is used downstream, see
https://docs.astropy.org/en/stable/api/astropy.wcs.WCS.html#astropy.wcs.WCS.wcs_world2pix
for more information about the origin.
Arguments
---------
ra : array_like
The array of input ra values.
dec : array_like
The array of input dec values.
wcs_ : astropy.wcs.WCS opject
The WCS object handling the conversion behind the scenes.
origin : 0 or 1
The origin for the conversion. See
https://docs.astropy.org/en/stable/api/astropy.wcs.WCS.html#astropy.wcs.WCS.wcs_world2pix
for more details.
offset : float
Constant offset to be added to the output x and y arrays.
"""
# Pack the input coordinates in the proper form for the calculation.
coords = numpy.vstack((ra, dec)).transpose()
# Call the underlying astropy.WCS.wcs_world2pix() method.
pix = wcs_.wcs_world2pix(coords, origin)
# Unpack the output of the conversion---note that we are swapping the
# axis to match the FITS convension, since this is going to be stored
# into a FITS file. This avoid a transposition at the read-back time.
x = pix[:, 1] + offset
y = pix[:, 0] + offset
# Since we have the wcs *and* the origin used to do the world-to-pixel
# conversion, this is *the* right place to construct the binning to
# be used along with x and y downstream.
nx, ny = wcs_.array_shape
xbinning = numpy.linspace(origin, nx + origin, nx + 1)
ybinning = numpy.linspace(origin, ny + origin, ny + 1)
return x, y, (xbinning, ybinning)
[docs]
def get(self, key, default=None):
"""Convenience method to address the keyword aguments.
"""
return self.kwargs.get(key, default)
[docs]
def set(self, key, value):
"""Convenience method to set keyword arguments.
"""
logger.info('Setting %s to %s...', key, value)
self.kwargs[key] = value
[docs]
def weight_data(self):
"""Retrieve the weights from the underlying event file.
This encapsulates the simple logic used downstream by the binning algorithms
supporting weights, i.e., if the command-line argument `weights` is False
the weights are all one, while if it is True returns they are the content
of the column indicated by the `weightcol` command-line argument.
"""
if not self.get('weights'):
logger.info('Performing un-weighted analysis...')
return numpy.full(self.event_file.num_events(), 1.)
col_name = self.get('weightcol')
logger.info('Retrieving weights from column "%s"...', col_name)
return self.event_file.event_data[col_name]
[docs]
def check_pcube_weighting_scheme(self, aeff):
"""Simple check on the weighting scheme being used.
For more background information, see
https://bitbucket.org/ixpesw/ixpeobssim/issues/573 and
https://bitbucket.org/ixpesw/ixpeobssim/issues/613
"""
weight_scheme = aeff.weighting_scheme()
if self.get('weights') and weight_scheme != 'SIMPLE':
logger.error('Wrong arf weighting scheme (%s) for %s with options %s',
weight_scheme, self.__class__.__name__, self.kwargs)
logger.info('See https://bitbucket.org/ixpesw/ixpeobssim/issues/573 for more info.')
raise RuntimeError('Cannot create binned data product')
[docs]
def load_aeff_for_polarization_analysis(self):
"""Load the proper arf file for a model-independent polarization analysis
(i.e., to be used for polarization cubes, maps and map cubes).
This is loading the proper arf file making sure that, when weights are
used, the SIMPLE weighting prescription is picked.
"""
aeff = load_arf(self.irf_name, self.event_file.du_id(),
simple_weighting=self.get('weights'), gray_filter=self.get('grayfilter'))
self.check_pcube_weighting_scheme(aeff)
return aeff
[docs]
def build_primary_hdu(self, data=None, header=None):
"""Build the primary HDU for the output file.
"""
primary_hdu = xPrimaryHDU(data, header, 'xpbin.py')
primary_hdu.setup_header(self.event_file.primary_keywords())
primary_hdu.add_keyword('BINALG', self.get('algorithm'),
'the binning algorithm used')
primary_hdu.add_comment('%s run with kwargs %s' %\
(self.__class__.__name__, self.kwargs))
return primary_hdu
[docs]
@staticmethod
def read_binning_from_file(file_path):
"""Read a custom binning from file and return a numpy array.
"""
return numpy.loadtxt(file_path)
[docs]
@staticmethod
def equipopulated_binning(num_bins, data, min_val=None, max_val=None):
"""Create an equipopulated binning based on the values of a generic
data column.
Arguments
---------
num_bins : int
The number of bins
data : array
The underlying data to be used for the binning
min_val : float (optional)
The minimum data value to be used for the binning
max_val : float (optional)
The maximum data value to be used for the binning
"""
if min_val is None:
min_val = data.min()
if max_val is None:
max_val = data.max()
mask = (data >= min_val) * (data <= max_val)
data = data[mask]
data.sort()
binning = [min_val]
for i in range(1, num_bins):
index = int(i * len(data) / float(num_bins))
binning.append(data[index])
binning.append(max_val)
return numpy.array(binning)
# pylint: disable=inconsistent-return-statements
[docs]
@staticmethod
def make_binning(bin_alg, min_val=None, max_val=None, num_bins=None,
bin_list=None, bin_data=None, bin_file=None):
"""Generic function to define a binning with several possible
different algorithms.
"""
assert bin_alg in ['LIN', 'LOG', 'LIST', 'EQP', 'FILE']
if bin_alg == 'LIN':
assert min_val is not None
assert max_val is not None
assert num_bins is not None
return numpy.linspace(min_val, max_val, num_bins + 1)
if bin_alg == 'LOG':
assert min_val is not None and min_val > 0.
assert max_val is not None
assert num_bins is not None
return numpy.logspace(numpy.log10(min_val), numpy.log10(max_val), num_bins + 1)
if bin_alg == 'EQP':
assert num_bins is not None
assert bin_data is not None
return xEventBinningBase.equipopulated_binning(num_bins, bin_data, min_val, max_val)
if bin_alg == 'FILE':
assert bin_file is not None
return xEventBinningBase.read_binning_from_file(bin_file)
if bin_alg == 'LIST':
assert bin_list is not None
return numpy.array(bin_list, 'd')
@staticmethod
def _energy_binning_kwargs():
"""Return the list of valid keywords for the _build_image_wcs() method.
"""
return ['emin', 'emax', 'ebins', 'ebinalg', 'ebinning', 'ebinfile']
[docs]
@staticmethod
def make_energy_binning(bin_data=None, **kwargs):
"""Small convenience function for energy binning.
"""
args = (kwargs.get('ebinalg'), kwargs.get('emin'), kwargs.get('emax'),
kwargs.get('ebins'), kwargs.get('ebinning'), bin_data,
kwargs.get('ebinfile'))
binning = xEventBinningBase.make_binning(*args)
logger.info('Energy binning: %s', binning)
return binning
[docs]
@staticmethod
def bin_centers(bin_edges):
"""Return an array of bin centers given an array of bin edges.
Arguments
---------
bin_edges : 1-d array of length (n + 1).
The array with the bin edges.
Returns
-------
1-d array of length n.
The array with the values of the bin centers.
"""
assert bin_edges.ndim == 1
return 0.5 * (bin_edges[:-1] + bin_edges[1:])
[docs]
@staticmethod
def bin_widths(bin_edges):
"""Return an array of bin widths given an array of bin edges.
Arguments
---------
bin_edges : 1-d array of length (n + 1).
The array with the bin edges.
Returns
-------
1-d array of length n.
The array with the values of the bin widths.
"""
assert bin_edges.ndim == 1
return bin_edges[1:] - bin_edges[:-1]
[docs]
def bin_(self):
"""Do-nothing method to be reimplemented in the derived classes.
"""
raise NotImplementedError
[docs]
def write_output_file(self, hdu_list, overwrite=True):
"""Write the binned data to the output file.
"""
hdu_list.info()
logger.info('Writing %s binned data to %s...', self.get('algorithm'),
self.get('outfile'))
hdu_list.writeto(self.get('outfile'), overwrite=overwrite)
logger.info('Done.')
[docs]
class xBinnedFileBase:
"""Base class for binned files.
"""
# pylint: disable=protected-access
def __init__(self, file_path):
"""Constructor.
This is the basic interface to binned FITS files written by xpbin.
There are two different ways to instantiate class object: (i) call the
constructor given a file path; (ii) call the class method
from_file_list() passing a list of file paths (in which case the
contents of the different files are summed up together according to the
specific implementation of the __add__() method for any particular
class).
Derived classes are responsible for reimplementing the _read_data()
method, where all the relevant information is extracted from the FITS
file itself. This was originally implemented because the underlying
FITS file was closed right away in the constructor, but this prevented
any easy implementation of a method to save to disk any binned data
product. So the FITS file is now open for the entire lifetime of the
object, but the machinery for the internal storage of the data is
retained, as is now pervasive in the code and it would be too disruptive
to change it.
"""
# Mind we Initialize this to avoid isses in the destructor, where we
# clean things up.
self.__data_dict = {}
self.hdu_list = read_hdu_list_in_memory(file_path)
self.hdu_list.info()
self.primary_header = self.hdu_list['PRIMARY'].header
self._read_data()
[docs]
@classmethod
def from_file_list(cls, file_list):
"""Method to instance the binned class from a list of files.
"""
assert isinstance(file_list, Iterable)
return reduce(cls.__iadd__, [cls(file_path) for file_path in file_list])
def _read_data(self):
"""Do-nothing method to be reimplemented in the derived classes.
"""
def _read_binary_table_data(self, extension_name):
"""Convenience function retrieving all the data from a binary table.
This is looping over all the columns of the binary table identified
by a given extension name and inderting all the corresponding arrays
into the self.__data_dict dictionary.
"""
data = self.hdu_list[extension_name].data
for col in data.columns:
self.__data_dict[col.name] = col.array
def _read_image_data(self, extension_name):
"""Convenience function retrieving data from an image.
"""
self.__data_dict[extension_name] = self.hdu_list[extension_name].data
def _weighted_average(self, other, col_name, weights_col_name, default=0., invert_w2=False):
"""Convenience function to calculate the weighted average of two
different binned object for a particular column.
This comes handy when summing binned files, e.g., for the average
energy or effective modulation factor.
Note we make all efforts to avoid any zero-division error.
"""
v1 = self.__data_dict[col_name]
w1 = self.__data_dict[weights_col_name]
mask1 = w1 > 0.
v2 = other.__data_dict[col_name]
w2 = other.__data_dict[weights_col_name]
mask2 = w2 > 0.
if invert_w2:
w2 = -w2
# Initialize the array holding the weighted average to the default value.
val = numpy.full(v1.shape, default)
# Both weights are non-zero---we can do the actual weighted average.
mask = numpy.logical_and(mask1, mask2)
val[mask] = (v1 * w1 + v2 * w2)[mask] / (w1 + w2)[mask]
# Weights for other are zero.
mask = numpy.logical_and(mask1, numpy.logical_not(mask2))
val[mask] = v1[mask]
# Weights for self are zero.
mask = numpy.logical_and(numpy.logical_not(mask1), mask2)
val[mask] = v2[mask]
assert numpy.isfinite(val).all()
return val
def __getattr__(self, name):
"""Proxy method to retrieve the underlying data as if they were class
members.
Warning
-------
I guess I was feeling clever when I first implemented this, but a
definite side-effect is the necessity of an overly complicated
__setattr__() method to make sure that the underlying __data_dict
class member is properly updated when we change the table values---which
we do all the time, e.g., when summing binned products.
Note that, as of version 12, this only works if the attribute name is
spelled upper-case. This is intented to make more obvious in the source
code when we are trying to access data through this __getattr__ hook,
vs. normal member access---I think it was a terrible mistake to make this
case-insensitive to start with. Note I added some basic logic to give a
sensible error message when a user try an old-style, lower-case access.
"""
if not name.isupper():
logger.error('%s has no attribute %s.', self.__class__.__name__, name)
if name.upper() in self.__data_dict:
msg = 'Please use the upper-case form "%s" (new in version 12.0.0).'
logger.info(msg, name.upper())
abort()
return self.__data_dict[name]
def __setattr__(self, name, value):
"""Proxy method to set an entry in the underlying data as if they were
class members.
Warning
-------
This is admittedly quite terrible, but I only realize the need for an
automatic mechanism for updating __data_dict very late in the game, and
now the data access by name is all over the place.
A couple of things to remember:
* within this function we need to access the class member via a call to
the parent object.__getattribute__() in order to avoid an infinite
recursion;
* the __data_dict class member does not always exist in the life of the
object, and therefore we need to encapsulate the call into a
try/except block.
"""
try:
_data_dict = object.__getattribute__(self, '_xBinnedFileBase__data_dict')
if name in _data_dict:
_data_dict[name] = value
# Update the underlyng HDU list---this is important to have
# the thing always up to date in case you want to write it
# back to file at some point. This presents the additional
# challenge that we have to map back the __data_dict class
# member to the actual structure in the underlying FITS files.
# The following hack only works as long as we are dealing with
# either image extensions or binary tables (at position 1).
# This might be another reason why we might want to refactor the
# entire xBinnedFileBase class at some point.
try:
self.hdu_list[name].data = value
except KeyError:
self.hdu_list[1].data[name] = value
except AttributeError:
pass
object.__setattr__(self, name, value)
[docs]
def set_data(self, name, value):
"""Add a (key, value) pair to the underlying __data_dict class member.
Warning
-------
This is yet another side effect of the perverse __getattr__ / __setattr__
mechanism we have put in place for this class---if we calculate a
quantity dinamically for a binned object and we still want to be able
to access it with the same rules, we have to manually add it to the
dictionary.
"""
assert name.isupper()
self.__data_dict[name] = value
def _check_iadd(self, other, same_shape=(), same_values=()):
"""Convenience method to check that two binned object are suitable for
addition.
Typically we want to make sure that the two object are of the same class
and that the underlying data arrays have the same shape. Occasionally,
we also require that some of the data, e.g., the EBONDS, are identical.
This is a convenient hook to have for the implementation of the
__iadd__() dunder of the subclasses.
"""
assert isinstance(other, self.__class__)
for key in same_shape:
s1 = self.__data_dict[key].shape
s2 = other.__data_dict[key].shape
if s1 != s2:
logger.error('Arrays have different shapes (%s vs %s) for key "%s"', s1, s2, key)
abort('Binned files cannot be combined consistently')
for key in same_values:
v1 = self.__data_dict[key]
v2 = other.__data_dict[key]
if not numpy.array_equal(v1, v2):
logger.error('Arrays have different values for key "%s"', key)
logger.error('First array:\n%s', v1)
logger.error('Second array:\n%s', v2)
try:
delta = v1 - v2
logger.error('Difference:\n%s', delta)
except ValueError as e:
logger.error('Cannot calculate difference: %s', e)
abort('Binned files cannot be combined consistently')
[docs]
def ontime(self, scale=1.e-3):
"""Return the nominal ontime for the binned file.
Arguments
---------
scale : float
A scale factor, by default 1.e-3 (i.e., ontime in ks).
"""
return self.primary_header['ONTIME'] * scale
[docs]
def binning_algorithm(self):
"""Return the binning algorithm used to create the file.
"""
return self.primary_header['BINALG']
[docs]
def du_id(self):
"""Return the DU ID for the binned file.
.. versionadded:: 12.0.0
Warning
-------
This was added in version 12 of ixpeobssim after
https://bitbucket.org/ixpesw/ixpeobssim/issues/327
"""
key = 'DETNAM'
try:
return int(self.primary_header[key][-1])
except KeyError:
logger.error('The binned file primary header has no %s keyword.', key)
logger.error('(Note we did not propagate %s to binned file prior to version 12)', key)
return None
# pylint: disable=unused-argument
[docs]
def plot(self, *args, **kwargs):
"""Do nothing method.
"""
logger.info('%s.plot() not implemented, yet.', self.__class__.__name__)
[docs]
def write(self, file_path, overwrite=True):
"""Write the binned data to the output file.
"""
logger.info('Writing %s object to %s...', self.__class__.__name__, file_path)
self.hdu_list.writeto(file_path, overwrite=overwrite)
logger.info('Done.')
def __str__(self):
"""String formatting.
"""
return '%s content:\n%s' % (self.__class__.__name__, self.__data_dict)
[docs]
def peek_binning_algorithm(file_path):
"""Open a binned file and peek at the underlying algorithm
"""
return xBinnedFileBase(file_path).binning_algorithm()