import multiprocessing as mp
import sys
import traceback
from io import StringIO
from multiprocessing import Pool
import numpy as np
from tqdm import tqdm
from ..compsim import Competition
from ..compsim.competition_factory import from_data
from .array_sweep_results import *
from .constants import *
from .models import *
def _process_sweep_item(input_model: ArraySweepProcessInput) -> ArraySweepProcessOutput:
"""
Process a single sweep item in an isolated separate process.
Parameters
----------
input_model : ArraySweepProcessInput
ArraySweepProcessInput containing all input parameters
Returns
-------
ArraySweepProcessOutput
Results and metadata from the sweep item
"""
try:
# Capture warnings in this process
string_buffer = StringIO()
old_stdout = sys.stdout
sys.stdout = string_buffer
# Create a new Competition instance for this process
comp = from_data(input_model.comp_data)
try:
comp.mycar.modify_params(input_model.var_name, input_model.step)
# Plausibility checks now handled by Pydantic validators during model creation
plausible = True
warnings = string_buffer.getvalue()
comp_res = comp.run()
return ArraySweepProcessOutput(
var_idx=input_model.var_idx,
step_idx=input_model.step_idx,
plausible=plausible,
accel_pts=round(comp_res.accel.points, ROUNDING_PRECISION),
skidpad_pts=round(comp_res.skidpad.points, ROUNDING_PRECISION),
autoX_pts=round(comp_res.autoX.points, ROUNDING_PRECISION),
endurance_pts=round(comp_res.endurance.points, ROUNDING_PRECISION),
efficiency_pts=round(comp_res.efficiency_points, ROUNDING_PRECISION),
accel_t=round(comp_res.accel.tyour, ROUNDING_PRECISION),
skidpad_t=round(comp_res.skidpad.tyour, ROUNDING_PRECISION),
autoX_t=round(comp_res.autoX.tyour, ROUNDING_PRECISION),
endurance_t=round(comp_res.endurance.tyour, ROUNDING_PRECISION),
warnings=warnings,
error=None,
)
finally:
sys.stdout = old_stdout
except Exception as e:
return ArraySweepProcessOutput(
var_idx=input_model.var_idx,
step_idx=input_model.step_idx,
error=f"Process error: {str(e)}\n{traceback.format_exc()}",
plausible=False,
accel_pts=0,
skidpad_pts=0,
autoX_pts=0,
endurance_pts=0,
efficiency_pts=0,
accel_t=0,
skidpad_t=0,
autoX_t=0,
endurance_t=0,
warnings="",
)
[docs]
class ArraySweeper:
"""
Sweeps a list of variables at a list of percent steps.
Notes
-----
Each variable is swept independently. The percent steps are relative to the
default value of the variable. The 1D sweep calls this class with a single
variable and a list of percent steps.
"""
def __init__(
self,
comp_data: CompetitionData,
var_list: list,
var_percent_step_list: np.ndarray = SWEEP_VAR_PERCENT_STEP_LIST,
):
self.comp_data = comp_data
self.comp = from_data(comp_data)
self.var_percent_step_list = var_percent_step_list
self.sweep_inputs = []
sweep_outputs = []
for var in var_list:
base_value = self.comp.mycar.get_current_params(var)
sweep_values = np.round(base_value * (1 + var_percent_step_list), 4)
sweep_input_data = {
"name": var,
"sweep_values": sweep_values,
}
# Create strongly-typed SweepData1D
sweep_output = SweepData1D.create(var, sweep_values)
self.sweep_inputs.append(sweep_input_data)
sweep_outputs.append(sweep_output)
# Store percent steps (shared across all variables)
self.sweep_outputs = ArraySweepData(
percent_steps=var_percent_step_list, sweep_outputs=sweep_outputs
)
[docs]
def sweep(self, verbose=False, num_processes=None):
"""
Run the sweep using multiprocessing.
Parameters
----------
verbose : bool, optional
Whether to show detailed progress (default is False)
num_processes : int, optional
Number of processes to use. Defaults to CPU count.
Returns
-------
ArraySweepResults
Results of the sweep operation
"""
if num_processes is None:
num_processes = mp.cpu_count()
total_iterations = len(self.sweep_inputs) * len(self.var_percent_step_list)
print(
f"Running Array Sweeper for {total_iterations} competition simulations using {num_processes} processes."
)
# Create progress bar
progress_bar = tqdm(
total=total_iterations,
desc="Running simulations",
unit="sim",
dynamic_ncols=True,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
)
# Prepare arguments for multiprocessing - only pass necessary data
flattened_sweep_inputs = (
[]
) # This will become a 1D-array with length num_processes
for var_idx, var in enumerate(self.sweep_inputs):
for step_idx, step in enumerate(var["sweep_values"]):
flattened_sweep_inputs.append(
ArraySweepProcessInput(
comp_data=self.comp_data,
var_name=var["name"],
step=step,
var_idx=var_idx,
step_idx=step_idx,
)
)
try:
# Create process pool and run simulations
errors_log = [] # Track errors separately
with Pool(processes=num_processes) as pool:
for result in pool.imap_unordered(
_process_sweep_item, flattened_sweep_inputs
):
# Access the SweepOutputData for this variable
var_idx = result.var_idx
step_idx = result.step_idx
sweep_output = self.sweep_outputs.sweep_outputs[var_idx]
# Update results at specific indices to maintain order
sweep_output.plausible[step_idx] = result.plausible
sweep_output.accel_pts[step_idx] = result.accel_pts
sweep_output.skidpad_pts[step_idx] = result.skidpad_pts
sweep_output.autoX_pts[step_idx] = result.autoX_pts
sweep_output.endurance_pts[step_idx] = result.endurance_pts
sweep_output.efficiency_pts[step_idx] = result.efficiency_pts
sweep_output.accel_t[step_idx] = result.accel_t
sweep_output.skidpad_t[step_idx] = result.skidpad_t
sweep_output.autoX_t[step_idx] = result.autoX_t
sweep_output.endurance_t[step_idx] = result.endurance_t
# Track errors separately
if result.error is not None:
errors_log.append((var_idx, step_idx, result.error))
if verbose:
if result.warnings:
progress_bar.write(result.warnings.strip())
progress_bar.set_postfix_str(
f"Variable: {self.sweep_inputs[var_idx]['name']}, Step: {self.sweep_inputs[var_idx]['sweep_values'][step_idx]:.4f}"
)
progress_bar.update(1)
# Report any errors that occurred
for var_idx, step_idx, error in errors_log:
print(
f"Warning: Sim errored out for {self.sweep_inputs[var_idx]['name']} "
f"at value {self.sweep_inputs[var_idx]['sweep_values'][step_idx]:.4f}.\n"
f"Data is incorrect and graphing may produce unpredictable results."
)
print(error)
except Exception as e:
progress_bar.write(f"Sweep failed: {str(e)}")
raise
finally:
progress_bar.close()
return ArraySweepResults(self.sweep_outputs)