#!/usr/bin/env python
#
# Copyright (C) 2018--2020, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import print_function, division
import os
import numpy
from astropy.io import fits
import xspec
from ixpeobssim import IXPEOBSSIM_XSPEC
from ixpeobssim.utils.logging_ import logger, abort
from ixpeobssim.irf.caldb import irf_folder_path
from ixpeobssim.utils.os_ import check_input_file
from ixpeobssim.binning.base import peek_binning_algorithm, xBinnedFileBase
from ixpeobssim.utils.matplotlib_ import plt, setup_gca, residual_plot, xStatBox
from ixpeobssim.utils.fmtaxis import fmtaxis
from ixpeobssim.instrument import DU_IDS
from ixpeobssim.utils.matplotlib_ import last_line_color, nlog_errorbars
# pylint: disable=invalid-name
STAT_METHODS = ['chi', 'cstat', 'lstat', 'pgstat', 'pstat', 'whittle']
POLARIMETRIC_ADDITIVE_MODELS = ['apolconst', 'apollin', 'apolpow']
[docs]
class xXspecSpectrumManager(dict):
"""Small facility to keep track of the spectra being loaded into XSPEC.
This is essentially a dict where the type of the binned spectra and the
DU Ids are mapped into the data group in the XSPEC memory, so that we
can easily retrieve the plot data at any time.
"""
[docs]
def register(self, file_path, data_group):
"""Register a spectrum.
Note we peek into the binned file to get an hold on the binning algorithm
and the DU ID.
"""
input_file = xBinnedFileBase(file_path)
bin_alg = input_file.binning_algorithm()
du_id = input_file.du_id()
args = os.path.basename(file_path), bin_alg, du_id, data_group
logger.info('Registering [...]%s (%s, DU %d) at data group %d', *args)
self[self._key(bin_alg, du_id)] = data_group
@staticmethod
def _key(bin_alg, du_id):
"""Build the indexing key from the binning algorithm and the DU ID.
"""
return (bin_alg, du_id)
[docs]
def data_group(self, bin_alg, du_id, default=None):
"""Return the XSPEC data group mapping to a particular binning algorithm
and DU ID.
"""
return self.get(self._key(bin_alg, du_id), default)
[docs]
def spectrum_types(self):
"""Return a tuple with all the binning algorithms containing at
least a spectrum.
"""
return tuple(set([key[0] for key in self]))
# Global variable for the book-keeping of the data sets loaded in memory.
# This is cleared up and re-populated at each load_input_files() call.
_SPECTRUM_MANAGER = xXspecSpectrumManager()
[docs]
class xXspecPlotData:
"""Small container class storing the XSPEC plot data.
Plot data are typically retrieved by function calls to the global xspec.Plot
object, and this contained provides a convenient interface to cache plot
data for later use. The container encapsulates the x and y values, the
errors on the y values and, if available, the model calculated in
correspondence of the x array.
"""
def __init__(self, energy, data, errors, model=None):
"""Constructor.
"""
self.energy = numpy.array(energy)
self.data = numpy.array(data)
self.errors = numpy.array(errors)
if model is not None:
self.model = numpy.array(model)
else:
self.model = None
def __len__(self):
"""Return the lenght of the underlying data vector.
"""
return len(self.energy)
[docs]
def save(self, file_path):
"""To be implemented.
"""
raise NotImplementedError
def __str__(self):
"""String representation.
"""
if self.model is not None:
text = ''
else:
text = ' not'
return '%d point(s) found, model%s available' % (len(self), text)
[docs]
class xXspecFitData:
"""Small containter class to store the output of an XSPEC spectral fit.
This includes the parameter names, best-fit values and errors, the test
statistics and the number of degrees of freedom.
"""
def __init__(self, model):
"""Constructor.
"""
self.test_statistic = xspec.Fit.testStatistic
self.stat_test = xspec.Fit.statTest
self.dof = xspec.Fit.dof
self.__index = 0
self.model_expression = model.expression
self.par_names = []
self.__par_index_dict = {}
self.par_values = []
self.par_errors = []
self.par_low_bounds = []
self.par_high_bounds = []
self.par_status_codes = []
for i in range(model.nParameters):
par = model(i + 1)
self.par_names.append(par.name)
self.__par_index_dict[par.name] = i
self.par_values.append(par.values[0])
self.par_errors.append(par.sigma)
low_bound, high_bound, status_code = par.error
self.par_low_bounds.append(low_bound)
self.par_high_bounds.append(high_bound)
self.par_status_codes.append(status_code)
def __call__(self, identifier):
"""Overloaded method returning the parameter value and error at a given
index.
"""
if isinstance(identifier, str):
assert identifier in self.par_names
identifier = self.__par_index_dict[identifier]
return self.par_values[identifier], self.par_errors[identifier]
[docs]
def par_value(self, identifier):
"""Return the parameter value at a given index.
"""
if isinstance(identifier, str):
assert identifier in self.par_names
identifier = self.__par_index_dict[identifier]
return self.par_values[identifier]
[docs]
def par_error(self, identifier):
"""Return the parameter error at a given index.
"""
if isinstance(identifier, str):
assert identifier in self.par_names
identifier = self.__par_index_dict[identifier]
return self.par_errors[identifier]
def __len__(self):
"""Return the number of parameters in the model.
"""
return len(self.par_names)
def __iter__(self):
"""Implementation of the iteration protocol.
"""
return self
def __next__(self):
"""Implementation of the iteration protocol.
"""
if self.__index < len(self):
i = self.__index
self.__index += 1
return self.par_names[i], self.par_values[i], self.par_errors[i],\
self.par_low_bounds[i], self.par_high_bounds[i], self.par_status_codes[i]
# I don't know that this is a clever thing to do, here---need to
# study the Python iteration protocol better.
self.__index = 0
raise StopIteration()
[docs]
def next(self):
"""Alternative method for Python 2 compatibility.
"""
return self.__next__()
def __str__(self):
"""String formatting.
"""
text = 'Fit model: %s (%s = %.2f / %d)\n' %\
(self.model_expression, self.stat_test, self.test_statistic, self.dof)
for name, value, error, low_bound, high_bound, status_code in self:
if error > 0:
text += '%15s: %.3e +/- %.3e' % (name, value, error)
if low_bound != high_bound:
pos = high_bound - value
neg = value - low_bound
text += ' (+%.3e / -%.3e) %s' % (pos, neg, status_code)
text += '\n'
else:
text += '%15s: %.3e (frozen)\n' % (name, value)
text.strip('\n')
return text
[docs]
def stat_box(self, **kwargs):
"""Return a stat box that can be overlaid onto a given plot.
Note that when the error command has been run after the fit, we do
include the average of the (asymmetric) error bars in the stat box.
"""
stat_box = xStatBox(**kwargs)
stat_box.add_entry('Fit model: %s' % self.model_expression)
stat_box.add_entry('TS (%s) = %.2f / %d dof' %\
(self.stat_test, self.test_statistic, self.dof))
for name, value, error, low_bound, high_bound, status_code in self:
if low_bound != high_bound:
error = 0.5 * (high_bound - low_bound)
stat_box.add_entry(name, value, error)
return stat_box
[docs]
def compare_fit_data(fit_data, target_values, threshold=5.):
"""Compare the best-fit parameter values with the input model.
Arguments
---------
fit_data : xXspecFitData instance
The fit output.
target_values : array_like
The corresponding values from the input model.
threshold : float
The threshold (in units of sigma) for determining the agreement
between the best fit parameters and the model inputs.
Return
------
Zero if the bes-fit parameters agree with the input, and an error code > 0
otherwise.
"""
error_code = 0
logger.info('Comparing fit output with input model...')
for i, (name, value, error, _, _, _) in enumerate(fit_data):
target = target_values[i]
# This is catching possible frozen parameters.
if error > 0:
delta = abs((value - target) / error)
else:
delta = 0
msg = '%s = %.4e +/- %.4e, target = %.4e (%.2f sigma)' %\
(name, value, error, target, delta)
if delta > threshold:
error_code += 1
logger.error(msg)
else:
logger.info(msg)
return error_code
[docs]
def reset():
"""Global reset.
"""
xspec.AllData.clear()
# This is needed to avoid XSPEC prompting the user for, e.g., missing
# response files, which we take care of programmaticaly.
xspec.Xset.allowPrompting = False
[docs]
def add_model_string(key, value):
"""Add a key,value pair of strings to XSPEC's internal database.
(simple wrapper around the xspec.Xset.addModelString() method.)
This database provides a way to pass string values to certain model functions
(e.g., POW_EMIN and POW_EMAX) which are hardcoded to search for "key".
(See the XSPEC manual description for the "xset" command for a table showing
model/key usage.)
If the key,value pair already exists, it will be replaced with the new entries.
"""
logger.info('Setting "%s" to %s in the XSPEC internal database...', key, value)
xspec.Xset.addModelString(key, str(value))
[docs]
def add_model_strings(**kwargs):
"""Facility to add multiple model string to xspec.
"""
for key, value in kwargs.items():
add_model_string(key, value)
[docs]
def select_energy_range(emin=None, emax=None):
"""Apply a global mask on the energy value for the channels to be fitted.
"""
if emin is not None:
logger.info('Ignoring channels below %.3f keV...', emin)
xspec.AllData.ignore('**-%f' % emin)
if emax is not None:
logger.info('Ignoring channels above %.3f keV...', emax)
xspec.AllData.ignore('%f-**' % emax)
[docs]
def load_local_models(package_name='ixpeobssim'):
"""Load all the ixpeobssim local models.
We introduced this function, and called it automatically, version 12.0.0,
based on the idea that whoever had XSPEC installed also had the XSPEC
local models installed. This turned out to be a glaring usability issue,
as there are many possible ways one could be unable to compile the local
models, see https://bitbucket.org/ixpesw/ixpeobssim/issues/350
As of version 12.1.0 we wrap the XSPEC call in a try / except, and happily
proceed even if the local models cannot be loaded.
"""
args = package_name, IXPEOBSSIM_XSPEC
logger.info('Loading XSPEC local models from the "%s" package in %s...', *args)
try:
xspec.AllModels.lmod(*args)
except:
logger.error('Could not XSPEC load local models.')
logger.error('(This might indicate that you need to compile them.)')
logger.error('See the documentation for more info about XSPEC support.')
logger.info('Done.')
[docs]
def setup_fit_model(expression, par_values=None):
"""Create a model for spectral fitting in XSPEC.
Note we abort if the model cannot be created---this can happen, e.g.,
if one is trying to use local models without having compiled them. This
seems as the only sane option, at this point, as the result of moving on
would depend on the internal XSPEC state (is there any active model?) and
this would be likely more confusing to the user than stating straight away
what the problem is.
"""
expression = expression.strip()
logger.info('Setting up model "%s"...', expression)
try:
model = xspec.Model(expression)
except:
# Opsss, cannot create the model...
logger.error('Could not create model "%s"', expression)
logger.error('Did you forget to compile the local models?')
abort('Cannot proceed with fitting')
if par_values is not None:
for i, par_value in enumerate(par_values):
par_index = i + 1
logger.info('Setting parameter %d to %.2f..', par_index, par_value)
model(par_index).values = par_value
# If needed, freeze the phony normalization for additive, purely polarimetric
# models.
if expression in POLARIMETRIC_ADDITIVE_MODELS:
logger.info('Freezing the normalization to 1. for additive model %s...', expression)
# Note that the normalization is the last parameters, and we start
# counting from 1.
parameter = model(model.nParameters)
parameter.values = 1.
parameter.frozen = True
return model
[docs]
def set_parameter(par_index, par_value, model_id=1):
"""Set the model parameter value corresponding to provided index.
"""
model = xspec.AllModels(model_id)
parameter = model(par_index)
parameter.values = par_value
return parameter
[docs]
def fix_parameter(par_index, par_value, model_id=1):
"""Fix a model parameter to a given value.
"""
parameter = set_parameter(par_index, par_value, model_id)
parameter.frozen = True
[docs]
def set_parameter_range(par_index, min_value, max_value, model_id=1):
"""Set the range for a parameter.
"""
model = xspec.AllModels(model_id)
parameter = model(par_index)
values = parameter.values
values[2] = min_value
values[3] = min_value
values[4] = max_value
values[5] = max_value
parameter.values = values
return parameter
[docs]
def fit(stat_method='chi', max_iterations=10, error=True):
"""Perform a spectral fit.
"""
assert stat_method in STAT_METHODS
xspec.Fit.nIterations = max_iterations
xspec.Fit.statMethod = stat_method
xspec.Fit.perform()
if error:
calculate_confidence_intervals()
xspec.Fit.show()
return current_fit_output()
[docs]
def calculate_confidence_intervals(model_id=1, delta_fit_statistics=1.):
"""Calculate the confidence intervals for a fit.
This is running the error XSPEC command, see
https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/node78.html
"""
model = xspec.AllModels(model_id)
arg_string = '%.5f 1-%d' % (delta_fit_statistics, model.nParameters)
xspec.Fit.error(arg_string)
[docs]
def current_fit_output(model_id=1):
"""Return the current fit output.
"""
model = xspec.AllModels(model_id)
return xXspecFitData(model)
[docs]
def retrieve_plot_data(bin_alg, du_id):
"""Retrieve the plot data (and, if available, the corresponding model)
for a given spectrum.
Warning
-------
Note that the plot data within XSPEC are intented to be normalized wrt the
energy---i.e., the input from the FITS table is divided by the width of the
energy bin, and the units get an additional keV^{-1}.
"""
group = _SPECTRUM_MANAGER.data_group(bin_alg, du_id)
if group is None:
return
xspec.Plot.xAxis = 'keV'
xspec.Plot('data')
data = [xspec.Plot.x(group), xspec.Plot.y(group), xspec.Plot.yErr(group)]
try:
data.append(xspec.Plot.model(group))
except:
pass
data = xXspecPlotData(*data)
logger.info('Done, %s', data)
return data
[docs]
def plot_normalized_counts(plot_data, du_id, **kwargs):
"""Plot the normalized counts.
Note that we use full circles for positive values and empty circles for
negative ones, so that we can plot Stokes parameters (that can go negative)
in logarithmic scale.
"""
kwargs.setdefault('zorder', 1)
kwargs.setdefault('ms', 5)
kwargs.setdefault('fmt', 'o')
kwargs['label'] = 'DU %d' % du_id
nlog_errorbars(plot_data.energy, plot_data.data, plot_data.errors, **kwargs)
if plot_data.model is not None:
plt.plot(plot_data.energy, plot_data.model, zorder=2, color='black')
[docs]
def plot_residuals(plot_data):
"""Plot the model residuals.
"""
res = (plot_data.data - plot_data.model) / plot_data.errors
plt.errorbar(plot_data.energy, res, 1., fmt='o', zorder=1, ms=5)
plt.plot(plot_data.energy, numpy.zeros(plot_data.energy.shape),
zorder=2, color='black')
setup_gca(grids=True, **fmtaxis.ene_pulls_sigma)
def _ylabel(bin_alg):
"""Return the units for a given binned spectrum.
"""
label = 'Normalized %s counts' % bin_alg
if bin_alg in ('PHA1', 'PHA1Q', 'PHA1U'):
return '%s [s$^{-1}$ keV$^{-1}$]' % label
if bin_alg in ('PHA1QN', 'PHA1UN'):
return '%s [keV$^{-1}$]' % label
[docs]
def plot(figure_name='XSPEC spectrum', logy=True):
"""Custom, matplotlib-based implementation of the standard XSPEC
spectral-fit plot.
"""
for bin_alg in _SPECTRUM_MANAGER.spectrum_types():
ax1, ax2 = residual_plot('%s %s' % (figure_name, bin_alg))
for du_id in DU_IDS:
data = retrieve_plot_data(bin_alg, du_id)
if data is None:
continue
plt.sca(ax1)
plot_normalized_counts(data, du_id)
plt.sca(ax2)
plot_residuals(data)
plt.sca(ax1)
setup_gca(ylabel=_ylabel(bin_alg), grids=True, logy=logy)
fit_data = current_fit_output()
fit_data.stat_box(position='lower left').plot()
plt.legend()
[docs]
def energy_flux(emin=2., emax=8.):
"""Return the value of the energy flux and of the photon flux between
emin and emax.
"""
xspec.AllModels.calcFlux('%.3f %.3f err' % (emin, emax))
return xspec.AllData(1).flux
[docs]
def sample_spectral_model(expression, parameters, emin=1., emax=12.,
num_points=250, name=None, source_id=1):
"""Sample the values for a generic XSPEC model, given an expression and a set
of parameters.
Most notably, this is used to feed into ixpeobssim time-independent
XSPEC spectral models.
Arguments
---------
expression : str
The model expression string, using full component names.
name : str
The model name.
source_id : int
The source number.
parameters : dict or list
The model parameters.
emin, emax : double
Energy limits, in keV.
num_points : int
The number of points to sample the spectrum.
"""
if name is None:
name = 'Model%d' % (len(xspec.AllModels.sources) + 1)
model = xspec.Model(expression, name, source_id, parameters)
parameter_names = [model(i + 1).name for i in range(model.nParameters)]
# Set the energy grid for sampling the spectrum.
# In XSPEC language, this is effectively the same thing as defining a linear
# binning in energy and then taking the bin centers.
# Since on our end we are interested in spectra at precise energy points, we
# add the proper padding, here, so that in the end the output energy array
# is what we mean with the input parameters---within numerical accuracy.
padding = 0.5 * (emax - emin) / num_points
emin -= padding
emax += padding
binning = numpy.linspace(emin, emax, num_points + 1)
xspec.AllModels.setEnergies("%.12f %.12f %i lin" % (emin, emax, num_points))
xspec.Plot.device = "/null"
xspec.Plot('model %s' % name)
energy = numpy.array(xspec.Plot.x())
# Mind this is the actual flux integrated over the bin and divided by the
# bin width---not the flux calculated at the bin center, so an
# a-posteriori correction might be needed.
flux = numpy.array(xspec.Plot.model())
xspec.AllModels.setEnergies('reset')
return binning, energy, flux, parameter_names