Source code for suboptimumg.sweep.sweeper_2d

import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool
from typing import Callable, Dict

import numpy as np
from tqdm import tqdm

from ..compsim import Competition
from ..compsim.competition_factory import from_data
from ..compsim.models import CompetitionData
from .constants import *
from .models import (
    SweepData2D,
    SweepParamConfig,
    SweepProcessInput2D,
    SweepProcessOutput2D,
)
from .sweep_results_2var import SweepResults2Var
from .utils import create_steps


def _process_2d_sweep_item(input_model: SweepProcessInput2D) -> SweepProcessOutput2D:
    """
    Process a single 2D sweep item in an isolated separate process.

    Parameters
    ----------
    input_model : SweepProcessInput2D
        SweepProcessInput2D containing all input parameters

    Returns
    -------
    SweepProcessOutput2D
        Results and metadata from the 2D sweep item
    """
    try:
        # Capture warnings in this process
        string_buffer = StringIO()
        old_stdout = sys.stdout
        sys.stdout = string_buffer

        # Create a new Competition instance from the Pydantic data model
        comp = from_data(input_model.comp_data)

        try:
            comp.mycar.modify_params(input_model.var_1_name, input_model.var_1_value)
            comp.mycar.modify_params(input_model.var_2_name, input_model.var_2_value)
            # Modify all dependent parameters
            for k, v in input_model.dep_vals.items():
                comp.mycar.modify_params(k, v)

            warnings = string_buffer.getvalue()

            comp_res = comp.run()

            return SweepProcessOutput2D(
                x_idx=input_model.x_idx,
                y_idx=input_model.y_idx,
                accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
                skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
                autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
                endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
                efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
                accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
                skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
                autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
                endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
                warnings=warnings,
                error=None,
            )

        finally:
            sys.stdout = old_stdout

    except Exception as e:
        return SweepProcessOutput2D(
            x_idx=input_model.x_idx,
            y_idx=input_model.y_idx,
            error=f"Process error: {str(e)}\n{traceback.format_exc()}",
            accel_pts=0,
            skidpad_pts=0,
            autoX_pts=0,
            endurance_pts=0,
            efficiency_pts=0,
            accel_t=0,
            skidpad_t=0,
            autoX_t=0,
            endurance_t=0,
            warnings="",
        )


[docs] class Sweeper2D: """ 2D parameter sweeper. """ def __init__( self, comp_data: CompetitionData, var_1: SweepParamConfig, var_2: SweepParamConfig, dependencies: Dict[str, Callable] = {}, ): self.comp_data = comp_data self.comp = from_data(comp_data) self.var_1_name = var_1.name self.var_1_list = create_steps(var_1.min, var_1.max, var_1.steps) self.var_2_name = var_2.name self.var_2_list = create_steps(var_2.min, var_2.max, var_2.steps) # name/function pairs self.dependencies = dependencies # name/matrix of values self.dep_values: Dict[str, List[List[float]]] = {} for k, func in self.dependencies.items(): # Build out matrix where (x_idx, y_idx) corresponds to a dependent param's value with var_1[x_idx] and var_2[y_idx] adj_values = [] for x_idx in range(len(self.var_1_list)): y_arr = [] for y_idx in range(len(self.var_2_list)): y_arr.append(func(self.var_1_list[x_idx], self.var_2_list[y_idx])) adj_values.append(y_arr) self.dep_values[k] = adj_values # Initialize 2D arrays for each event to store results. Dim 1 = var 1, dim 2 = var 2. self.sweep_data = SweepData2D.create( self.var_1_name, self.var_1_list, self.var_2_name, self.var_2_list )
[docs] def sweep(self, verbose=False, num_processes=None): """ Run a two-dimensional parameter sweep (grid sweep) using multiprocessing. Parameters ---------- verbose : bool, optional Whether to show detailed progress (default is False) num_processes : int, optional Number of processes to use. Defaults to CPU count. Returns ------- SweepResults2Var Results of the 2D sweep operation """ if num_processes is None: num_processes = mp.cpu_count() total_iterations = len(self.var_1_list) * len(self.var_2_list) print( f"Running Grid Sweep for {total_iterations} competition simulations using {num_processes} processes." ) # Create progress bar progress_bar = tqdm( total=total_iterations, desc="Running simulations", unit="sim", dynamic_ncols=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", ) # Prepare arguments for multiprocessing flattened_sweep_inputs = [] for x_idx, x in enumerate(self.var_1_list): for y_idx, y in enumerate(self.var_2_list): # Create key/val pair for the dependent param values at var_1[x_idx] and var_2[y_idx] single_point_dep_vals = {} for k, matrix in self.dep_values.items(): single_point_dep_vals[k] = matrix[x_idx][y_idx] flattened_sweep_inputs.append( SweepProcessInput2D( comp_data=self.comp_data, var_1_name=self.var_1_name, var_1_value=x, var_2_name=self.var_2_name, var_2_value=y, dep_vals=single_point_dep_vals, x_idx=x_idx, y_idx=y_idx, ) ) errors = [[None] * len(self.var_2_list) for _ in range(len(self.var_1_list))] try: # Create process pool and run simulations with Pool(processes=num_processes) as pool: for result in pool.imap_unordered( _process_2d_sweep_item, flattened_sweep_inputs ): # Update results at specific indices to maintain order x_idx = result.x_idx y_idx = result.y_idx self.sweep_data.accel_pts[x_idx][y_idx] = result.accel_pts self.sweep_data.skidpad_pts[x_idx][y_idx] = result.skidpad_pts self.sweep_data.autoX_pts[x_idx][y_idx] = result.autoX_pts self.sweep_data.endurance_pts[x_idx][y_idx] = result.endurance_pts self.sweep_data.efficiency_pts[x_idx][y_idx] = result.efficiency_pts self.sweep_data.accel_t[x_idx][y_idx] = result.accel_t self.sweep_data.skidpad_t[x_idx][y_idx] = result.skidpad_t self.sweep_data.autoX_t[x_idx][y_idx] = result.autoX_t self.sweep_data.endurance_t[x_idx][y_idx] = result.endurance_t errors[x_idx][y_idx] = result.error if result.warnings: progress_bar.write(result.warnings.strip()) if verbose: progress_bar.set_postfix_str( f"{self.var_1_name}: {self.var_1_list[x_idx]:.4f}, {self.var_2_name}: {self.var_2_list[y_idx]:.4f}" ) progress_bar.update(1) # Verify results for x_idx in range(len(self.var_1_list)): for y_idx in range(len(self.var_2_list)): if errors[x_idx][y_idx] is not None: print( f"Warning: Sim errored out at {self.var_1_name}={self.var_1_list[x_idx]:.4f}, " f"{self.var_2_name}={self.var_2_list[y_idx]:.4f}.\n" f"Data is incorrect and graphing may produce unpredictable results.\n" f"{errors[x_idx][y_idx]}" ) return SweepResults2Var( self.sweep_data, self.dependencies, ) except Exception as e: progress_bar.write(f"Sweep failed: {str(e)}") raise finally: progress_bar.close()