Source code for ixpeobssim.srcmodel.ephemeris

# Copyright (C) 2015--2022, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Module containing ephemeris utilities, classes and functions
"""

from __future__ import print_function, division

import numpy
import matplotlib.pyplot as plt
import astropy.units as u
from astropy.coordinates import solar_system_ephemeris, get_body_barycentric
from astropy.time import Time

from ixpeobssim.core.spline import xInterpolatedUnivariateSpline
from ixpeobssim.utils.time_ import met_to_mjd, mjd_to_met, days_to_seconds


# pylint: disable=invalid-name, too-many-arguments, too-many-locals, no-member
# pylint: disable=too-many-instance-attributes


[docs] def get_earth_barycentric_ephemeris(met): """Return earth barycentric vector given a set of terrestrial times referred to MET in seconds. Args ---- met : array_like Array of terrestrial times in MET Returns ------- array Return the earth barycentric vector """ #Set ephemeris solar_system_ephemeris.set('builtin') #ERFA library Tt = Time(met_to_mjd(met), format='mjd', scale='tt') #Barycentric (ICRS) position of the body in cartesian coordinates earth_b = get_body_barycentric('earth', Tt) #earth vector e_v = earth_b.get_xyz() e_v = e_v.to(u.meter).value earth_vector = numpy.matrix.transpose(numpy.array(e_v)) return earth_vector
[docs] def get_eccentric_anomaly(t_, orb_eph, periods=100, samples=10, plot=False): """At different time of emission of pulsar we calculate eccentric anomaly of binary system. Eccentric Anomaly in radiants. Args ---- t_ : array float Array of times at which evaluate the eccentric anomaly orb_eph : class xOrbitalEphemeris Orbital ephemeris periods : int Number of periods to use to evaluate the eccentric anomaly samples : int Number of bins in the eccentric anomaly evaluation plot : Boolean If True plot the inverse function Returns ------- class spline returns the eccentric anomaly as a function of pulsar emission time """ # EA sampling EA_ = numpy.linspace(0, periods * 2 * numpy.pi, samples) # Angular velocity, assumed almost the same for all over observation n_ = 2 * numpy.pi * orb_eph.omega_orb(t_.min()) # We solve the inverse of toes function, using spline method toes_ = (EA_ - orb_eph.ecc * numpy.sin(EA_)) / n_ + orb_eph.t_orbital fmt = dict(xlabel='Ecc Anomaly [rad]', ylabel='TOES [s]') toes_splinefunc = xInterpolatedUnivariateSpline(EA_, toes_, **fmt) EA_splinefunc = toes_splinefunc.inverse() if plot: toes_splinefunc.plot() plt.figure() EA_splinefunc.plot() return EA_splinefunc(t_)
[docs] def read_par_file(parfile_path): """Method to read a parameter file and to get pulsar parameters. The following parameters are accepted: * PEPOCH Epoch of period/frequency (MJD) * POSEPOCH Epoch of position measuremet (MJD) * F0 Pulsar rotation frequency (s^-1) * F1 Pulsar rotation frequency derivative (s^-2) * F2 Pulsar rotation frequency second derivative (s^-3) * P0 Pulsar period (s). * P1 Pulsar period derivative (10^-15). * DM Dispersion measure (pc cm^-3) * A1 Projected pulsar semi-major axis of orbit (lt-s) * E/ECC Eccentricity of orbit * T0 Epoch of periastron passage of orbit (MJD) * TASC Epoch of ascending node passage (MJD) * PB Period of orbit (days) * OM Longitude of periastron passage (deg) * EPS1 First Laplace parameter [E x sin(OM)] * EPS2 Second Laplace parameter [E x cos(OM)] * EPS1DOT Time derivative of EPS1 * EPS2DOT Time derivative of EPS2 * OMDOT Rate of periastron advance (deg/yr) * PBDOT Rate of change of orbital period * XDOT Rate of change of projected semi-major axis * ECCDOT Rate of change of eccentricity For further information refer to the TEMPO2 user manual. .. note:: Parameter files are commonly used in pulsar timing analysis, because they contain precise information about pulsar position, spin-down rate, orbital parameters (if in binary systems), etc. The structure matches with a dictionary fill-in method. Args ---- parfile_path : str Path to the .par file containing the ephemeris Returns ------- dictionary Return a dictionary containing the pulsar parameters """ assert parfile_path.endswith('.par') parameters = {} with open(parfile_path) as input_file: rows = input_file.readlines() for line in [row.strip() for row in rows]: # Skip comments if line[0] == '#': continue elements = line.split() # Skip blank lines if len(elements) == 0: continue _key = elements[0] if len(elements) == 2: try: parameters[_key] = float(elements[1]) except ValueError: parameters[_key] = elements[1] # Some lines have errors if len(elements) == 3: if elements[2] not in ['0', '1']: try: parameters[_key] = float(elements[1]) parameters[_key + '_ERR'] = float(elements[2]) except ValueError: pass else: try: parameters[_key] = float(elements[1]) except ValueError: pass if len(elements) == 4: try: parameters[_key] = float(elements[1]) parameters[_key + '_ERR'] = float(elements[3]) except ValueError: pass return parameters
[docs] def time_list(pulse_shape, start_met, ephemeris, n_events, duration): """Time events extracted with the acceptance rejection method. Args ---- pulse_shape : Pulse profile shape from pulse_pol_from_harmonics_spline in ixpeobssim.srcmodel.polarization start_met : float Starting time ephemeris : xOrbitalEphemeris object The orbital ehemeris to be used. n_events : integer Number of events to simulate duration : float Time of observation """ times = numpy.random.uniform(start_met, start_met + duration, n_events * 3) times = numpy.sort(times) # Timing model applied to the times ph = phase_function(times, start_met, ephemeris.nu(start_met), ephemeris.nudot(start_met)) ph = ph - numpy.floor(ph) max_ = numpy.amax(pulse_shape(ph)) # Randomly flat distributed amplitude for pulse shape amplitudes = numpy.random.uniform(0., max_, n_events * 3) # Comparison check and selection of good amplitude values good = amplitudes < pulse_shape(ph) return times[good][:n_events]
[docs] class xEphemeris: """Convenience class encapsulating the ephemeris for a periodic source. This is meant to capture effects up to the second time derivative of the frequency. The basic task of the class is to handle the conversion from mission elapsed time to phase and vice versa. This is performed through the met_to_phase(), and phase_to_met() interfaces. Note that, in this context, by `phase` we mean a free-running quantity, while by `pulse phase` we mean the fractional part of the former, in the [0, 1[ interval. The direct transformation (met to phase or pulse phase) is performed through a simple taylor expansion .. math:: \\phi = \\nu_0 (t - t_0) + \\frac{1}{2} \\dot\\nu_0 (t - t_0)^2 + \\frac{1}{6} \\ddot\\nu (t - t_0)^3 while the inverse (phase to met) is done through an interpolating spline. In addition, given a pulse shape in the form of a xUnivariateGenerator, the class is able to extract a set of random arrays of met and pulse phase via the rvs() method. This is used in ixpeobssim.srcmodel.roi for simulating periodic sources. Args ---- met0 : float The reference mission elapsed time. nu0 : float Rotational frequency at met0. nudot0 : float First detivative of the rotational frequency at met0. nuddot : float Second derivative of the rotational frequency (constant). """ def __init__(self, met0, nu0, nudot0=0., nuddot=0.): """Constructor. """ self.met0 = met0 self.nu0 = nu0 self.nudot0 = nudot0 self.nuddot = nuddot
[docs] def dict(self): """Return the ephemeris content in the form of a dictionary. This is useful, e.g., when calling xpphase passing an ephemeris object to set the fields programmatically. The reason why we're wrapping this and not using __dict__ directly is to be able to accommodate possible future changes in either the class layout or the options for the external programs. """ return self.__dict__
def _dt(self, met): """Return the time difference wrt the reference met. """ return met - self.met0
[docs] def nu(self, met): """Return the source frequency at a given time. """ dt = self._dt(met) return self.nu0 + self.nudot0 * dt + self.nuddot * (dt**2.) / 2.
[docs] def nudot(self, met): """Return the source first derivative of frequency at a given time. """ dt = self._dt(met) return self.nudot0 + self.nuddot * dt
[docs] def period(self, met): """Return the source period at a given time. """ return 1. / self.nu(met)
[docs] def met_to_phase(self, met): """Return the phase for a given value of array of MET values. Note the phase, here, is free-running, i.e. not bound between 0 and 1. For a simple ephemeris with zero time derivatives this is returning a straight line with a slope equal to the (constant) frequency. """ dt = self._dt(met) return self.nu0 * dt + self.nudot0 * (dt**2.) / 2. + self.nuddot * (dt**3.) / 6.
[docs] def fold(self, met, start_met, phi0=0.): """Fold the MET values to return the corresponding arrays of pulse phase. .. warning:: This is creating a new xEphemeris object with the reference MET set to start_met, and the frequency derivatives changed accordingly, and then calling the met_to_phase() method of this new object. This is supposed to roundtrip with the rvs() method to a decent accuracy, and is therefore the interface used in the xpphase application. Note, however, that there is a lot of numerical (more or less random) rounding going on under the hood, and this is not intended for "professional" use. (In a sense, the function internals are tweaked specifically for the numerical noise to play in such a way that we're making approximately the same errors in rvs() and fold()). """ met0 = start_met nu0 = self.nu(start_met) nudot0 = self.nudot(start_met) phase = xEphemeris(met0, nu0, nudot0, self.nuddot).met_to_phase(met) pulse_phase = numpy.mod(phase + phi0, 1) return pulse_phase
[docs] def phase_spline(self, start_met, duration, num_points=1000): """Return an interpolated univariate spline with the phase values for a given time range. This is mainly used in the inverted fashion to convert from phase to met. """ met = numpy.linspace(start_met, start_met + duration, num_points) fmt = dict(xlabel='Time [s]', ylabel='Phase') return xInterpolatedUnivariateSpline(met, self.met_to_phase(met), **fmt)
[docs] def phase_spline_inverse(self, start_met, duration, num_points=1000): """Return the inverse of the phase spline for a given time range. This can effectively be used for calculating the proper mission elapsed times starting from a bunch of phase values. """ return self.phase_spline(start_met, duration, num_points).inverse()
def _zero_order_met_guess(self, phase): """Provide a zero-order guess of the mission elapsed time(s) corresponding to a particolar value or array of phases. """ return phase / self.nu0 + self.met0
[docs] def phase_to_met(self, phase, start_met=None, duration=None, num_points=1000): """Return the mission elapsed times for given value or array of phases. This is essentially the inverse of phase(), and the two are supposed to roundtrip with each other. The reason why this function is slightly more complicated is that the former cannot be inverted analytically, and since the source frequency changes with time, it is not trivial to map directly the minimum and maximum phase to the corresponding met values, which is needed to create the interpolating spline for the conversion. The start_met and stop_met arguments are provided to control the spline operating the conversion, and should be provided if available---in this case the span of the spline is optimal and it is guaranteed that no extrapolation is needed. Both arguments default to None, in which case a zero-order guess is made as to what their value shoule be. Args ---- phase : array_like The phase value(s) to be converted in met. start_met : float The starting met value for the spline operating the conversion. duration : float The overall span for the spline operating the conversion. num_points : int The number of points for spline operating the conversion. """ if start_met is None: start_met = self._zero_order_met_guess(phase.min()) if duration is None: duration = self._zero_order_met_guess(phase.max()) - start_met return self.phase_spline_inverse(start_met, duration, num_points)(phase)
[docs] def rvs(self, pulse_profile, start_met, duration, num_events): """Return two (sorted) arrays of pulse phase and met for a given pulse profile, observation interval and number of events. Args ---- pulse_profile : xUnivariateGenerator instance The pulse target profile. start_met : float The initial met for the observation. duration : float The duration of the observation. num_events : float The total number of events to be simulated. """ # Calculate the number of periods spanned by the target met interval. # Note that we split the calculation into the integer and fractional # parts, that need to be handled separately. phase_min = self.met_to_phase(start_met) phase_max = self.met_to_phase(start_met + duration) delta_phase = phase_max - phase_min reminder, num_periods = numpy.modf(delta_phase) # Calculate the number of events to be extracted in the last fraction # of a period. num_extra_events = numpy.round(num_events * reminder / (delta_phase)).astype(int) # Step 1: handle the first n full periods: extract two random numbers, # one for the (integral) number of periods and one for the (fractional) # pulse phase. period = numpy.random.randint(0, num_periods, num_events - num_extra_events) pulse_phase = pulse_profile.rvs(period.shape) phase = phase_min + period + pulse_phase # Step 2: if the target duration does not span exactly an integer number # of periods, we take care of the last fractional part. if num_extra_events > 0: _pulse_phase = pulse_profile.rvs_bounded(num_extra_events, rvmax=reminder) pulse_phase = numpy.append(pulse_phase, _pulse_phase) phase = numpy.append(phase, phase_min + num_periods + _pulse_phase) # Transfor the phase to MET values. met = self.phase_to_met(phase, start_met, duration) # Sort the event times (and phases accordingly). mask = numpy.argsort(met) pulse_phase = pulse_phase[mask] met = met[mask] # And we're good to go! return pulse_phase, met
[docs] @classmethod def from_file(cls, parfile_path): """Read a set of attributes given a parameter file. """ par_dict = read_par_file(parfile_path) params = {} params['met0'] = mjd_to_met(par_dict['PEPOCH']) params['nu0'] = par_dict['F0'] params['nudot0'] = par_dict.get('F1', 0.) params['nuddot'] = par_dict.get('F2', 0.) return cls(**params)
def __str__(self): """String formatting. """ return '%s %s' % (self.__class__.__name__, self.__dict__)
[docs] def phase_function(met, met0, nu0, nudot0=0., nuddot=0.): """Return the rotational phase for a given observational time. Warning ------- This function is deprecated in favor of the xEphemeris.met_to_phase() class method, and is (temporarily) kept for backward compatibility. """ return xEphemeris(met0, nu0, nudot0, nuddot).met_to_phase(met)
[docs] class xOrbitalEphemeris(xEphemeris): """Convenience class encapsulating an orbital ephemeris. Arguments --------- met0 : float Epoch of period/frequency (MET) nu0 : float Pulsar rotation frequency (s^-1) t_orbital : float Epoch of ascending node passage or Epoch of periastron passage of orbit (MET) p_orbital : float Period of orbit (s) axis_proj : float Projected pulsar semi-major axis of orbit (lt-s) ecc : float Eccentricity of orbit lon_periast : float Longitude of periastron passage (deg) nudot0 : float Pulsar rotation frequency derivative (s^-2) nuddot : float Pulsar rotation frequency second derivative (s^-3) p_orbitaldot : float Rate of change of orbital period eps1 : float First Laplace parameter [E x sin(lon_periest)] eps2 : float Second Laplace parameter [E x cos(lon_periest)] model : Binary model (BT/ELL1/DD/MSS) """ def __init__(self, met0, nu0, t_orbital, p_orbital, axis_proj, ecc=0., lon_periast=0., nudot0=0., nuddot=0., p_orbitaldot=0., eps1=None, eps2=None, model=None): """Constructor """ xEphemeris.__init__(self, met0, nu0, nudot0, nuddot) self.t_orbital = t_orbital self.p_orbital = p_orbital self.p_orbitaldot = p_orbitaldot self.ecc = ecc self.axis_proj = axis_proj self.lon_periast = lon_periast self.eps1 = eps1 self.eps2 = eps2 self.model = model
[docs] def omega_orb(self, met): """Return the inverse of the orbit period at a given time. """ dt = self._dt(met) return 1. / self.p_orbital - 0.5 * (self.p_orbitaldot / (self.p_orbital**2.)) * dt
[docs] @classmethod def from_file(cls, parfile_path): """Read a set of attributes given a parameter file. """ par_dict = read_par_file(parfile_path) params = {} params['met0'] = mjd_to_met(par_dict['PEPOCH']) params['nu0'] = par_dict['F0'] params['nudot0'] = par_dict.get('F1', 0.) params['nuddot'] = par_dict.get('F2', 0.) t0 = par_dict.get('T0') params['t_orbital'] = mjd_to_met(par_dict.get('TASC', t0)) params['p_orbital'] = days_to_seconds(par_dict['PB']) params['axis_proj'] = par_dict['A1'] # in lt-s params['p_orbitaldot'] = par_dict.get('PBDOT', 0.) params['ecc'] = par_dict.get('ECC', 0.) params['lon_periast'] = par_dict.get('OM', 0.) params['eps1'] = par_dict.get('EPS1', None) params['eps2'] = par_dict.get('EPS2', None) params['model'] = par_dict['BINARY'] return cls(**params)