Source code for perda.utils.frequency_analysis

import numpy as np
from numpy import float64
from numpy.typing import NDArray
from plotly import graph_objects as go

from ..constants import DELIMITER, title_block
from ..core_data_structures.data_instance import DataInstance
from ..plotting.plotting_constants import *
from ..plotting.scatter_histogram_plotter import plot_scatter_and_histogram
from ..units import Timescale


def _s_conversion_factor(time_unit: Timescale) -> float:
    """Return the factor to convert from the given time unit to seconds."""
    if time_unit == Timescale.US:
        return 1e-6
    if time_unit == Timescale.MS:
        return 1e-3
    return 1.0


[docs] def analyze_frequency( data_instance: DataInstance, expected_frequency_hz: float | None = None, source_time_unit: Timescale = Timescale.MS, gap_threshold_multiplier: float = 2.0, font_config: FontConfig = DEFAULT_FONT_CONFIG, layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG, plot_config: ScatterHistogramPlotConfig = DEFAULT_SCATTER_HISTOGRAM_PLOT_CONFIG, ) -> go.Figure: """ Analyse the sampling frequency of a DataInstance and return a diagnostic figure. Prints a summary of frequency statistics and gap detection, then returns a figure with two subplots: instantaneous frequency over time and a frequency histogram. Parameters ---------- data_instance : DataInstance The DataInstance whose logging cadence to analyse. expected_frequency_hz : float | None, optional Nominal expected sampling frequency in Hz. When provided, additional diagnostics (frequency error, missed-message estimate, reference lines) are included. Default is None. source_time_unit : Timescale, optional Timestamp unit used in ``data_instance.timestamp_np``. Default is ms. gap_threshold_multiplier : float, optional An interval is flagged as a gap when it exceeds this multiple of the expected interval (if ``expected_frequency_hz`` is given) or the median interval. Default is 2.0. font_config : FontConfig, optional Font sizes for plot elements. Default is DEFAULT_FONT_CONFIG. layout_config : LayoutConfig, optional Plot dimensions and margins. Default is DEFAULT_LAYOUT_CONFIG. plot_config : ScatterHistogramPlotConfig | None, optional Colors and histogram bin count. Default is DEFAULT_SCATTER_HISTOGRAM_PLOT_CONFIG. Returns ------- go.Figure Plotly figure with frequency time-series and frequency histogram subplots. Examples -------- >>> fig = analyze_frequency(di, expected_frequency_hz=100) >>> fig.show() """ label = data_instance.label or f"var_id={data_instance.var_id}" if len(data_instance) < 2: print(f"{label}: insufficient data points (need >= 2).") return go.Figure() ts = data_instance.timestamp_np.astype(np.float64) ts_s = (ts[:-1] - ts[0]) * _s_conversion_factor(source_time_unit) dt_s = np.diff(ts) * _s_conversion_factor(source_time_unit) if expected_frequency_hz is not None and expected_frequency_hz > 0: baseline_interval = 1.0 / expected_frequency_hz else: baseline_interval = float(np.median(dt_s)) gap_threshold = gap_threshold_multiplier * baseline_interval gap_mask = dt_s > gap_threshold n_gaps = int(np.sum(gap_mask)) freq: NDArray[float64] = 1.0 / dt_s total_duration_s = float(ts_s[-1] - ts_s[0]) mean_freq = float(np.mean(freq)) median_freq = float(np.median(freq)) std_freq = float(np.std(freq)) min_freq = float(np.min(freq)) max_freq = float(np.max(freq)) p5 = float(np.percentile(freq, 5)) p95 = float(np.percentile(freq, 95)) W = 10 W_Label = 16 print(str(data_instance)) print(DELIMITER) print(f"{'Duration:':<{W_Label}} {total_duration_s:{W}.3f} s") print( f"{'Total samples:':<{W_Label}} {len(data_instance):{W}d}" + ( f" (Expected: {int(round(expected_frequency_hz * total_duration_s))} Hz)" if expected_frequency_hz else "" ) ) print( "Frequency (Hz)" + ( f" (Expected: {expected_frequency_hz:{W}.3f} Hz)" if expected_frequency_hz else "" ) ) print( f" {'Mean:':<{W_Label - 4}} {mean_freq:{W}.3f} {'Median:':<{W_Label - 4}} {median_freq:.3f}" ) print( f" {'Std dev:':<{W_Label - 4}} {std_freq:{W}.3f} {'Min:':<{W_Label - 4}} {min_freq:.3f} {'Max:':<{W_Label - 4}} {max_freq:.3f}" ) print( f" {'P5:':<{W_Label - 4}} {p5:{W}.3f} {'P95:':<{W_Label - 4}} {p95:.3f}" ) print("Gaps") print(f" {'Threshold:':<{W_Label - 4}} {gap_threshold} s") print(f" {'Detected:':<{W_Label - 4}} {n_gaps}") line_label = ( f"Expected {expected_frequency_hz} Hz" if expected_frequency_hz is not None else None ) return plot_scatter_and_histogram( x=ts_s, y=freq, title=f"Frequency Analysis — {label}", scatter_title="Instantaneous Frequency over Time", histogram_title="Instantaneous Frequency Distribution", x_label="Time (s)", y_label="Frequency (Hz)", scatter_name="Freq (Hz)", histogram_name="Freq (Hz)", hline=expected_frequency_hz, hline_label=line_label, vline=expected_frequency_hz, vline_label=line_label, font_config=font_config, layout_config=layout_config, plot_config=plot_config, )