Source code for ixpeobssim.instrument.charging

#!/urs/bin/env python
#
# Copyright (C) 2019, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Charging-related facilities.
"""

from __future__ import print_function, division

import numpy
from astropy.io import fits

from ixpeobssim.core.fitsio import xPrimaryHDU, xBinTableHDUBase
from ixpeobssim.core.hist import xGpdMap3d, xHistogramBase, xHistogram2d
from ixpeobssim.instrument.gpd import GPD_PHYSICAL_HALF_SIDE_X, GPD_PHYSICAL_HALF_SIDE_Y
from ixpeobssim.utils.logging_ import logger, abort


# pylint: disable=invalid-name, no-member, too-many-arguments, too-many-ancestors
# pylint: disable=too-many-locals


DEFAULT_K_C = 1.2e5
DEFAULT_TAU_D = 1.1e5
DEFAULT_DELTA_MAX = 0.067


[docs] def charging_tau(energy_flux, k_c=DEFAULT_K_C): """Return the time constant for the GEM charging in absence of discharge. """ return k_c / energy_flux
[docs] def effective_tau(energy_flux, k_c=DEFAULT_K_C, tau_d=DEFAULT_TAU_D): """Return the effective time constant for the GEM in an arbitrary state. """ return 1. / (1. / charging_tau(energy_flux, k_c) + 1. / tau_d)
[docs] def asymptotic_gain_delta(energy_flux, k_c=DEFAULT_K_C, tau_d=DEFAULT_TAU_D, delta_max=DEFAULT_DELTA_MAX): """Return the asymptotic gain decrease for the GEM in an arbitrary state. """ return -delta_max / (1. + charging_tau(energy_flux, k_c) / tau_d)
[docs] def gain_profile(energy_flux, k_c=DEFAULT_K_C, tau_d=DEFAULT_TAU_D): """Return a lambda to generate a gain profile, given some model parameters. to plot the actual time-profile of the gain. """ tau = effective_tau(energy_flux, k_c, tau_d) delta = asymptotic_gain_delta(energy_flux, k_c, tau_d) return lambda t: 1. + delta * (1. - numpy.exp(-t / tau))
[docs] def calculate_gain(time_, energy_flux, initial_gain=1., k_c=DEFAULT_K_C, tau_d=DEFAULT_TAU_D, delta_max=DEFAULT_DELTA_MAX): """Calculate the gain as a function of time for a given input energy flux. The time grid and the corresponding energy flux are passed as numpy arrays (of the same shape). The system is integrated using finite differences. """ gain = [initial_gain] for dt, r in zip(numpy.diff(time_), energy_flux[1:]): g = gain[-1] delta = 1. - g g -= r * delta_max / k_c * (1 - delta / delta_max) * dt g += delta / tau_d * dt gain.append(g) gain = numpy.array(gain) return gain
[docs] class xChargingPrimaryHDU(xPrimaryHDU): """Charging map file primary header. """ HEADER_KEYWORDS = [ ('TELESCOP', 'IXPE' , 'Telescope name'), ('INSTRUME', 'GPD' , 'Instrument name'), ('DETNAM' , 'N/A' , 'Name of the logical detector unit'), ('DET_ID' , 'N/A' , 'Name of the physical detector unit'), ('CREATOR' , 'N/A' , 'Creator app'), ('ORIGIN' , 'IXPE team', 'Organization responsible for the data'), ('DATE' , 'N/A' , 'File creation date') ] HEADER_COMMENTS = []
[docs] class xBinTableHDUCharging(xBinTableHDUBase): """Binary table description for the CHRG_MAP extension of the observation output files. """ NAME = 'CHRG_MAP' HEADER_KEYWORDS = [ ('VERSION' , 1 , 'Extension version number'), ('CVSD0001', 'N/A', 'Date when this file should first be used'), ('CVST0001', 'N/A', 'Time of day when this file should first be used'), ('NUM_BINS', 'N/A', 'Number of bins per side of the map') ] HEADER_COMMENTS = ['This extension provides a map of the detector '\ 'charging status expressed as a fraction of its maximum value.'] DATA_SPECS = [ ('BINX', 'I', None, 'Index for the x coordinate'), ('BINY', 'I', None, 'Index for the y coordinate'), ('SLOW', 'D', None, 'Parameters for the charging slow component'), ('FAST', 'D', None, 'Parameters for the charging fast component') ]
[docs] def read_charging_map(file_path): """ Open a charging map file and read the values for the slow and fast component. We get the size of the arrays from the header.""" logger.info('Reading charging map from %s...', file_path) charging_map = fits.open(file_path)['CHRG_MAP'] nside = charging_map.header['NUM_BINS'] # Initialize the initial slow and fast values as 2d arrays filled with zeroes charging_fast = numpy.full((nside, nside), 0.) charging_slow = numpy.full((nside, nside), 0.) # We fill the two arrays without doing any assumption on the order of the # values in the FITS files, but using explicitly the index from the BINX # and BINY columns. This is slower but safer, as it will work even if we # change the ordering in the input file. data_fast = charging_map.data['FAST'] data_slow = charging_map.data['SLOW'] binx = charging_map.data['BINX'] biny = charging_map.data['BINY'] for i, (x, y) in enumerate(zip(binx, biny)): # Note: indexes of numpy are (row, column), so y goes first charging_fast[y, x] = data_fast[i] charging_slow[y, x] = data_slow[i] logger.info('Done.') return charging_fast, charging_slow
[docs] def create_charging_map_extension(fast_map, slow_map=None, start_date='N/A', start_time='N/A', version=1): """ Create the CHRG_MAP extension for a charging map file. """ if fast_map.ndim != 2: abort('Charging maps must be 2-D arrays.') nside = fast_map.shape[0] if nside != fast_map.shape[1]: abort('Charging map dimensions must be the same on both axes') # Note how we use numpy.tile and numpy.repeat to get the right sequence # respectively for the rows and the columns binx = numpy.tile(numpy.arange(nside), nside) biny = numpy.repeat(numpy.arange(nside), nside) if slow_map is None: slow_map = numpy.full(fast_map.shape, 0.) slow = slow_map.flatten() fast = fast_map.flatten() charging_hdu = xBinTableHDUCharging(data=[binx, biny, slow, fast]) keywords = { 'VERSION' : version, 'CVSD0001' : start_date, 'CVST0001' : start_time, 'NUM_BINS' : nside } for key, value in keywords.items(): charging_hdu.set_keyword(key, value) return charging_hdu
[docs] def read_charging_parameters(input_file_path): """Open a charging parameters calibration file, read the parameters and return them. Parameters ---------- input_file_path : string the path to the input FITS file storing the charging parameters """ hdu_list = fits.open(input_file_path) params = hdu_list['CHRG_PAR'].data[0] return params['KC_FAST'], params['TD_FAST'], params['DM_FAST'], \ params['KC_SLOW'], params['TD_SLOW'], params['DM_SLOW']
[docs] class xEnergyFluxCube(xGpdMap3d): """Main data structure for the charging-induced gain correction. """ def __init__(self, nside, tedges, zlabel='Mission Elapsed Time [s]', wlabel='Energy flux [keV mm$^{-2}$ s$^{-1}$]'): """Overloaded constructor. """ xGpdMap3d.__init__(self, nside, tedges, zlabel=zlabel, wlabel=wlabel) # Cache the pixel area, used to normalize the flux at fill() time. self.pixel_area = (GPD_PHYSICAL_HALF_SIDE_X / nside) * (GPD_PHYSICAL_HALF_SIDE_Y / nside) self.__gain_data = None self.charging_map = None
[docs] def xbinning(self): """Return the histogram binning on the detx axis. """ return self.binning[0]
[docs] def ybinning(self): """Return the histogram binning on the dety axis. """ return self.binning[1]
[docs] def tbinning(self): """Return the histogram binning on the time axis (aka z-axis). """ return self.binning[2]
#pylint: disable=arguments-differ
[docs] def fill(self, detx, dety, time_, energy, deadtime_correction=None, **kwargs): """Overloaded fill() method. Here we essentially take care of the energy weighting and the normalization by the elapsed time and the pixel area. This is not completely trivial as the mean energy must be calculated in the same time slices used to bin the original histogram, to cope with the possibility that the source spectrum---or the event flux on the detector, for whatever reason---changes with time. Warning ------- The deadtime correction is passed as a single number, i.e., averaged over the full data set. """ # Do an initial filling of the three-dimensional histogram with the # number of events per space-time cube. xGpdMap3d.fill(self, detx, dety, time_, **kwargs) # Create a temporary 2-dimensional histogram in time and energy to # calculate the average event energy in appropriate time slices. tbinning = self.tbinning() # Add some padding to the energy binning to deal with the case # where we are passing a monochromatic line. pad = 0.1 ebinning = numpy.linspace(energy.min() - pad, energy.max() + pad, 501) energy_hist = xHistogram2d(tbinning, ebinning).fill(time_, energy) # Calculate the mean energy in each time slice. ebin_centers = energy_hist.bin_centers(1) weights = energy_hist.content energy_sum = weights.sum(axis=1) mean_energy = (ebin_centers * weights).sum(axis=1) / energy_sum mask = energy_sum > 0. mean_energy[~mask] = 0. # And now we can calculate the actual energy flux per unit area and # overwrite the histogram content. Let's go step by step. # First comes the rates in counts per second. flux = self.content / numpy.diff(tbinning) if deadtime_correction is not None: # Next the deadtime correction---note that, in general, the deadtime # correction will depend on time, as the counting rate does. # Calculate the ovearall *measured* rate on the GPD for each time bin. overall_flux = flux.sum(axis=0).sum(axis=0) # Translate the average deadtime correction coefficient in the input # FITS file into the corresponding average dead time per event. dead_time = (1. - deadtime_correction) / overall_flux.mean() # Calculate the corrected rate in every time bin using the usual # formula (note that overall_rate is an array, here). flux /= (1. - overall_flux * dead_time) # And, finally, the product with the average energy and the # normalization to the pixel area. flux *= mean_energy / self.pixel_area # And, it goes without saying, update the histogram. self.set_content(flux)
[docs] def time_slice(self, tmin=None, tmax=None): """Return a time slice of the irradiation history, between generic bounds. By default this is returning the average energy flux per unit area over the entire input sample. Note that the times are adjusted to the closest bih edges. """ # Convert the physical times into the corresponding bin edges on the # time axis. if tmin is None: _min = 0 else: _min = numpy.searchsorted(self.tbinning(), tmin) if tmax is None: _max = -1 else: _max = numpy.searchsorted(self.tbinning(), tmax) # Create an empty hyper-histogram to hold the map of the energy flux # per unit area. hist = xHistogram2d(*self.binning[0:2], *self.labels[0:2], self.labels[3]) # Do the appropriate averaging of the glorious three-dimensional weights # in order to have the appropariate bidimensional slice. flux = self.content[:, :, _min:_max].mean(axis=2) # Set the contents and return the histogram. hist.set_content(flux) return hist
[docs] @staticmethod def step(flux, dt, dg, k_c, delta_max, tau_d): """ Compute a step of the charging model (for either the slow or fast process). For a description of the charging parameters see the calculate_gain_data() method Parameters ---------- flux: float or array the value of the input energy flux for this step (ADC counts/mm^2/s) dt : float the time step (in seconds) dg : float or array the current gain drop due to this process (fractional) """ if (delta_max == 0) or (k_c == 0): return numpy.full(flux.shape, 0.) # Add the effect of the charging. delta = flux * delta_max / k_c * (1. - dg / delta_max) if tau_d != 0: # ...and next the discharge. delta -= (dg / tau_d) return delta * dt
[docs] def calculate_gain_data(self, k_c_fast=DEFAULT_K_C, tau_d_fast=DEFAULT_TAU_D, delta_max_fast=DEFAULT_DELTA_MAX, initial_dg_fast=None, k_c_slow=0., tau_d_slow=0., delta_max_slow=0., initial_dg_slow=None): """Calculate the expected space-time profile of the gain for a given illumination history. We are adopting a charging model with two components: one fast and one slow. Both are regulated by the same equation, which includes a rate-dependent charging and a constant discharging, with the caracteristic time for the two processes (fast and slow) separated by roughly an order of magnitude. The system is integrated using finite differences. The returned gain profile is computed on the times defined by self.tbinning() and has the same spatial dimensions as self.content Arguments ---------- k_c_fast : float the charging constant for the fast process (ADC counts/mm^2) tau_d_fast : float the discharge time constant for the fast process (seconds) delta_max_fast : float maximum fractional gain drop for the fast process initial_dg_fast : float or array initial fraction of gain drop due to the fast process. If an array, the value is given for each spatial bin - thus the dimension must match the spatial dimensions of self.content. If None, we assume initial full discharge for the fast component. k_c_slow : float as k_c_fast, but for the slow process tau_d_slow : float as tau_d_fast, but for the slow process delta_max_slow : float as delta_max_fast, but for the slow process initial_dg_slow : float as initial_dg_fast, but for the slow process """ # The shape of the gain profile: spatial dimensions as self.content # and time axis (the last one) as self.time_grid tbinning = self.tbinning() gain_shape = self.content.shape[:-1] + (tbinning.shape[0], ) # initial_dg_fast and initial_dg_slow have the same spatial dimensions # of the gain if initial_dg_fast is None: initial_dg_fast = numpy.full(gain_shape[:-1], 0.) if initial_dg_slow is None: initial_dg_slow = numpy.full(gain_shape[:-1], 0.) # Calculate the initial gain drop across the active surface. delta_g_fast = initial_dg_fast * delta_max_fast delta_g_slow = initial_dg_slow * delta_max_slow # Initialize the gain self.__gain_data = numpy.full(gain_shape, 1.) # Note: selecting the last axis no matter the rank with numpy '...' # syntax self.__gain_data[..., 0] = self.__gain_data[..., 0] - delta_g_fast - \ delta_g_slow # Cache the time differences for the loop delta_tbinning = numpy.diff(tbinning) # Time loop. Note the clever way to loop over the last dimentions of the # 3d array holding the energy flux, i.e., taking the transpose # (by default numpy uses the first axis to iterate). # https://stackoverflow.com/questions/1589706/ # This has no effect for 1-d arrays for i, (dt, f) in enumerate(zip(delta_tbinning, self.content.T)): # Because of the transpose 'trick' we have to take the transpose # of the flux 'f' here, for consistency (otherwise the other # dimensions are messed up). See the discussion at # https://bitbucket.org/ixpesw/ixpeobssim/issues/389 # for the need of a transpose of the flux here. dg_fast = self.step(f.T, dt, delta_g_fast, k_c_fast, delta_max_fast, tau_d_fast) dg_slow = self.step(f.T, dt, delta_g_slow, k_c_slow, delta_max_slow, tau_d_slow) delta_g_fast += dg_fast delta_g_slow += dg_slow self.__gain_data[..., i+1] = self.__gain_data[..., i] - dg_fast - dg_slow if delta_max_slow > 0.: delta_g_slow /= delta_max_slow if delta_max_fast > 0.: delta_g_fast /= delta_max_fast self.charging_map = [delta_g_fast, delta_g_slow] return self.__gain_data
[docs] def gain_map(self, t): """Return a two-dimensional histogram with the gain map at a given time. """ hist = xHistogram2d(*self.binning[0:2], *self.labels[0:2], 'Relative gain') k = xHistogramBase.bisect(self.tbinning(), t) gain = self.__gain_data[:, :, k] hist.set_content(gain) return hist
[docs] def gain(self, detx, dety, time_, **charging_params): """Return the expected corrected gain for a list of events at specific detector positions and times. """ if self.__gain_data is None: self.calculate_gain_data(**charging_params) i = xHistogramBase.bisect(self.xbinning(), detx) j = xHistogramBase.bisect(self.ybinning(), dety) # Here we specify that the side is 'rigth' to prevent the very first # element from being assigned to the last time bin k = xHistogramBase.bisect(self.tbinning(), time_, side='right') return self.__gain_data[i, j, k + 1]
def _plot(self): """Overloaded plotting method. """ self.time_slice().plot()