Source code for astropy_timeseries.binned
# Licensed under a 3-clause BSD style license - see LICENSE.rst
from copy import deepcopy
import numpy as np
from astropy.table import groups, QTable
from astropy.time import Time, TimeDelta
from astropy import units as u
from astropy.units import Quantity
from .core import BaseTimeSeries
__all__ = ['BinnedTimeSeries']
[docs]class BinnedTimeSeries(BaseTimeSeries):
_require_time_column = False
def __init__(self, data=None, time_bin_start=None, time_bin_end=None,
time_bin_size=None, n_bins=None, **kwargs):
super().__init__(data=data, **kwargs)
# FIXME: this is because for some operations, an empty time series needs
# to be created, then columns added one by one. We should check that
# when columns are added manually, time is added first and is of the
# right type.
if (data is None and time_bin_start is None and time_bin_end is None and
time_bin_size is None and n_bins is None):
self._required_columns = ['time_bin_start', 'time_bin_size']
return
# First if time_bin_start and time_bin_end have been given in the table data, we
# should extract them and treat them as if they had been passed as
# keyword arguments.
if 'time_bin_start' in self.colnames:
if time_bin_start is None:
time_bin_start = self.columns['time_bin_start']
self.remove_column('time_bin_start')
else:
raise TypeError("'time_bin_start' has been given both in the table "
"and as a keyword argument")
if 'time_bin_size' in self.colnames:
if time_bin_size is None:
time_bin_size = self.columns['time_bin_size']
self.remove_column('time_bin_size')
else:
raise TypeError("'time_bin_size' has been given both in the table "
"and as a keyword argument")
if time_bin_start is None:
raise TypeError("'time_bin_start' has not been specified")
if time_bin_end is None and time_bin_size is None:
raise TypeError("Either 'time_bin_size' or 'time_bin_end' should be specified")
if not isinstance(time_bin_start, Time):
time_bin_start = Time(time_bin_start)
if time_bin_end is not None and not isinstance(time_bin_end, Time):
time_bin_end = Time(time_bin_end)
if time_bin_size is not None and not isinstance(time_bin_size, (Quantity, TimeDelta)):
raise TypeError("'time_bin_size' should be a Quantity or a TimeDelta")
if isinstance(time_bin_size, TimeDelta):
time_bin_size = time_bin_size.sec * u.s
if time_bin_start.isscalar:
# We interpret this as meaning that this is the start of the
# first bin and that the bins are contiguous. In this case,
# we require time_bin_size to be specified.
if time_bin_size is None:
raise TypeError("'time_bin_start' is scalar, so 'time_bin_size' is required")
if time_bin_size.isscalar:
if data is not None:
# TODO: raise error if also passed explicily and inconsistent
n_bins = len(self)
time_bin_size = np.repeat(time_bin_size, n_bins)
time_delta = np.cumsum(time_bin_size)
time_bin_end = time_bin_start + time_delta
# Now shift the array so that the first entry is 0
time_delta = np.roll(time_delta, 1)
time_delta[0] = 0. * u.s
# Make time_bin_start into an array
time_bin_start = time_bin_start + time_delta
else:
if len(self.colnames) > 0 and len(time_bin_start) != len(self):
raise ValueError("Length of 'time_bin_start' ({0}) should match "
"table length ({1})".format(len(time_bin_start), len(self)))
if time_bin_end is not None:
if time_bin_end.isscalar:
times = time_bin_start.copy()
times[:-1] = times[1:]
times[-1] = time_bin_end
time_bin_end = times
time_bin_size = (time_bin_end - time_bin_start).sec * u.s
elif time_bin_size is None:
raise TypeError("Either 'time_bin_size' or 'time_bin_end' should be specified")
self.add_column(time_bin_start, index=0, name='time_bin_start')
self.add_index('time_bin_start')
if time_bin_size.isscalar:
time_bin_size = np.repeat(time_bin_size, len(self))
self.add_column(time_bin_size, index=1, name='time_bin_size')
@property
def time_bin_start(self):
"""
The start times of all the time bins.
"""
return self['time_bin_start']
@property
def time_bin_center(self):
"""
The center times of all the time bins.
"""
return self['time_bin_start'] + self['time_bin_size'] * 0.5
@property
def time_bin_end(self):
"""
The end times of all the time bins.
"""
return self['time_bin_start'] + self['time_bin_size']
@property
def time_bin_size(self):
"""
The sizes of all the time bins.
"""
return self['time_bin_size']
def __getitem__(self, item):
if self._is_list_or_tuple_of_str(item):
if 'time_bin_start' not in item or 'time_bin_size' not in item:
out = QTable([self[x] for x in item],
meta=deepcopy(self.meta),
copy_indices=self._copy_indices)
out._groups = groups.TableGroups(out, indices=self.groups._indices,
keys=self.groups._keys)
return out
return super().__getitem__(item)