Source code for perda.core_data_structures.joins

from collections import defaultdict
from typing import Tuple

import numpy as np


[docs] def left_join( left_ts: np.ndarray, left_val: np.ndarray, right_ts: np.ndarray, right_val: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Left join: keep all left timestamps, match and interpolate right values. Parameters ---------- left_ts : np.ndarray Timestamps for left series (these are kept exactly) left_val : np.ndarray Values for left series right_ts : np.ndarray Timestamps for right series (these will be matched to left) right_val : np.ndarray Values for right series interpolate : bool, optional If True, interpolate right values to fill all left timestamps. If False, only use matched values (NaN for unmatched). Default is True. Returns ------- timestamps : np.ndarray The left timestamps (unchanged) left_values : np.ndarray The left values (unchanged) right_values : np.ndarray Right values matched/interpolated to left timestamps Notes ----- Process: 1. For each right timestamp, find the closest left timestamp 2. If multiple right timestamps map to the same left timestamp, average them 3. Interpolate right values to fill remaining left timestamps (if interpolate=True) """ timestamps = left_ts.copy() left_values = left_val.copy() right_values = np.full(left_ts.shape, np.nan, dtype=float) if right_ts.size == 0 or left_ts.size == 0: raise ValueError("Both time series must be non-empty") # Find insertion indices idx = np.searchsorted(left_ts, right_ts) # Clamp indices to valid range idx_right = np.clip(idx, 0, len(left_ts) - 1) idx_left = np.clip(idx - 1, 0, len(left_ts) - 1) # Choose closer neighbor dist_left = np.abs(right_ts - left_ts[idx_left]) dist_right = np.abs(right_ts - left_ts[idx_right]) closest_idx = np.where(dist_left <= dist_right, idx_left, idx_right) # Average right values mapped to same left index matches = defaultdict(list) for li, rv in zip(closest_idx, right_val): matches[li].append(rv) for li, vals in matches.items(): right_values[li] = np.mean(vals) # Interpolate missing values valid = ~np.isnan(right_values) right_values = np.interp(left_ts, left_ts[valid], right_values[valid]) return timestamps, left_values, right_values
[docs] def outer_join( left_ts: np.ndarray, left_val: np.ndarray, right_ts: np.ndarray, right_val: np.ndarray, *, drop_nan: bool = True, fill: float = 0.0, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Outer join: union of timestamps with linear interpolation. Parameters ---------- left_ts : np.ndarray Timestamps for left series left_val : np.ndarray Values for left series right_ts : np.ndarray Timestamps for right series right_val : np.ndarray Values for right series drop_nan : bool, optional If True, drop rows where either series has NaN after interpolation. Default is True. fill : float, optional Fill value for NaNs when drop_nan is False. Default is 0.0. Returns ------- timestamps : np.ndarray Union of all timestamps left_values : np.ndarray Left values interpolated to union timestamps right_values : np.ndarray Right values interpolated to union timestamps """ if right_ts.size == 0 or left_ts.size == 0: raise ValueError("Both time series must be non-empty") timestamps = np.union1d(left_ts, right_ts) left_values = np.interp(timestamps, left_ts, left_val) right_values = np.interp(timestamps, right_ts, right_val) if drop_nan: keep_mask = ~np.isnan(left_values) & ~np.isnan(right_values) timestamps = timestamps[keep_mask] left_values = left_values[keep_mask] right_values = right_values[keep_mask] else: left_values = np.nan_to_num(left_values, nan=fill) right_values = np.nan_to_num(right_values, nan=fill) return timestamps, left_values, right_values
[docs] def inner_join( left_ts: np.ndarray, left_val: np.ndarray, right_ts: np.ndarray, right_val: np.ndarray, *, tolerance: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Inner join: keep only left timestamps that have a matching right timestamp within tolerance. Process: 1. For each left timestamp, find the closest right timestamp 2. Keep the left timestamp only if the distance is within tolerance 3. Match right values to the kept left timestamps Parameters ---------- left_ts : np.ndarray Timestamps for left series left_val : np.ndarray Values for left series right_ts : np.ndarray Timestamps for right series right_val : np.ndarray Values for right series tolerance : float Maximum allowed distance between left and right timestamps for a match. Timestamps with distance > tolerance are dropped. Returns ------- timestamps : np.ndarray Subset of left timestamps that have matches within tolerance left_values : np.ndarray Left values at the matched timestamps right_values : np.ndarray Right values interpolated to the matched timestamps """ if right_ts.size == 0 or left_ts.size == 0: raise ValueError("Both time series must be non-empty") # Output arrays timestamps = left_ts.copy() left_values = left_val.copy() right_values = np.full(left_ts.shape, np.nan, dtype=float) # Find insertion points of left_ts into right_ts idx = np.searchsorted(right_ts, left_ts) # Candidate neighbors idx_right = np.clip(idx, 0, len(right_ts) - 1) idx_left = np.clip(idx - 1, 0, len(right_ts) - 1) # Distances dist_left = np.abs(left_ts - right_ts[idx_left]) dist_right = np.abs(left_ts - right_ts[idx_right]) # Choose closest right index closest_right_idx = np.where(dist_left <= dist_right, idx_left, idx_right) # Distance to chosen right timestamp min_dist = np.minimum(dist_left, dist_right) # Apply tolerance filter valid_mask = min_dist <= tolerance valid_left_idx = np.where(valid_mask)[0] valid_right_idx = closest_right_idx[valid_mask] # Deduplicate: multiple right values may map to same left index matches = defaultdict(list) for li, ri in zip(valid_left_idx, valid_right_idx): matches[li].append(right_val[ri]) for li, vals in matches.items(): right_values[li] = np.mean(vals) # Filter to only include matched timestamps matched_indices = np.array(sorted(matches.keys())) timestamps = timestamps[matched_indices] left_values = left_values[matched_indices] right_values = right_values[matched_indices] return timestamps, left_values, right_values