Source code for suboptimumg.sweep.sweeper_1d

import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool
from typing import Callable, Dict

import numpy as np
from tqdm import tqdm

from ..compsim import Competition
from ..compsim.competition_factory import from_data
from ..compsim.models import CompetitionData
from .constants import *
from .models import (
    SweepData1D,
    SweepParamConfig,
    SweepProcessInput1D,
    SweepProcessOutput1D,
)
from .sweep_results_1var import SweepResults1Var
from .utils import create_steps


def _process_sweep_item(input_model: SweepProcessInput1D) -> SweepProcessOutput1D:
    """
    Process a single instance of a competition in an isolated separate process.

    Parameters
    ----------
    input_model : SweepProcessInput1D
        SweepProcessInput1D containing all input parameters

    Returns
    -------
    SweepProcessOutput1D
        Results and metadata from the sweep item

    Notes
    -----
    This function is used as part of a 1D sweep.
    """
    try:
        # Capture warnings in this process
        string_buffer = StringIO()
        old_stdout = sys.stdout
        sys.stdout = string_buffer

        # Create a new Competition instance from the Pydantic data model
        comp = from_data(input_model.comp_data)

        try:
            comp.mycar.modify_params(input_model.var_1_name, input_model.var_1_value)
            for k, v in input_model.dep_vals.items():
                comp.mycar.modify_params(k, v)

            warnings = string_buffer.getvalue()

            comp_res = comp.run()

            return SweepProcessOutput1D(
                idx=input_model.idx,
                accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
                skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
                autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
                endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
                efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
                accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
                skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
                autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
                endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
                warnings=warnings,
                error=None,
            )

        finally:
            sys.stdout = old_stdout

    except Exception as e:
        return SweepProcessOutput1D(
            idx=input_model.idx,
            error=f"Process error: {str(e)}\n{traceback.format_exc()}",
            accel_pts=0,
            skidpad_pts=0,
            autoX_pts=0,
            endurance_pts=0,
            efficiency_pts=0,
            accel_t=0,
            skidpad_t=0,
            autoX_t=0,
            endurance_t=0,
            warnings="",
        )


[docs] class Sweeper1D: """ One-dimensional parameter sweeper. """ def __init__( self, comp_data: CompetitionData, var_1: SweepParamConfig, dependencies: Dict[str, Callable] = {}, ): self.comp_data = comp_data self.comp = from_data(comp_data) self.var_1_name = var_1.name self.var_1_list = create_steps(var_1.min, var_1.max, var_1.steps) # name/function pairs self.dependencies = dependencies # name/list of values self.dep_values: Dict[str, List[float]] = {} for k, func in self.dependencies.items(): # Build out list where (idx) corresponds to a dependent param's value with var_1[idx] adj_values = [] for idx in range(len(self.var_1_list)): adj_values.append(func(self.var_1_list[idx])) self.dep_values[k] = adj_values # Initialize 1D arrays for each event to store results. self.sweep_data = SweepData1D.create(self.var_1_name, self.var_1_list)
[docs] def sweep(self, verbose=False, num_processes=None): """ Run a one-dimensional parameter sweep using multiprocessing. Parameters ---------- verbose : bool, optional Whether to show detailed progress (default is False) num_processes : int, optional Number of processes to use. Defaults to CPU count. Returns ------- SweepResults1Var Results of the 1D sweep operation """ if num_processes is None: num_processes = mp.cpu_count() total_iterations = len(self.var_1_list) print( f"Running 1D Sweep for {total_iterations} competition simulations using {num_processes} processes." ) # Create progress bar progress_bar = tqdm( total=total_iterations, desc="Running simulations", unit="sim", dynamic_ncols=True, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", ) # Prepare arguments for multiprocessing flattened_sweep_inputs = [] for idx, x in enumerate(self.var_1_list): # Create key/val pair for the dependent param values at var_1[idx] single_point_dep_vals = {} for k, l in self.dep_values.items(): single_point_dep_vals[k] = l[idx] flattened_sweep_inputs.append( SweepProcessInput1D( comp_data=self.comp_data, var_1_name=self.var_1_name, var_1_value=x, dep_vals=single_point_dep_vals, idx=idx, ) ) errors = [None] * len(self.var_1_list) try: # Create process pool and run simulations with Pool(processes=num_processes) as pool: for result in pool.imap_unordered( _process_sweep_item, flattened_sweep_inputs ): # Update results at specific indices to maintain order idx = result.idx self.sweep_data.accel_pts[idx] = result.accel_pts self.sweep_data.skidpad_pts[idx] = result.skidpad_pts self.sweep_data.autoX_pts[idx] = result.autoX_pts self.sweep_data.endurance_pts[idx] = result.endurance_pts self.sweep_data.efficiency_pts[idx] = result.efficiency_pts self.sweep_data.accel_t[idx] = result.accel_t self.sweep_data.skidpad_t[idx] = result.skidpad_t self.sweep_data.autoX_t[idx] = result.autoX_t self.sweep_data.endurance_t[idx] = result.endurance_t errors[idx] = result.error if verbose: if result.warnings: progress_bar.write(result.warnings.strip()) progress_bar.set_postfix_str( f"{self.var_1_name}: {self.var_1_list[idx]:.4f}" ) progress_bar.update(1) for idx in range(len(self.var_1_list)): if errors[idx] is not None: print( f"Warning: Sim errored out at {self.var_1_name}={self.var_1_list[idx]:.4f}\n" f"Data is incorrect and graphing may produce unpredictable results.\n" f"{errors[idx]}" ) return SweepResults1Var( self.sweep_data, self.dependencies, ) except Exception as e: progress_bar.write(f"Sweep failed: {str(e)}") raise finally: progress_bar.close()