import io
import sys
from typing import List, Union
from plotly import graph_objects as go
from ..core_data_structures.data_instance import DataInstance
from ..core_data_structures.single_run_data import SingleRunData
from ..plotting.data_instance_plotter import *
from ..plotting.plotting_constants import *
from ..plotting.subplots import data_instance_subplots
from ..units import Timescale, _from_seconds, _to_seconds
from ..utils.accel_calculator import *
from ..utils.data_summary import single_run_summary
from ..utils.diff import diff
from ..utils.frequency_analysis import analyze_frequency as _analyze_frequency
from ..utils.integrate import smoothed_filtered_integration
from ..utils.preprocessing import PreprocessingStep, apply_preprocessing
from ..utils.search import SearchResult, search
from .csv import *
[docs]
class Analyzer:
"""Primary class for loading and analyzing car log data.
After loading, all variables live in ``analyzer.data`` (a
:class:`~perda.core_data_structures.SingleRunData`), which supports
dictionary-like access returning :class:`~perda.core_data_structures.DataInstance`
objects.
Access by name or variable ID
------------------------------
>>> di = aly.data["pcm.wheelSpeeds.frontRight"] # by cpp_name
>>> di = aly.data[42] # by variable ID
Check membership
----------------
>>> "pcm.wheelSpeeds.frontRight" in aly.data # True / False
Read raw arrays
---------------
>>> di.timestamp_np # NDArray[int64] — timestamps in the log's native unit
>>> di.value_np # NDArray[float64] — sample values
Arithmetic between variables
----------------------------
>>> avg_speed = (aly.data["pcm.wheelSpeeds.frontRight"] + aly.data["pcm.wheelSpeeds.frontLeft"]) / 2.0
Trim to a time window (timestamps in the log's native unit)
------------------------------------------------------------
>>> di_trimmed = aly.data["pcm.wheelSpeeds.frontRight"].trim(ts_start=10_000, ts_end=30_000)
Find variables when you don't know the exact name
--------------------------------------------------
>>> results = aly.search("front wheel speed") # prints + returns list[SearchResult]
>>> di = aly.data[results[0].cpp_name]
Enumerate all variables with summary stats
------------------------------------------
>>> summaries = aly.variable_summary() # list[VariableSummary], sorted by name
>>> [v.cpp_name for v in summaries]
"""
def __init__(
self,
filepath: str,
ts_offset: int = 0,
parsing_errors_limit: int = 100,
verbose: int = 1,
preprocessing: list[PreprocessingStep] | None = None,
) -> None:
"""
Initialize a new analyzer instance.
Parameters
----------
filepath : str
Path to the CSV file containing CAN bus variables.
ts_offset : int, optional
Timestamp offset to apply to all data points. Default is 0.
parsing_errors_limit : int, optional
Maximum number of parsing errors before stopping. Default is 100.
preprocessing : list[PreprocessingStep] | None, optional
Ordered list of post-parse preprocessing steps to apply. Each step
is a ``SingleRunData -> SingleRunData`` callable. Steps are skipped
with a warning if required variables are absent. Default is None.
Examples
--------
>>> from perda.utils.preprocessing import correct_motor_data, correct_steering_angle
>>> aly = Analyzer("path/to/log.csv", preprocessing=[correct_motor_data])
>>> aly = Analyzer("path/to/log.csv", preprocessing=[
... correct_motor_data,
... correct_steering_angle(calibration=((1.5, -90.0), (3.0, 0.0), (4.5, 90.0))),
... ])
>>> print(aly) # lists all available variables
"""
self.data: SingleRunData = parse_csv(
filepath,
ts_offset,
parsing_errors_limit=parsing_errors_limit,
verbose=verbose,
)
if preprocessing:
self.data = apply_preprocessing(self.data, preprocessing)
def __str__(self) -> str:
"""Return a summary of all variables in the loaded run data."""
old_stdout = sys.stdout
buffer = io.StringIO()
sys.stdout = buffer
single_run_summary(self.data)
output = buffer.getvalue()
buffer.close()
sys.stdout = old_stdout
return output
[docs]
def search(self, query: str, top_n: int = 10) -> list[SearchResult]:
"""
Natural language search for available variables in the parsed data.
Prints matching results to stdout and returns them for programmatic use.
Parameters
----------
query : str
Free-text search query (e.g. "front wheel speed").
top_n : int
Maximum number of results to return and display (default 10).
Returns
-------
list[SearchResult]
Top matches in descending relevance order (at most ``top_n`` entries).
Each entry has ``rank``, ``score``, ``var_id``, ``cpp_name``,
and ``descript``.
Examples
--------
>>> results = aly.search("front wheel speed")
>>> results = aly.search("front wheel speed", top_n=5)
>>> names = [r.cpp_name for r in results]
"""
return search(self.data, query, top_n)
[docs]
def plot(
self,
var_1: Union[str, int, DataInstance, List[Union[str, int, DataInstance]]],
var_2: (
Union[str, int, DataInstance, List[Union[str, int, DataInstance]]] | None
) = None,
ts_start: float | None = None,
ts_end: float | None = None,
title: str | None = None,
y_label_1: str | None = None,
y_label_2: str | None = None,
show_legend: bool = True,
font_config: FontConfig = DEFAULT_FONT_CONFIG,
layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG,
vline_config: VLineConfig = DEFAULT_VLINE_CONFIG,
) -> go.Figure:
"""
Display variables from the parsed data on an interactive Plotly plot.
Concat boundaries (if any) are automatically shown as vertical lines.
Parameters
----------
var_1 : Union[str, int, DataInstance, List[Union[str, int, DataInstance]]]
Variable(s) to plot on the left y-axis. Can be variable name(s), variable ID(s), or DataInstance(s)
var_2 : Union[str, int, DataInstance, List[Union[str, int, DataInstance]]] | None, optional
Optional variable(s) to plot on the right y-axis. Can be variable name(s), variable ID(s), or DataInstance(s).
ts_start : float | None, optional
Start of the time window in seconds. Data points before this time are excluded. Default is None (no lower bound).
ts_end : float | None, optional
End of the time window in seconds. Data points after this time are excluded. Default is None (no upper bound).
title : str | None, optional
y_label_1 : str | None, optional
Label for left y-axis (or only y-axis if no right input).
y_label_2 : str | None, optional
Label for right y-axis.
show_legend : bool, optional
Whether to show plot legends. Default is True
font_config : FontConfig, optional
Font configuration for plot elements.
layout_config : LayoutConfig, optional
Layout configuration for plot dimensions.
vline_config : VLineConfig, optional
Visual configuration for concat boundary lines.
Examples
--------
>>> fig = aly.plot("pcm.wheelSpeeds.frontRight")
>>> fig = aly.plot(["pcm.wheelSpeeds.frontRight", "pcm.wheelSpeeds.frontLeft"], title="Front Wheel Speeds")
>>> fig = aly.plot("pcm.moc.motor.requestedTorque", "pcm.wheelSpeeds.frontRight", ts_start=10.0, ts_end=30.0)
>>> # Plot a derived DataInstance (e.g. average of two signals)
>>> avg_speed = (aly.data["pcm.wheelSpeeds.frontRight"] + aly.data["pcm.wheelSpeeds.frontLeft"]) / 2.0
>>> fig = aly.plot(avg_speed)
>>> fig.show()
"""
# Normalize left input to List[DataInstance]
var_1_norm = self._normalize_input(var_1)
unit = self.data.timestamp_unit
# Convert concat boundaries to seconds for the plotter
vlines: List[float] | None = None
if self.data.concat_boundaries:
vlines = [_to_seconds(b, unit) for b in self.data.concat_boundaries]
# Apply time range filter if specified (convert seconds -> raw units for trim)
if ts_start is not None or ts_end is not None:
start_raw = _from_seconds(ts_start, unit) if ts_start is not None else None
end_raw = _from_seconds(ts_end, unit) if ts_end is not None else None
var_1_norm = [di.trim(start_raw, end_raw) for di in var_1_norm]
if var_2 is not None:
# Normalize right input to List[DataInstance]
var_2_norm = self._normalize_input(var_2)
if ts_start is not None or ts_end is not None:
var_2_norm = [di.trim(start_raw, end_raw) for di in var_2_norm]
return plot_dual_axis(
left_data_instances=var_1_norm,
right_data_instances=var_2_norm,
title=title,
left_y_axis_title=y_label_1,
right_y_axis_title=y_label_2,
show_legend=show_legend,
font_config=font_config,
layout_config=layout_config,
timestamp_unit=self.data.timestamp_unit,
vlines=vlines,
vline_config=vline_config,
)
else:
return plot_single_axis(
data_instances=var_1_norm,
title=title,
y_axis_title=y_label_1,
show_legend=show_legend,
font_config=font_config,
layout_config=layout_config,
timestamp_unit=self.data.timestamp_unit,
vlines=vlines,
vline_config=vline_config,
)
[docs]
def subplots(
self,
rows: List[
Union[
str,
int,
DataInstance,
List[Union[str, int, DataInstance]],
]
],
title: str | None = None,
row_y_labels: List[str | None] | None = None,
ts_start: float | None = None,
ts_end: float | None = None,
show_legend: bool = True,
layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG,
font_config: FontConfig = DEFAULT_FONT_CONFIG,
) -> go.Figure:
"""
Plot multiple variables as stacked subplots on a shared time axis.
Each entry in ``rows`` becomes one subplot row. Pass a list of
variables for a row to overlay multiple signals on the same panel,
or a single variable for a dedicated panel.
Parameters
----------
rows : List[str | int | DataInstance | List[str | int | DataInstance]]
One entry per subplot row (top to bottom). Each entry may be a
single variable (name, ID, or DataInstance) or a list of variables
to overlay on that row.
title : str | None, optional
Figure-level title. Default is None.
row_y_labels : List[str | None] | None, optional
Y-axis label for each row. ``None`` entries fall back to the
DataInstance labels. Must match the length of ``rows`` when
provided. Default is None.
ts_start : float | None, optional
Start of the time window in seconds. Data before this time is
excluded from all rows. Default is None (no lower bound).
ts_end : float | None, optional
End of the time window in seconds. Data after this time is
excluded from all rows. Default is None (no upper bound).
show_legend : bool, optional
Whether to show the figure legend. Default is True.
layout_config : LayoutConfig, optional
Figure dimensions and spacing.
font_config : FontConfig, optional
Font sizes for plot elements.
Returns
-------
go.Figure
Examples
--------
>>> fig = aly.subplots(["pcm.wheelSpeeds.frontRight", "pcm.moc.motor.requestedTorque"])
>>> fig = aly.subplots(
... rows=[
... ["pcm.wheelSpeeds.frontRight", "pcm.wheelSpeeds.frontLeft"],
... "pcm.moc.motor.requestedTorque",
... ],
... title="Run Overview",
... row_y_labels=["Wheel Speed (mph)", "Torque (Nm)"],
... ts_start=5.0,
... ts_end=30.0,
... )
>>> fig.show()
"""
unit = self.data.timestamp_unit
start_raw = _from_seconds(ts_start, unit) if ts_start is not None else None
end_raw = _from_seconds(ts_end, unit) if ts_end is not None else None
normalized_rows: List[List[DataInstance]] = []
for row_entry in rows:
row_dis = self._normalize_input(row_entry)
if start_raw is not None or end_raw is not None:
row_dis = [di.trim(start_raw, end_raw) for di in row_dis]
normalized_rows.append(row_dis)
return data_instance_subplots(
rows=normalized_rows,
title=title,
row_y_labels=row_y_labels,
show_legend=show_legend,
layout_config=layout_config,
font_config=font_config,
timestamp_unit=unit,
)
[docs]
def diff(
self,
server_data: SingleRunData,
timestamp_tolerance_s: float = 0.002,
diff_rtol: float = 1e-3,
diff_atol: float = 1e-3,
diff_plot_config: DiffPlotConfig = DEFAULT_DIFF_PLOT_CONFIG,
layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG,
font_config: FontConfig = DEFAULT_FONT_CONFIG,
) -> go.Figure:
"""
Compute the differences between the current data (assumed to be from RPI) and server data.
Parameters
----------
server_data : SingleRunData
The server data to compare against.
timestamp_tolerance_s : float, optional
Timestamp tolerance in seconds used to match points between streams.
Defaults to 0.002 (2 ms).
diff_rtol : float, optional
Relative tolerance for value comparison (numpy.isclose).
diff_atol : float, optional
Absolute tolerance for value comparison (numpy.isclose).
diff_plot_config : DiffPlotConfig, optional
layout_config : LayoutConfig, optional
font_config : FontConfig, optional
Examples
--------
>>> fig = aly.diff(server_data)
>>> fig.show()
"""
return diff(
self.data,
server_data,
timestamp_tolerance_s=timestamp_tolerance_s,
diff_rtol=diff_rtol,
diff_atol=diff_atol,
diff_plot_config=diff_plot_config,
layout_config=layout_config,
font_config=font_config,
)
[docs]
def analyze_frequency(
self,
var: Union[str, int],
expected_frequency_hz: float | None = None,
gap_threshold_multiplier: float = 2.0,
font_config: FontConfig = DEFAULT_FONT_CONFIG,
layout_config: LayoutConfig = DEFAULT_LAYOUT_CONFIG,
plot_config: ScatterHistogramPlotConfig = DEFAULT_SCATTER_HISTOGRAM_PLOT_CONFIG,
) -> go.Figure:
"""
Analyse the sampling frequency of a variable and return a diagnostic figure.
Prints a summary to stdout and returns a Plotly figure with two subplots:
instantaneous frequency over time and an inter-sample interval histogram.
Parameters
----------
var : Union[str, int]
Variable name or ID to look up in the parsed data.
expected_frequency_hz : float | None, optional
Nominal expected sampling frequency in Hz for error and gap diagnostics.
Default is None.
gap_threshold_multiplier : float, optional
Intervals exceeding this multiple of the expected (or median) interval
are flagged as gaps. Default is 2.0.
font_config : FontConfig, optional
Font sizes for plot elements.
layout_config : LayoutConfig, optional
Plot dimensions and margins.
Returns
-------
go.Figure
Plotly figure with frequency diagnostics.
Examples
--------
>>> fig = aly.analyze_frequency("ams.stack.thermistors.temperature[38]", expected_frequency_hz=100)
>>> fig.show()
"""
di = self.data[var]
return _analyze_frequency(
di,
expected_frequency_hz=expected_frequency_hz,
source_time_unit=self.data.timestamp_unit,
gap_threshold_multiplier=gap_threshold_multiplier,
font_config=font_config,
layout_config=layout_config,
plot_config=plot_config,
)
def _normalize_input(
self,
input_data: Union[str, int, DataInstance, List[Union[str, int, DataInstance]]],
) -> List[DataInstance]:
"""
Normalize various input types to a list of DataInstances.
"""
if isinstance(input_data, DataInstance):
return [input_data]
elif isinstance(input_data, list):
return [
item if isinstance(item, DataInstance) else self.data[item]
for item in input_data
]
else:
return [self.data[input_data]]
[docs]
def get_accel_times(self) -> list[AccelSegmentResult]:
"""
Intelligently detect and extract segments of the log where an
acceleration run occurs, then compute acceleration times.
Returns
-------
list[AccelSegmentResult]
List of acceleration segment results.
Examples
--------
>>> results = aly.get_accel_times()
>>> for r in results:
... print(r)
"""
speed_obj = (
self.data["pcm.wheelSpeeds.frontRight"]
+ self.data["pcm.wheelSpeeds.frontLeft"]
) / 2.0
signal_obj = detect_accel_event(
torque_obj=self.data["pcm.moc.motor.requestedTorque"], speed_obj=speed_obj
)
time_arr, _, distance = smoothed_filtered_integration(
data=speed_obj, source_time_unit=self.data.timestamp_unit
)
distance_obj = DataInstance(
timestamp_np=time_arr,
value_np=distance,
label="Distance",
)
return compute_accel_results(
signal_obj=signal_obj,
distance_obj=distance_obj,
source_time_unit=self.data.timestamp_unit,
)