import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool
from typing import Callable, Dict
import numpy as np
from tqdm import tqdm
from ..compsim import Competition
from ..compsim.competition_factory import from_data
from ..compsim.models import CompetitionData
from .constants import *
from .models import (
SweepData2D,
SweepParamConfig,
SweepProcessInput2D,
SweepProcessOutput2D,
)
from .sweep_results_2var import SweepResults2Var
from .utils import create_steps
def _process_2d_sweep_item(input_model: SweepProcessInput2D) -> SweepProcessOutput2D:
"""
Process a single 2D sweep item in an isolated separate process.
Parameters
----------
input_model : SweepProcessInput2D
SweepProcessInput2D containing all input parameters
Returns
-------
SweepProcessOutput2D
Results and metadata from the 2D sweep item
"""
try:
# Capture warnings in this process
string_buffer = StringIO()
old_stdout = sys.stdout
sys.stdout = string_buffer
# Create a new Competition instance from the Pydantic data model
comp = from_data(input_model.comp_data)
try:
comp.mycar.modify_params(input_model.var_1_name, input_model.var_1_value)
comp.mycar.modify_params(input_model.var_2_name, input_model.var_2_value)
# Modify all dependent parameters
for k, v in input_model.dep_vals.items():
comp.mycar.modify_params(k, v)
warnings = string_buffer.getvalue()
comp_res = comp.run()
return SweepProcessOutput2D(
x_idx=input_model.x_idx,
y_idx=input_model.y_idx,
accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
warnings=warnings,
error=None,
)
finally:
sys.stdout = old_stdout
except Exception as e:
return SweepProcessOutput2D(
x_idx=input_model.x_idx,
y_idx=input_model.y_idx,
error=f"Process error: {str(e)}\n{traceback.format_exc()}",
accel_pts=0,
skidpad_pts=0,
autoX_pts=0,
endurance_pts=0,
efficiency_pts=0,
accel_t=0,
skidpad_t=0,
autoX_t=0,
endurance_t=0,
warnings="",
)
[docs]
class Sweeper2D:
"""
2D parameter sweeper.
"""
def __init__(
self,
comp_data: CompetitionData,
var_1: SweepParamConfig,
var_2: SweepParamConfig,
dependencies: Dict[str, Callable] = {},
):
self.comp_data = comp_data
self.comp = from_data(comp_data)
self.var_1_name = var_1.name
self.var_1_list = create_steps(var_1.min, var_1.max, var_1.steps)
self.var_2_name = var_2.name
self.var_2_list = create_steps(var_2.min, var_2.max, var_2.steps)
# name/function pairs
self.dependencies = dependencies
# name/matrix of values
self.dep_values: Dict[str, List[List[float]]] = {}
for k, func in self.dependencies.items():
# Build out matrix where (x_idx, y_idx) corresponds to a dependent param's value with var_1[x_idx] and var_2[y_idx]
adj_values = []
for x_idx in range(len(self.var_1_list)):
y_arr = []
for y_idx in range(len(self.var_2_list)):
y_arr.append(func(self.var_1_list[x_idx], self.var_2_list[y_idx]))
adj_values.append(y_arr)
self.dep_values[k] = adj_values
# Initialize 2D arrays for each event to store results. Dim 1 = var 1, dim 2 = var 2.
self.sweep_data = SweepData2D.create(
self.var_1_name, self.var_1_list, self.var_2_name, self.var_2_list
)
[docs]
def sweep(self, verbose=False, num_processes=None):
"""
Run a two-dimensional parameter sweep (grid sweep) using multiprocessing.
Parameters
----------
verbose : bool, optional
Whether to show detailed progress (default is False)
num_processes : int, optional
Number of processes to use. Defaults to CPU count.
Returns
-------
SweepResults2Var
Results of the 2D sweep operation
"""
if num_processes is None:
num_processes = mp.cpu_count()
total_iterations = len(self.var_1_list) * len(self.var_2_list)
print(
f"Running Grid Sweep for {total_iterations} competition simulations using {num_processes} processes."
)
# Create progress bar
progress_bar = tqdm(
total=total_iterations,
desc="Running simulations",
unit="sim",
dynamic_ncols=True,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
)
# Prepare arguments for multiprocessing
flattened_sweep_inputs = []
for x_idx, x in enumerate(self.var_1_list):
for y_idx, y in enumerate(self.var_2_list):
# Create key/val pair for the dependent param values at var_1[x_idx] and var_2[y_idx]
single_point_dep_vals = {}
for k, matrix in self.dep_values.items():
single_point_dep_vals[k] = matrix[x_idx][y_idx]
flattened_sweep_inputs.append(
SweepProcessInput2D(
comp_data=self.comp_data,
var_1_name=self.var_1_name,
var_1_value=x,
var_2_name=self.var_2_name,
var_2_value=y,
dep_vals=single_point_dep_vals,
x_idx=x_idx,
y_idx=y_idx,
)
)
errors = [[None] * len(self.var_2_list) for _ in range(len(self.var_1_list))]
try:
# Create process pool and run simulations
with Pool(processes=num_processes) as pool:
for result in pool.imap_unordered(
_process_2d_sweep_item, flattened_sweep_inputs
):
# Update results at specific indices to maintain order
x_idx = result.x_idx
y_idx = result.y_idx
self.sweep_data.accel_pts[x_idx][y_idx] = result.accel_pts
self.sweep_data.skidpad_pts[x_idx][y_idx] = result.skidpad_pts
self.sweep_data.autoX_pts[x_idx][y_idx] = result.autoX_pts
self.sweep_data.endurance_pts[x_idx][y_idx] = result.endurance_pts
self.sweep_data.efficiency_pts[x_idx][y_idx] = result.efficiency_pts
self.sweep_data.accel_t[x_idx][y_idx] = result.accel_t
self.sweep_data.skidpad_t[x_idx][y_idx] = result.skidpad_t
self.sweep_data.autoX_t[x_idx][y_idx] = result.autoX_t
self.sweep_data.endurance_t[x_idx][y_idx] = result.endurance_t
errors[x_idx][y_idx] = result.error
if result.warnings:
progress_bar.write(result.warnings.strip())
if verbose:
progress_bar.set_postfix_str(
f"{self.var_1_name}: {self.var_1_list[x_idx]:.4f}, {self.var_2_name}: {self.var_2_list[y_idx]:.4f}"
)
progress_bar.update(1)
# Verify results
for x_idx in range(len(self.var_1_list)):
for y_idx in range(len(self.var_2_list)):
if errors[x_idx][y_idx] is not None:
print(
f"Warning: Sim errored out at {self.var_1_name}={self.var_1_list[x_idx]:.4f}, "
f"{self.var_2_name}={self.var_2_list[y_idx]:.4f}.\n"
f"Data is incorrect and graphing may produce unpredictable results.\n"
f"{errors[x_idx][y_idx]}"
)
return SweepResults2Var(
self.sweep_data,
self.dependencies,
)
except Exception as e:
progress_bar.write(f"Sweep failed: {str(e)}")
raise
finally:
progress_bar.close()