perda.utils.filtering#
- perda.utils.filtering.apply_sos_filter(signal, sos, order)[source]#
Apply a second-order-section filter with NaN masking.
- Parameters:
signal (NDArray[float64]) – Input signal, may contain NaN.
sos (NDArray[float64]) – Second-order sections from
scipy.signal.butter.order (int) – Filter order (used to compute minimum valid sample threshold).
- Returns:
Filtered signal with NaN preserved at original NaN positions, or
Noneif there are too few valid samples to filter.- Return type:
NDArray[float64] | None
- perda.utils.filtering.compute_fft(di, source_time_unit=Timescale.MS, distance_di=None)[source]#
Compute the real FFT magnitude spectrum of a DataInstance.
NaN values are dropped before the transform. The signal is mean-subtracted to suppress the DC component.
- Parameters:
di (DataInstance) – Signal to transform.
source_time_unit (Timescale, optional) – Timestamp unit. Used only in time-domain mode. Default is
Timescale.MS.distance_di (DataInstance | None, optional) – If provided, the FFT is computed in the spatial domain and the sample spacing is derived from cumulative distance in meters (interpolated onto
di’s timestamp grid). Default is None (time domain, result in Hz).
- Return type:
tuple[ndarray[tuple[Any,...],dtype[float64]],ndarray[tuple[Any,...],dtype[float64]]]- Returns:
frequencies (NDArray[float64]) – Frequency axis values.
magnitudes (NDArray[float64]) – FFT magnitude (
abs(rfft(signal - mean))).
Examples
>>> freqs, mags = compute_fft(di) >>> fig = plot_fft_spectrum([freqs], [mags], [di.label])
- perda.utils.filtering.lowpass_filter(di, cutoff_hz, source_time_unit=Timescale.MS, order=4)[source]#
Apply a Butterworth lowpass filter in the time domain.
Returns a new DataInstance (or list thereof) with filtered values. Original timestamps and metadata are preserved. NaN positions in the input remain NaN in the output.
To re-filter at a different cutoff without stacking, pass the original (pre-filter) DataInstance again:
di_original = aly.data["pcm.wheelSpeeds.frontRight"] di_10hz = lowpass_filter(di_original, cutoff_hz=10.0) di_5hz = lowpass_filter(di_original, cutoff_hz=5.0) # not stacked
- Parameters:
di (DataInstance | list[DataInstance]) – Input signal(s). Lists are processed independently.
cutoff_hz (float) – Cutoff frequency in Hz.
source_time_unit (Timescale, optional) – Timestamp unit of
di.timestamp_np. Default isTimescale.MS.order (int, optional) – Butterworth filter order. The effective order is 2× due to forward-backward (
sosfiltfilt) application. Default is 4.
- Returns:
New DataInstance(s) with filtered
value_np.- Return type:
DataInstance | list[DataInstance]
Examples
>>> di_filtered = lowpass_filter(di, cutoff_hz=10.0) >>> di_filtered = lowpass_filter([di_a, di_b], cutoff_hz=5.0)
- perda.utils.filtering.lowpass_filter_by_distance(di, distance_di, cutoff_freq_per_meter, order=4)[source]#
Apply a Butterworth lowpass filter in the spatial domain.
The sample rate is derived from a cumulative-distance DataInstance rather than timestamps.
distance_diis interpolated onto each signal’s timestamp grid before use, so the two need not share the same grid.- Parameters:
di (DataInstance | list[DataInstance]) – Input signal(s) to filter.
distance_di (DataInstance) – Cumulative distance in meters on any timestamp grid.
cutoff_freq_per_meter (float) – Cutoff frequency in cycles per meter (1/m).
order (int, optional) – Butterworth filter order. Default is 4.
- Returns:
New DataInstance(s) with spatially filtered
value_np, on the original signal timestamp grid.- Return type:
DataInstance | list[DataInstance]
Examples
>>> di_filtered = lowpass_filter_by_distance(di, distance_di, cutoff_freq_per_meter=0.05)
- perda.utils.filtering.zscore_filter(di, window_s, threshold, source_time_unit=Timescale.MS)[source]#
Remove outliers using a rolling-window z-score and interpolate gaps.
For each sample, a z-score is computed relative to the local rolling mean and standard deviation. Samples with
|z| > thresholdare replaced withNaNand then linearly interpolated.- Parameters:
di (DataInstance | list[DataInstance]) – Input signal(s).
window_s (float) – Rolling window size in seconds
threshold (float) – Z-score threshold; samples exceeding this are masked.
source_time_unit (Timescale, optional) – Timestamp unit. Default is
Timescale.MS.
- Returns:
New DataInstance(s) with outliers replaced by linear interpolation.
- Return type:
DataInstance | list[DataInstance]
Examples
>>> di_clean = zscore_filter(di, window_s=1.0, threshold=3.0)