Source code for ixpeobssim.srcmodel.magnetar

#!/urs/bin/env python
#
# Copyright (C) 2020--2021, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Interface to the magnetar tabular models by Roberto Turolla and Roberto Taverna.
"""

from __future__ import print_function, division

import os
from enum import IntEnum

import numpy
from astropy.io import fits

from ixpeobssim import IXPEOBSSIM_AUXFILES, auxfiles_missing
from ixpeobssim.utils.logging_ import logger, abort
from ixpeobssim.core.fitsio import xPrimaryHDU
from ixpeobssim.core.fitting import power_law_analytical_fit, linear_analytical_fit
from ixpeobssim.core.spline import xInterpolatedBivariateSpline
from ixpeobssim.core.stokes import xModelStokesParameters
from ixpeobssim.utils.units_ import erg_to_keV, keV_to_erg
from ixpeobssim.utils.misc import pairwise

# pylint: disable=invalid-name, too-many-arguments, no-member, too-many-locals
# pylint: disable=too-many-instance-attributes, too-few-public-methods


[docs] def parse_legacy_data(file_path, pad_bounds=True, emin=1., emax=12.): """Parse a legacy data file, predating the glorious, full model table. .. warning:: This is obsolete and, in principle, there should be no reason to use it except for testing. Here is an example of the underlying data format: .. code-block:: 89.9000 60.0000 0.500000 0.340000 0 0.00100000 0.699132 -0.300000 -0.273469 4456.25 0.776281 73.0709 -0.273469 -0.246939 834.500 0.786651 73.3261 The basic rules are: * the first line contains the model parameters: chi, xi, delta_phi, and beta; * then comes a variable number of blocks where the first (3-elements) row indicates the index and range of the bin phase and the following (5-element) rows encapsulate the spectral and polarimetric properties. More specifically, each 5-element row includes: * log10(energy_lo); * log10(energy_hi); * flux (arbitrary units---the number of counts in the underlying MC); * polarization degree; * polarization angle in decimal degrees. """ phase, energy, flux, pol_deg, pol_ang = ([] for i in range(5)) # Loop over the input file. with open(file_path, 'r') as input_file: input_file.readline() for line in input_file: values = [float(item) for item in line.split()] if len(values) == 3: _, lo, hi = values phase.append(0.5 * (lo + hi) / (2. * numpy.pi)) if len(values) == 5: loglo, loghi, f, pd, pa = values e = 0.5 * (10.**loglo + 10.**loghi) if e < emin or e > emax: continue energy.append(e) flux.append(f) pol_deg.append(pd) pol_ang.append(pa) # Convert the quantities to numpy arrays. num_phase_bins = len(phase) num_energy_bins = int(len(energy) / num_phase_bins) shape = (num_phase_bins, num_energy_bins) phase = numpy.array(phase) # Need only one copy of the array of energies... energy = numpy.array(energy[:num_energy_bins]) # ...and reshape the others appropriately. flux = numpy.array(flux).reshape(shape) pol_deg = numpy.array(pol_deg).reshape(shape) pol_ang = numpy.array(pol_ang).reshape(shape) if pad_bounds: phase, flux, pol_deg, pol_ang = _pad_arrays(phase, flux, pol_deg, pol_ang) return phase, energy, flux, pol_deg, pol_ang
def _pad_arrays(phase, *arrays): """Pad the relevant arrays so that the resulting interpolator is well-behaved in 0 and 1. .. warning:: This is only used for parsing legacy data and, in principle, there should be no reason to use it except for testing. """ phase = numpy.hstack([phase[-1] - 1., phase, phase[0] + 1.]) arrays = [numpy.vstack([array_[-1, :], array_, array_[0, :]]) for array_ in arrays] return [phase] + arrays
[docs] def parse_data_file(file_path, *target_params, num_phase_bins=9, num_energy_bins=49): """Parse the content of an input data file. The basic data structure is as follows: * a first line with the 4 parameter values; * a line with (phase_bin, phase_min, phase_max) * a series of lines with (logemin, logemax, I, Q/I, U/I) .. code-block:: 89.9000 89.9000 1.40000 0.200000 1 0.0010000000 0.69913174 -0.300000 -0.273469 852.250 0.0217493 -0.0900019 -0.273469 -0.246939 725.750 0.0282958 -0.0814848 ... 0.946939 0.973469 78.0000 0.0389240 -0.0909807 0.973469 1.00000 198.250 0.0437026 -0.0840274 2 0.69913174 1.3972635 ... """ _parse_line = lambda line: tuple([float(item) for item in line.split()]) logger.info('Parsing input file %s...', file_path) # Initialize the arrays for the output data. phase = numpy.zeros((num_phase_bins, )) energy_lo = numpy.zeros((num_energy_bins, )) energy_hi = numpy.zeros((num_energy_bins, )) I = numpy.zeros((num_phase_bins, num_energy_bins)) Q = numpy.zeros((num_phase_bins, num_energy_bins)) U = numpy.zeros((num_phase_bins, num_energy_bins)) # Read the actual file. with open(file_path, 'r') as input_file: # Make sure the first lines agrees with the target parameters from the file name. line = input_file.readline() if len(target_params) > 0: assert _parse_line(line) == target_params phase_bin = 0 energy_bin = num_energy_bins for line in input_file: # A line with 3 values, this signals the beginning of a new phase bin. values = _parse_line(line) if len(values) == 3: phase_index, phase_min, phase_max = values assert phase_index == phase_bin + 1 assert energy_bin == num_energy_bins phase[phase_bin] = 0.5 * (phase_min + phase_max) phase_bin += 1 energy_bin = 0 # A line with 5 values contains the I, q and u values for a given energy bin. elif len(values) == 5: loge_min, loge_max, _I, _q, _u = values I[phase_bin - 1][energy_bin] = _I Q[phase_bin - 1][energy_bin] = _q * _I U[phase_bin - 1][energy_bin] = _u * _I if phase_bin == 1: energy_lo[energy_bin] = 10.**loge_min energy_hi[energy_bin] = 10.**loge_max else: assert 10.**loge_min == energy_lo[energy_bin] assert 10.**loge_max == energy_hi[energy_bin] energy_bin += 1 assert phase_bin == num_phase_bins return phase, energy_lo, energy_hi, I, Q, U
[docs] def package_model_table(root_folder_path, model_name): """Build a model table package in OGIP-compliant FITS format given an archive of text files containing the Stokes spectra as a function of energy and phase. """ logger.info('Packaging magnetar model table...') # Folder structure for the different physical models. folder_dict = { xMagnetarModelsT2020.ATMOSPHERE: 'atmosphere', xMagnetarModelsT2020.BLACKBODY: 'blackbody', xMagnetarModelsT2020.SOLID_SURFACE_FREE_IONS: 'solid(free)', xMagnetarModelsT2020.SOLID_SURFACE_FIXED_IONS: 'solid(fix)' } # Convenience function to construct the file name. def _file_name(chi, xi, delta_phi, beta): """Build the file name corresponding to a given set of parammeters. """ delta_phi = '%02d' % (delta_phi * 10.) beta = ('%03d' % (beta * 100)).rstrip('0') return 'dataphaseen(%.2f-%.2f-%s-%s).dat' % (chi, xi, delta_phi, beta) # Loop over the input files and fill the spectra arrays. num_spectra = xParameterSpaceT2020.NUMBSPECTRA num_energy_bins = len(xParameterSpaceT2020.ENERGY_LO) num_phase_bins = len(xParameterSpaceT2020.PHASE) paramval = numpy.zeros((num_spectra, 6)) intpspec_I = numpy.zeros((num_spectra, num_energy_bins)) intpspec_Q = numpy.zeros((num_spectra, num_energy_bins)) intpspec_U = numpy.zeros((num_spectra, num_energy_bins)) logger.info('Looping over input files...') i = 0 for model, folder_name in folder_dict.items(): folder_path = os.path.join(root_folder_path, folder_name) for chi in xParameterSpaceT2020.CHI: for xi in xParameterSpaceT2020.XI: for delta_phi in xParameterSpaceT2020.DELTA_PHI: for beta in xParameterSpaceT2020.BETA: params = chi, xi, delta_phi, beta file_name = _file_name(*params) file_path = os.path.join(folder_path, file_name) assert os.path.exists(file_path) _phase, _elo, _ehi, I, Q, U = parse_data_file(file_path, *params) assert numpy.allclose(_phase, xParameterSpaceT2020.PHASE) assert numpy.allclose(_elo, xParameterSpaceT2020.ENERGY_LO) assert numpy.allclose(_ehi, xParameterSpaceT2020.ENERGY_HI) cols = ( numpy.full(num_phase_bins, model), numpy.full(num_phase_bins, chi), numpy.full(num_phase_bins, xi), numpy.full(num_phase_bins, delta_phi), numpy.full(num_phase_bins, beta), _phase ) _paramval = numpy.column_stack(cols) idx = slice(i * num_phase_bins, (i + 1) * num_phase_bins) paramval[idx,:] = _paramval intpspec_I[idx,:] = I intpspec_Q[idx,:] = Q intpspec_U[idx,:] = U i += 1 logger.info('Done, %d file(s) parsed.', i) logger.info('Writing output files...') def _model_keywords(hduclas2=None, hduvers='1.1.0'): """Small nested function for the common keywords for the model headers. """ keywords = [ ('HDUCLASS', 'OGIP'), ('HDUCLAS1', 'XSPEC TABLE MODEL'), ('HDUVERS', hduvers), ] if hduclas2 is not None: keywords.insert(-2, ('HDUCLAS2', hduclas2)) return keywords def _primary_hdu(model_name, model_type): """Small nested function for the primary header. See https://bitbucket.org/ixpesw/ixpeobssim/issues/438 """ header_keywords = _model_keywords() +\ [ ('MODLNAME', '%s_%s' % (model_name, model_type.lower()), 'Table model name'), ('MODLUNIT', '', 'Table model units'), ('REDSHIFT', False, 'Add redshift parameter?'), ('ADDMODEL', True, 'Is model additive?'), ('LOELIMIT', 0., 'Value of model below tabulated energies'), ('HIELIMIT', 0., 'Value of model above tabulated energies') ] return xPrimaryHDU(keywords=header_keywords) # PARAMETERS columns = [ fits.Column(name='NAME', array=xParameterSpaceT2020.NAME, format='12A'), fits.Column(name='METHOD', array=numpy.zeros((6,), dtype=int), format='J'), fits.Column(name='INITIAL', array=xParameterSpaceT2020.INITIAL, format='E'), fits.Column(name='DELTA', array=xParameterSpaceT2020.DELTA, format='E'), fits.Column(name='MINIMUM', array=xParameterSpaceT2020.MINIMUM, format='E'), fits.Column(name='BOTTOM', array=xParameterSpaceT2020.MINIMUM, format='E'), fits.Column(name='TOP', array=xParameterSpaceT2020.MAXIMUM, format='E'), fits.Column(name='MAXIMUM', array=xParameterSpaceT2020.MAXIMUM, format='E'), fits.Column(name='NUMBVALS', array=xParameterSpaceT2020.NUMBVALS, format='J'), fits.Column(name='VALUE', array=xParameterSpaceT2020.VALUE, format='PE') ] parameters = fits.BinTableHDU.from_columns(columns, name='PARAMETERS') for key, value in _model_keywords('PARAMETERS', '1.0.0'): parameters.header.set(key, value) parameters.header.set('NINTPARM', 6) parameters.header.set('NADDPARM', 0) # ENERGIES columns = [ fits.Column(name='ENERG_LO', array=xParameterSpaceT2020.ENERGY_LO, format='D'), fits.Column(name='ENERG_HI', array=xParameterSpaceT2020.ENERGY_HI, format='D') ] energies = fits.BinTableHDU.from_columns(columns, name='ENERGIES') for key, value in _model_keywords('ENERGIES', '1.0.0'): energies.header.set(key, value) # SPECTRA for spec, label in zip((intpspec_I, intpspec_Q, intpspec_U), ('I', 'Q', 'U')): columns = [ fits.Column(name='PARAMVAL', array=paramval, format='6E'), fits.Column(name='INTPSPEC', array=spec, format='49E') ] spectra = fits.BinTableHDU.from_columns(columns, name='SPECTRA') for key, value in _model_keywords('SPECTRA', '1.0.0'): spectra.header.set(key, value) # Build the actual HDU list. args = model_name, label hdu_list = fits.HDUList([_primary_hdu(*args), parameters, energies, spectra]) file_path = 'magnetar_rcs_model_table_2020MNRAS4925057T_qedoff_%s.fits' % label logger.info('Writing %s to %s...', label, file_path) hdu_list.writeto(file_path, overwrite=True) logger.info('Done.')
[docs] class xMagnetarModelsT2020(IntEnum): """Enum class encapsulating the physical models in Taverna et al. 2020 https://ui.adsabs.harvard.edu/link_gateway/2020MNRAS.492.5057T/EPRINT_PDF * ``ATMOSPHERE = 1`` * ``BLACKBODY = 2`` * ``SOLID_SURFACE_FREE_IONS = 3`` * ``SOLID_SURFACE_FIXED_IONS = 4`` """ ATMOSPHERE = 1 BLACKBODY = 2 SOLID_SURFACE_FREE_IONS = 3 SOLID_SURFACE_FIXED_IONS = 4
[docs] @classmethod def has_value(cls, value): """Convenience method that can be used downstrem to check the validity of a model passed as an integer. https://stackoverflow.com/questions/43634618 """ return value in cls._value2member_map_
[docs] class xParameterSpaceT2020: """Small namespace to encapsulate the standard grid of model parameters. """ NAME = numpy.array(['Model', 'Chi (deg)', 'xi (deg)', 'delta_phi (r', 'beta', 'phase (rad)']) MODEL = [ xMagnetarModelsT2020.ATMOSPHERE, xMagnetarModelsT2020.BLACKBODY, xMagnetarModelsT2020.SOLID_SURFACE_FREE_IONS, xMagnetarModelsT2020.SOLID_SURFACE_FIXED_IONS ] CHI = numpy.array( [0.1, 15, 30, 45, 60, 75, 89.9, 105, 120, 135, 150, 165, 179.9] ) XI = numpy.array( [0.1, 15, 30, 45, 60, 75, 85, 89.9] ) DELTA_PHI = numpy.array( [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4] ) BETA = numpy.array( [0.2, 0.3, 0.34, 0.4, 0.5, 0.6, 0.7] ) PHASE = numpy.array( [0.35006587, 1.04819762, 1.74632935, 2.4444611, 3.1425929, 3.84072455, 4.53885635, 5.23698815, 5.9351197] ) INITIAL = numpy.array([1., 60., 30., 0.5, 0.34, numpy.pi]) DELTA = numpy.array([1., 5., 5., 0.1, 0.04, 0.1]) MINIMUM = [min(item) for item in (MODEL, CHI, XI, DELTA_PHI, BETA, PHASE)] MAXIMUM = [max(item) for item in (MODEL, CHI, XI, DELTA_PHI, BETA, PHASE)] NUMBVALS = [len(item) for item in (MODEL, CHI, XI, DELTA_PHI, BETA, PHASE)] NUMBSPECTRA = numpy.prod(NUMBVALS) VALUE = [MODEL, CHI, XI, DELTA_PHI, BETA, PHASE] ENERGY_LO = numpy.array( [0.50118723, 0.53275925, 0.56631883, 0.60199377, 0.63991457, 0.68022564, 0.72307609, 0.76862410, 0.81704297, 0.86851135, 0.92322190, 0.98137886, 1.04319933, 1.10891409, 1.17876844, 1.25302345, 1.33195637, 1.41585898, 1.50505025, 1.59986007, 1.70063847, 1.80776927, 1.92164429, 2.04269723, 2.17137583, 2.30815516, 2.45355614, 2.60811057, 2.77240708, 2.94705336, 3.13269418, 3.33003655, 3.53981039, 3.76279016, 3.99982509, 4.25178211, 4.51962081, 4.80433187, 5.10696639, 5.42867697, 5.77064024, 6.13415859, 6.52057658, 6.93132081, 7.36795560, 7.83207793, 8.32545543, 8.84991297, 9.40738678] ) ENERGY_HI = numpy.append(ENERGY_LO[1:], 10.)
[docs] class xMagnetarTableModelComponentT2020: """Interface to the magnetar tabular models. The models are coming as three separate FITS files for the I, Q, and U Stokes parameters, the basic structure of each one being, e.g.: .. code-block:: No. Name Ver Type Cards Dimensions Format 0 PRIMARY 1 PrimaryHDU 15 () 1 PARAMETERS 1 BinTableHDU 47 6R x 10C [12A, J, E, E, E, E, E, E, J, PE(13)] 2 ENERGIES 1 BinTableHDU 21 49R x 2C [D, D] 3 SPECTRA 1 BinTableHDU 21 314496R x 2C [6E, 49E] (The actual dimensions of the cards might change, but the very fundamental structure of the files is fixed by the OGIP standard, I believe.) The ``PARAMETERS`` binary table encapsulate the following model parameters: * `Model`: the underlying Physical model, see the MagnetarModel enum * `chi`: angle between the line of sight and the rotation angle of the star. * `xi`: angle between the magnetic axis ans the rotation angle of the star. * `delta_phi`: twist angle. * `beta`: electron velocity in units of c. * `phase`: the phase bins. For illustration purposes, in the initial implementation we have 4 physical models x 13 chi x 8 xi x 12 delta_phi x 7 beta x 9 phase bins, that is, 314496 combinations. Spelling things out explicitely: .. code-block:: Model -> [1, 2, 3, 4] chi -> [0.1, 15, 30, 45, 60, 75, 89.9, 105, 120, 135, 150, 165, 179.9] xi -> [0.1, 15, 30, 45, 60, 75, 85, 89.9] delta_phi -> [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4] beta -> [0.2, 0.3, 0.34, 0.4, 0.5, 0.6, 0.7] phase -> [0.350066, 1.0482, 1.74633, 2.44446, 3.14259, 3.84072, 4.53886, 5.23699, 5.93512] The ``ENERGY`` binary extensions encapsulates the energy binning, and in the initial implementations has 49 bins between ~0.5 and 10 keV. Finally, the glorious ``SPECTRA`` extensions contains the spectra for all the possible configurations. It comes with two columns, the first of which contains the 6 model parameters and the second the 49 spectral values. Note the ``SPECTRA`` extension is filled by looping over the model parameters in reverse order, i.e., from `phase` to `Model`. """ def __init__(self, file_path, normalize=False): """Open the tabular model FITS files and cache the necessary data for use. """ logger.info('Loading tabular models from %s...', file_path) with fits.open(file_path) as hdu_list: hdu_list.info() # Cache the necessary stuff so that we can close the underlying # FITS file and move on. self.par_names = tuple(hdu_list['PARAMETERS'].data['NAME']) self.par_values = tuple(hdu_list['PARAMETERS'].data['VALUE']) self.energ_lo = hdu_list['ENERGIES'].data['ENERG_LO'] self.energ_hi = hdu_list['ENERGIES'].data['ENERG_HI'] self.spectral_params = hdu_list['SPECTRA'].data['PARAMVAL'] self.spectral_data = hdu_list['SPECTRA'].data['INTPSPEC'] # Normalize by the bin width in energy---this is needed for the I # component. if normalize: self.spectral_data /= (self.energ_hi - self.energ_lo) # Calculate the shape of the table---this is essentially the number # of values each of the parameters can take, e.g., (4, 13, 8, 12, 7, 9). self.shape = tuple([len(_value) for _value in self.par_values]) # Auxiliary variable to retrieve the spectra by parameter index. # This is essentially the cumulative product of the last part of the # shape in reverse order, e.g., (78624, 6048, 756, 63, 9). self._cumshape = numpy.flip(numpy.cumprod(self.shape[::-1]))[1:] # Is this the right thing? Need to talk with Roberto^2 and see # what they are actually doing when they generate the table. self.energy = 0.5 * (self.energ_lo + self.energ_hi) # And, sice we're at it, cache the phase grid for later use. Note that # the phase in the model runs from 0 to 2pi, while for the ixpeobsim # purpopses we do need a number between 0 and 1. self.phase = self.par_values[-1] / (2. * numpy.pi) # Trim the first and the last energy points in the energy grid and all # the related spectra, as in the underlying Monte Carlo simulation # the outesrmost bins act as underflow/overflow accumulators, and # the corresponding values are therefore unphysical. self.energy = self.energy[1:][:-1] self.spectral_data = self.spectral_data[:, 1:][:, :-1] logger.info('Done.\n%s', self) def __str__(self): """String formatting. """ text = 'Table shape %s\n' % str(self.shape) for name, values in zip(self.par_names, self.par_values): text += '%s: %s\n' % (name, values) return text.strip('\n')
[docs] def num_phase_bins(self): """Return the number of phase bins. """ return self.shape[-1]
[docs] def parameters(self, *indices): """Retrieve the parameter values for a given set of indices. """ return tuple(self.par_values[i][j] for i, j in enumerate(indices))
[docs] def spectrum_data(self, *indices): """Retrieve the spectrum data for a given series of indices. Note that the function requires to pass *all* the indices, *except* for the last one, i.e., the one referring to the bin phase, and the return value contains the data for all the bin phases. """ assert len(indices) == len(self.shape) - 1 index = (numpy.array(indices) * self._cumshape).sum() slice_ = slice(index, index + self.num_phase_bins()) return self.spectral_data[slice_]
@staticmethod def _nearest_index(array_, value): """Find the index of the nearest array element to the target value. Note that, since we are only using this to bisect the arrays of the parameter values, we're not doing a binary search. See https://stackoverflow.com/questions/2566412 for a more extensive discussion. """ return numpy.abs(array_ - value).argmin()
[docs] def nearest_indices(self, *params): """Return the indices of the slice of the underlying multi-dimensional array corresponding to the model that comes closest to the input parameters. """ par_values = self.par_values[:-1] return tuple([self._nearest_index(a, v) for a, v in zip(par_values, params)])
[docs] def nearest(self, *params): """Return the slice of the underlying multi-dimensional array corresponding to the model that come closest to the input parameters. """ indices = self.nearest_indices(*params) return self.spectrum_data(*indices)
@staticmethod def _combinations(values): """Build an array with all the combinations of a given set of iterables. This is used when interpolating the model table, where we bisect the all the parameter grids and we have to build a weighted average of all possible combinations. See https://stackoverflow.com/questions/1208118 for more details. """ return numpy.array(numpy.meshgrid(*values)).T.reshape(-1, len(values))
[docs] def interpolate_indices(self, *params, threshold=1.e-6): """Interpolate the underlying SPECTRA extension at a given set of input parameters. This performs a simple linear interpolation, where the first parameter (the actual model) is treated in a special fashion in that is not interpolated at all---in fact the function requires that the value passed as an argument to the function is one of the four valis values. For all the other parameters, the corresponding grids of tabulated values are bisected at the target value and the two nearest elements are weighted proportinally to the relative distance of the target. """ model, *params = params model = int(model) assert xMagnetarModelsT2020.has_value(model) par_values = self.par_values[1:-1] indices = [] weights = [] # Bisect the parameter grids. for value, grid in zip(params, par_values): i = numpy.searchsorted(grid, value) indices.append((i - 1, i)) lo, hi = grid[i - 1], grid[i] w = (hi - value) / (hi - lo) # Small hack to avoid numerical artifacts due to the fact that the # parameter grids are saved in single precision, while everywhere # else we are operating in double precision. Essentially if the # weights are too close to either 0 or 1, we attribute that to simple # numerical rounding. if w < threshold: w = 0. elif 1. - w < threshold: w = 1. weights.append((w, 1. - w)) # Create all the combination of the relevant parameters and weights. indices = self._combinations(indices) weights = self._combinations(weights) # Multiply the weights along the proper axis to come up with one value # for each parameter set. weights = numpy.prod(weights, 1) # Get rid of the combinations that will not contribute to the weighted # average. mask = weights > 0. indices = indices[mask] weights = weights[mask] # Prepend the model to the indices. model_indices = numpy.full((len(indices), 1), model - 1) indices = numpy.hstack((model_indices, indices)) return indices, weights
[docs] def weighted_average(self, indices, weights): """Calculate the weighted average for a given set of entries in the underlying SPECTRA table. """ data = numpy.full((self.shape[-1], len(self.energy)), 0.) for i, w in zip(indices, weights): data += self.spectrum_data(*i) * w return data
[docs] def interpolate(self, *params): """Interpolate the underlying SPECTRA table to a given set of parameters. """ indices, weights = self.interpolate_indices(*params) return self.weighted_average(indices, weights)
[docs] class xMagnetarTableModelT2020: """Full description of a magnetar spectro-polarimetric model. """ I_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_I.fits' Q_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_Q.fits' U_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_U.fits' def __init__(self): """Constructor. """ if self.auxfiles_missing(): abort() # Note we need to keep track of the I spectrum in two flavors: # * I0: the un-normalized spectrum, straight from the input tables, # that is needed to normalize Q and U downstream; # * I: the normalized spectrum, where the bin contents are divided by # the bin width to get the right spectrum in cm^{-2} s^{-1} keV^{-1} # # See https://bitbucket.org/ixpesw/ixpeobssim/issues/453 # for more details about the matter. self._I0 = self._load_component(self.I_FILE_NAME) self._I = self._load_component(self.I_FILE_NAME, normalize=True) self._Q = self._load_component(self.Q_FILE_NAME) self._U = self._load_component(self.U_FILE_NAME) # Minimal check on the compatibility of the components. assert numpy.allclose(self._I.energy, self._Q.energy) assert numpy.allclose(self._I.energy, self._U.energy) assert numpy.allclose(self._I.phase, self._Q.phase) assert numpy.allclose(self._I.phase, self._U.phase) # Cache the params for the energy and phase binning. self.energy = self._I.energy self.phase = self._I.phase self.par_names = self._I.par_names self.par_values = self._I.par_values @staticmethod def _load_component(file_name, normalize=False): """Load a model component. """ file_path = os.path.join(IXPEOBSSIM_AUXFILES, file_name) return xMagnetarTableModelComponentT2020(file_path, normalize)
[docs] @classmethod def auxfiles_missing(cls): """Convenience function to check for missing auxiliary files. Note this is a classmethod, so it can be invoked without an instance of the class, which can be handy, e.g., for skipping unit tests. """ return auxfiles_missing(cls.I_FILE_NAME, cls.Q_FILE_NAME, cls.U_FILE_NAME)
def _nearest_data(self, model, chi, xi, delta_phi, beta): """Finde the underlying spectrum that comes closest to the input parameters. """ assert xMagnetarModelsT2020.has_value(model) params = model, chi, xi, delta_phi, beta indices = self._I.nearest_indices(*params) return self._I0.spectrum_data(*indices),\ self._I.spectrum_data(*indices),\ self._Q.spectrum_data(*indices),\ self._U.spectrum_data(*indices) def _interpolate_data(self, model, chi, xi, delta_phi, beta): """Interpolate the underlying Stokes spectra. """ assert xMagnetarModelsT2020.has_value(model) params = model, chi, xi, delta_phi, beta indices, weights = self._I.interpolate_indices(*params) return self._I0.weighted_average(indices, weights),\ self._I.weighted_average(indices, weights),\ self._Q.weighted_average(indices, weights),\ self._U.weighted_average(indices, weights) @staticmethod def _pad_phase_data(data): """Small conveinence function to pad spectral data in phase, see the documentation of xMagnetarTableModelT2020._pad_phase() for details. """ data0 = 0.5 * (data[0] + data[-1]) return numpy.concatenate(([data[-1], data0], data, [data0, data[0]])) @staticmethod def _pad_phase(phase, I0, I, Q, U): """Pad the phase grid and the Stokes data to guarantee a proper continuity condition at the bounduaries. Mond the underlying phase grid runs from the the center of the first phase bin (> 0.) to the center of the last phase bin (< 1.), so we have to do something to avoid bogus extrapolations near 0 and 1. The basic paddind strategy is to add two points on the left, at max(phase) - 1. and 0., and two points to the right, at 1. and min(phase) + 1., so that the new grids covers the range max(phase) - 1. (< 0) to min(phase) + 1. (> 1.). The spectral data are padded accordingly, averaging the first and last phase points to set the value at 0. and 1. Note that, with this prescription, using a spline of order k = 2 for the phase axis when building the relevant 2-dimensional spline guarantees that the values at 0. and 1. are identical---i.e., you have no jumps when the phase rolls over. """ phase = numpy.concatenate(([phase.max() - 1., 0.], phase, [1., phase.min() + 1.])) I0 = xMagnetarTableModelT2020._pad_phase_data(I0) I = xMagnetarTableModelT2020._pad_phase_data(I) Q = xMagnetarTableModelT2020._pad_phase_data(Q) U = xMagnetarTableModelT2020._pad_phase_data(U) return phase, I0, I, Q, U @staticmethod def _pad_energy(energy, I, q, u, emax=15., num_points=10, fit_emin=6., fit_emax=10.): """Pad the energy grid and Stokes data above the maximum energy (~9 keV) so we do not rely on the naive (constrant) 2-d spline extrapolation. This turned out to be more complicated than originally anticipates as, while it's reasonably easy to fit the high-energy part of I with a power law and use that to extrapolate, this doesn't really work for Q and U, that can go negative. The strategy we adopt is a two-step one, namely: * we fit I in each phase bin with a power law between fit_emin and fit_emax, and we use the fit to extrapolate above fit_emax; * we fit q = Q/I and u = U/I in each phase bin with a straight line between fit_emin and fit_emax, and we use the fit to extrapolate above fit_emax. """ num_phase_bins, _ = I.shape mask = numpy.logical_and(energy >= fit_emin, energy <= fit_emax) pad_energy = numpy.linspace(fit_emax, emax, num_points) # Small anonymous function to select the part of the I, Q or U # two-dimensional arrays to be used for the fitting, which requires some # numpy magic. mask2d = numpy.tile(mask, (num_phase_bins, 1)) _subselect = lambda data: data[mask2d].reshape((num_phase_bins, len(x))) # First step: fit the high-energy part of the I data with a power law # and pad them with the fit extrapolation. x = energy[mask] y = _subselect(I).T norm, index = power_law_analytical_fit(x, y) norm = numpy.tile(norm, (num_points, 1)).T index = numpy.tile(index, (num_points, 1)).T I_padding = norm * pad_energy**index # Second step: extrapolate Q/I and U/I. def _qupadding(data): """Small convenience function to fit Q and U and extrapolate to the lower-end of the energy padding. """ y = _subselect(data).T m, q = linear_analytical_fit(x, y) m = numpy.tile(m, (num_points, 1)).T q = numpy.tile(q, (num_points, 1)).T _pad = m * pad_energy + q return _pad q = numpy.hstack((q, _qupadding(q))) u = numpy.hstack((u, _qupadding(u))) # Note that I goes last so that we have access to the original array # up to the last minute, and we can use it, e.g., to estimate a proxy # of the errors in _qupadding. I = numpy.hstack((I, I_padding)) energy = numpy.hstack((energy, numpy.linspace(fit_emax, emax, num_points))) return energy, I, q, u @staticmethod def _build_splines(energy, phase, I0, I, Q, U, integral_flux, emin, emax, pad_phase=True, pad_energy=True): """Build a set of three splines that can be used by ixpeobssim to run a simulation. Note that, With our prescrition on the padding, using a spline of order 2 for the phase axis guarantees continuity when the phase rolls over, so we don't expose ky in the public API. Likewise, the spline order over the energy axis is hard-coded. The flags for disabling the padding in phase and energy are here mainly for testing purposes---so that we can disengage the padding and compare the spline interpolation with the underlying data directly. """ kx = 2 ky = 2 if pad_phase: phase, I0, I, Q, U = xMagnetarTableModelT2020._pad_phase(phase, I0, I, Q, U) q = Q / I0 u = U / I0 if pad_energy: energy, I, q, u = xMagnetarTableModelT2020._pad_energy(energy, I, q, u) pd = xModelStokesParameters.polarization_degree(q, u) pa = xModelStokesParameters.polarization_angle(q, u) fmt = dict(kx=kx, ky=ky, xlabel='Energy [keV]', ylabel='Phase', zlabel='Flux [cm$^{-2}$ s$^{-1}$ keV$^{-1}$]') spec_spline = xInterpolatedBivariateSpline(energy, phase, I.T, **fmt) if integral_flux is not None: # Normalize the spectral spline to the correct integral flux. # Note that we need a custom spline for the energy flux, here---see # https://bitbucket.org/ixpesw/ixpeobssim/issues/471 espec_spline = xInterpolatedBivariateSpline(energy, phase, (energy * I).T) norm = erg_to_keV(integral_flux / espec_spline.integral(emin, emax, 0., 1.)) spec_spline = spec_spline.scale(norm) fmt.update({'zlabel': 'Polarization degree'}) pol_deg_spline = xInterpolatedBivariateSpline(energy, phase, pd.T, **fmt) fmt.update({'zlabel': 'Polarization angle [rad]'}) pol_ang_spline = xInterpolatedBivariateSpline(energy, phase, pa.T, **fmt) return spec_spline, pol_deg_spline, pol_ang_spline @staticmethod def _wrap_splines(spec_spline, pol_deg_spline, pol_ang_spline): """Wrap a set of splines in such a way they have the right signature for ixpeobssim to use them. Note that we do slightly more than just wrapping the splines, as, for the case of the polarization degree, we also make sure that the values returned lie within the physical range [0., 1.]. This is necessary because the underlying models are noisy, and when interpolating with a cubic spline it is possible to undershoot or overshoot the physical bounds. """ def spec(E, t): """Nested spectrum definition. """ return spec_spline(E, t) def pol_deg(E, t, ra=None, dec=None): """Nested polarization degree definition. Note we are clipping the values to the [0., 1.] interval. """ # pylint: disable=unused-argument return numpy.clip(pol_deg_spline(E, t), 0., 1.) def pol_ang(E, t, ra=None, dec=None): """Nested polarization angle definition. """ # pylint: disable=unused-argument return pol_ang_spline(E, t) return spec, pol_deg, pol_ang
[docs] def nearest_splines(self, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10., pad_phase=True, pad_energy=True): """Return a set of model splines for the nearest set of parameters in the underlying model table. """ data = self._nearest_data(model, chi, xi, delta_phi, beta) args = self.energy, self.phase, *data, integral_flux, emin, emax, pad_phase, pad_energy return self._build_splines(*args)
[docs] def interpolated_splines(self, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10., pad_phase=True, pad_energy=True): """Return a set of model splines interpolating the underlying model table to the target parammeters. """ data = self._interpolate_data(model, chi, xi, delta_phi, beta) args = self.energy, self.phase, *data, integral_flux, emin, emax, pad_phase, pad_energy return self._build_splines(*args)
[docs] def nearest(self, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10.): """Return a set of wraps of the nearest splines ready to be used in ixpeobssim. """ args = model, chi, xi, delta_phi, beta, integral_flux, emin, emax splines = self.nearest_splines(*args) return self._wrap_splines(*splines)
[docs] def interpolate(self, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10.): """Return a set of wraps of the interpolated splines ready to be used in ixpeobssim. """ args = model, chi, xi, delta_phi, beta, integral_flux, emin, emax splines = self.interpolated_splines(*args) return self._wrap_splines(*splines)
[docs] def energy_spectrum(self, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10.): """Return a bivariate spline representing the energy spectrum. This is essentially retrieving the photon spectrum (as a bivariate spline) and multiplying the z array with the proper tiling of the x array (i.e., the energy). """ args = model, chi, xi, delta_phi, beta, integral_flux, emin, emax spec_spline, _, _ = self.interpolated_splines(*args) z = spec_spline.z * numpy.tile(spec_spline.x, (len(spec_spline.y), 1)).T return xInterpolatedBivariateSpline(spec_spline.x, spec_spline.y, z)
[docs] def phase_resolved_integral_flux(self, phase_binning, model, chi, xi, delta_phi, beta, integral_flux, emin=2., emax=10.): """Return the integral flux of the model (in erg/cm^2) in arbitrary phase bins. """ espec = self.energy_spectrum(model, chi, xi, delta_phi, beta, integral_flux, emin, emax) flux = numpy.array([espec.integral(emin, emax, min_, max_) / (max_ - min_) \ for (min_, max_) in pairwise(phase_binning)]) return keV_to_erg(flux)
[docs] class xMagnetarTableModelT2020QedOff(xMagnetarTableModelT2020): """Full description of a magnetar spectro-polarimetric model no QED effects. """ I_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_qedoff_I.fits' Q_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_qedoff_Q.fits' U_FILE_NAME = 'magnetar_rcs_model_table_2020MNRAS4925057T_qedoff_U.fits'
if __name__ == '__main__': package_model_table('/data/temp/qedoff', 'Magnetar_1708_qedoff')