Source code for perda.utils.integrate

import numpy as np
import polars as pl
from numpy import float64
from numpy.typing import NDArray
from scipy.integrate import cumulative_trapezoid
from scipy.signal import savgol_filter

from ..core_data_structures.data_instance import DataInstance
from ..units import MAD_TO_STD, Timescale, convert_time


[docs] def integrate_over_time_range( data_instance: DataInstance, start_time: int = 0, end_time: int = -1, source_time_unit: Timescale = Timescale.MS, target_time_unit: Timescale = Timescale.S, ) -> float: """ Get integral of the value over time using the trapezoidal rule. Parameters ---------- data_instance : DataInstance DataInstance to integrate start_time : int, optional Start time for integration. Default is 0 end_time : int, optional End time for integration. -1 means end of data. Default is -1 source_time_unit : Timescale, optional Time unit of inputs. Default is Timescale.MS target_time_unit : Timescale, optional Target time unit for averaging. Default is Timescale.S Returns ------- float Integral of the value over the time range Notes ----- Uses numpy.trapezoid (trapezoidal rule) for numerical integration of discrete data. """ if len(data_instance.timestamp_np) < 2: return 0.0 ts: NDArray[float64] = convert_time( data_instance.timestamp_np.astype(np.float64), source_time_unit, target_time_unit, ) values = data_instance.value_np.astype(np.float64) # Set actual bounds actual_start_time = max(start_time, ts[0]) actual_end_time = ts[-1] if end_time == -1 else min(end_time, ts[-1]) if actual_start_time >= actual_end_time: return 0.0 # Filter data to the time range mask = (ts >= actual_start_time) & (ts <= actual_end_time) ts_filtered = ts[mask] values_filtered = values[mask] if len(ts_filtered) < 2: return 0.0 # Integrate using trapezoidal rule integral = np.trapezoid(values_filtered, ts_filtered) return float(integral)
[docs] def average_over_time_range( data_instance: DataInstance, start_time: int = 0, end_time: int = -1, source_time_unit: Timescale = Timescale.MS, target_time_unit: Timescale = Timescale.S, ) -> float: """ Get average value over time using integral divided by time range. Parameters ---------- data_instance : DataInstance DataInstance to average start_time : int, optional Start time for averaging. Default is 0 end_time : int, optional End time for averaging. -1 means end of data. Default is -1 source_time_unit : Timescale, optional Time unit of inputs. Default is Timescale.MS target_time_unit : Timescale, optional Target time unit for averaging. Default is Timescale.S Returns ------- float Time-weighted average value over the time range Notes ----- Uses numpy.trapezoid (trapezoidal rule) for numerical integration of discrete data. """ if len(data_instance.timestamp_np) == 1: return float(data_instance.value_np[0]) integral = integrate_over_time_range( data_instance, start_time, end_time, source_time_unit, target_time_unit ) ts = data_instance.timestamp_np.astype(np.float64) actual_start_time = max(start_time, ts[0]) actual_end_time = ts[-1] if end_time == -1 else min(end_time, ts[-1]) actual_start_time = convert_time( actual_start_time, source_time_unit, target_time_unit ) actual_end_time = convert_time(actual_end_time, source_time_unit, target_time_unit) time_range = actual_end_time - actual_start_time return float(integral / time_range) if time_range > 0 else 0.0
[docs] def get_data_slice_by_timestamp( original_instance: DataInstance, start_time: int = 0, end_time: int = -1 ) -> DataInstance: """ Get a new DataInstance with data in [start_time, end_time). Parameters ---------- original_instance : DataInstance Original DataInstance to slice start_time : int, optional Start time (inclusive). Default is 0 end_time : int, optional End time (exclusive). -1 means till end. Default is -1 Returns ------- DataInstance New DataInstance containing only data within the specified time range """ if end_time < 0: mask = original_instance.timestamp_np >= start_time else: mask = (original_instance.timestamp_np >= start_time) & ( original_instance.timestamp_np < end_time ) return DataInstance( timestamp_np=original_instance.timestamp_np[mask], value_np=original_instance.value_np[mask], label=original_instance.label, var_id=original_instance.var_id, )
[docs] def smoothed_filtered_integration( data: DataInstance, source_time_unit: Timescale = Timescale.US, target_time_unit: Timescale = Timescale.S, filter_window_size: int = 10, n_sigmas: float = 3, smoothing_window_len: int = 11, smoothing_poly_order: int = 2, ) -> tuple[NDArray[float64], NDArray[float64], NDArray[float64]]: """Integrate a time-series signal after outlier removal and smoothing. Cleans spikes via rolling MAD-based outlier detection, applies Savitzky-Golay smoothing, then computes the cumulative trapezoidal integral over time. Parameters ---------- data : DataInstance Time-series signal to integrate. source_time_unit : Timescale Time unit of inputs. Default is Timescale.US. target_time_unit : Timescale Target time unit for integration result. Default is Timescale.S. filter_window_size : int Rolling window size used for median and MAD outlier detection. n_sigmas : float Number of scaled MAD units beyond which a point is considered an outlier. smoothing_window_len : int Window length for Savitzky-Golay smoothing filter. smoothing_poly_order : int Polynomial order for Savitzky-Golay smoothing filter. Returns ------- tuple[NDArray[Float64], NDArray[Float64], NDArray[Float64]] A tuple of (timestamps, smoothed values, cumulative integral), all of the same length as the input signal. """ v_np = np.array(data.value_np, dtype=np.float64) t = np.array(data.timestamp_np, dtype=np.float64) v_series = pl.Series(v_np) # Rolling median and MAD (Median Absolute Deviation) rolling_median = v_series.rolling_median( window_size=filter_window_size, min_periods=1, center=True ).to_numpy() rolling_std = v_series.rolling_map( lambda x: np.median(np.abs(x.to_numpy() - np.median(x.to_numpy()))) * MAD_TO_STD, window_size=filter_window_size, min_periods=1, center=True, ).to_numpy() # Identify and replace spikes with NaN is_outlier = np.abs(v_np - rolling_median) > (n_sigmas * rolling_std) v_cleaned = v_np.copy() v_cleaned[is_outlier] = np.nan # Linear interpolation over outlier gaps nans = np.isnan(v_cleaned) if nans.any(): idx = np.arange(len(v_cleaned)) not_nan_idx = idx[~nans] if len(not_nan_idx) > 1: v_cleaned = np.interp(idx, not_nan_idx, v_cleaned[~nans]) elif len(not_nan_idx) == 1: v_cleaned[:] = v_cleaned[not_nan_idx[0]] # Smoothing if len(v_cleaned) > smoothing_window_len: v_smooth = savgol_filter(v_cleaned, smoothing_window_len, smoothing_poly_order) else: v_smooth = v_cleaned v_integrated = cumulative_trapezoid( v_smooth, x=convert_time(t, source_time_unit, target_time_unit), initial=0 ) return t, v_smooth, v_integrated