Source code for perda.analyzer.analyzer

import io
import sys
from typing import List, Union

from plotly import graph_objects as go

from ..core_data_structures.data_instance import DataInstance
from ..core_data_structures.single_run_data import SingleRunData
from ..plotting.data_instance_plotter import *
from ..plotting.plotting_constants import *
from ..units import Timescale, mph_seconds_to_meters
from ..utils.accel_calculator import *
from ..utils.data_summary import single_run_summary
from ..utils.diff import diff
from ..utils.frequency_analysis import analyze_frequency as _analyze_frequency
from ..utils.integrate import smoothed_filtered_integration
from ..utils.search import search
from .csv import *


[docs] class Analyzer: """Primary class for loading and analyzing car log data.""" def __init__( self, filepath: str, ts_offset: int = 0, parsing_errors_limit: int = 100, ) -> None: """ Initialize a new analyzer instance. Parameters ---------- filepath : str Path to the CSV file containing CAN bus variables ts_offset : int, optional Timestamp offset to apply to all data points. Default is 0 parsing_errors_limit : int, optional Maximum number of parsing errors before stopping. Default is 100 parse_unit : Timescale | str | None, optional Logging timestamp unit for parsing. If None, auto-detect from header: lines ending with "v2.0" use us, otherwise ms. Examples -------- >>> aly = Analyzer("path/to/log.csv") >>> print(aly) # lists all available variables """ self.data: SingleRunData = parse_csv( filepath, ts_offset, parsing_errors_limit=parsing_errors_limit, ) def __str__(self) -> str: """Return a summary of all variables in the loaded run data.""" old_stdout = sys.stdout buffer = io.StringIO() sys.stdout = buffer single_run_summary(self.data) output = buffer.getvalue() buffer.close() sys.stdout = old_stdout return output
[docs] def search(self, query: str) -> None: """ Natural language search for available variables in the parsed data. Parameters ---------- query : str Search query Examples -------- >>> aly.search("front wheel speed") """ search(self.data, query)
[docs] def plot( self, var_1: Union[str, int, DataInstance, List[Union[str, int, DataInstance]]], var_2: ( Union[str, int, DataInstance, List[Union[str, int, DataInstance]]] | None ) = None, ts_start: float | None = None, ts_end: float | None = None, title: str | None = None, y_label_1: str | None = None, y_label_2: str | None = None, show_legend: bool = True, font_config: FontConfig = DEFAULT_FONT_CONFIG, layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG, vline_config: VLineConfig = DEFAULT_VLINE_CONFIG, ) -> go.Figure: """ Display variables from the parsed data on an interactive Plotly plot. Concat boundaries (if any) are automatically shown as vertical lines. Parameters ---------- var_1 : Union[str, int, DataInstance, List[Union[str, int, DataInstance]]] Variable(s) to plot on the left y-axis. Can be variable name(s), variable ID(s), or DataInstance(s) var_2 : Union[str, int, DataInstance, List[Union[str, int, DataInstance]]] | None, optional Optional variable(s) to plot on the right y-axis. Can be variable name(s), variable ID(s), or DataInstance(s). ts_start : float | None, optional Start of the time window in seconds. Data points before this time are excluded. Default is None (no lower bound). ts_end : float | None, optional End of the time window in seconds. Data points after this time are excluded. Default is None (no upper bound). title : str | None, optional y_label_1 : str | None, optional Label for left y-axis (or only y-axis if no right input). y_label_2 : str | None, optional Label for right y-axis. show_legend : bool, optional Whether to show plot legends. Default is True font_config : FontConfig, optional Font configuration for plot elements. Default is DEFAULT_FONT_CONFIG layout_config : LayoutConfig, optional Layout configuration for plot dimensions. Default is DEFAULT_LAYOUT_CONFIG vline_config : VLineConfig, optional Visual configuration for concat boundary lines. Default is DEFAULT_VLINE_CONFIG. Examples -------- >>> fig = aly.plot("pcm.wheelSpeeds.frontRight") >>> fig = aly.plot(["pcm.wheelSpeeds.frontRight", "pcm.wheelSpeeds.frontLeft"], title="Front Wheel Speeds") >>> fig = aly.plot("pcm.moc.motor.requestedTorque", "pcm.wheelSpeeds.frontRight", ts_start=10.0, ts_end=30.0) >>> fig.show() """ # Normalize left input to List[DataInstance] var_1_norm = self._normalize_input(var_1) # Divisor to convert raw timestamps to seconds (used for vlines and time range filter) unit = self.data.timestamp_unit divisor = 1e6 if unit == Timescale.US else 1e3 if unit == Timescale.MS else 1.0 # Convert concat boundaries to seconds for the plotter vlines: List[float] | None = None if self.data.concat_boundaries: vlines = [b / divisor for b in self.data.concat_boundaries] # Apply time range filter if specified (convert seconds → raw units for trim) if ts_start is not None or ts_end is not None: start_raw = ts_start * divisor if ts_start is not None else None end_raw = ts_end * divisor if ts_end is not None else None var_1_norm = [di.trim(start_raw, end_raw) for di in var_1_norm] if var_2 is not None: # Normalize right input to List[DataInstance] var_2_norm = self._normalize_input(var_2) if ts_start is not None or ts_end is not None: var_2_norm = [di.trim(start_raw, end_raw) for di in var_2_norm] return plot_dual_axis( left_data_instances=var_1_norm, right_data_instances=var_2_norm, title=title, left_y_axis_title=y_label_1, right_y_axis_title=y_label_2, show_legend=show_legend, font_config=font_config, layout_config=layout_config, timestamp_unit=self.data.timestamp_unit, vlines=vlines, vline_config=vline_config, ) else: return plot_single_axis( data_instances=var_1_norm, title=title, y_axis_title=y_label_1, show_legend=show_legend, font_config=font_config, layout_config=layout_config, timestamp_unit=self.data.timestamp_unit, vlines=vlines, vline_config=vline_config, )
[docs] def diff( self, server_data: SingleRunData, timestamp_tolerance_ms: int = 2, diff_rtol: float = 1e-3, diff_atol: float = 1e-3, ) -> go.Figure: """ Compute the differences between the current data (assumed to be from RPI) and server data. Parameters ---------- server_data : SingleRunData The server data to compare against. timestamp_tolerance_ms : int, optional Timestamp tolerance used to match points between streams. diff_rtol : float, optional Relative tolerance for value comparison (numpy.isclose). diff_atol : float, optional Absolute tolerance for value comparison (numpy.isclose). Examples -------- >>> fig = aly.diff(server_data) >>> fig.show() """ return diff( self.data, server_data, timestamp_tolerance_ms=timestamp_tolerance_ms, diff_rtol=diff_rtol, diff_atol=diff_atol, )
[docs] def analyze_frequency( self, var: Union[str, int], expected_frequency_hz: float | None = None, gap_threshold_multiplier: float = 2.0, font_config: FontConfig = DEFAULT_FONT_CONFIG, layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG, plot_config: ScatterHistogramPlotConfig = DEFAULT_SCATTER_HISTOGRAM_PLOT_CONFIG, ) -> go.Figure: """ Analyse the sampling frequency of a variable and return a diagnostic figure. Prints a summary to stdout and returns a Plotly figure with two subplots: instantaneous frequency over time and an inter-sample interval histogram. Parameters ---------- var : Union[str, int] Variable name or ID to look up in the parsed data. expected_frequency_hz : float | None, optional Nominal expected sampling frequency in Hz for error and gap diagnostics. Default is None. gap_threshold_multiplier : float, optional Intervals exceeding this multiple of the expected (or median) interval are flagged as gaps. Default is 2.0. font_config : FontConfig, optional Font sizes for plot elements. Default is DEFAULT_FONT_CONFIG. layout_config : LayoutConfig, optional Plot dimensions and margins. Default is DEFAULT_LAYOUT_CONFIG. Returns ------- go.Figure Plotly figure with frequency diagnostics. Examples -------- >>> fig = aly.analyze_frequency("ams.stack.thermistors.temperature[38]", expected_frequency_hz=100) >>> fig.show() """ di = self.data[var] return _analyze_frequency( di, expected_frequency_hz=expected_frequency_hz, source_time_unit=self.data.timestamp_unit, gap_threshold_multiplier=gap_threshold_multiplier, font_config=font_config, layout_config=layout_config, plot_config=plot_config, )
def _normalize_input( self, input_data: Union[str, int, DataInstance, List[Union[str, int, DataInstance]]], ) -> List[DataInstance]: """ Normalize various input types to a list of DataInstances. """ if isinstance(input_data, DataInstance): return [input_data] elif isinstance(input_data, list): return [ item if isinstance(item, DataInstance) else self.data[item] for item in input_data ] else: return [self.data[input_data]]
[docs] def get_accel_times(self) -> list[AccelSegmentResult]: """ Get acceleration times from the analyzer. Returns ------- list[AccelSegmentResult] List of acceleration segment results. Examples -------- >>> results = aly.get_accel_times() >>> for r in results: ... print(r) """ speed_obj = ( self.data["pcm.wheelSpeeds.frontRight"] + self.data["pcm.wheelSpeeds.frontLeft"] ) / 2.0 signal_obj = detect_accel_event( torque_obj=self.data["pcm.moc.motor.requestedTorque"], speed_obj=speed_obj ) time_arr, _, distance = smoothed_filtered_integration( data=speed_obj, source_time_unit=self.data.timestamp_unit ) distance_obj = DataInstance( timestamp_np=time_arr, value_np=mph_seconds_to_meters(distance), label="Distance", ) return compute_accel_results( signal_obj=signal_obj, distance_obj=distance_obj, source_time_unit=self.data.timestamp_unit, )