Source code for suboptimumg.sweep.array_sweeper

import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool

import numpy as np
from tqdm import tqdm

from ..compsim import Competition
from ..compsim.competition_factory import from_data
from .array_sweep_results import *
from .constants import *
from .models import *


def _process_sweep_item(input_model: ArraySweepProcessInput) -> ArraySweepProcessOutput:
    """
    Process a single sweep item in an isolated separate process.

    Parameters
    ----------
    input_model : ArraySweepProcessInput
        ArraySweepProcessInput containing all input parameters

    Returns
    -------
    ArraySweepProcessOutput
        Results and metadata from the sweep item
    """
    try:
        # Capture warnings in this process
        string_buffer = StringIO()
        old_stdout = sys.stdout
        sys.stdout = string_buffer

        # Create a new Competition instance for this process
        comp = from_data(input_model.comp_data)

        try:
            comp.mycar.modify_params(input_model.var_name, input_model.step)
            # Plausibility checks now handled by Pydantic validators during model creation
            plausible = True

            warnings = string_buffer.getvalue()

            comp_res = comp.run()

            return ArraySweepProcessOutput(
                var_idx=input_model.var_idx,
                step_idx=input_model.step_idx,
                plausible=plausible,
                accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
                skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
                autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
                endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
                efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
                accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
                skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
                autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
                endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
                warnings=warnings,
                error=None,
            )

        finally:
            sys.stdout = old_stdout

    except Exception as e:
        return ArraySweepProcessOutput(
            var_idx=input_model.var_idx,
            step_idx=input_model.step_idx,
            error=f"Process error: {str(e)}\n{traceback.format_exc()}",
            plausible=False,
            accel_pts=0,
            skidpad_pts=0,
            autoX_pts=0,
            endurance_pts=0,
            efficiency_pts=0,
            accel_t=0,
            skidpad_t=0,
            autoX_t=0,
            endurance_t=0,
            warnings="",
        )


[docs] class ArraySweeper: """ Sweeps a list of variables at a list of percent steps. Notes ----- Each variable is swept independently. The percent steps are relative to the default value of the variable. The 1D sweep calls this class with a single variable and a list of percent steps. """ def __init__( self, comp_data: CompetitionData, var_list: list, var_percent_step_list: np.ndarray = SWEEP_VAR_PERCENT_STEP_LIST, ): self.comp_data = comp_data self.comp = from_data(comp_data) self.var_percent_step_list = var_percent_step_list self.sweep_inputs = [] sweep_outputs = [] for var in var_list: base_value = self.comp.mycar.get_current_params(var) sweep_values = np.round(base_value * (1 + var_percent_step_list), 4) sweep_input_data = { "name": var, "sweep_values": sweep_values, } # Create strongly-typed SweepData1D sweep_output = SweepData1D.create(var, sweep_values) self.sweep_inputs.append(sweep_input_data) sweep_outputs.append(sweep_output) # Store percent steps (shared across all variables) self.sweep_outputs = ArraySweepData( percent_steps=var_percent_step_list, sweep_outputs=sweep_outputs )
[docs] def sweep(self, verbose=False, num_processes=None): """ Run the sweep using multiprocessing. Parameters ---------- verbose : bool, optional Whether to show detailed progress (default is False) num_processes : int, optional Number of processes to use. Defaults to CPU count. Returns ------- ArraySweepResults Results of the sweep operation """ if num_processes is None: num_processes = mp.cpu_count() total_iterations = len(self.sweep_inputs) * len(self.var_percent_step_list) print( f"Running Array Sweeper for {total_iterations} competition simulations using {num_processes} processes." ) # Create progress bar progress_bar = tqdm( total=total_iterations, desc="Running simulations", unit="sim", dynamic_ncols=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", ) # Prepare arguments for multiprocessing - only pass necessary data flattened_sweep_inputs = ( [] ) # This will become a 1D-array with length num_processes for var_idx, var in enumerate(self.sweep_inputs): for step_idx, step in enumerate(var["sweep_values"]): flattened_sweep_inputs.append( ArraySweepProcessInput( comp_data=self.comp_data, var_name=var["name"], step=step, var_idx=var_idx, step_idx=step_idx, ) ) try: # Create process pool and run simulations errors_log = [] # Track errors separately with Pool(processes=num_processes) as pool: for result in pool.imap_unordered( _process_sweep_item, flattened_sweep_inputs ): # Access the SweepOutputData for this variable var_idx = result.var_idx step_idx = result.step_idx sweep_output = self.sweep_outputs.sweep_outputs[var_idx] # Update results at specific indices to maintain order sweep_output.plausible[step_idx] = result.plausible sweep_output.accel_pts[step_idx] = result.accel_pts sweep_output.skidpad_pts[step_idx] = result.skidpad_pts sweep_output.autoX_pts[step_idx] = result.autoX_pts sweep_output.endurance_pts[step_idx] = result.endurance_pts sweep_output.efficiency_pts[step_idx] = result.efficiency_pts sweep_output.accel_t[step_idx] = result.accel_t sweep_output.skidpad_t[step_idx] = result.skidpad_t sweep_output.autoX_t[step_idx] = result.autoX_t sweep_output.endurance_t[step_idx] = result.endurance_t # Track errors separately if result.error is not None: errors_log.append((var_idx, step_idx, result.error)) if verbose: if result.warnings: progress_bar.write(result.warnings.strip()) progress_bar.set_postfix_str( f"Variable: {self.sweep_inputs[var_idx]['name']}, Step: {self.sweep_inputs[var_idx]['sweep_values'][step_idx]:.4f}" ) progress_bar.update(1) # Report any errors that occurred for var_idx, step_idx, error in errors_log: print( f"Warning: Sim errored out for {self.sweep_inputs[var_idx]['name']} " f"at value {self.sweep_inputs[var_idx]['sweep_values'][step_idx]:.4f}.\n" f"Data is incorrect and graphing may produce unpredictable results." ) print(error) except Exception as e: progress_bar.write(f"Sweep failed: {str(e)}") raise finally: progress_bar.close() return ArraySweepResults(self.sweep_outputs)