# Copyright (C) 2022, the ixpeobssim team.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Data structures related to the good time intervals.
"""
from __future__ import print_function, division
from enum import IntEnum
import numpy
from ixpeobssim.utils.logging_ import logger
# pylint: disable=invalid-name
[docs]
class UnexpectedEdgeType(RuntimeError):
"""
Custom exception raised when an unexpected sequence of GTI edges is
encountered (e.g. two GTI start in a row without a GTI stop in between).
"""
pass
[docs]
class xGTIList(list):
"""Class representing a list of good time interval.
The interfaces are fairly minimal, but since we use this in quite
different places, it was handy to collect the useful stuff in one place.
"""
def __init__(self, start_met, stop_met, *gtis):
"""
"""
list.__init__(self)
self.start_met = start_met
self.stop_met = stop_met
self.span = self.stop_met - self.start_met
for start, stop in gtis:
self.append_gti(start, stop)
[docs]
@classmethod
def from_arrays(cls, start_met, stop_met, tstarts, tstops):
"""
Initialize from two arrays (of start and stop).
This is essentially converting the two arrays of start and stop to
an array of tuples (start, stop), as required by the constructor of
the class.
"""
gtis = []
for start, stop in zip(tstarts, tstops):
gtis.append([start, stop])
return cls(start_met, stop_met, *gtis)
@staticmethod
def validate_intervals(start, stop):
if len(start) != len(stop):
raise ValueError('start/stop length mismatch')
if not numpy.all(start < stop):
raise ValueError('non-positive interval detected')
[docs]
def append_gti(self, start, stop):
"""Append a new GTI to the list.
"""
assert start >= self.start_met
assert stop <= self.stop_met
self.append((start, stop))
[docs]
def total_good_time(self):
"""Return the total good time.
"""
return sum(stop - start for start, stop in self)
[docs]
def start_mets(self):
"""Return the value of the start MET values for the GTI.
"""
return [start for start, stop in self]
[docs]
def stop_mets(self):
"""Return the value of the stop MET values for the GTI.
"""
return [stop for start, stop in self]
[docs]
def all_mets(self):
"""Return all the MET values corresponding to the start or stop of the
GTIs in the list.
Note that Python supports the call to sum() with start set by keyword
argument only since version 3.8, so we are refraining from that, here.
"""
return sum([[start, stop] for start, stop in self], [])
[docs]
def complement(self):
"""Return the logical complement of the GTI list in the relevant time
span, i.e., the list of time intervals between the start and the end
of observation that are *not* good time intervals.
These are the ones where we do not take celestial data and we can, e.g.,
take calibration data.
See https://stackoverflow.com/questions/16789776/ for the use of the
iterator over the list.
"""
# This is an iterator to all the bounds of the GTIs except for the
# start of the first GTI and the end of the last.
iterator = iter(self.all_mets()[1:-1])
return [(start, next(iterator)) for start in iterator]
[docs]
def gti_mask(self, time_):
"""
Return a boolean mask selecting the times falling inside the GTI in the
list.
Args
---------
time_ : array of times to select
"""
mask = numpy.zeros(time_.shape, dtype=bool)
for (start, stop) in self:
mask[numpy.logical_and(time_ >= start, time_ <= stop)] = True
return mask
[docs]
def filter_event_times(self, time_):
"""Filter a given array of event times and return a reduced array
only containing the times within the good time intervals in the list,
along with the corresponding boolean mask. The latter, in turn, can be
used to filter ancillary related columns, e.g., the phase in simulations
of periodic sources.
"""
logger.info('Filtering %d event times according to the GTIs...', len(time_))
mask = self.gti_mask(time_)
time_ = time_[mask]
logger.info('Done, %d entries remaining.', len(time_))
return time_, mask
[docs]
def remove_bti(self, bti_start, bti_stop):
"""
Update Good Time Intervals (GTIs) by removing Bad Time Intervals (BTIs)
from them.
Args:
bti_start (numpy.ndarray): Array of start times for BTIs.
bti_stop (numpy.ndarray): Array of stop times for BTIs.
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: Arrays of start and stop times for
the new GTIs.
"""
gti_start, gti_stop = self.start_mets(), self.stop_mets()
self.validate_intervals(bti_start, bti_stop)
helper = xGTIListMergerHelper()
def merge_rule():
""" BTI are treated as negative GTIs
"""
return helper.in_old_gti and not helper.in_new_gti
return helper.combine_intervals(gti_start, gti_stop, bti_start,
bti_stop, merge_rule)
[docs]
def merge_gti(self, new_gti_start, new_gti_stop):
"""
Update Good Time Intervals (GTIs) by performing the *intersection* with
the given GTIs.
Args:
new_gti_start (numpy.ndarray): Array of start times for GTIs.
new_gti_stop (numpy.ndarray): Array of stop times for GTIs.
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: Arrays of start and stop times
for the intersection GTIs.
"""
gti_start, gti_stop = self.start_mets(), self.stop_mets()
self.validate_intervals(new_gti_start, new_gti_stop)
helper = xGTIListMergerHelper()
def merge_rule():
return helper.in_old_gti and helper.in_new_gti
return helper.combine_intervals(gti_start, gti_stop, new_gti_start,
new_gti_stop, merge_rule)
def __str__(self):
"""String formatting.
"""
text = 'List of good time intervals (%.3f s total over %.3f s span):' %\
(self.total_good_time(), self.span)
for i, (start, stop) in enumerate(self):
text = '%s\n[%3d] (%.3f--%.3f) or (%.3f--%.3f)' %\
(text, i + 1, start, stop, start - self.start_met,
stop - self.start_met)
return text
[docs]
class xGTIListMergerHelper:
""" Helper class encapsulating most of the logic for combining two sets of
GTIs. Provides internal flags for keeping track of whether we are inside
the OLD/NEW GTIs and the logic for switching state based on edge detection.
"""
[docs]
class EdgeType(IntEnum):
"""
Enumeration representing the types of edges that can occur in Good
Time Intervals (GTIs) and Bad Time Intervals (BTIs).
Members:
OLD_GTI_START: Beginning of a good time interval.
OLD_GTI_STOP: End of a good time interval.
NEW_GTI_START: Beginning of a bad time interval.
NEW_GTI_STOP: End of a bad time interval.
"""
OLD_GTI_START = 0
OLD_GTI_STOP = 1
NEW_GTI_START = 2
NEW_GTI_STOP = 3
EDGE_PRIORITY = {
EdgeType.NEW_GTI_STOP: 0,
EdgeType.OLD_GTI_STOP: 1,
EdgeType.NEW_GTI_START: 2,
EdgeType.OLD_GTI_START: 3,
}
def __init__(self):
"""
"""
self.in_old_gti = False
self.in_new_gti = False
def _error_msg(self, edge_type):
return f'Unexpected edge type {edge_type.name} when '\
f'"in_old_gti" is {self.in_old_gti} and '\
f'"in_new_gti" is {self.in_new_gti}'
[docs]
def update_state(self, edge):
""" Change the internal state based on the given edge type. Rise an
appropriate error in case of erroneous transitions (e.g. encoutering a
OLD_GTI_START when already inside a OLD_GTI).
"""
if edge == self.EdgeType.OLD_GTI_START:
if self.in_old_gti:
raise UnexpectedEdgeType(self._error_msg(edge))
self.in_old_gti = True
elif edge == self.EdgeType.OLD_GTI_STOP:
if not self.in_old_gti:
raise UnexpectedEdgeType(self._error_msg(edge))
self.in_old_gti = False
elif edge == self.EdgeType.NEW_GTI_START:
if self.in_new_gti:
raise UnexpectedEdgeType(self._error_msg(edge))
self.in_new_gti = True
elif edge == self.EdgeType.NEW_GTI_STOP:
if not self.in_new_gti:
raise UnexpectedEdgeType(self._error_msg(edge))
self.in_new_gti = False
[docs]
def sweep_intervals(self, edge_times, edge_types, is_active):
"""
Generic sweep-line engine for interval algebra.
Intervals are half-open: [start, stop)
The user provided function is_active defines when the resulting GTIs
should be considered active.
Args
----------
edge_times : ndarray
edge_types : ndarray
is_active : callable() -> bool
Returns True when output GTI should be active.
Returns
-------
start, stop : ndarray
"""
# Sort edges by (time, priority)
order = numpy.lexsort((
[self.EDGE_PRIORITY[t] for t in edge_types],
edge_times
))
edge_times = edge_times[order]
edge_types = edge_types[order]
start = []
stop = []
prev_active = False
for t, etype in zip(edge_times, edge_types):
self.update_state(etype)
active = is_active()
if not prev_active and active:
start.append(t)
elif prev_active and not active:
stop.append(t)
prev_active = active
if prev_active:
raise RuntimeError('Unclosed interval at end of sweep')
return numpy.array(start), numpy.array(stop)
[docs]
def combine_intervals(self, old_start, old_stop, new_start, new_stop,
rule):
""" Combine the given intervals based on the given rule. This function
mostly prepare the input arrays for sweep_intervals().
"""
edge_times = numpy.hstack((old_start, old_stop, new_start, new_stop))
edge_types = numpy.asarray(
[self.EdgeType.OLD_GTI_START] * len(old_start) +
[self.EdgeType.OLD_GTI_STOP] * len(old_stop) +
[self.EdgeType.NEW_GTI_START] * len(new_start) +
[self.EdgeType.NEW_GTI_STOP] * len(new_stop)
)
return self.sweep_intervals(edge_times, edge_types, rule)
[docs]
class xSimpleGTIList(xGTIList):
"""Subclass of xGTIList with a single GTI.
"""
def __init__(self, start_met, stop_met):
"""Constructor.
"""
xGTIList.__init__(self, start_met, stop_met, (start_met, stop_met))
[docs]
class xUberGTIList(xGTIList):
"""Subclass of xGTIList with a single GTI including all the MET from -inf to inf.
"""
def __init__(self):
"""Constructor.
"""
start_met = -numpy.inf
stop_met = numpy.inf
xGTIList.__init__(self, start_met, stop_met, (start_met, stop_met))