PERDA: Advanced Usage#

Showcasing some more advanced features of PERDA. This notebook is kept up to date with the latest features, so it serves as a good reference for ongoing development.

Frequency Analysis#

[ ]:
# Retrieve DataInstances
from perda.analyzer import Analyzer

csvPath = "csv_files/16thApr10-52-00.csv"
aly = Analyzer(csvPath)
[ ]:
aly.analyze_frequency("bms.stack.thermistors.temperature[38]")

Cross-Log Browsing#

[ ]:
LOG_FOLDER = "csv_files/LogBrowseTest"

LAT_VAR = "pcm.vnav.posLla.latitude"
LON_VAR = "pcm.vnav.posLla.longitude"

EXTRA_VARS = [
    "pcm.wheelSpeeds.frontLeft",
    "ludwig.steeringWheel.angle",
    "pcm.pedals.brakePressure.front",
    "pcm.moc.motor.requestedTorque",
]
[ ]:
from pathlib import Path

import numpy as np

from perda.analyzer import Analyzer
from perda.core_data_structures.deduplication import deduplicate
from perda.core_data_structures import left_join_data_instances

csv_paths = sorted(Path(LOG_FOLDER).glob("*.csv"))
print(f"Found {len(csv_paths)} CSV logs in {LOG_FOLDER}\n")

gps_runs = []
analyzers = []

for path in csv_paths:
    aly = Analyzer(str(path), verbose=0)

    if LAT_VAR not in aly.data or LON_VAR not in aly.data:
        print(f"GPS variables not found in {path.name}, skipping")
        continue

    # --- This portion can be replaced with any other lightweight filtering ---
    # --- currently, this filters out logs without valid GPS fixes          ---
    lat_di = deduplicate(aly.data[LAT_VAR])
    lon_di = deduplicate(aly.data[LON_VAR])

    lat_di, lon_di = left_join_data_instances(lat_di, lon_di)

    lat_arr = lat_di.value_np
    lon_arr = lon_di.value_np
    valid = (
        (lat_arr != 0.0)
        & (lon_arr != 0.0)
        & (np.abs(lat_arr) <= 90.0)
        & (np.abs(lon_arr) <= 180.0)
    )
    if not valid.any():
        print(f"No valid GPS fix in {path.name}. skipping")
        continue

    lat_arr = lat_arr[valid]
    lon_arr = lon_arr[valid]
    gps_runs.append((lat_arr, lon_arr, path.name))

    analyzers.append(aly)
[ ]:
from perda.plotting.subplots import plot_multi_log_subplots
from perda.utils.gps_analysis import plot_gps_comparison


gps_fig = plot_gps_comparison(
    runs=gps_runs,
    title="GPS Log Browser",
)
gps_fig.show()
[ ]:
fig = plot_multi_log_subplots(
    logs=[aly.data for aly in analyzers],
    var_names=EXTRA_VARS,
    title="Variable Comparison",
)
fig.show()

CSV Compare (diff)#

[ ]:
from perda.analyzer import Analyzer

# Note: Currently have no
csvBase = "csv_files/2ndMay13-52-38.csv"
csvIncom = "csv_files/2ndMay13-53-29.csv"

# Parse two csv files for compare
alyBase = Analyzer(csvBase)
alyIncom = Analyzer(csvIncom)
[ ]:
# Compare base and incoming data
graph = alyBase.diff(alyIncom.data)
[ ]:
graph.show()

Live Analysis#

[ ]:
from perda.live import LiveAnalyzer, ValueType

live = LiveAnalyzer.dataserver()  # Can also choose LiveAnalyzer.local() for local ipc
live.is_connected()
[ ]:
# Get most current value of a variable
print(
    live.get("pcm.moc.motor.temp", ValueType.NUMERIC)
)  # Must specify a ValueType, can be FLOAT, BOOL or NUMERIC

print(live.get("bms.error", ValueType.BOOL))

# Tune variables live
live.set("ludwig.tunable.tractionControl.kd", 1.3, ValueType.FLOAT)
[ ]:
curr = live.fetch(
    "bms.pack.current", time_secs=100
)  # Fetch the last 100 seconds of current data
volt = live.fetch("bms.pack.voltage", time_secs=100)
power = curr * volt
live.plot(power)
[ ]:
live.plot(["bms.pack.voltage", "bms.stack.voltage"], time_secs=120)
[ ]:
live.plot("bms.pack.voltage")