Source code for astropy_timeseries.downsample

import warnings

import numpy as np
from astropy import units as u
from astropy.utils.exceptions import AstropyUserWarning

from .sampled import TimeSeries
from .binned import BinnedTimeSeries

__all__ = ['simple_downsample']


def reduceat(array, indices, function):
    """
    Manual reduceat functionality for cases where Numpy functions don't have a reduceat
    """
    result = [function(array[indices[i]:indices[i+1]]) for i in range(len(indices) - 1)]
    result.append(function(array[indices[-1]:]))
    return np.array(result)


[docs]def simple_downsample(time_series, time_bin_size, func=None, time_bin_start=None, n_bins=None): """ Downsample a time series by binning values into bins with a fixed size, using a single function Parameters ---------- time_series : :class:`~astropy_timeseries.TimeSeries` The time series to downsample. time_bin_size : `~astropy.units.Quantity` The time interval for the binned time series func : callable, optional The function to use for combining points in the same bin. Defaults to np.nanmean. time_bin_start : `~astropy.time.Time`, optional The start time for the binned time series. Defaults to the first time in the sampled time series. n_bins : int, optional The number of bins to use. Defaults to the number needed to fit all the original points. Returns ------- binned_time_series : :class:`~astropy_timeseries.BinnedTimeSeries` The downsampled time series. """ if not isinstance(time_series, TimeSeries): raise TypeError("time_series should be a TimeSeries") bin_size_sec = time_bin_size.to_value(u.s) # Use the table sorted by time sorted = time_series.iloc[:] # Determine start time if needed if time_bin_start is None: time_bin_start = sorted.time[0] # Find the relative time since the start time, in seconds relative_time_sec = (sorted.time - time_bin_start).sec # Determine the number of bins if needed if n_bins is None: n_bins = int(np.ceil(relative_time_sec[-1] / bin_size_sec)) if func is None: func = np.nanmedian # Determine the bins relative_bins_sec = np.cumsum(np.hstack([0, np.repeat(bin_size_sec, n_bins)])) bins = time_bin_start + relative_bins_sec * u.s # Find the subset of the table that is inside the bins keep = ((relative_time_sec >= relative_bins_sec[0]) & (relative_time_sec < relative_bins_sec[-1])) subset = sorted[keep] # Figure out which bin each row falls in - the -1 is because items # falling in the first bins will have index 1 but we want that to be 0 indices = np.searchsorted(relative_bins_sec, relative_time_sec[keep]) - 1 # Create new binned time series binned = BinnedTimeSeries(time_bin_start=bins[:-1], time_bin_end=bins[-1]) # Determine rows where values are defined groups = np.hstack([0, np.nonzero(np.diff(indices))[0] + 1]) # Find unique indices to determine which rows in the final time series # will not be empty. unique_indices = np.unique(indices) # Add back columns for colname in subset.colnames: if colname == 'time': continue values = subset[colname] # FIXME: figure out how to avoid the following, if possible if not isinstance(values, (np.ndarray, u.Quantity)): warnings.warn("Skipping column {0} since it has a mix-in type", AstropyUserWarning) continue data = np.ma.zeros(n_bins, dtype=values.dtype) data.mask = 1 if isinstance(values, u.Quantity): data[unique_indices] = u.Quantity(reduceat(values.value, groups, func), values.unit, copy=False) else: data[unique_indices] = reduceat(values, groups, func) data.mask[unique_indices] = 0 binned[colname] = data return binned