Source code for perda.live.cdp_client

"""
Python client library for communicating with the Rust CDP IPC server.
Supports getting and setting values, as well as fetching time-series ranges.
"""

import logging
import socket
import struct
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple, Union

import numpy as np

logger = logging.getLogger(__name__)


[docs] class ValueType(Enum): """Value types supported by the CDP IPC protocol.""" NUMERIC = 0x00 FLOAT = 0x01 BOOL = 0x02 _RANGE = 0x03 # Internal use only — signals a GetRanged request
[docs] class CanType(Enum): """CAN data types as defined by the server.""" Bool = 0 UInt8 = 1 UInt16 = 2 UInt32 = 3 UInt64 = 4 Int8 = 5 Int16 = 6 Int32 = 7 Int64 = 8 Float = 9 Double = 10
# Maps CanType to (struct format char, byte size) _CAN_TYPE_FMT: dict[CanType, Tuple[str, int]] = { CanType.Bool: ("B", 1), CanType.UInt8: ("B", 1), CanType.UInt16: ("H", 2), CanType.UInt32: ("I", 4), CanType.UInt64: ("Q", 8), CanType.Int8: ("b", 1), CanType.Int16: ("h", 2), CanType.Int32: ("i", 4), CanType.Int64: ("q", 8), CanType.Float: ("f", 4), CanType.Double: ("d", 8), }
[docs] class ResponseStatus(Enum): """Response status codes from the CDP server.""" GET_SUCCESS = 0x0 SET_SUCCESS = 0x1 COMM_ERROR = 0x2 INVALID_ACCESS = 0x3 INCORRECT_TYPE = 0x4 DISCONNECTED = 0x5 BADLY_FORMED = 0x6 GET_RANGED_SUCCESS = 0x7
[docs] class CDPException(Exception): """Base exception for CDP client errors.""" pass
[docs] class CDPProtocolError(CDPException): """Raised when there's a protocol-level error.""" pass
[docs] class CDPServerError(CDPException): """Raised when the server returns an error response.""" def __init__(self, status: ResponseStatus, message: str): self.status = status super().__init__(message)
@dataclass class _RangedPacket: """Parsed contents of a single GetRangedSuccess UDP packet.""" var_id: int begin: int end: int can_type: CanType storage_interval: int # milliseconds values: np.ndarray timestamps: np.ndarray # milliseconds def _parse_ranged_packet(payload: bytes) -> _RangedPacket: """ Parse the payload of a GetRangedSuccess packet (everything after the 0x7 sig byte). Layout (all little-endian): 4B id.id u32 4B id.index u32 (ignored) 4B df.begin u32 4B df.end u32 1B df.ty u8 (CanType discriminant) 4B storage_interval u32 (milliseconds) NxS packed samples """ if len(payload) < 17: raise CDPProtocolError(f"GetRanged payload too short: {len(payload)} bytes") var_id, _index, begin, end, can_type_raw = struct.unpack_from("<IIIIB", payload, 0) # Header is 4+4+4+4+1 = 17 bytes, then storage_interval u32 = 4 more bytes storage_interval: int = struct.unpack_from("<I", payload, 17)[0] try: can_type = CanType(can_type_raw) except ValueError: raise CDPProtocolError(f"Unknown CanType discriminant: {can_type_raw}") fmt_char, sample_size = _CAN_TYPE_FMT[can_type] n_samples = end - begin + 1 expected_data_size = n_samples * sample_size data_offset = 21 # 17 (fixed header) + 4 (storage_interval) actual_data_size = len(payload) - data_offset if actual_data_size != expected_data_size: raise CDPProtocolError( f"Data size mismatch: expected {expected_data_size}B, got {actual_data_size}B" ) raw = struct.unpack_from(f"<{n_samples}{fmt_char}", payload, data_offset) values = np.array(raw, dtype=float) # Reconstruct timestamps in milliseconds timestamps = np.array( [(begin + i) * storage_interval for i in range(n_samples)], dtype=np.int64, ) return _RangedPacket( var_id=var_id, begin=begin, end=end, can_type=can_type, storage_interval=storage_interval, values=values, timestamps=timestamps, )
[docs] class CDPClient: """ Python client for communicating with the Rust CDP IPC server. Example usage: client = CDPClient() client.connect("127.0.0.1", 5001) # Get a single value value = client.get("vehicle.speed", ValueType.FLOAT) # Get a time-series range (last 10 seconds) di = client.get_range("vehicle.speed", ValueType.FLOAT, time_secs=10) # Set a value client.set("vehicle.target_speed", 65.0, ValueType.FLOAT) client.disconnect() """ ACCESS_STRING_SIZE = 56 PACKET_SIZE = 1 + ACCESS_STRING_SIZE + 8 # 65 bytes total def __init__(self, timeout: float = 5.0, range_timeout: float = 2.0): """ Initialize the CDP client. Args: timeout: Socket timeout for get/set requests (seconds) range_timeout: Maximum time to wait for all ranged response packets before giving up (seconds) """ self.socket: Optional[socket.socket] = None self.server_addr: Optional[Tuple[str, int]] = None self.timeout = timeout self.range_timeout = range_timeout
[docs] def connect(self, host: str = "127.0.0.1", port: int = 5001) -> None: """Connect to the CDP server.""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.settimeout(self.timeout) self.server_addr = (host, port) logger.info(f"Connected to CDP server at {host}:{port}") except Exception as e: raise CDPException(f"Failed to connect to server: {e}")
[docs] def disconnect(self) -> None: """Disconnect from the CDP server.""" if self.socket: self.socket.close() self.socket = None self.server_addr = None logger.info("Disconnected from CDP server")
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def get(self, access_string: str, value_type: ValueType) -> Union[float, bool, int]: """ Get the latest value for a signal from the CDP server. Args: access_string: The access path (e.g. "vehicle.speed") value_type: Expected type of the value Returns: The requested value as float, bool, or int """ sock, addr = self._check_connected() packet = self._build_request_packet(access_string, value_type, mode="get") try: sock.sendto(packet, addr) response, _ = sock.recvfrom(1024) return self._parse_get_response(response, value_type) except socket.timeout: raise CDPException("Request timed out") except Exception as e: raise CDPException(f"Communication error: {e}")
[docs] def get_range( self, access_string: str, time_secs: int, ): """ Request a time-series range of data from the CDP server. The server will chunk the response into multiple UDP packets. This method collects all packets until either the expected sample count is reached or range_timeout elapses, then assembles them into a single DataInstance. Args: access_string: The access path (e.g. "vehicle.speed") value_type: Expected type of the value time_secs: How many seconds of history to retrieve (u32) Returns: A DataInstance with the assembled time-series data """ from perda.analyzer import DataInstance sock, addr = self._check_connected() if not (0 < time_secs < 2**32): raise CDPException("time_secs must be a positive u32 value") packet = self._build_request_packet( access_string, ValueType._RANGE, mode="get_range", time_secs=time_secs ) try: sock.sendto(packet, addr) except Exception as e: raise CDPException(f"Failed to send ranged request: {e}") # Switch to range_timeout for collection phase sock.settimeout(self.range_timeout) packets: list[_RangedPacket] = [] storage_interval: Optional[int] = None var_id: Optional[int] = None total_samples_received = 0 expected_samples: Optional[int] = None try: while True: try: response, _ = sock.recvfrom(65535) except socket.timeout: logger.warning( "get_range timed out waiting for packets; " f"received {total_samples_received} of " f"{expected_samples or '?'} expected samples" ) break if len(response) < 1: raise CDPProtocolError("Empty response packet") status_byte = response[0] # Any error status during a ranged request is fatal if status_byte != ResponseStatus.GET_RANGED_SUCCESS.value: self._raise_server_error(ResponseStatus(status_byte)) pkt = _parse_ranged_packet(response[1:]) packets.append(pkt) total_samples_received += len(pkt.values) # Derive expected total samples from first packet if storage_interval is None: storage_interval = pkt.storage_interval var_id = pkt.var_id if storage_interval > 0: expected_samples = (time_secs * 1000) // storage_interval else: logger.warning( "storage_interval is zero; terminating on timeout" ) # Terminate early if we have all expected samples if ( expected_samples is not None and total_samples_received >= expected_samples ): logger.debug( f"get_range complete: {total_samples_received} samples received" ) break finally: # Restore normal timeout sock.settimeout(self.timeout) if not packets: raise CDPException("No ranged response packets received") # Sort packets by begin index to ensure chronological order packets.sort(key=lambda p: p.begin) all_timestamps = np.concatenate([p.timestamps for p in packets]) all_values = np.concatenate([p.values for p in packets]) return DataInstance( timestamp_np=all_timestamps, value_np=all_values, label=access_string, var_id=var_id, )
[docs] def set( self, access_string: str, value: Union[float, bool, int], value_type: ValueType, ) -> None: """ Set a value on the CDP server. Args: access_string: The access path (e.g. "vehicle.target_speed") value: The value to set value_type: Type of the value """ sock, addr = self._check_connected() packet = self._build_request_packet( access_string, value_type, mode="set", value=float(value) ) try: sock.sendto(packet, addr) response, _ = sock.recvfrom(1024) self._parse_set_response(response) except socket.timeout: raise CDPException("Request timed out") except Exception as e: raise CDPException(f"Communication error: {e}")
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _check_connected(self) -> Tuple[socket.socket, Tuple[str, int]]: if not self.socket or not self.server_addr: raise CDPException("Not connected to server") return self.socket, self.server_addr def _build_request_packet( self, access_string: str, value_type: ValueType, mode: str, # "get" | "get_range" | "set" value: float = 0.0, time_secs: int = 0, ) -> bytes: """Build a request packet according to the protocol. Sig byte layout: bits 0-1 : ValueType (0b11 = Range, triggers GetRanged when bit 7 set) bit 7 : 1 = Get/GetRanged, 0 = Set """ if len(access_string.encode("utf-8")) > self.ACCESS_STRING_SIZE: raise CDPException( f"Access string too long (max {self.ACCESS_STRING_SIZE} bytes)" ) sig = value_type.value & 0b11 if mode in ("get", "get_range"): sig |= 1 << 7 # mode == "set": no additional bits packet = bytearray(self.PACKET_SIZE) packet[0] = sig access_bytes = access_string.encode("utf-8") packet[1 : 1 + len(access_bytes)] = access_bytes if mode == "get_range": # time_secs packed as u32 LE into the 8-byte value field packet[1 + self.ACCESS_STRING_SIZE :] = ( struct.pack("<I", time_secs) + b"\x00" * 4 ) else: packet[1 + self.ACCESS_STRING_SIZE :] = struct.pack("<d", value) return bytes(packet) def _parse_get_response( self, response: bytes, value_type: ValueType ) -> Union[float, bool, int]: if len(response) < 1: raise CDPProtocolError("Response too short") status = ResponseStatus(response[0]) if status != ResponseStatus.GET_SUCCESS: self._raise_server_error(status) if len(response) != 9: raise CDPProtocolError( f"Invalid GET response length: expected 9, got {len(response)}" ) float_value = struct.unpack("<d", response[1:9])[0] if value_type == ValueType.BOOL: return bool(float_value) elif value_type == ValueType.NUMERIC: return int(float_value) if float_value.is_integer() else float_value else: return float_value def _parse_set_response(self, response: bytes) -> None: if len(response) < 1: raise CDPProtocolError("Response too short") status = ResponseStatus(response[0]) if status != ResponseStatus.SET_SUCCESS: self._raise_server_error(status) def _raise_server_error(self, status: ResponseStatus) -> None: error_messages = { ResponseStatus.COMM_ERROR: "Communication error with backend", ResponseStatus.INVALID_ACCESS: "Invalid access string", ResponseStatus.INCORRECT_TYPE: "Incorrect value type", ResponseStatus.DISCONNECTED: "Server disconnected from backend", ResponseStatus.BADLY_FORMED: "Badly formed request", } message = error_messages.get(status, f"Unknown server error: {status}") raise CDPServerError(status, message)
# --------------------------------------------------------------------------- # Convenience functions # ---------------------------------------------------------------------------
[docs] def get_value( access_string: str, value_type: ValueType, host: str = "127.0.0.1", port: int = 5001, timeout: float = 5.0, ) -> Union[float, bool, int]: """Convenience function to get a single value.""" with CDPClient(timeout=timeout) as client: client.connect(host, port) return client.get(access_string, value_type)
[docs] def set_value( access_string: str, value: Union[float, bool, int], value_type: ValueType, host: str = "127.0.0.1", port: int = 5001, timeout: float = 5.0, ) -> None: """Convenience function to set a single value.""" with CDPClient(timeout=timeout) as client: client.connect(host, port) client.set(access_string, value, value_type)
[docs] def get_range_value( access_string: str, time_secs: int, host: str = "127.0.0.1", port: int = 5001, timeout: float = 5.0, range_timeout: float = 2.0, ): """Convenience function to fetch a time-series range as a DataInstance.""" with CDPClient(timeout=timeout, range_timeout=range_timeout) as client: client.connect(host, port) return client.get_range(access_string, time_secs)
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) with CDPClient() as client: client.connect(host="127.0.0.1") # Single value example try: temp = client.get("bms.board.glvTemp", ValueType.FLOAT) print(f"Current temp: {temp}") except CDPServerError as e: print(f"Server error: {e} (status: {e.status})") # Ranged example — last 30 seconds of motor temperature try: di = client.get_range("motor.temp", ValueType.FLOAT, time_secs=30) print(f"Got {len(di.value_np)} samples for '{di.label}'") except CDPServerError as e: print(f"Server error: {e} (status: {e.status})")