Source code for astropy_timeseries.sampled
# Licensed under a 3-clause BSD style license - see LICENSE.rst
from copy import deepcopy
import numpy as np
from astropy.table import groups, QTable, Table
from astropy.time import Time, TimeDelta
from astropy import units as u
from astropy.units import Quantity
from .core import BaseTimeSeries
__all__ = ['TimeSeries']
[docs]class TimeSeries(BaseTimeSeries):
_require_time_column = False
def __init__(self, data=None, time=None, time_delta=None, n_samples=None, **kwargs):
"""
"""
super().__init__(data=data, **kwargs)
# FIXME: this is because for some operations, an empty time series needs
# to be created, then columns added one by one. We should check that
# when columns are added manually, time is added first and is of the
# right type.
if data is None and time is None and time_delta is None:
self._required_columns = ['time']
return
# First if time has been given in the table data, we should extract it
# and treat it as if it had been passed as a keyword argument.
if data is not None:
# TODO: raise error if also passed explicily and inconsistent
n_samples = len(self)
if 'time' in self.colnames:
if time is None:
time = self.columns['time']
self.remove_column('time')
else:
raise TypeError("'time' has been given both in the table and as a keyword argument")
if time is None:
raise TypeError("'time' has not been specified")
if not isinstance(time, Time):
time = Time(time)
if time_delta is not None and not isinstance(time_delta, (Quantity, TimeDelta)):
raise TypeError("'time_delta' should be a Quantity or a TimeDelta")
if isinstance(time_delta, TimeDelta):
time_delta = time_delta.sec * u.s
if time.isscalar:
# We interpret this as meaning that time is that of the first
# sample and that the interval is given by time_delta.
if time_delta is None:
raise TypeError("'time' is scalar, so 'time_delta' is required")
if time_delta.isscalar:
time_delta = np.repeat(time_delta, n_samples)
time_delta = np.cumsum(time_delta)
time_delta = np.roll(time_delta, 1)
time_delta[0] = 0. * u.s
time = time + time_delta
else:
if len(self.colnames) > 0 and len(time) != len(self):
raise ValueError("Length of 'time' ({0}) should match "
"data length ({1})".format(len(time), n_samples))
if time_delta is not None:
raise TypeError("'time_delta' should not be specified since "
"'time' is an array")
self.add_column(time, index=0, name='time')
self.add_index('time')
@property
def time(self):
"""
The time values.
"""
return self['time']
[docs] def fold(self, period=None, midpoint_epoch=None):
"""
Return a new TimeSeries folded with a period and midpoint epoch.
Parameters
----------
period : `~astropy.units.Quantity`
The period to use for folding
midpoint_epoch : `~astropy.time.Time`
The time to use as the midpoint epoch, at which the relative
time offset will be 0. Defaults to the first time in the time
series.
"""
folded = self.copy()
folded.remove_column('time')
if midpoint_epoch is None:
midpoint_epoch = self.time[0]
else:
midpoint_epoch = Time(midpoint_epoch)
period_sec = period.to_value(u.s)
relative_time_sec = ((self.time - midpoint_epoch).sec + period_sec / 2) % period_sec - period_sec / 2
folded_time = TimeDelta(relative_time_sec * u.s)
folded.add_column(folded_time, name='time', index=0)
return folded
def __getitem__(self, item):
if self._is_list_or_tuple_of_str(item):
if 'time' not in item:
out = QTable([self[x] for x in item],
meta=deepcopy(self.meta),
copy_indices=self._copy_indices)
out._groups = groups.TableGroups(out, indices=self.groups._indices,
keys=self.groups._keys)
return out
return super().__getitem__(item)
[docs] def add_columns(self, *args, **kwargs):
result = super().add_columns(*args, **kwargs)
if len(self.indices) == 0 and 'time' in self.colnames:
self.add_index('time')
return result
[docs] @classmethod
def from_pandas(self, df):
"""
Convert a :class:`~pandas.DataFrame` to a
:class:`astropy_timeseries.TimeSeries`.
"""
from pandas import DataFrame, DatetimeIndex
if not isinstance(df, DataFrame):
raise TypeError("Input should be a pandas DataFrame")
if not isinstance(df.index, DatetimeIndex):
raise TypeError("DataFrame does not have a DatetimeIndex")
# TODO: determine how user can specify time scale
time = Time(df.index)
# Create table without the time column
table = Table.from_pandas(df)
return TimeSeries(time=time, data=table)
[docs] def to_pandas(self):
"""
Convert this time series to a :class:`~pandas.DataFrame` with a
:class:`~pandas.DatetimeIndex` index.
"""
# Extract table without time column
table = self[[x for x in self.colnames if x != 'time']]
# First make a normal pandas dataframe
df = Table(table).to_pandas()
# Set index
df.set_index(self.time.datetime64, inplace=True)
return df