Source code for perda.utils.accel_calculator

import numpy as np
from pydantic import BaseModel, Field

from ..core_data_structures.data_instance import DataInstance
from ..units import *


[docs] class AccelSegmentResult(BaseModel): """Result of a single segment marked as an acceleration test""" start_time: float = Field(description="Timestamp when the segment began.") time_to_dist: float = Field( description="Time to travel `dist_reached` meters from segment start." ) dist_reached: float = Field(description="Distance of the segment in meters.") timescale: Timescale = Field(description="Time unit for the output times.") def __str__(self) -> str: """Return a one-line summary of this acceleration segment result.""" return ( f"Accel at {self.start_time:.2f}s: " f"reached {self.dist_reached}m in {self.time_to_dist:.3f}s" )
[docs] def detect_accel_event( torque_obj: DataInstance, speed_obj: DataInstance, torque_threshold: float = 100, speed_threshold: float = 0.5, ) -> DataInstance: """Detect acceleration events based on torque and speed thresholds. An event is active when torque exceeds `torque_threshold` and speed exceeds `speed_threshold`, and ends when speed drops back to or below `speed_threshold`. Parameters ---------- torque_obj : DataInstance Time-series of motor torque values. speed_obj : DataInstance Time-series of wheel speed values. The output is aligned to these timestamps. torque_threshold : float, optional Minimum torque (in Nm) required to trigger an acceleration event. Default is 100. speed_threshold : float, optional Speed value used as the trigger floor and reset condition. Default is 0.5. Returns ------- DataInstance Binary signal (0.0 or 1.0) on `speed_obj` timestamps, labeled "Accel Event", where 1.0 indicates an active acceleration event. Examples -------- >>> signal = detect_accel_event(torque_di, speed_di) """ torque_interp = np.interp( speed_obj.timestamp_np, torque_obj.timestamp_np, torque_obj.value_np ) speed = speed_obj.value_np signal_values = np.zeros_like(speed) active = False for i in range(len(speed)): if not active: if torque_interp[i] > torque_threshold and speed[i] > speed_threshold: active = True else: if speed[i] <= speed_threshold: active = False if active: signal_values[i] = 1.0 return DataInstance( timestamp_np=speed_obj.timestamp_np, value_np=signal_values, label="Accel Event" )
[docs] def compute_accel_results( signal_obj: DataInstance, distance_obj: DataInstance, target_dist: float = 75, source_time_unit: Timescale = Timescale.MS, target_time_unit: Timescale = Timescale.S, ) -> list[AccelSegmentResult]: """Compute time-to-distance results for each acceleration event. Parameters ---------- signal_obj : DataInstance Binary accel event signal (0.0/1.0), typically from `detect_accel_event`. distance_obj : DataInstance Cumulative distance signal in meters. target_dist : float, optional Target distance in meters. Default is 75. source_time_unit : Timescale, optional Time unit of input timestamps. Default is Timescale.MS. target_time_unit : Timescale, optional Time unit for output times. Default is Timescale.S. Returns ------- list[AccelSegmentResult] One result per qualifying segment. Examples -------- >>> results = compute_accel_results(signal_di, distance_di) >>> for r in results: ... print(r) """ sig = signal_obj.value_np time = signal_obj.timestamp_np diff_arr = np.diff(sig, prepend=0) start_indices = np.where(diff_arr == 1)[0] end_indices = np.where(diff_arr == -1)[0] results = [] for start_idx in start_indices: t_start = time[start_idx] future_ends = end_indices[end_indices > start_idx] t_end_signal = time[future_ends[0]] if len(future_ends) > 0 else time[-1] dist_at_start = np.interp( t_start, distance_obj.timestamp_np, distance_obj.value_np ) dist_at_signal_end = np.interp( t_end_signal, distance_obj.timestamp_np, distance_obj.value_np ) if dist_at_signal_end - dist_at_start < target_dist: continue target_absolute_dist = dist_at_start + target_dist mask = distance_obj.timestamp_np >= t_start future_t = distance_obj.timestamp_np[mask] future_d = distance_obj.value_np[mask] if future_d[-1] >= target_absolute_dist: t_target_hit = np.interp(target_absolute_dist, future_d, future_t) results.append( AccelSegmentResult( start_time=convert_time( t_start, source_time_unit, target_time_unit ), time_to_dist=convert_time( t_target_hit - t_start, source_time_unit, target_time_unit ), dist_reached=target_dist, timescale=target_time_unit, ) ) return results