import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool
from typing import Callable, Dict
import numpy as np
from tqdm import tqdm
from ..compsim import Competition
from ..compsim.competition_factory import from_data
from ..compsim.models import CompetitionData
from .constants import *
from .models import (
SweepData1D,
SweepParamConfig,
SweepProcessInput1D,
SweepProcessOutput1D,
)
from .sweep_results_1var import SweepResults1Var
from .utils import create_steps
def _process_sweep_item(input_model: SweepProcessInput1D) -> SweepProcessOutput1D:
"""
Process a single instance of a competition in an isolated separate process.
Parameters
----------
input_model : SweepProcessInput1D
SweepProcessInput1D containing all input parameters
Returns
-------
SweepProcessOutput1D
Results and metadata from the sweep item
Notes
-----
This function is used as part of a 1D sweep.
"""
try:
# Capture warnings in this process
string_buffer = StringIO()
old_stdout = sys.stdout
sys.stdout = string_buffer
# Create a new Competition instance from the Pydantic data model
comp = from_data(input_model.comp_data)
try:
comp.mycar.modify_params(input_model.var_1_name, input_model.var_1_value)
for k, v in input_model.dep_vals.items():
comp.mycar.modify_params(k, v)
warnings = string_buffer.getvalue()
comp_res = comp.run()
return SweepProcessOutput1D(
idx=input_model.idx,
accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
warnings=warnings,
error=None,
)
finally:
sys.stdout = old_stdout
except Exception as e:
return SweepProcessOutput1D(
idx=input_model.idx,
error=f"Process error: {str(e)}\n{traceback.format_exc()}",
accel_pts=0,
skidpad_pts=0,
autoX_pts=0,
endurance_pts=0,
efficiency_pts=0,
accel_t=0,
skidpad_t=0,
autoX_t=0,
endurance_t=0,
warnings="",
)
[docs]
class Sweeper1D:
"""
One-dimensional parameter sweeper.
"""
def __init__(
self,
comp_data: CompetitionData,
var_1: SweepParamConfig,
dependencies: Dict[str, Callable] = {},
):
self.comp_data = comp_data
self.comp = from_data(comp_data)
self.var_1_name = var_1.name
self.var_1_list = create_steps(var_1.min, var_1.max, var_1.steps)
# name/function pairs
self.dependencies = dependencies
# name/list of values
self.dep_values: Dict[str, List[float]] = {}
for k, func in self.dependencies.items():
# Build out list where (idx) corresponds to a dependent param's value with var_1[idx]
adj_values = []
for idx in range(len(self.var_1_list)):
adj_values.append(func(self.var_1_list[idx]))
self.dep_values[k] = adj_values
# Initialize 1D arrays for each event to store results.
self.sweep_data = SweepData1D.create(self.var_1_name, self.var_1_list)
[docs]
def sweep(self, verbose=False, num_processes=None):
"""
Run a one-dimensional parameter sweep using multiprocessing.
Parameters
----------
verbose : bool, optional
Whether to show detailed progress (default is False)
num_processes : int, optional
Number of processes to use. Defaults to CPU count.
Returns
-------
SweepResults1Var
Results of the 1D sweep operation
"""
if num_processes is None:
num_processes = mp.cpu_count()
total_iterations = len(self.var_1_list)
print(
f"Running 1D Sweep for {total_iterations} competition simulations using {num_processes} processes."
)
# Create progress bar
progress_bar = tqdm(
total=total_iterations,
desc="Running simulations",
unit="sim",
dynamic_ncols=True,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
)
# Prepare arguments for multiprocessing
flattened_sweep_inputs = []
for idx, x in enumerate(self.var_1_list):
# Create key/val pair for the dependent param values at var_1[idx]
single_point_dep_vals = {}
for k, l in self.dep_values.items():
single_point_dep_vals[k] = l[idx]
flattened_sweep_inputs.append(
SweepProcessInput1D(
comp_data=self.comp_data,
var_1_name=self.var_1_name,
var_1_value=x,
dep_vals=single_point_dep_vals,
idx=idx,
)
)
errors = [None] * len(self.var_1_list)
try:
# Create process pool and run simulations
with Pool(processes=num_processes) as pool:
for result in pool.imap_unordered(
_process_sweep_item, flattened_sweep_inputs
):
# Update results at specific indices to maintain order
idx = result.idx
self.sweep_data.accel_pts[idx] = result.accel_pts
self.sweep_data.skidpad_pts[idx] = result.skidpad_pts
self.sweep_data.autoX_pts[idx] = result.autoX_pts
self.sweep_data.endurance_pts[idx] = result.endurance_pts
self.sweep_data.efficiency_pts[idx] = result.efficiency_pts
self.sweep_data.accel_t[idx] = result.accel_t
self.sweep_data.skidpad_t[idx] = result.skidpad_t
self.sweep_data.autoX_t[idx] = result.autoX_t
self.sweep_data.endurance_t[idx] = result.endurance_t
errors[idx] = result.error
if verbose:
if result.warnings:
progress_bar.write(result.warnings.strip())
progress_bar.set_postfix_str(
f"{self.var_1_name}: {self.var_1_list[idx]:.4f}"
)
progress_bar.update(1)
for idx in range(len(self.var_1_list)):
if errors[idx] is not None:
print(
f"Warning: Sim errored out at {self.var_1_name}={self.var_1_list[idx]:.4f}\n"
f"Data is incorrect and graphing may produce unpredictable results.\n"
f"{errors[idx]}"
)
return SweepResults1Var(
self.sweep_data,
self.dependencies,
)
except Exception as e:
progress_bar.write(f"Sweep failed: {str(e)}")
raise
finally:
progress_bar.close()