From 1e51b283060d3fe18d30db7c75fea8034d26e138 Mon Sep 17 00:00:00 2001 From: till-m Date: Tue, 18 Nov 2025 13:20:31 +0100 Subject: [PATCH] Add `.predict` function to optimizer --- bayes_opt/bayesian_optimization.py | 73 ++++++- tests/test_bayesian_optimization.py | 306 ++++++++++++++++++++++++---- 2 files changed, 332 insertions(+), 47 deletions(-) diff --git a/bayes_opt/bayesian_optimization.py b/bayes_opt/bayesian_optimization.py index 8afaedaf..6816eddd 100644 --- a/bayes_opt/bayesian_optimization.py +++ b/bayes_opt/bayesian_optimization.py @@ -175,6 +175,75 @@ def res(self) -> list[dict[str, Any]]: """ return self._space.res() + def predict( + self, params: dict[str, Any] | list[dict[str, Any]], return_std=False, return_cov=False, fit_gp=True + ) -> tuple[float | NDArray[Float], float | NDArray[Float]]: + """Predict the target function value at given parameters. + + Parameters + --------- + params: dict or list + The parameters where the prediction is made. + + return_std: bool, optional(default=True) + If True, the standard deviation of the prediction is returned. + + return_cov: bool, optional(default=False) + If True, the covariance of the prediction is returned. + + fit_gp: bool, optional(default=True) + If True, the internal Gaussian Process model is fitted before + making the prediction. + + Returns + ------- + mean: float or np.ndarray + The predicted mean of the target function at the given parameters. + + std_or_cov: float or np.ndarray + The predicted standard deviation or covariance of the target function + at the given parameters. + """ + if isinstance(params, list): + # convert list of dicts to 2D array + params_array = np.array([self._space.params_to_array(p) for p in params]) + single_param = False + elif isinstance(params, dict): + params_array = self._space.params_to_array(params).reshape(1, -1) + single_param = True + + if fit_gp: + if len(self._space) == 0: + msg = ( + "The Gaussian Process model cannot be fitted with zero observations. To use predict(), " + "without fitting the GP, set fit_gp=False. The predictions will then be made using the " + "GP prior." + ) + raise RuntimeError(msg) + self.acquisition_function._fit_gp(self._gp, self._space) + + res = self._gp.predict(params_array, return_std=return_std, return_cov=return_cov) + + if return_std or return_cov: + mean, std_or_cov = res + else: + mean = res + + if not single_param and mean.ndim == 0: + mean = np.atleast_1d(mean) + # ruff complains when nesting conditionals, so this three-way split is necessary + if not single_param and (return_std or return_cov) and std_or_cov.ndim == 0: + std_or_cov = np.atleast_1d(std_or_cov) + + if single_param and mean.ndim > 0: + mean = mean[0] + if single_param and (return_std or return_cov) and std_or_cov.ndim > 0: + std_or_cov = std_or_cov[0] + + if return_std or return_cov: + return mean, std_or_cov + return mean + def register( self, params: ParamsType, target: float, constraint_value: float | NDArray[Float] | None = None ) -> None: @@ -303,8 +372,8 @@ def maximize(self, init_points: int = 5, n_iter: int = 25) -> None: probe based on the acquisition function. This means that the GP may not be fitted on all points registered to the target space when the method completes. If you intend to use the GP model after the - optimization routine, make sure to fit it manually, e.g. by calling - ``optimizer._gp.fit(optimizer.space.params, optimizer.space.target)``. + optimization routine, make sure to call predict() with fit_gp=True. + """ # Log optimization start self.logger.log_optimization_start(self._space.keys) diff --git a/tests/test_bayesian_optimization.py b/tests/test_bayesian_optimization.py index e1d39b31..1aef09c6 100644 --- a/tests/test_bayesian_optimization.py +++ b/tests/test_bayesian_optimization.py @@ -16,6 +16,53 @@ from bayes_opt.util import ensure_rng +class FixedPerimeterTriangleParameter(BayesParameter): + def __init__(self, name: str, bounds, perimeter) -> None: + super().__init__(name, bounds) + self.perimeter = perimeter + + @property + def is_continuous(self): + return True + + def random_sample(self, n_samples: int, random_state): + random_state = ensure_rng(random_state) + samples = [] + while len(samples) < n_samples: + samples_ = random_state.dirichlet(np.ones(3), n_samples) + samples_ = samples_ * self.perimeter # scale samples by perimeter + + samples_ = samples_[ + np.all((self.bounds[:, 0] <= samples_) & (samples_ <= self.bounds[:, 1]), axis=-1) + ] + samples.extend(np.atleast_2d(samples_)) + return np.array(samples[:n_samples]) + + def to_float(self, value): + return value + + def to_param(self, value): + return value * self.perimeter / sum(value) + + def kernel_transform(self, value): + return value * self.perimeter / np.sum(value, axis=-1, keepdims=True) + + def to_string(self, value, str_len: int) -> str: + len_each = (str_len - 2) // 3 + str_ = "|".join([f"{float(np.round(value[i], 4))}"[:len_each] for i in range(3)]) + return str_.ljust(str_len) + + @property + def dim(self): + return 3 # as we have three float values, each representing the length of one side. + + +def area_of_triangle(sides): + a, b, c = sides + s = np.sum(sides, axis=-1) # perimeter + return np.sqrt(s * (s - a) * (s - b) * (s - c)) + + def target_func(**kwargs): # arbitrary target func return sum(kwargs.values()) @@ -490,51 +537,6 @@ def test_save_load_w_domain_reduction(tmp_path): def test_save_load_w_custom_parameter(tmp_path): """Test saving and loading optimizer state with custom parameter types.""" - class FixedPerimeterTriangleParameter(BayesParameter): - def __init__(self, name: str, bounds, perimeter) -> None: - super().__init__(name, bounds) - self.perimeter = perimeter - - @property - def is_continuous(self): - return True - - def random_sample(self, n_samples: int, random_state): - random_state = ensure_rng(random_state) - samples = [] - while len(samples) < n_samples: - samples_ = random_state.dirichlet(np.ones(3), n_samples) - samples_ = samples_ * self.perimeter # scale samples by perimeter - - samples_ = samples_[ - np.all((self.bounds[:, 0] <= samples_) & (samples_ <= self.bounds[:, 1]), axis=-1) - ] - samples.extend(np.atleast_2d(samples_)) - return np.array(samples[:n_samples]) - - def to_float(self, value): - return value - - def to_param(self, value): - return value * self.perimeter / sum(value) - - def kernel_transform(self, value): - return value * self.perimeter / np.sum(value, axis=-1, keepdims=True) - - def to_string(self, value, str_len: int) -> str: - len_each = (str_len - 2) // 3 - str_ = "|".join([f"{float(np.round(value[i], 4))}"[:len_each] for i in range(3)]) - return str_.ljust(str_len) - - @property - def dim(self): - return 3 # as we have three float values, each representing the length of one side. - - def area_of_triangle(sides): - a, b, c = sides - s = np.sum(sides, axis=-1) # perimeter - return np.sqrt(s * (s - a) * (s - b) * (s - c)) - # Create parameter and bounds param = FixedPerimeterTriangleParameter( name="sides", bounds=np.array([[0.0, 1.0], [0.0, 1.0], [0.0, 1.0]]), perimeter=1.0 @@ -585,3 +587,217 @@ def area_of_triangle(sides): suggestion1 = optimizer.suggest() suggestion2 = new_optimizer.suggest() np.testing.assert_array_almost_equal(suggestion1["sides"], suggestion2["sides"], decimal=7) + + +def test_predict(): + """Test the predict method of the optimizer.""" + optimizer = BayesianOptimization(f=target_func, pbounds=PBOUNDS, random_state=1, verbose=0) + + # Register some points + optimizer.register(params={"p1": 1, "p2": 2}, target=3) + optimizer.register(params={"p1": 4, "p2": 5}, target=9) + optimizer.register(params={"p1": 7, "p2": 8}, target=15) + + # Points to predict + test_points = [{"p1": 2, "p2": 3}, {"p1": 5, "p2": 6}, {"p1": 8, "p2": 9}] + + # Perform predictions + means = optimizer.predict(test_points) + + # Check that means have correct length + assert len(means) == len(test_points) + + # Also test with return_std=True to get std + means, stds = optimizer.predict(test_points, return_std=True) + assert len(means) == len(test_points) + assert len(stds) == len(test_points) + + +def test_predict_example(): + """Test the predict method with known outputs.""" + optimizer = BayesianOptimization(f=target_func, pbounds=PBOUNDS, random_state=1, verbose=0) + + # Register some points + optimizer.register(params={"p1": 0, "p2": 0}, target=0) + optimizer.register(params={"p1": 10, "p2": 10}, target=20) + + # Point to predict + test_point = {"p1": 0, "p2": 0} + + # Perform prediction + means = optimizer.predict([test_point]) + assert np.isclose(means, 0, atol=1e-3) + + # Test with return_std=True + means, stds = optimizer.predict([test_point], return_std=True) + assert np.isclose(means, 0, atol=1e-3) + assert stds < 0.02 # std should be small but not tiny due to GP uncertainty + + test_point = {"p1": 10, "p2": 10} + means = optimizer.predict([test_point]) + assert np.isclose(means, 20, atol=1e-3) + + means, stds = optimizer.predict([test_point], return_std=True) + assert np.isclose(means, 20, atol=1e-3) + assert stds < 0.02 + + +def test_predict_no_fit(): + """Test the predict method when GP has not been fitted.""" + optimizer = BayesianOptimization(f=target_func, pbounds=PBOUNDS, random_state=1, verbose=0) + + # Perform prediction with fit_gp=True should raise error when no data + with pytest.raises(RuntimeError): + optimizer.predict({"p1": 5, "p2": 5}, fit_gp=True) + + # Predict without fitting GP using single dict - get scalar mean by default + mean = optimizer.predict({"p1": 5, "p2": 5}, fit_gp=False) + # Since GP is not fitted, mean should be close to 0 + assert np.isclose(mean, 0, atol=1e-4) + + # Get std when not fitting GP + mean, std = optimizer.predict({"p1": 5, "p2": 5}, fit_gp=False, return_std=True) + # Since GP is not fitted, std should be large + assert std > 1e-2 + + # Test with list - returns array + means = optimizer.predict([{"p1": 5, "p2": 5}], fit_gp=False) + # With a list, even single point returns array + assert len(means) == 1 + assert np.isclose(means[0], 0, atol=1e-4) + + +def test_predict_return_cov(): + """Test the predict method with return_cov=True.""" + optimizer = BayesianOptimization(f=target_func, pbounds=PBOUNDS, random_state=1, verbose=0) + + optimizer.register(params={"p1": 1, "p2": 2}, target=3) + optimizer.register(params={"p1": 4, "p2": 5}, target=9) + + test_points = [{"p1": 2, "p2": 3}, {"p1": 5, "p2": 6}] + + means, cov = optimizer.predict(test_points, return_cov=True) + + assert len(means) == len(test_points) + assert cov.shape == (len(test_points), len(test_points)) + + +def test_predict_integer_params(): + """Test the predict method with integer parameters.""" + int_pbounds = {"p1": (0, 10, int), "p2": (0, 10, int)} + optimizer = BayesianOptimization(f=target_func, pbounds=int_pbounds, random_state=1, verbose=0) + optimizer.register(params={"p1": 1, "p2": 2}, target=3) + optimizer.register(params={"p1": 4, "p2": 5}, target=9) + test_points = [{"p1": 2, "p2": 3}, {"p1": 5, "p2": 6}] + means = optimizer.predict(test_points) + assert len(means) == len(test_points) + + # Test with return_std + means, stds = optimizer.predict(test_points, return_std=True) + assert len(means) == len(test_points) + assert len(stds) == len(test_points) + + float_points = [{"p1": 2.7, "p2": 3.3}, {"p1": 5.9, "p2": 6.1}] + means_float = optimizer.predict(float_points) + assert len(means_float) == len(float_points) + + means_float, stds_float = optimizer.predict(float_points, return_std=True) + assert len(means_float) == len(float_points) + assert len(stds_float) == len(float_points) + # Check that rounding float inputs gives similar predictions as integer inputs + + for i in range(len(test_points)): + rounded_point = {k: round(v) for k, v in float_points[i].items()} + mean_rounded = optimizer.predict([rounded_point]) + assert np.isclose(means_float[i], mean_rounded, atol=1e-1) + + # Also check with std + for i in range(len(test_points)): + rounded_point = {k: round(v) for k, v in float_points[i].items()} + mean_rounded, std_rounded = optimizer.predict([rounded_point], return_std=True) + assert np.isclose(means_float[i], mean_rounded, atol=1e-1) + assert np.isclose(stds_float[i], std_rounded, atol=1e-1) + + +def test_predict_categorical_params(): + """Test the predict method with categorical parameters.""" + + def cat_target_func(param1: str, param2: str) -> float: + value_map = {"low": 1.0, "medium": 2.0, "high": 3.0} + return value_map[param1] + value_map[param2] + + cat_pbounds = {"param1": ["low", "medium", "high"], "param2": ["low", "medium", "high"]} + + optimizer = BayesianOptimization(f=cat_target_func, pbounds=cat_pbounds, random_state=1, verbose=0) + + optimizer.register(params={"param1": "low", "param2": "low"}, target=2.0) + optimizer.register(params={"param1": "high", "param2": "high"}, target=6.0) + + test_points = [{"param1": "medium", "param2": "medium"}, {"param1": "low", "param2": "high"}] + + means = optimizer.predict(test_points) + + assert len(means) == len(test_points) + assert np.isclose(means[0], 4.0, atol=1.0) + assert np.isclose(means[1], 4.0, atol=1.0) + + # Test with return_std + means, stds = optimizer.predict(test_points, return_std=True) + assert len(means) == len(test_points) + assert len(stds) == len(test_points) + assert stds[0] > 0.0 + assert stds[1] > 0.0 + + +def test_predict_no_points_registered(): + """Test the predict method when no points have been registered.""" + optimizer = BayesianOptimization(f=target_func, pbounds=PBOUNDS, random_state=1, verbose=0) + + test_points = [{"p1": 2, "p2": 3}, {"p1": 5, "p2": 6}] + + means = optimizer.predict(test_points, fit_gp=False) + + assert len(means) == len(test_points) + for mean in means: + assert np.isclose(mean, 0, atol=1e-4) + + # Test with return_std to get uncertainty + means, stds = optimizer.predict(test_points, fit_gp=False, return_std=True) + assert len(means) == len(test_points) + assert len(stds) == len(test_points) + for std in stds: + assert std > 1e-2 + + +def test_predict_custom_parameter(): + """Test the predict method with a custom parameter type.""" + + param = FixedPerimeterTriangleParameter( + name="sides", bounds=np.array([[0.0, 1.0], [0.0, 1.0], [0.0, 1.0]]), perimeter=1.0 + ) + pbounds = {"sides": param} + + optimizer = BayesianOptimization(f=area_of_triangle, pbounds=pbounds, random_state=1, verbose=0) + + optimizer.register( + params={"sides": np.array([0.3, 0.4, 0.3])}, target=area_of_triangle(np.array([0.3, 0.4, 0.3])) + ) + optimizer.register( + params={"sides": np.array([0.2, 0.5, 0.3])}, target=area_of_triangle(np.array([0.2, 0.5, 0.3])) + ) + + test_points = [{"sides": np.array([0.25, 0.5, 0.25])}, {"sides": np.array([0.4, 0.4, 0.2])}] + + means = optimizer.predict(test_points) + + assert len(means) == len(test_points) + for i, point in enumerate(test_points): + expected_area = area_of_triangle(point["sides"]) + assert np.isclose(means[i], expected_area, atol=0.1) + + # Test with return_std + means, stds = optimizer.predict(test_points, return_std=True) + assert len(means) == len(test_points) + assert len(stds) == len(test_points) + for i in range(len(test_points)): + assert stds[i] > 0.0