Source code for ixpeobssim.instrument.traj

#!/urs/bin/env python
# Copyright (C) 2019--2020, the ixpeobssim team.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Spacecraft trajecory related functions

from __future__ import print_function, division

from enum import IntEnum
import os
import numbers
import datetime

    from io import BytesIO
    from StringIO import StringIO as BytesIO

import numpy
import matplotlib.path
import matplotlib.patches
import skyfield
import skyfield.api
import skyfield.iokit
from skyfield.timelib import Timescale
from skyfield.iokit import parse_deltat_data
from skyfield.iokit import parse_deltat_preds, parse_leap_seconds
from skyfield.nutationlib import iau2000b
from skyfield.functions import angle_between
from skyfield.units import Angle

from ixpeobssim.utils.environment import SKYFIELD_VERSION
from ixpeobssim.utils.logging_ import logger, abort
from ixpeobssim.utils.profile import timing, xChrono, xMemoryProfiler
from ixpeobssim.utils.matplotlib_ import plt
from ixpeobssim.utils.time_ import met_to_jd, xTimeInterval
from ixpeobssim.evt.gti import xGTIList
from import apply_dithering

RADIUS_EARTH_AU = 4.26352e-5
AU_TO_KM = 1.495978707e8 #1 AU in km

EARTH_RADIUS = 6371.   # km
EARTH_MASS = 5.972e24  # kg
G = 6.67408e-11

_TIMESCALE_IO_ADVICE = "Try opening the same URL in your browser to learn more about the problem."

