PERDA: Advanced Usage#
Showcasing some more advanced features of PERDA. This notebook is kept up to date with the latest features, so it serves as a good reference for ongoing development.
Frequency Analysis#
[ ]:
# Retrieve DataInstances
from perda.analyzer import Analyzer
csvPath = "csv_files/16thApr10-52-00.csv"
aly = Analyzer(csvPath)
[ ]:
aly.analyze_frequency("bms.stack.thermistors.temperature[38]")
Cross-Log Browsing#
[ ]:
LOG_FOLDER = "csv_files/LogBrowseTest"
LAT_VAR = "pcm.vnav.posLla.latitude"
LON_VAR = "pcm.vnav.posLla.longitude"
EXTRA_VARS = [
"pcm.wheelSpeeds.frontLeft",
"ludwig.steeringWheel.angle",
"pcm.pedals.brakePressure.front",
"pcm.moc.motor.requestedTorque",
]
[ ]:
from pathlib import Path
import numpy as np
from perda.analyzer import Analyzer
from perda.core_data_structures.deduplication import deduplicate
from perda.core_data_structures import left_join_data_instances
csv_paths = sorted(Path(LOG_FOLDER).glob("*.csv"))
print(f"Found {len(csv_paths)} CSV logs in {LOG_FOLDER}\n")
gps_runs = []
analyzers = []
for path in csv_paths:
aly = Analyzer(str(path), verbose=0)
if LAT_VAR not in aly.data or LON_VAR not in aly.data:
print(f"GPS variables not found in {path.name}, skipping")
continue
# --- This portion can be replaced with any other lightweight filtering ---
# --- currently, this filters out logs without valid GPS fixes ---
lat_di = deduplicate(aly.data[LAT_VAR])
lon_di = deduplicate(aly.data[LON_VAR])
lat_di, lon_di = left_join_data_instances(lat_di, lon_di)
lat_arr = lat_di.value_np
lon_arr = lon_di.value_np
valid = (
(lat_arr != 0.0)
& (lon_arr != 0.0)
& (np.abs(lat_arr) <= 90.0)
& (np.abs(lon_arr) <= 180.0)
)
if not valid.any():
print(f"No valid GPS fix in {path.name}. skipping")
continue
lat_arr = lat_arr[valid]
lon_arr = lon_arr[valid]
gps_runs.append((lat_arr, lon_arr, path.name))
analyzers.append(aly)
[ ]:
from perda.plotting.subplots import plot_multi_log_subplots
from perda.utils.gps_analysis import plot_gps_comparison
gps_fig = plot_gps_comparison(
runs=gps_runs,
title="GPS Log Browser",
)
gps_fig.show()
[ ]:
fig = plot_multi_log_subplots(
logs=[aly.data for aly in analyzers],
var_names=EXTRA_VARS,
title="Variable Comparison",
)
fig.show()
CSV Compare (diff)#
[ ]:
from perda.analyzer import Analyzer
# Note: Currently have no
csvBase = "csv_files/2ndMay13-52-38.csv"
csvIncom = "csv_files/2ndMay13-53-29.csv"
# Parse two csv files for compare
alyBase = Analyzer(csvBase)
alyIncom = Analyzer(csvIncom)
[ ]:
# Compare base and incoming data
graph = alyBase.diff(alyIncom.data)
[ ]:
graph.show()
Live Analysis#
[ ]:
from perda.live import LiveAnalyzer, ValueType
live = LiveAnalyzer.dataserver() # Can also choose LiveAnalyzer.local() for local ipc
live.is_connected()
[ ]:
# Get most current value of a variable
print(
live.get("pcm.moc.motor.temp", ValueType.NUMERIC)
) # Must specify a ValueType, can be FLOAT, BOOL or NUMERIC
print(live.get("bms.error", ValueType.BOOL))
# Tune variables live
live.set("ludwig.tunable.tractionControl.kd", 1.3, ValueType.FLOAT)
[ ]:
curr = live.fetch(
"bms.pack.current", time_secs=100
) # Fetch the last 100 seconds of current data
volt = live.fetch("bms.pack.voltage", time_secs=100)
power = curr * volt
live.plot(power)
[ ]:
live.plot(["bms.pack.voltage", "bms.stack.voltage"], time_secs=120)
[ ]:
live.plot("bms.pack.voltage")