import numpy as np
import polars as pl
from tqdm import tqdm
from ..core_data_structures.data_instance import DataInstance
from ..core_data_structures.single_run_data import SingleRunData
from ..units import Timescale
[docs]
def parse_csv(
file_path: str,
ts_offset: int = 0,
parsing_errors_limit: int = 100,
) -> SingleRunData:
"""
Parse CSV file and return SingleRunData model.
Parameters
----------
file_path : str
Path to the CSV file to parse
parsing_errors_limit : int, optional
Maximum number of parsing errors before stopping. -1 for no limit. Default is 100
parse_unit : Timescale | str | None, optional
Logging timestamp unit. If None, auto-detects using header suffix "v2.0" (us) or defaults to ms.
Returns
-------
SingleRunData
Parsed data structure containing all variables
"""
# Maps variable ID to variable name
id_to_cpp_name: dict[int, str] = {}
id_to_descript: dict[int, str] = {}
with open(file_path, "r") as f:
# Parse and print first line (header)
header_line = f.readline()
parse_unit = (
Timescale.US if header_line.rstrip().endswith("v2.0") else Timescale.MS
)
print(f"Header: {header_line.rstrip()}")
print(f"Timestamp unit: {parse_unit.value}")
# Block 1: Variable ID/Name pairs
pbar = tqdm(desc="Reading variable ID mappings", unit=" lines", initial=2)
skip_rows = 1 # header line
line = f.readline()
while line and line.startswith("Value "):
pbar.update(1)
skip_rows += 1
# Remove "Value " prefix, separate into variable name and ID
identifier = line[6:].strip().split(": ")
try:
var_id = int(identifier[1])
name_part = identifier[0]
# Check format: Value Desc (cpp.name): id | Value cpp.name: id
if "(" in name_part and ")" in name_part:
open_idx = name_part.rfind("(")
close_idx = name_part.rfind(")")
if open_idx < close_idx:
cpp_name = name_part[open_idx + 1 : close_idx].strip()
descript = name_part[:open_idx].strip()
else:
cpp_name = name_part.strip()
descript = ""
else:
cpp_name = name_part.strip()
descript = ""
if not cpp_name:
raise ValueError(f"Empty cpp_name in mapping line: {line.strip()}")
# Store variable ID to name mapping
if var_id in id_to_cpp_name:
print(
f"Warning: Duplicate variable ID {var_id} at line {pbar.n}. Overwriting previous name."
)
id_to_cpp_name[var_id] = cpp_name
id_to_descript[var_id] = descript
except Exception as e:
print(f"Error parsing variable ID/Name pair at line {pbar.n}: {e}")
line = f.readline()
pbar.close()
# Block 2: Read data with Polars, Block 3: Sort — all in one step
print("Reading and sorting data...")
df = pl.read_csv(
file_path,
skip_rows=skip_rows,
has_header=False,
new_columns=["timestamp", "var_id", "value"],
schema={"column_1": pl.Int64, "column_2": pl.Int32, "column_3": pl.Float64},
ignore_errors=True,
glob=False,
)
parsing_errors = len(
df.filter(
df["timestamp"].is_null() | df["var_id"].is_null() | df["value"].is_null()
)
)
if parsing_errors_limit > 0 and parsing_errors >= parsing_errors_limit:
raise Exception("Too many data parsing errors encountered.")
df = (
df.drop_nulls()
.with_columns((pl.col("timestamp") + ts_offset).alias("timestamp"))
.sort(["var_id", "timestamp"])
)
if df.is_empty():
raise Exception("No valid data points found after parsing.")
total_data_points = len(df)
data_start_time = int(df["timestamp"].min())
data_end_time = int(df["timestamp"].max())
# Build per-variable numpy arrays from grouped Polars data
var_arrays: dict[int, tuple] = {}
for (var_id,), group in df.group_by(["var_id"], maintain_order=True):
var_arrays[int(var_id)] = (
group["timestamp"].to_numpy(),
group["value"].to_numpy(),
)
# Format data as DataInstances
id_to_instance: dict[int, DataInstance] = {}
cpp_name_to_id: dict[str, int] = {}
for var_id in tqdm(id_to_cpp_name, desc="Creating DataInstances"):
name = id_to_cpp_name[var_id]
descript = id_to_descript[var_id]
cpp_name_to_id[name] = var_id
timestamps_np, values_np = var_arrays.get(
var_id, (np.array([], dtype=np.int64), np.array([], dtype=np.float64))
)
id_to_instance[var_id] = DataInstance(
timestamp_np=timestamps_np,
value_np=values_np,
label=descript,
var_id=var_id,
cpp_name=name,
)
# Create and return SingleRunData model
print(f"CSV parsing complete with {parsing_errors} parsing errors.")
return SingleRunData(
id_to_instance=id_to_instance,
cpp_name_to_id=cpp_name_to_id,
id_to_cpp_name=id_to_cpp_name,
id_to_descript=id_to_descript,
total_data_points=total_data_points,
data_start_time=data_start_time,
data_end_time=data_end_time,
timestamp_unit=parse_unit,
)