perda.utils.filtering#

perda.utils.filtering.apply_sos_filter(signal, sos, order)[source]#

Apply a second-order-section filter with NaN masking.

Parameters:
  • signal (NDArray[float64]) – Input signal, may contain NaN.

  • sos (NDArray[float64]) – Second-order sections from scipy.signal.butter.

  • order (int) – Filter order (used to compute minimum valid sample threshold).

Returns:

Filtered signal with NaN preserved at original NaN positions, or None if there are too few valid samples to filter.

Return type:

NDArray[float64] | None

perda.utils.filtering.compute_fft(di, source_time_unit=Timescale.MS, distance_di=None)[source]#

Compute the real FFT magnitude spectrum of a DataInstance.

NaN values are dropped before the transform. The signal is mean-subtracted to suppress the DC component.

Parameters:
  • di (DataInstance) – Signal to transform.

  • source_time_unit (Timescale, optional) – Timestamp unit. Used only in time-domain mode. Default is Timescale.MS.

  • distance_di (DataInstance | None, optional) – If provided, the FFT is computed in the spatial domain and the sample spacing is derived from cumulative distance in meters (interpolated onto di’s timestamp grid). Default is None (time domain, result in Hz).

Return type:

tuple[ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]]]

Returns:

  • frequencies (NDArray[float64]) – Frequency axis values.

  • magnitudes (NDArray[float64]) – FFT magnitude (abs(rfft(signal - mean))).

Examples

>>> freqs, mags = compute_fft(di)
>>> fig = plot_fft_spectrum([freqs], [mags], [di.label])
perda.utils.filtering.lowpass_filter(di, cutoff_hz, source_time_unit=Timescale.MS, order=4)[source]#

Apply a Butterworth lowpass filter in the time domain.

Returns a new DataInstance (or list thereof) with filtered values. Original timestamps and metadata are preserved. NaN positions in the input remain NaN in the output.

To re-filter at a different cutoff without stacking, pass the original (pre-filter) DataInstance again:

di_original = aly.data["pcm.wheelSpeeds.frontRight"]
di_10hz = lowpass_filter(di_original, cutoff_hz=10.0)
di_5hz  = lowpass_filter(di_original, cutoff_hz=5.0)   # not stacked
Parameters:
  • di (DataInstance | list[DataInstance]) – Input signal(s). Lists are processed independently.

  • cutoff_hz (float) – Cutoff frequency in Hz.

  • source_time_unit (Timescale, optional) – Timestamp unit of di.timestamp_np. Default is Timescale.MS.

  • order (int, optional) – Butterworth filter order. The effective order is 2× due to forward-backward (sosfiltfilt) application. Default is 4.

Returns:

New DataInstance(s) with filtered value_np.

Return type:

DataInstance | list[DataInstance]

Examples

>>> di_filtered = lowpass_filter(di, cutoff_hz=10.0)
>>> di_filtered = lowpass_filter([di_a, di_b], cutoff_hz=5.0)
perda.utils.filtering.lowpass_filter_by_distance(di, distance_di, cutoff_freq_per_meter, order=4)[source]#

Apply a Butterworth lowpass filter in the spatial domain.

The sample rate is derived from a cumulative-distance DataInstance rather than timestamps. distance_di is interpolated onto each signal’s timestamp grid before use, so the two need not share the same grid.

Parameters:
  • di (DataInstance | list[DataInstance]) – Input signal(s) to filter.

  • distance_di (DataInstance) – Cumulative distance in meters on any timestamp grid.

  • cutoff_freq_per_meter (float) – Cutoff frequency in cycles per meter (1/m).

  • order (int, optional) – Butterworth filter order. Default is 4.

Returns:

New DataInstance(s) with spatially filtered value_np, on the original signal timestamp grid.

Return type:

DataInstance | list[DataInstance]

Examples

>>> di_filtered = lowpass_filter_by_distance(di, distance_di, cutoff_freq_per_meter=0.05)
perda.utils.filtering.zscore_filter(di, window_s, threshold, source_time_unit=Timescale.MS)[source]#

Remove outliers using a rolling-window z-score and interpolate gaps.

For each sample, a z-score is computed relative to the local rolling mean and standard deviation. Samples with |z| > threshold are replaced with NaN and then linearly interpolated.

Parameters:
  • di (DataInstance | list[DataInstance]) – Input signal(s).

  • window_s (float) – Rolling window size in seconds

  • threshold (float) – Z-score threshold; samples exceeding this are masked.

  • source_time_unit (Timescale, optional) – Timestamp unit. Default is Timescale.MS.

Returns:

New DataInstance(s) with outliers replaced by linear interpolation.

Return type:

DataInstance | list[DataInstance]

Examples

>>> di_clean = zscore_filter(di, window_s=1.0, threshold=3.0)