Source code for paref.moo_algorithms.minimizer.gpr_minimizer

from time import sleep
from typing import Callable

import numpy as np
from scipy.optimize import differential_evolution
from warnings import warn

from paref.blackbox_functions.design_space.bounds import Bounds
from paref.interfaces.moo_algorithms.blackbox_function import BlackboxFunction
from paref.interfaces.moo_algorithms.paref_moo import ParefMOO, CompositionWithParetoReflection
from paref.moo_algorithms.minimizer.surrogates.gpr import GPR
from paref.pareto_reflections.minimize_g import MinGParetoReflection
from paref.pareto_reflections.operations.compose_reflections import ComposeReflections


[docs] class DifferentialEvolution: def __init__(self, display=False): self.display = display self._number_evaluations_last_call = None
[docs] def __call__( self, function: Callable, upper_bounds: np.ndarray, lower_bounds: np.ndarray, max_iter: int = 300, ) -> np.ndarray: t_initial = (upper_bounds + lower_bounds) / 2 res = differential_evolution( func=function, x0=t_initial, disp=self.display, tol=1e-5, bounds=[ (lower_bounds[i], upper_bounds[i]) for i in range(len(lower_bounds)) ], maxiter=max_iter, ) self.result = res self._number_evaluations_last_call = res.nfev return res.x
@property def number_evaluations_last_call(self): return self._number_evaluations_last_call
[docs] def calculate_optimal_scaling_x(fun, blackbox_function, max_iter_minimizer: int = 500): # scale each component to [0,1] dim_f = len(fun(blackbox_function.design_space.upper_bounds)) minimizer = DifferentialEvolution() x_min = np.zeros(dim_f) x_max = np.zeros(dim_f) for i in range(len(x_min)): res_i_min = minimizer( function=lambda x: fun(x)[i], max_iter=max_iter_minimizer, upper_bounds=blackbox_function.design_space.upper_bounds, lower_bounds=blackbox_function.design_space.lower_bounds, ) res_i_max = minimizer( function=lambda x: -fun(x)[i], max_iter=max_iter_minimizer, upper_bounds=blackbox_function.design_space.upper_bounds, lower_bounds=blackbox_function.design_space.lower_bounds, ) x_min[i] = fun(res_i_min)[i] x_max[i] = fun(res_i_max)[i] return lambda x: (x - x_min) / (x_max - x_min)
[docs] def calculate_optimal_scaling_g(fun, g, blackbox_function, max_iter_minimizer: int = 500): # Scale g and each component to [0,1] minimizer = DifferentialEvolution() res_g = minimizer( function=lambda x: g(fun(x)), max_iter=max_iter_minimizer, upper_bounds=blackbox_function.design_space.upper_bounds, lower_bounds=blackbox_function.design_space.lower_bounds, ) res_g_max = minimizer( function=lambda x: -g(fun(x)), max_iter=max_iter_minimizer, upper_bounds=blackbox_function.design_space.upper_bounds, lower_bounds=blackbox_function.design_space.lower_bounds, ) xg_min = fun(res_g) gxg_min = g(xg_min) xg_max = fun(res_g_max) gxg_max = g(xg_max) return lambda x: (x - gxg_min) / (gxg_max - gxg_min)
[docs] class GPRMinimizer(ParefMOO): """Minimize any function by approximating it with a GPR and minimize the (computationally cheap) GPR .. note:: This minimizer should be used in setups where the blackbox function is computationally expensive and only a few initial samples are available. How it works ------------ A `GPR <https://en.wikipedia.org/wiki/Kriging>`_ is trained on the evaluations of the blackbox function. Then, the trained GPR is minimized by a `differential evolution algorithm <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.differential_evolution.html>`_. If a multi-dimensional blackbox is composed with a (scalar valued) Pareto reflection, then, for each component a GPR is trained and the composition of the trained GPRs with the Pareto reflection is minimized. .. warning:: This minimizer requires a number of initial evaluations in order to perform well. If the number of evaluations is below some threshold (default: 20), a `latin hypercube sampling <https://en.wikipedia.org/wiki/Latin_hypercube_samplingL>`_ is performed before the optimizer starts. Per construction a minimum number of 20 initial evaluations is required. In addition, this minimizer requires the design space to be a cube, i.e. characterized by bounds. """ def __init__(self, max_iter_minimizer: int = 500, training_iter: int = 2000, learning_rate: float = 0.05, min_distance_to_evaluated_points: float = 2e-2, ): """Initialize the algorithms hyperparameters Parameters ---------- max_iter_minimizer : int default 100 maximum number of iterations of the differential evolution algorithm training_iter : int default 2000 maximum training iterations of the GPR(s) learning_rate : float default 0.05 learning rate of the training of the GPR(s) min_required_evaluations : int default 20 minimum number of evaluations required for the training (must be greater or equal than 20) min_distance_to_evaluated_points : float default 2e-2 required minimum distance to already evaluated points """ self._minimizer = DifferentialEvolution() self._max_iter_minimizer = max_iter_minimizer self._training_iter = training_iter self._learning_rate = learning_rate self._min_distance_to_evaluated_points = min_distance_to_evaluated_points self._gpr = None
[docs] def apply_moo_operation(self, blackbox_function: BlackboxFunction, ) -> None: """Apply moo operation constructed as above Parameters ---------- blackbox_function : BlackboxFunction blackbox function to which algorithm is applied """ # TBA: when found points are too close stop! # TBA: control mechanism: when algo doesn't work give message about what went wrong # TBA: monitoring: stop time, evaluations found, if training process of gpr converged, all with hints if len(blackbox_function.y) < 20: raise ValueError('Blackbox function must have at least 20 evaluations! Apply the latin hypercube sampling ' '(blackbox_function.perform_lhc(n=20)) first!') gpr = GPR(training_iter=self._training_iter, learning_rate=self._learning_rate, ) base_blackbox_function = blackbox_function pareto_reflections = [] while isinstance(base_blackbox_function, CompositionWithParetoReflection): pareto_reflections.append(base_blackbox_function._pareto_reflection) base_blackbox_function = base_blackbox_function._blackbox_function train_x = base_blackbox_function.x train_y = base_blackbox_function.y print('\n==========================================================' '\n==========================================================') print( 'Training...\n' ) sleep(0.1) # ensure that the print statement is displayed before the training starts gpr.train(train_x=train_x, train_y=train_y) if np.any(gpr.model_convergence > 0.1): warn( 'GPRs may have not converged! \n' 'Try more training iterations (training_iter parameter).' 'You can check the convergence of the training by self._gpr.plot_loss().', RuntimeWarning) sleep(1) self._gpr = gpr if len(pareto_reflections) != 0: pareto_reflection = pareto_reflections[0] for reflection in pareto_reflections[1:]: pareto_reflection = ComposeReflections(reflection, pareto_reflection) pareto_reflection = pareto_reflections[-1] # calculate optimal scaling for first pareto reflection if isinstance(pareto_reflections[-1], MinGParetoReflection): print('\nCalculating optimal scaling...') pareto_reflections[-1].scaling_x = calculate_optimal_scaling_x(lambda x: gpr(x), blackbox_function) pareto_reflections[-1].scaling_g = calculate_optimal_scaling_g(lambda x: gpr(x), pareto_reflections[-1].g, blackbox_function) pareto_reflections[-1]._epsilon = 2e-2 if len(pareto_reflections) > 1: for i in range(1, len(pareto_reflections) + 1): # calculate optimal scaling if pareto reflection is a MinGParetoReflection if isinstance(pareto_reflections[-i], MinGParetoReflection): print('\nCalculating optimal scaling...') pareto_reflections[-i].scaling_x = calculate_optimal_scaling_x( lambda x: pareto_reflection(gpr(x)), blackbox_function) pareto_reflections[-i].scaling_g = calculate_optimal_scaling_g( lambda x: pareto_reflection(gpr(x)), pareto_reflections[-i].g, blackbox_function) pareto_reflections[-i]._epsilon = 2e-2 pareto_reflection = ComposeReflections(pareto_reflection, pareto_reflections[-i]) fun = lambda x: pareto_reflection(gpr(x)) else: fun = lambda x: gpr(x) print('\nOptimization...') if isinstance(blackbox_function.design_space, Bounds): res = self._minimizer( function=fun, max_iter=self._max_iter_minimizer, upper_bounds=blackbox_function.design_space.upper_bounds, lower_bounds=blackbox_function.design_space.lower_bounds, ) else: raise ValueError('Design space property of blackbox function must be an instance of Bounds!') print( f'\n Found Pareto point: \n x={res} ' f'\n prediction={gpr(res)} ' f'\n standard deviation={gpr.std(res)}') if np.any([np.all(dominated) for dominated in (gpr(res) >= gpr(blackbox_function.x))]): warn( 'Optimizer did not find a Pareto point! \n' 'Try more minimizer iterations (max_iter_minimizer).', RuntimeWarning) sleep(1) # if np.all(pareto_reflection(gpr(res)) >= pareto_reflection(blackbox_function.y[0])): # print('\nNo Pareto point was found. Algorithmic search stopped.') # if np.any(np.linalg.norm(gpr(res) - np.array( # [gpr(x) for x in blackbox_function.x]), axis=1) <= self._min_distance_to_evaluated_points): # print('\nFound Pareto point is too close to some already evaluated point.') print('\nEvaluating blackbox function...') blackbox_function(res) print('Value of blackbox function: ', base_blackbox_function.y[-1]) print('Difference to estimation: ', gpr(res) - base_blackbox_function.y[-1], '\n') if base_blackbox_function.y[-1] not in base_blackbox_function.pareto_front: warn( 'Found Point is not Pareto optimal! \n' 'Either the optimization converged or the optimization failed. Check the convergence by looking at ' 'the difference between the last evaluations (blackbox_function.y).' 'You can check the convergence of the training by self._gpr.plot_loss().', RuntimeWarning) sleep(1)
@property def supported_codomain_dimensions(self) -> int: # TBA: dimensionality check and list of int return 1