Source code for perda.utils.preprocessing

from typing import Callable, Union

import numpy as np
from numpy import float64
from numpy.typing import NDArray

from ..core_data_structures.data_instance import DataInstance, left_join_data_instances
from ..core_data_structures.single_run_data import SingleRunData
from ..units import in_to_m, mph_to_m_per_s

DEFAULT_GEAR_RATIO: float = 5.6
DEFAULT_TIRE_RADIUS_IN: float = 7.85

DEFAULT_VECTORNAV_BODY_VEL_X = "pcm.vnav.velocityBody.x"
DEFAULT_VECTORNAV_BODY_VEL_Y = "pcm.vnav.velocityBody.y"
DEFAULT_VECTORNAV_BODY_VEL_Z = "pcm.vnav.velocityBody.z"
DEFAULT_VECTORNAV_YAW = "pcm.vnav.yawPitchRoll.yaw"
DEFAULT_VECTORNAV_NED_VEL_N = "velN"
DEFAULT_VECTORNAV_NED_VEL_E = "velE"
DEFAULT_VECTORNAV_NED_VEL_D = "velD"

DEFAULT_WHEELSPEED_FR = "pcm.wheelSpeeds.frontRight"
DEFAULT_WHEELSPEED_FL = "pcm.wheelSpeeds.frontLeft"
DEFAULT_WHEELSPEED_RR = "pcm.wheelSpeeds.backRight"
DEFAULT_WHEELSPEED_RL = "pcm.wheelSpeeds.backLeft"

DEFAULT_MOTOR_RPM = "pcm.moc.motor.angularSpeed"
DEFAULT_MOTOR_WHEELSPEED = "pcm.moc.motor.wheelSpeed"

DEFAULT_STEERING_RAW = "ludwig.steeringWheel.raw"
DEFAULT_STEERING_ANGLE = "ludwig.steeringWheel.angle"

# Default 3-point voltage->angle calibration for the steering pot.
SteeringCalibration = tuple[tuple[float, float], ...]
DEFAULT_STEERING_CALIBRATION: SteeringCalibration = (
    (1.86, -97.0),  # max left
    (2.93, 0.0),  # zero
    (3.96, 97.0),  # max right
)

# Type alias for a preprocessing step function that takes and returns a SingleRunData instance.
PreprocessingStep = Callable[[SingleRunData], SingleRunData]