[docs] class xSkyfieldLoader(skyfield.iokit.Loader): def __init__(self, verbose=True, expire=True): """Overloaded timescale loader, pointing to IXPEOBSSIM_DATA for all the data files to be dowloaded locally. """ skyfield.iokit.Loader.__init__(self, IXPEOBSSIM_DATA, verbose, expire) @staticmethod def _get_data(file_name): """Drop-in replacement function for the pkgutils.get_data() method used in the original timescale implementation. This is just picking up the file from the proper folder in the ixpeobssim installation, rather than from the skyfield installation. See for more details. """ file_path = os.path.join(IXPEOBSSIM_INSTRUMENT_DATA, file_name) with open(file_path, 'rb') as input_file: data = return data
[docs] def timescale(self, delta_t=None, builtin=False): """Open or download the three timescale files, returning a `Timescale`. This is an almost verbatim re-implementation of the timescale() method in skyfield/, the only difference being the locations from where we retrieve the data. """'Running custom skyfield loader...') if builtin:'Using builtin timescale ancillary data (from %s)...',\ IXPEOBSSIM_INSTRUMENT_DATA) b = self._get_data('') # Hack to accommodate a change in skyfield 1.26. # See issue #309. if SKYFIELD_VERSION < '1.26': expiration_date, data = parse_deltat_data(BytesIO(b)) else: data = parse_deltat_data(BytesIO(b)) # End of hack. b = self._get_data('deltat.preds') # Hack to accommodate a change in skyfield 1.26. # See issue #309. if SKYFIELD_VERSION < '1.26': expiration_date, preds = parse_deltat_preds(BytesIO(b)) else: preds = parse_deltat_preds(BytesIO(b)) # End of hack. data_end_time = data[0, -1] i = numpy.searchsorted(preds[0], data_end_time, side='right') delta_t_recent = numpy.concatenate([data, preds[:,i:]], axis=1) b = self._get_data('Leap_Second.dat') expiration_date, arrays = parse_leap_seconds(BytesIO(b)) leap_dates, leap_offsets = arrays return Timescale(delta_t_recent, leap_dates, leap_offsets)'Using downloaded timescale ancillary data (from %s)...',\ if delta_t is not None: delta_t_recent = numpy.array(((-1e99, 1e99), (delta_t, delta_t))) else: try: data = self('') preds = self('deltat.preds') except IOError as e: e.args = (e.args[0] + _TIMESCALE_IO_ADVICE,) + e.args[1:] raise data_end_time = data[0, -1] i = numpy.searchsorted(preds[0], data_end_time, side='right') delta_t_recent = numpy.concatenate([data, preds[:,i:]], axis=1) try: leap_dates, leap_offsets = self('Leap_Second.dat') except IOError as e: e.args = (e.args[0] + _TIMESCALE_IO_ADVICE,) + e.args[1:] raise return Timescale(delta_t_recent, leap_dates, leap_offsets)
[docs] class xSAABoundary(matplotlib.path.Path): """Small container class representing the SAA boundary. The main purpose of this class is to encapsulate the algorithm to figure out whether any given point on the Earth surface is inside the SAA polygon or not. You can glance through the related issue at to get an account of the underlying discussion, but essentially we opted for the SAA class to be a subclass of matplotlib.path.Path since the latter provides all the functionality that we need (includeing plotting), implemented efficiently and without bringing in yet another dependence. """ def __init__(self, file_name='saa_polygon.txt'): """Constructor. """ file_path = os.path.join(IXPEOBSSIM_INSTRUMENT_DATA, file_name)'Loading SAA boundaries from %s...', file_path) vertices = numpy.loadtxt(file_path) vertices = numpy.vstack((vertices, vertices[0])) matplotlib.path.Path.__init__(self, vertices, closed=True)
[docs] def plot(self, plot_vertices=True, **kwargs): """Plot the polygon. """ kwargs.setdefault('facecolor', 'orange') kwargs.setdefault('edgecolor', 'black') kwargs.setdefault('alpha', 0.5) patch = matplotlib.patches.PathPatch(self, **kwargs) plt.gca().add_patch(patch) if plot_vertices: plt.plot(*numpy.hsplit(self.vertices, 2), 'o', color=kwargs.get('edgecolor'))
[docs] def contains(self, lon, lat): """Return a boolean mask signaling the pairs of coordinates inside the polyogon. """ # Support the function call with scalar arguments. if isinstance(lon, numbers.Number) and isinstance(lat, numbers.Number): return self.contains_point((lon, lat)) # Handle array arguments---note the additional "s" in the underlying call. return self.contains_points(list(zip(lon, lat)))
def __str__(self): """String formatting. """ text = 'SAA boundary: ' for lon, lat in self.vertices: text += '(%.2f, %.2f)--' % (lon, lat) return text.strip('-')
[docs] class xTLE: """Small utility function to create TLE strings for the purpose of our simulation. See for some background information. Mind this is not meant to be a complete interface, which would be silly---what we are after is really the ability to generate a fictional TLE for a simple circular orbit with arbitrary altitude and inclination. We purposedly do not provide the ability to write ouput file in order not to pollute the environment---this is something that should remain internal to the simulation. And, for completeness, here is the IXPE TLE pulled out from shortly after launch. 1 49954U 21121A 21351.00640149 .00001120 00000-0 35770-4 0 9994 2 49954 0.2300 281.7657 0011347 134.4260 303.9164 14.90740926 1166 """ @staticmethod def _internationl_designator(year=2021, launch_no=0, launch_piece='A'): """ 4 10–11 International Designator (last two digits of launch year) 5 12–14 International Designator (launch number of the year) 6 15–17 International Designator (piece of the launch) """ return '%s%03d%s' % (str(year)[-2:], launch_no, launch_piece.ljust(3)) @staticmethod def _epoch(year=2021, day=0.): """ 7 19–20 Epoch Year (last two digits of year) 8 21–32 Epoch (day of the year and fractional portion of the day) For reasons that now escape me this used to be set on the basis of the start time of the simulation (i.e.,, and it used to be different for each run, which prevented the user from being able to have the same GTIs twice, even if the start_date and duration of the observation did not change. Since the precise epoch of the TLE isn't really important---but getting reproducible results is---we set the epoch to a fictional point in time, which is the same for every run. """ return '%s%012.8f' % (str(year)[-2:], day) @staticmethod def _format_inclination(inclination): """ """ if inclination <= 10.: return '%.4f' % inclination return '%.3f' % inclination @staticmethod def _mean_motion(altitude, correction_factor=0.9996733350090976): """Calculate the mean motion given the altitude over the earth surface. This is just using the elementary equation of motion for circular orbits, and the small correction factor was determined after the fact to fix a ~10 km residual error on the actual altitude. """ r = 1000. * (EARTH_RADIUS + altitude) T = numpy.sqrt(4. * numpy.pi**2. * r**3. / G / EARTH_MASS) return correction_factor * 86400. / T
[docs] @staticmethod def lines(inclination=0.23, altitude=601.1, catalog_no=49954, launch_year=2021, launch_no=121, launch_piece='A', launch_day=351.00640149, ballistic_coefficient='.00001120', drag_term='35770-4', element_number=9994, ascending_node=281.7657, eccentricity='0011347', perigee=134.4260, mean_anomaly=303.9164): """ """ intl_designator = xTLE._internationl_designator(launch_year, launch_no, launch_piece) epoch = xTLE._epoch(launch_year, launch_day) line1 = '1 %05dU %s %s %s 00000-0 %s 0 %4d' %\ (catalog_no, intl_designator, epoch, ballistic_coefficient, drag_term, element_number) inclination = xTLE._format_inclination(inclination) mean_motion = xTLE._mean_motion(altitude) line2 = '2 %05d %s %3.4f %s %3.4f %3.4f %.8f 1166' %\ (catalog_no, inclination, ascending_node, eccentricity, perigee, mean_anomaly, mean_motion) return line1, line2
[docs] class xIXPETrajectory: """A class to characterize the spacecraft's orbit and trajectory for ixpeobssim. This is supposed to keep track of the necessary elements to keep track of the state of the observatory and generate the GTI. The TLE data are read from the proper file in $IXPEOBSSIM_INSTRUMENT_DATA (which satellite precisely is controlled by the satellite_name argument). The planetary and lunar ephemeris are taken from `JPL <>`_. While JPL distributes and maintain an extensive set of files, in order to avoid large downloads at runtime, we have packaged or own trimmed versions of DE405 and DE430t within the 2000--2040 time span into the ixpeobssim/instrument/data folder. The trimming has been performed using the jplephem utility described at The generation of the timescale information takes advantage of all the skyfield machinery. Note that, since the default skyfield method do not reasily offer any flexibility in terms of where the ancillary files are located, we have implemented our own Loader class that behaves as the the original with two notable exceptions: * the builtin timescale ancillary files (when necessary) are loaded from IXPEOBSSIM_INSTRUMENT_DATA, rather than from the sjyfield installation; * the updated timescale ancillary files (when necessary) are downloaded (and read from) IXPEOBSSIM_DATA. Args ---- ephemeris_file_name : str The ephemeris file name (in the IXPEOBSSIM_INSTRUMENT_DATA folder). builtin_timescale: bool Flag passed to the skyfield timescale generation routine (if True the static files shipped with ixpeobssim are used, otehrwise they are downloaded). """ def __init__(self, ephemeris_file_name='de430t_2000_2040.bsp', min_sun_angle=65., max_sun_angle=115., builtin_timescale=True): """Constructor. """ self.min_sun_angle = min_sun_angle self.max_sun_angle = max_sun_angle # Load the timescale. self.timescale = xSkyfieldLoader().timescale(builtin=builtin_timescale) # Load the SAA boundary. self.saa_boundary = xSAABoundary() # Load the satellite TLE. self.satellite = skyfield.api.EarthSatellite(*xTLE.lines(), 'IXPE', self.timescale) logger.debug(self.satellite) # Load the planetary ephemeris. ephem_file_path = os.path.join(IXPEOBSSIM_INSTRUMENT_DATA, ephemeris_file_name)'Loading planet ephemeris from %s...', ephem_file_path) self.planets = skyfield.api.load_file(ephem_file_path)
[docs] @staticmethod def load_tle(tle_file_name='tle_data_science.txt', satellite_name='AGILE'): """ Load TLE data for a given satellite. Note the tle() method is deprecated with an entertaining comment DEPRECATED: in a misguided attempt to be overly convenient, this routine builds an unweildy dictionary of satellites with keys of two different Python types: integer keys for satellite numbers, and string keys for satellite names. It even lists satellites like ``ISS (ZARYA)`` twice, in case the user wants to look them up by a single name like ``ZARYA``. What a mess. Users should instead call the simple ``tle_file()`` method, and themselves build any dictionaries they need. Args ---- tle_file_name : str The name of the file (in the IXPEOBSSIM_INSTRUMENT_DATA folder) containing the TLE data. satellite_name : str The name of the satellite to be looked for in the TLE data ancillary files. In absence of sensible TLE data for IXPE, NUSTAR and AGILE are two scientific satellites in near-equatorial orbit that can be used for our purpose. """ tle_file_path = os.path.join(IXPEOBSSIM_INSTRUMENT_DATA, tle_file_name)'Loading IXPE TLE data from %s...', tle_file_path) satellites = { s for s in skyfield.api.load.tle_file(tle_file_path)} try: return satellites[satellite_name] except KeyError: abort('Cannot retrieve TLE data for satellite %s' % satellite_name)
def __del__(self): """This is to close the underlying ephemeris file---I am even unsure this is a good idea. See for more information. """ self.planets.spk.close()
[docs] def met_to_ut(self, met, use_iau2000b=True): """Convert MET(s) to skyfield.Time object(s), using the proper underlying timescale. See for all the gory details of the iau2000b magic! That really saved the day. """ ut = self.timescale.ut1_jd(met_to_jd(met)) if use_iau2000b: ut._nutation_angles = iau2000b( return ut
[docs] def geocentric(self, met): """Return the geocentric position at a given MET. """ ut = self.met_to_ut(met) return
[docs] def position(self, met): """Return the position (longitude and latitude in decimal degrees) of the satellite at a given MET. """ pos = self.geocentric(met).subpoint() return pos.longitude.degrees, pos.latitude.degrees
[docs] def elevation(self, met): """Return the altitude of the satellite (in km) at a given MET. """ return self.geocentric(met).subpoint()
[docs] def in_saa(self, met): """Return whether the satellite is in the SAA at a given time. """ lon, lat = self.position(met) return self.saa_boundary.contains(lon, lat)
[docs] def target_planet_angle(self, met, target_ra, target_dec, planet_name): """Return the angular separation between a target RA and DEC and a planet at a given MET time. Warning ------- For now the angular separation is calculated from a geocentric (i.e., not spacecraft) position, which should be sufficient for our needs. """ # Support the call for scalar met values. if isinstance(met, numbers.Number): met = numpy.array([met]) ut = self.met_to_ut(met) # RA for target_pos needs to be given in RA hours, not degrees # (15 degrees per hour of RA) # target_pos = skyfield.api.position_of_radec(target_ra / 15., target_dec) # The above version is for a future version of skyfield with better # documentation and a more sensible default distance. # It is implemented identically using the current (as of 1.20) version below. # The distance argument corresponds to 1 Gpc in AU. target_pos = skyfield.api.position_from_radec(target_ra / 15., target_dec, distance=206264806247096.38) planet_pos = self.planets['earth'].at(ut).observe(self.planets[planet_name]) # Hack to maintain backward compatibility with older versions of skyfield. if SKYFIELD_VERSION < '1.23': return planet_pos.separation_from(target_pos).degrees # In newer version of skyfield we seem to have broadcasting issues, see # target_pos = planet_pos = if target_pos.ndim == 1 and planet_pos.ndim == 2: target_pos = target_pos.reshape((3, 1)) return Angle(radians=angle_between(target_pos, planet_pos)).degrees
[docs] def target_sun_angle(self, met, target_ra, target_dec): """Return the angular separation between a target RA and DEC and the Sun at a given MET. """ return self.target_planet_angle(met, target_ra, target_dec, 'sun')
[docs] def target_moon_angle(self, met, target_ra, target_dec): """Return the angular separation between a target RA and DEC and the Moon at a given MET. """ return self.target_planet_angle(met, target_ra, target_dec, 'moon')
[docs] def sun_constrained(self, met, target_ra, target_dec): """Return whether a given target at a given time cannot be observed due to the Sun constraints. """ sun_angle = self.target_sun_angle(met, target_ra, target_dec) return numpy.logical_or(sun_angle < self.min_sun_angle, sun_angle > self.max_sun_angle)
[docs] def target_occulted(self, met, target_ra, target_dec, limiting_altitude = 200.): """Return whether a given target is occulted at a given time. """ # Support the call for scalar met values. if isinstance(met, numbers.Number): met = numpy.array([met]) ut = self.met_to_ut(met) #This target position is by default at a distance of 1 gigaparsec #Much too far to worry about parallax or other effects but also far enough #to cause floating point precision errors if the vector algebra isn't done carefully #target_pos = skyfield.positionlib.position_of_radec(target_ra / 15., target_dec, t=ut,epoch=self.timescale.J2000) #The distance corresponds to 1 Gpc in AU target_pos = skyfield.api.position_from_radec(target_ra / 15., target_dec, distance = 206264806247096.38 ) target_ecliptic = target_pos.ecliptic_position().au #The target_ecliptic position has a total length/distance of 1e6 AU. #which corresponds to ~ 4.8 kpc #The actual distance to the source past 4.8 pc is irrelevant to #the occultation calculation earth_ecliptic = self.planets['earth'].at(ut).ecliptic_position().au sat_ecliptic = self.geocentric(met).ecliptic_position().au limit_dist = (EARTH_RADIUS + limiting_altitude) / AU_TO_KM #print(limit_dist) dist_perp_list = [] dist_sattarg_list = [] dist_targearth_list = [] ttt_list = [] n_times = numpy.shape(ut)[0] #At each time we calculate the minimum perpendicular distance #from the Earth position to the vector connecting #target ra/dec and satellite #Parameterizing the vector #v = sat + elciptic * t #where t is an arbitary parameter #we can calculate the minimum perpendicular distance to the Earth position #and value of t when it reaches that position (denoted at ttt in the code). #If the minimum perpendicular distance is within R_Earth + limit_altitude #and ttt > 0 (meaning the minimum distance is reached as a vector goes from #satellite to target then the Earth is occulting the observation. #All math is given at this website # for j in range(n_times): this_earth = earth_ecliptic[:,j] this_sat = sat_ecliptic[:,j] + this_earth # sat_ecliptic is natively a geocentric coordinate, while the others #are are Solar Sys baycentric diff_earth_sat = this_earth - this_sat l_earth_sat = numpy.sqrt(,diff_earth_sat)) diff_earth_targ = this_earth - target_ecliptic l_earth_targ = numpy.sqrt(,diff_earth_targ)) diff_targ_sat = target_ecliptic - this_sat l_targ_sat = numpy.sqrt(, diff_targ_sat)) #This is the (x0-x1) X ( x0-x2) version of the min dist formula cross = numpy.cross(diff_earth_sat,diff_earth_targ) lcross = numpy.sqrt(,cross)) dist_perp = lcross / l_targ_sat #Floating point precision is an issue for this calculation since the # diff_eath_sat << diff_earth_targ. So we actually calculate the value # of the parameter t where it reaches minimum distance. ttt = -1.* -1. * diff_earth_sat, diff_targ_sat) / l_targ_sat**2 dist_perp_list.append(dist_perp) ttt_list.append(ttt) dist_perp_arr = numpy.array(dist_perp_list) ttt_arr = numpy.array(ttt_list) #Minimum perpendicular distance between Earth position and target RA/DEC #crosses the Earth perp_mask = dist_perp_arr < limit_dist #The Earth is closer to the target RA/DEC than the satellite front_mask = ttt_arr > 0. #True means is occulted, and False means the RA/DEC is not occulted. return numpy.logical_and(perp_mask,front_mask)
[docs] @staticmethod def equispaced_met_grid(start_met, stop_met, approximate_step): """Return an equally-spaced grid of MET values according to the input argument. Note that the array returned ranges exactly from start_met to stop_met, and the step size is rounded to guarantee that. """ num_steps = int((stop_met - start_met) / approximate_step) + 1 return numpy.linspace(start_met, stop_met, num_steps)
@staticmethod def _interleave_arrays(a1, a2): """Interleave two numpy arrays, inserting the midpoint for each pair. This is used for our poor-man binary search of the SAA transit times and the appearance/disappearance of the target source. We typically pass two series of MET values, and return the interleaved array, as well as the maximum "error" on the midpoint, i.e., the absolute difference between the highest and lowest value in each input pair. Shamelessly inspired from """ assert a1.size == a2.size assert (a2 > a1).all() out = numpy.empty((3 * a1.size,), dtype=a1.dtype) out[0::3] = a1 out[1::3] = 0.5 * (a1 + a2) out[2::3] = a2 out = numpy.unique(out) max_error = (0.5 * (a2 - a1)).max() return out, max_error def _generic_binary_search(self, start_met, stop_met, function, args=(), initial_step=100., precision=0.001): """Generic binary search to find transitions in time into a function of the satellite coordinates. Args ---- met : array_like The initial array of MET values. pos : 2-element tuple (lon, lat) of array_like The initial tuple of position arrays. function : callable The function(lon, lat) we want to apply the binary search to. args : tuple The optional additional arguments to be passed to the function. precision : float The minimum precision (in s) to which we want to calculate the transition times. """ met = self.equispaced_met_grid(start_met, stop_met, initial_step) err = 2 * precision while err >= precision: # Apply the releant function for the binary search. mask = function(met, *args) # Calculate the indices where the value of the function flips from # True to False or vice-versa. idx = numpy.argwhere(numpy.diff(mask)).squeeze() # Handle the case where the index mask is empty---i.e. the are no # flips in the objective function over the met array. This means that # we either return the entire MET interval or nothing. if idx.size == 0: # The function is true at the beginning and true for the entire # [start_met, stop_met] interval. if mask[0] == True: return [(start_met, stop_met)] # The function is never True in the parent interval. return [] # Create a new array with only the MET values around the flips, adding # the midpoint for the next step of the binary search. # At this point met is a one-dimensional array containing triplets # (low, mid, high) of values, where mid is our best estimate of the # transition and err is the maximum of (high - low), representing # the worst case maximum error. met, err = self._interleave_arrays(met[idx], met[idx + 1]) # Take the midpoints of the final triplets--that's all we care about. met = met[1::3] # We now evaluate the position and the mask *right after* the transition # to figure out whether we are entering or exiting. entrance = function(met + precision, *args) # Take care of the corner cases where the first transition is an exit or # the last transition is an entrance. Note that we cannot test the # equality of the elements of a numpy boolean array with is, need to use ==. if entrance[0] == False: met = numpy.insert(met, 0, start_met) if entrance[-1] == True: met = numpy.append(met, stop_met) return met @staticmethod def _array_to_epochs(met): """Small convenince function to transform an array of MET values into a list of epoch, i.e., a list of (start, stop) two-elements tuples. This is turning, e.g., [1., 2., 3.] into [(1., 2.), (2., 3.), (3., 4.)] """ return [(met[i], met[i + 1]) for i in range(0, len(met), 2)]
[docs] def saa_mets(self, start_met, stop_met, initial_step=100., precision=0.001): """Return the list of METs for which we cross the SAA boundaries within a given time interval. """ args = start_met, stop_met, self.in_saa, (), initial_step, precision return self._generic_binary_search(*args)
[docs] def saa_epochs(self, start_met, stop_met, initial_step=100., precision=0.001): """Calculate the SAA epochs. """ args = start_met, stop_met return self._array_to_epochs(self.saa_mets(*args, initial_step, precision))
[docs] def earth_occultation_mets(self, start_met, stop_met, target_ra, target_dec, initial_step=100., precision=0.001): """Return the list of METS for which the Earth occultation status is changing. """ args = start_met, stop_met, self.target_occulted, (target_ra, target_dec) return self._generic_binary_search(*args, initial_step, precision)
[docs] def earth_occultation_epochs(self, start_met, stop_met, target_ra, target_dec, initial_step=100., precision=0.001): """Calculate the epochs when a given target is occulted by the Earth. """ args = start_met, stop_met, target_ra, target_dec, initial_step, precision return self._array_to_epochs(self.earth_occultation_mets(*args))
[docs] def sun_constraint_mets(self, start_met, stop_met, target_ra, target_dec, initial_step=1000., precision=0.001): """Calculate the list of METs for which the Sun observability changes. """ args = start_met, stop_met, self.sun_constrained, (target_ra, target_dec) return self._generic_binary_search(*args, initial_step, precision)
[docs] def sun_constraint_epochs(self, start_met, stop_met, target_ra, target_dec, initial_step=1000., precision=0.001): """Calculate the epochs when observations are inhibited by the Sun constraints. """ args = start_met, stop_met, target_ra, target_dec, initial_step, precision return self._array_to_epochs(self.sun_constraint_mets(*args))
[docs] def timeline_mets(self, start_met, stop_met, target_ra, target_dec, saa, occult, initial_step=100., precision=0.001): """Return a list of all the relevant MET values for the calculation of the observation timeline, i.e., those that pertain to the SAA and to the Earth occultation. The calculation is started with the observaton start and stop met, and the SAA and Earth occultation MET values are addedd if necessary. The function returns separate lists for the SAA and Earth occultation MET values, as well as a sorted logical OR of the two with no, diplicates, since all is needed downstream to calculate the timeline. """ mets = numpy.array([start_met, stop_met]) saa_mets = numpy.array([]) occult_mets = numpy.array([]) if saa: args = start_met, stop_met, initial_step, precision saa_mets = self.saa_mets(*args) mets = numpy.append(mets, saa_mets) if occult: args = start_met, stop_met, target_ra, target_dec, initial_step, precision occult_mets = self.earth_occultation_mets(*args) mets = numpy.append(mets, occult_mets) return numpy.unique(mets), saa_mets, occult_mets
[docs] @staticmethod def complement_epochs(epochs, start_met, stop_met): """Return the complement to a list of epochs. """ # If the list of epochs is empty, then just return a list with a single # epoch covering the entire observation. if len(epochs) == 0: return [(start_met, stop_met)] met = numpy.array(tuple(_met for epoch in epochs for _met in epoch)) if met[0] > start_met: met = numpy.insert(met, 0, start_met) else: met = met[1:] if met[-1] < stop_met: met = numpy.append(met, stop_met) else: met = met[:-1] return [(met[i], met[i + 1]) for i in range(0, len(met), 2)]
@staticmethod def _epochs_to_arrays(epochs): """Tranform a list of epochs, i.e., a list of length-2 tuples of the form (t1, t2) into a pair of arrays suitable for the determination of the GTIs. .. warning: This is only used in the gti_list() class method, which is now deprecated, and can be removed when, and if, the latter is removed. """ # The first array contains all the epoch bounds, unpacked from the # original tuples. met = numpy.array(tuple(_met for epoch in epochs for _met in epoch)) # The second array signal whether each of the MET value signal an # enter into the epoch (+1) or an exit from the epoch (-1). sign = numpy.array([1, -1] * len(epochs)) return met, sign
[docs] def gti_list(self, start_met, stop_met, target_ra, target_dec, saa=True, occult=True, initial_step=100., precision=0.001): """Calculate the GTIs, i.e., the ephocs where we are not in the SAA and the target is not occulted by the Earth. While this method seems (and is, in fact) quite convoluted, we decided to calculate separately the SAA and Earth occultation epochs, when the instrument is not taking data, and merge them at the end to extract the good time intervals. The reasoning is that, while the former epochs are quite regular over the timescale of an observation---which allows to perform a sensible binary search starting from a reasonably corse grid (say with a 100 s step)---when we put in AND the two we get all kind of resonances, to the point that the GTIs can be arbitrarily small, and we would need a much finer (and computationally intensive) initial grid in order not to introduce measurable errors. .. warning: The xObservationTimeline class provide a fresh rewite of the GTI logics, in a simpler way that allows to keep track of the precise status of the observatory at each point in time. This function is temporarely kept for debugging reason, but will be effectively removed. Args ---- start_met : float The initial MET for the observation. stop_met : float The final MET for the observation. target_ra : float The right ascension of the target source in decimal degrees. target_dec : float The declination of the target source in decimal degrees. saa : bool Consider the SAA passages in the GTI calculation. occult : bool Consider the Earth occultation in the GTI calculation. initial_step : float The step (in s) for the initial equispacedtime time grid used to seed the binary search. (This can slow things down when too small.) precision : float The worst-case precision of the GTI calculation (i.e., the stop condition for the binary search) """ # Trivial case: we return a list containing a single GTI covering the # entire observation :-) saa_epochs = [] occult_epochs = [] if saa is False and occult is False: return xGTIList(start_met, stop_met, (start_met, stop_met)), saa_epochs, occult_epochs # If we're here we got something to do. Prepare the data structures. met = numpy.array([], dtype=float) sign = numpy.array([], dtype=int) epochs_list = [] # Collect the information we need about the parts of the observation # to be trimmed out. if saa is True: args = start_met, stop_met, initial_step, precision saa_epochs = self.saa_epochs(*args) epochs_list.append(saa_epochs) if occult is True: args = start_met, stop_met, target_ra, target_dec, initial_step, precision occult_epochs = self.earth_occultation_epochs(*args) epochs_list.append(occult_epochs) # Loop over the list of epochs where we are not taking data and # merge them into a unique array of MET values, keeping track of the # enter/exit flags. for epochs in epochs_list: _met, _sign = self._epochs_to_arrays(epochs) met = numpy.append(met, _met) sign = numpy.append(sign, _sign) # Handle the case where the MET list is empty---this means that we # have a single time interval covering the entire observation. if met.size == 0: return xGTIList(start_met, stop_met, (start_met, stop_met)), saa_epochs, occult_epochs # Sort the MET values and the signs idx = numpy.argsort(met) met = met[idx] sign = sign[idx] # We still have the potential issue of overlapping MET values, e.g., # when at the end of the observation we are occulted *and* in the SAA, # so that stop_met occurs twice at the end of the array. met, idx = numpy.unique(met, return_index=True) sign = sign[idx] # Final loop over the transitions. We keep track of the sum of the sign # values, and every time we hit zero, we're entering a GTI. sign_sum = 0 gti_list = xGTIList(start_met, stop_met) # If the first epoch entrance is after the start MET, we have a GTI # right away. if met[0] > start_met: gti_list.append_gti(start_met, met[0]) # If the last epoch exit is before the stop MET, then we need to append # the stop MET itself to the MET list in order to avoid an index error # at the last step of the loop. if met[-1] < stop_met: met = numpy.append(met, stop_met) for i, _sign in enumerate(sign): sign_sum += _sign if sign_sum == 0 and i < len(met) - 1: gti_list.append_gti(met[i], met[i + 1]) return gti_list, saa_epochs, occult_epochs
[docs] class xTimelineEpoch(xTimeInterval): """Small convenience class to encapsulate an "epoch" within a given observation. In this context an epoch is the datum of a start_met, a stop_met, and a series of bit flags indicating whether the observatory is, e.g., in the SAA or occulted by the Earth. See """ def __init__(self, start_met, stop_met, in_saa, occulted): """Constructor. """ xTimeInterval.__init__(self, start_met, stop_met) self.in_saa = in_saa self.occulted = occulted
[docs] def bitmask(self): """Return a bit mask corresponding to the observatory status during the epoch. """ return self.in_saa << 1 | self.occulted
[docs] def shrink(self, start_padding, stop_padding): """Create a new epoch where the bounds are shrinked by given amounts on either side. The "shrinked" epoch is inhering the SAA and occultation flags. Note that the original object is left untouched, i.e., the new epoch is created in place. """ start_met = self.start_met + start_padding stop_met = self.stop_met - stop_padding return self.__class__(start_met, stop_met, self.in_saa, self.occulted)
[docs] def isgti(self): """Return True if the epoch is a good time interval, i.e., we're not in the SAA and we're not occulted by the Earth. """ return not self.in_saa and not self.occulted
[docs] def isocti(self): """Return True of the epoch is a good on-orbit calibration time interval, i.e., we are occulted by the Earth but not in the SAA. """ return self.occulted and not self.in_saa
def __str__(self): """String formatting. """ return 'Epoch %s [b{:02b}]'.format(xTimeInterval.__str__(self), self.bitmask())
[docs] class xObservationTimeline(xTimeInterval): """Small container class encapsulating the detailed planning, in terms of GTIs, calibration intervals and SAA passages, for a specific observation (i.e., a portion of the mission with a specific, fixed pointing). This is complete rewrite of the code that used to live into the gti_list() method of the xIXPETrajectory class, ini response to The xIXPETrajectory class is still responsible for all of the complex logics and calculation to detect the relevant MET values for the SAA passages and the Earth occultation, and these values are turned into a list of xTimelineEpoch objects to be consumed downstream. This class provides a coherent and unified interface to extract good time intervals and onboard calibration intervals, with optional padding on either end and with the possibility of enforcing a minimum duration (after the padding). """ def __init__(self, start_met, stop_met, target_ra, target_dec, saa, occult): """Constructor. """ xTimeInterval.__init__(self, start_met, stop_met) self.target_ra = target_ra self.target_dec = target_dec self.trajectory = xIXPETrajectory() args = start_met, stop_met, target_ra, target_dec, saa, occult self.epochs = self._calculate_epochs(*self.trajectory.timeline_mets(*args))
[docs] def epoch_data(self): """Return the epoch data in a form that can be used to fill the TIMELINE extension in the output FITS files. """ start = numpy.array([epoch.start_met for epoch in self.epochs]) stop = numpy.array([epoch.stop_met for epoch in self.epochs]) in_saa = numpy.array([epoch.in_saa for epoch in self.epochs]) occulted = numpy.array([epoch.occulted for epoch in self.epochs]) return (start, stop, in_saa, occulted)
@staticmethod def _align_met(met, grid_step, ceil=False): """Align a MET value, or an array of MET value, with a pre-existing grid of fixed steps. This is used to generate the SC data on a regular time gird with a given step. """ met = int(met / grid_step) * grid_step if ceil: met += grid_step return met
[docs] def sc_data(self, time_step, dither_params=None, saa=True, occult=True): """Return the spacecraft data to be written into the SC_DATA optional extension of the output photon list. Spacecraft data are instantaneous data (e.g., longitude, latitude, elevation) evaluated on a regular grid with constant spacing. Rather than with the start or the stop of the observation, the grid is aligned with MET = 0, and padded as necessary on both ends to include the entire span of the observation. The relevant data are returned into the form of a Python dictionary of the form {column_name: value_array} that can be used directly to fill the columns of the output binary table. Arguments: ----------- time_step: float Interval used to save the SC information dither_params : (amplitude, pa, px, py) tuple, optional The parameters for the dithering of the observatory. The dithering is not applied if this is set to None. saa : bool Flag indicating whether the time intervals in the SAA should be flagged---the corresponding column with be identically zero if this is false. occult : bool Flag indicating whether the time intervals when the target is occulted should be flagged---the corresponding column with be identically zero if this is false. """ met0 = self._align_met(self.start_met, time_step) met1 = self._align_met(self.stop_met, time_step, ceil=True) met_grid = numpy.arange(met0, met1 + 0.5 * time_step, time_step) ra_pnt, dec_pnt = apply_dithering(met_grid, self.target_ra, self.target_dec, dither_params) lon, lat = self.trajectory.position(met_grid) elevation = self.trajectory.elevation(met_grid) sun_angle = self.trajectory.target_sun_angle(met_grid, ra_pnt, dec_pnt) if saa: in_saa = self.trajectory.in_saa(met_grid) else: in_saa = numpy.zeros(met_grid.shape, dtype=int) if occult: args = met_grid, self.target_ra, self.target_dec target_occulted = self.trajectory.target_occulted(*args) else: target_occulted = numpy.zeros(met_grid.shape, dtype=int) return {'MET': met_grid, 'RA_PNT': ra_pnt, 'DEC_PNT': dec_pnt, 'LAT_GEO': lat, 'LON_GEO': lon, 'ALT_GEO': elevation, 'SUN_ANGLE': sun_angle, 'IN_SAA': in_saa, 'TARGET_OCCULT': target_occulted}
@staticmethod def _bisect_odd(array_, value): """Return whether a single value is bisecting a given array in a odd index. This is useful to figure out whether we are in the SAA and/or occulted at any given time, by just bisecting the corresponding array of MET values. """ return numpy.searchsorted(array_, value) % 2 == 1 @staticmethod def _calculate_epochs(mets, saa_mets, occult_mets): """Loop over the relevant MET values and populate the list of epochs for the observation. Note that, for each ecpoch, we assign the SAA and Earth occultation flags by bisecting the corresponding MET list with the epoch center. """ epochs = [] for i, start in enumerate(mets[:-1]): stop = mets[i + 1] met0 = 0.5 * (start + stop) in_saa = xObservationTimeline._bisect_odd(saa_mets, met0) occulted = xObservationTimeline._bisect_odd(occult_mets, met0) epochs.append(xTimelineEpoch(start, stop, in_saa, occulted)) return epochs
[docs] def filter_epochs(self, min_duration=0., start_padding=0., stop_padding=0.): """Filter the epochs by duration. """ assert min_duration >= 0. min_duration += start_padding + stop_padding return [epoch for epoch in self.epochs if epoch.duration > min_duration]
[docs] def gti_list(self, min_duration=0., start_padding=0., stop_padding=0.): """Return the list of the good time intervals (GTI). The GTIs are defined as those when we are not in the SAA and we are not occulted by the Earth. Note the return value of this function is not a plain old list, but a xGTIList object. """ epochs = self.filter_epochs(min_duration, start_padding, stop_padding) gtis = [epoch.bounds() for epoch in epochs if epoch.isgti()] return xGTIList(self.start_met, self.stop_met, *gtis)
[docs] def octi_list(self, min_duration=0., start_padding=0., stop_padding=0.): """Return the list of the onboard calibration time intervals (OCTI). The OCTIs are defined as those when we are occulted by the Earth, but outside the SAA. """ epochs = self.filter_epochs(min_duration, start_padding, stop_padding) return [epoch.bounds() for epoch in epochs if epoch.isocti()]