Source code for perda.core_data_structures.split_helpers

from .single_run_data import DataInstance, SingleRunData


[docs] def trim_single_run_data( data: SingleRunData, ts_start: float, ts_end: float, ) -> SingleRunData: """Return a new SingleRunData with every variable trimmed to [ts_start, ts_end]. Parameters ---------- data : SingleRunData ts_start, ts_end : float Timestamps in the same unit as data.timestamp_unit. Returns ------- SingleRunData Fresh object; the original is not mutated. """ trimmed: dict[int, DataInstance] = { var_id: di.trim(ts_start, ts_end) for var_id, di in data.id_to_instance.items() } return SingleRunData( id_to_instance=trimmed, cpp_name_to_id=dict(data.cpp_name_to_id), id_to_cpp_name=dict(data.id_to_cpp_name), id_to_descript=dict(data.id_to_descript), total_data_points=sum(len(di.value_np) for di in trimmed.values()), data_start_time=int(ts_start), data_end_time=int(ts_end), timestamp_unit=data.timestamp_unit, concat_boundaries=[], )
[docs] def split_single_run_data( data: SingleRunData, split_timestamps: list[float], ) -> list[SingleRunData]: """Split a SingleRunData into segments defined by a list of boundary timestamps. Each consecutive pair of timestamps in ``split_timestamps`` defines one segment. The result is keyed by 1-based segment number. Parameters ---------- data : SingleRunData split_timestamps : list[float] Ordered boundary timestamps in the same unit as data.timestamp_unit. Must contain at least 2 values. Returns ------- List[SingleRunData] """ if len(split_timestamps) < 2: raise ValueError(f"Need at least 2 boundary timestamps") segments = [] for t_start, t_end in zip(split_timestamps[:-1], split_timestamps[1:]): segments.append(trim_single_run_data(data, t_start, t_end)) return segments