[docs] class PatchNedVelocityLambda: """Backing class for the ``patch_ned_velocity`` preprocessing step.""" def __init__( self, body_vel_x: str = DEFAULT_VECTORNAV_BODY_VEL_X, body_vel_y: str = DEFAULT_VECTORNAV_BODY_VEL_Y, body_vel_z: str = DEFAULT_VECTORNAV_BODY_VEL_Z, yaw: str = DEFAULT_VECTORNAV_YAW, ned_vel_n: str = DEFAULT_VECTORNAV_NED_VEL_N, ned_vel_e: str = DEFAULT_VECTORNAV_NED_VEL_E, ned_vel_d: str = DEFAULT_VECTORNAV_NED_VEL_D, ) -> None: self.body_vel_x = body_vel_x self.body_vel_y = body_vel_y self.body_vel_z = body_vel_z self.yaw = yaw self.ned_vel_n = ned_vel_n self.ned_vel_e = ned_vel_e self.ned_vel_d = ned_vel_d def __call__( self, data: SingleRunData | None = None, *, body_vel_x: str | None = None, body_vel_y: str | None = None, body_vel_z: str | None = None, yaw: str | None = None, ned_vel_n: str | None = None, ned_vel_e: str | None = None, ned_vel_d: str | None = None, ) -> Union[SingleRunData, "PatchNedVelocityLambda"]: """Apply the step to ``data``, or return a reconfigured copy when called with keyword args. Parameters ---------- data : SingleRunData, optional Run data to transform in-place. Omit when reconfiguring. body_vel_x : str, optional Channel incorrectly holding NED north; overwritten with body-frame forward velocity. body_vel_y : str, optional Channel incorrectly holding NED east; overwritten with body-frame right velocity. body_vel_z : str, optional Channel incorrectly holding NED down; left unchanged (NED down == FRD down). yaw : str, optional Yaw channel in degrees used to rotate NED -> FRD. ned_vel_n : str, optional Output channel name for the preserved raw NED north velocity. ned_vel_e : str, optional Output channel name for the preserved raw NED east velocity. ned_vel_d : str, optional Output channel name for the preserved raw NED down velocity. Returns ------- SingleRunData or PatchNedVelocityLambda Transformed data when ``data`` is passed; reconfigured step otherwise. """ if any( v is not None for v in ( body_vel_x, body_vel_y, body_vel_z, yaw, ned_vel_n, ned_vel_e, ned_vel_d, ) ): return PatchNedVelocityLambda( body_vel_x=body_vel_x or self.body_vel_x, body_vel_y=body_vel_y or self.body_vel_y, body_vel_z=body_vel_z or self.body_vel_z, yaw=yaw or self.yaw, ned_vel_n=ned_vel_n or self.ned_vel_n, ned_vel_e=ned_vel_e or self.ned_vel_e, ned_vel_d=ned_vel_d or self.ned_vel_d, ) assert data is not None required = [self.body_vel_x, self.body_vel_y, self.body_vel_z, self.yaw] missing = [v for v in required if v not in data] if missing: print(f"WARNING: patch_ned_velocity skipped — missing variables: {missing}") return data vel_n1, vel_e1, vel_d1, yaw_deg = left_join_data_instances( data[self.body_vel_x], [data[self.body_vel_y], data[self.body_vel_z], data[self.yaw]], ) yaw_rad = np.radians(yaw_deg.value_np) data[self.ned_vel_n] = DataInstance( timestamp_np=vel_n1.timestamp_np, value_np=vel_n1.value_np.copy(), label="NED North velocity (raw)", cpp_name=self.ned_vel_n, ) data[self.ned_vel_e] = DataInstance( timestamp_np=vel_e1.timestamp_np, value_np=vel_e1.value_np.copy(), label="NED East velocity (raw)", cpp_name=self.ned_vel_e, ) data[self.ned_vel_d] = DataInstance( timestamp_np=vel_d1.timestamp_np, value_np=vel_d1.value_np.copy(), label="NED Down velocity (raw)", cpp_name=self.ned_vel_d, ) cos_y = np.cos(yaw_rad) sin_y = np.sin(yaw_rad) data[self.body_vel_x] = DataInstance( timestamp_np=vel_n1.timestamp_np, value_np=vel_n1.value_np * cos_y + vel_e1.value_np * sin_y, label=data[self.body_vel_x].label, cpp_name=self.body_vel_x, ) # forward data[self.body_vel_y] = DataInstance( timestamp_np=vel_e1.timestamp_np, value_np=-vel_n1.value_np * sin_y + vel_e1.value_np * cos_y, label=data[self.body_vel_y].label, cpp_name=self.body_vel_y, ) # right # vel_z (down) is identical in NED and FRD — no change needed print( f"patch_ned_velocity: preserved raw NED in {self.ned_vel_n}/{self.ned_vel_e}/{self.ned_vel_d}, rotated {len(vel_n1)} points to body frame" ) return data
[docs] class ConvertWheelspeedsToMPerSLambda: """Backing class for the ``convert_wheelspeeds_to_m_per_s`` preprocessing step.""" def __init__( self, wheelspeed_fr: str = DEFAULT_WHEELSPEED_FR, wheelspeed_fl: str = DEFAULT_WHEELSPEED_FL, wheelspeed_rr: str = DEFAULT_WHEELSPEED_RR, wheelspeed_rl: str = DEFAULT_WHEELSPEED_RL, ) -> None: self.wheelspeed_fr = wheelspeed_fr self.wheelspeed_fl = wheelspeed_fl self.wheelspeed_rr = wheelspeed_rr self.wheelspeed_rl = wheelspeed_rl def __call__( self, data: SingleRunData | None = None, *, wheelspeed_fr: str | None = None, wheelspeed_fl: str | None = None, wheelspeed_rr: str | None = None, wheelspeed_rl: str | None = None, ) -> Union[SingleRunData, "ConvertWheelspeedsToMPerSLambda"]: """Apply the step to ``data``, or return a reconfigured copy when called with keyword args. Parameters ---------- data : SingleRunData, optional Run data to transform in-place. Omit when reconfiguring. wheelspeed_fr : str, optional Variable name for the front-right wheel speed channel (expected in mph). wheelspeed_fl : str, optional Variable name for the front-left wheel speed channel (expected in mph). wheelspeed_rr : str, optional Variable name for the rear-right wheel speed channel (expected in mph). wheelspeed_rl : str, optional Variable name for the rear-left wheel speed channel (expected in mph). Returns ------- SingleRunData or ConvertWheelspeedsToMPerSLambda Transformed data when ``data`` is passed; reconfigured step otherwise. """ if any( v is not None for v in (wheelspeed_fr, wheelspeed_fl, wheelspeed_rr, wheelspeed_rl) ): return ConvertWheelspeedsToMPerSLambda( wheelspeed_fr=wheelspeed_fr or self.wheelspeed_fr, wheelspeed_fl=wheelspeed_fl or self.wheelspeed_fl, wheelspeed_rr=wheelspeed_rr or self.wheelspeed_rr, wheelspeed_rl=wheelspeed_rl or self.wheelspeed_rl, ) assert data is not None cols = [ self.wheelspeed_fr, self.wheelspeed_fl, self.wheelspeed_rr, self.wheelspeed_rl, ] missing = [v for v in cols if v not in data] if missing: print( f"WARNING: convert_wheelspeeds_to_m_per_s skipped — missing variables: {missing}" ) return data for col in cols: di = data[col] backup_name = col + "_mph" if backup_name not in data: data[backup_name] = DataInstance( timestamp_np=di.timestamp_np, value_np=di.value_np.copy(), label=(di.label or col) + " (mph backup)", cpp_name=backup_name, ) data[col] = DataInstance( timestamp_np=di.timestamp_np, value_np=mph_to_m_per_s(di.value_np), label=di.label, cpp_name=col, ) print( f"convert_wheelspeeds_to_m_per_s: converted {len(cols)} channels mph -> m/s, backups in *_mph" ) return data
[docs] class CorrectMotorDataLambda: """Backing class for the ``correct_motor_data`` preprocessing step.""" def __init__( self, gear_ratio: float = DEFAULT_GEAR_RATIO, tire_radius_in: float = DEFAULT_TIRE_RADIUS_IN, motor_rpm: str = DEFAULT_MOTOR_RPM, motor_wheelspeed: str = DEFAULT_MOTOR_WHEELSPEED, ) -> None: self.gear_ratio = gear_ratio self.tire_radius_in = tire_radius_in self.motor_rpm = motor_rpm self.motor_wheelspeed = motor_wheelspeed def __call__( self, data: SingleRunData | None = None, *, gear_ratio: float | None = None, tire_radius_in: float | None = None, motor_rpm: str | None = None, motor_wheelspeed: str | None = None, ) -> Union[SingleRunData, "CorrectMotorDataLambda"]: """Apply the step to ``data``, or return a reconfigured copy when called with keyword args. Parameters ---------- data : SingleRunData, optional Run data to transform in-place. Omit when reconfiguring. gear_ratio : float, optional Final-drive gear ratio. tire_radius_in : float, optional Loaded tire radius in inches. motor_rpm : str, optional Variable name for the motor angular speed channel. motor_wheelspeed : str, optional Variable name to write the derived driven-wheel linear speed (m/s). Returns ------- SingleRunData or CorrectMotorDataLambda Transformed data when ``data`` is passed; reconfigured step otherwise. """ if any( v is not None for v in (gear_ratio, tire_radius_in, motor_rpm, motor_wheelspeed) ): return CorrectMotorDataLambda( gear_ratio=gear_ratio if gear_ratio is not None else self.gear_ratio, tire_radius_in=( tire_radius_in if tire_radius_in is not None else self.tire_radius_in ), motor_rpm=motor_rpm or self.motor_rpm, motor_wheelspeed=motor_wheelspeed or self.motor_wheelspeed, ) assert data is not None if self.motor_rpm not in data: print( f"WARNING: correct_motor_data skipped — missing variable: {self.motor_rpm}" ) return data di = data[self.motor_rpm] raw_rpm: NDArray[float64] = di.value_np.astype(np.float64) backup_name = self.motor_rpm + "_raw" if backup_name not in data: data[backup_name] = DataInstance( timestamp_np=di.timestamp_np, value_np=raw_rpm.copy(), label="Motor RPM raw (pre-flip)", cpp_name=backup_name, ) flipped = -raw_rpm data[self.motor_rpm] = DataInstance( timestamp_np=di.timestamp_np, value_np=flipped, label=di.label, cpp_name=self.motor_rpm, ) tire_radius_m = in_to_m(self.tire_radius_in) wheel_speed: NDArray[float64] = ( flipped * 2.0 * np.pi * tire_radius_m / (60.0 * self.gear_ratio) ) data[self.motor_wheelspeed] = DataInstance( timestamp_np=di.timestamp_np, value_np=wheel_speed, label="Driven wheel speed from motor RPM (m/s)", cpp_name=self.motor_wheelspeed, ) print( f"correct_motor_data: flipped {self.motor_rpm} sign, added {self.motor_wheelspeed} " f"(ratio={self.gear_ratio}, r={self.tire_radius_in} in)" ) return data
[docs] class CorrectSteeringAngleLambda: """ Backing class for the ``correct_steering_angle`` preprocessing step. Fits a polynomial through ``calibration`` points (voltage, angle) at construction time, then rewrites the angle channel from the raw voltage channel on each call. """ def __init__( self, calibration: SteeringCalibration = DEFAULT_STEERING_CALIBRATION, steering_raw: str = DEFAULT_STEERING_RAW, steering_angle: str = DEFAULT_STEERING_ANGLE, ) -> None: pts = sorted(calibration, key=lambda p: p[0]) if len(pts) < 2: raise ValueError("calibration needs at least 2 (voltage, angle) points.") self.pts = pts self.steering_raw = steering_raw self.steering_angle = steering_angle volts: NDArray[float64] = np.array([p[0] for p in pts], dtype=np.float64) angles: NDArray[float64] = np.array([p[1] for p in pts], dtype=np.float64) self.deg = 2 if len(pts) >= 3 else 1 self.coeffs: NDArray[float64] = np.polyfit(volts, angles, self.deg) def __call__( self, data: SingleRunData | None = None, *, calibration: SteeringCalibration | None = None, steering_raw: str | None = None, steering_angle: str | None = None, ) -> Union[SingleRunData, "CorrectSteeringAngleLambda"]: """Apply the step to ``data``, or return a reconfigured copy when called with keyword args. Parameters ---------- data : SingleRunData, optional Run data to transform in-place. Omit when reconfiguring. calibration : SteeringCalibration, optional Sequence of ``(voltage, angle_deg)`` pairs. A degree-2 polynomial is fit when >= 3 points are provided, linear otherwise. steering_raw : str, optional Variable name for the raw potentiometer voltage channel. steering_angle : str, optional Variable name to write the recomputed angle (degrees). If this channel already exists, its old values are backed up under ``<steering_angle>_original``. Returns ------- SingleRunData or CorrectSteeringAngleLambda Transformed data when ``data`` is passed; reconfigured step otherwise. """ if any(v is not None for v in (calibration, steering_raw, steering_angle)): return CorrectSteeringAngleLambda( calibration=calibration or tuple(self.pts), steering_raw=steering_raw or self.steering_raw, steering_angle=steering_angle or self.steering_angle, ) assert data is not None if self.steering_raw not in data: print( f"WARNING: correct_steering_angle skipped — missing variable: {self.steering_raw}" ) return data raw_di = data[self.steering_raw] raw_volts: NDArray[float64] = raw_di.value_np.astype(np.float64) recomputed: NDArray[float64] = np.polyval(self.coeffs, raw_volts) backup_name = self.steering_angle + "_original" if self.steering_angle in data and backup_name not in data: orig = data[self.steering_angle] data[backup_name] = DataInstance( timestamp_np=orig.timestamp_np, value_np=orig.value_np.copy(), label=(orig.label or self.steering_angle) + " (original)", cpp_name=backup_name, ) data[self.steering_angle] = DataInstance( timestamp_np=raw_di.timestamp_np, value_np=recomputed, label="Steering angle recomputed from raw voltage (deg)", cpp_name=self.steering_angle, ) cal_str = ", ".join(f"({v:.2f}V, {a:+.1f}°)" for v, a in self.pts) print( f"correct_steering_angle: recomputed {self.steering_angle} from {self.steering_raw} " f"using deg-{self.deg} fit ({cal_str})\n" ) return data
patch_ned_velocity = PatchNedVelocityLambda() """Preprocessing step: fix VectorNav NED-in-body-frame bug. Corrects a firmware bug where ``velocityBody.x/y/z`` contains NED-frame velocities instead of body-frame (FRD) velocities. The raw NED values are preserved as new channels (``velN``, ``velE``, ``velD``), and the body velocity channels are overwritten with the correctly rotated values using the yaw angle. Use directly in a preprocessing pipeline, or call with keyword arguments to get a reconfigured copy that operates on non-default variable names. Examples -------- >>> Analyzer(preprocessing=[patch_ned_velocity]) >>> Analyzer(preprocessing=[patch_ned_velocity(yaw="my.custom.yaw")]) >>> Analyzer(preprocessing=[patch_ned_velocity(body_vel_x="my.vel.x", ned_vel_n="my.ned.n")]) """ convert_wheelspeeds_to_m_per_s = ConvertWheelspeedsToMPerSLambda() """Preprocessing step: convert all four wheel speed channels from mph to m/s. Each channel is converted in-place; the original mph values are preserved as ``<channel_name>_mph`` backup channels (only created once — idempotent on repeated calls). Use directly in a preprocessing pipeline, or call with keyword arguments to get a reconfigured copy that operates on non-default variable names. Examples -------- >>> Analyzer(preprocessing=[convert_wheelspeeds_to_m_per_s]) >>> Analyzer(preprocessing=[convert_wheelspeeds_to_m_per_s(wheelspeed_fr="my.ws.fr")]) """ correct_motor_data = CorrectMotorDataLambda() """Preprocessing step: flip motor RPM sign and derive driven wheel speed. The motor reports RPM with the sign inverted (negative when driving forward). This step: 1. Preserves the original RPM as ``<motor_rpm>_raw``. 2. Flips the sign of the RPM channel in-place. 3. Derives a driven wheel linear speed (m/s) from RPM, gear ratio, and tire radius, and writes it to ``motor_wheelspeed``. Use directly in a preprocessing pipeline, or call with keyword arguments to get a reconfigured copy with different physical constants or variable names. Examples -------- >>> Analyzer(preprocessing=[correct_motor_data]) >>> Analyzer(preprocessing=[correct_motor_data(gear_ratio=6.2, tire_radius_in=8.0)]) """ correct_steering_angle = CorrectSteeringAngleLambda() """Preprocessing step: recompute steering angle from raw potentiometer voltage. Fits a polynomial through a set of ``(voltage, angle_deg)`` calibration points, then evaluates it on the raw voltage channel to produce a corrected angle. This corrects for sensor drift relative to the previously stored angle channel. If the angle channel already exists, the old values are preserved as ``<steering_angle>_original`` before being overwritten. The backup is only created once — idempotent on repeated calls. Use directly in a preprocessing pipeline, or call with keyword arguments to get a reconfigured copy. Examples -------- >>> Analyzer(preprocessing=[correct_steering_angle]) >>> Analyzer(preprocessing=[correct_steering_angle(calibration=((1.5, -90.0), (3.0, 0.0), (4.5, 90.0)))]) >>> Analyzer(preprocessing=[correct_steering_angle(steering_raw="my.raw")]) >>> Analyzer(preprocessing=[correct_steering_angle(calibration=my_cal, steering_angle="my.angle")]) """
[docs] def apply_preprocessing( data: SingleRunData, steps: list[PreprocessingStep], ) -> SingleRunData: """Run a sequence of preprocessing steps on a SingleRunData instance. Parameters ---------- data : SingleRunData Parsed run data to preprocess. steps : list of PreprocessingStep Ordered list of ``SingleRunData -> SingleRunData`` callables to apply. Returns ------- SingleRunData The same object, modified in-place and returned for chaining. Examples -------- >>> apply_preprocessing(data, [correct_motor_data, convert_wheelspeeds_to_m_per_s]) >>> apply_preprocessing(data, [ ... correct_motor_data(gear_ratio=6.2, tire_radius_in=8.0), ... correct_steering_angle(((1.5, -90.0), (3.0, 0.0), (4.5, 90.0))), ... ]) """ for step in steps: data = step(data) return data