Source code for perda.core_data_structures.resampling

from enum import Enum
from typing import Tuple

import numpy as np
from numpy.typing import NDArray
from scipy.interpolate import interp1d


[docs] class ResampleMethod(str, Enum): """Interpolation strategy used when aligning a series to a target timestamp grid.""" LINEAR = "linear" ZOH = "zoh" NEAREST = "nearest" CUBIC = "cubic"
def _interpolate( target: NDArray, src_t: NDArray, src_v: NDArray, method: ResampleMethod, ) -> NDArray: """ Interpolate src_v (sampled at src_t) onto target timestamps. Parameters ---------- target : NDArray Target timestamps (float64) src_t : NDArray Source timestamps (float64) src_v : NDArray Source values method : ResampleMethod Interpolation strategy Returns ------- NDArray Interpolated values at target timestamps """ if method == ResampleMethod.LINEAR: return np.interp(target, src_t, src_v) elif method == ResampleMethod.ZOH: f = interp1d( src_t, src_v, kind="previous", bounds_error=False, fill_value=(src_v[0], src_v[-1]), ) return f(target) elif method == ResampleMethod.NEAREST: f = interp1d( src_t, src_v, kind="nearest", bounds_error=False, fill_value=(src_v[0], src_v[-1]), ) return f(target) elif method == ResampleMethod.CUBIC: if len(src_t) < 4: return np.interp(target, src_t, src_v) f = interp1d( src_t, src_v, kind="cubic", bounds_error=False, fill_value=(src_v[0], src_v[-1]), ) return f(target) else: raise ValueError(f"Unknown resample method '{method}'")
[docs] def resample_to_freq( ts: NDArray, val: NDArray, freq_hz: float, timestamp_divisor: float, method: ResampleMethod = ResampleMethod.LINEAR, ) -> Tuple[NDArray, NDArray]: """ Resample a time series onto a uniform frequency grid. Parameters ---------- ts : NDArray Source timestamps (int64) val : NDArray Source values freq_hz : float Target sampling frequency in Hz timestamp_divisor : float Raw timestamp units per second (e.g. 1e6 for microseconds) method : ResampleMethod, optional Interpolation method. Default is LINEAR. Returns ------- target_ts : NDArray Uniform timestamp grid (int64) resampled_val : NDArray Values interpolated onto the uniform grid """ dt = timestamp_divisor / freq_hz target_ts = np.arange(ts[0], ts[-1], dt, dtype=np.float64).astype(np.int64) resampled_val = _interpolate( target_ts.astype(np.float64), ts.astype(np.float64), val, method ) return target_ts, resampled_val