From 0cd1186518f0b226d2dedb8cff1aa9b872da74f2 Mon Sep 17 00:00:00 2001 From: Manda Kausthubh Date: Sat, 31 May 2025 00:31:00 +0530 Subject: [PATCH] Added basic SAASBO --- bayes_opt/bayesian_optimization.py | 234 +++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) diff --git a/bayes_opt/bayesian_optimization.py b/bayes_opt/bayesian_optimization.py index 2cad5b8f..aab55ecf 100644 --- a/bayes_opt/bayesian_optimization.py +++ b/bayes_opt/bayesian_optimization.py @@ -13,10 +13,14 @@ from typing import TYPE_CHECKING, Any from warnings import warn +import torch +import pyro import numpy as np from scipy.optimize import NonlinearConstraint from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import Matern +import pyro.distributions as dist +from pyro.infer.mcmc import NUTS, MCMC from bayes_opt import acquisition from bayes_opt.constraint import ConstraintModel @@ -442,3 +446,233 @@ def load_state(self, path: str | PathLike[str]) -> None: state["random_state"]["cached_gaussian"], ) self._random_state.set_state(random_state_tuple) + + +class SAASBO(BayesianOptimization): + """Sparsity-Aware Acquisition for Bayesian Optimization (SAASBO). + + This class extends BayesianOptimization to implement SAASBO, which uses a + Gaussian Process with a horseshoe prior on the kernel length scales to promote + sparsity in high-dimensional optimization problems. It uses MCMC for fully + Bayesian inference over the GP hyperparameters. + + Additional Parameters + -------------------- + num_samples: int, optional (default=500) + Number of MCMC samples to draw from the GP posterior. + warmup_steps: int, optional (default=500) + Number of warmup steps for MCMC sampling. + thinning: int, optional (default=16) + Thinning factor for MCMC samples to reduce autocorrelation. + """ + + def __init__( + self, + f: Callable[..., float] | None, + pbounds: Mapping[str, tuple[float, float]], + acquisition_function: AcquisitionFunction | None = None, + constraint: Optional[NonlinearConstraint] = None, + random_state: int | RandomState | None = None, + verbose: int = 2, + bounds_transformer: Optional[DomainTransformer] = None, + allow_duplicate_points: bool = False, + num_samples: int = 500, + warmup_steps: int = 500, + thinning: int = 16, + ): + # Initialize the parent class + super().__init__( + f=f, + pbounds=pbounds, + acquisition_function=acquisition_function, + constraint=constraint, + random_state=random_state, + verbose=verbose, + bounds_transformer=bounds_transformer, + allow_duplicate_points=allow_duplicate_points, + ) + + # SAASBO-specific parameters + self.num_samples = num_samples + self.warmup_steps = warmup_steps + self.thinning = thinning + self._random_state = ensure_rng(random_state) + + # Override the default acquisition function to Expected Improvement if not specified + if acquisition_function is None: + self._acquisition_function = acquisition.ExpectedImprovement( + xi=0.01, random_state=self._random_state + ) + + # Remove the default GP regressor, as we'll use a Pyro-based GP + self._gp = None + self._mcmc_samples = None + + def _define_gp_model(self, X: torch.Tensor, y: torch.Tensor) -> Callable: + """Define the Pyro GP model with a horseshoe prior on length scales.""" + def gp_model(X: torch.Tensor, y: torch.Tensor): + # Kernel hyperparameters + outputscale = pyro.sample("outputscale", dist.LogNormal(0.0, 1.0)) + noise = pyro.sample("noise", dist.LogNormal(-2.0, 1.0)) + + # Horseshoe prior on length scales for each dimension + dim = X.shape[1] + tau = pyro.sample("tau", dist.HalfCauchy(0.1)) + beta = pyro.sample("beta", dist.HalfCauchy(torch.ones(dim))) + lengthscale = tau * beta + + # Matern 5/2 kernel with horseshoe length scales + kernel = pyro.contrib.gp.kernels.Matern52( + input_dim=dim, + lengthscale=lengthscale, + variance=outputscale, + ) + + # Define the GP + gpr = pyro.contrib.gp.models.GPRegression( + X=X, + y=y, + kernel=kernel, + noise=noise, + ) + + # Sample the mean + mean = pyro.sample("mean", dist.Normal(0.0, 1.0)) + gpr.mean = mean + return gpr + + return gp_model + + def _fit_gp(self) -> None: + """Fit the GP model using MCMC to sample from the posterior.""" + if len(self._space) == 0: + return + + # Convert data to PyTorch tensors + X = torch.tensor(self._space.params, dtype=torch.float64) + y = torch.tensor(self._space.target, dtype=torch.float64) + + # Define the GP model + gp_model = self._define_gp_model(X, y) + + # Set up MCMC with NUTS + nuts_kernel = NUTS(gp_model) + mcmc = MCMC( + kernel=nuts_kernel, + num_samples=self.num_samples, + warmup_steps=self.warmup_steps, + num_chains=1, + ) + + # Run MCMC + mcmc.run(X, y) + + # Get samples + self._mcmc_samples = mcmc.get_samples() + + def suggest(self) -> dict[str, float | np.ndarray]: + """Suggest a promising point to probe next using SAASBO. + + This method averages the acquisition function over MCMC samples of the GP. + """ + if len(self._space) == 0: + return self._space.array_to_params(self._space.random_sample(random_state=self._random_state)) + + # Fit the GP model with MCMC if not already done + if self._mcmc_samples is None: + self._fit_gp() + + # Generate candidate points (e.g., using random sampling or a grid) + n_candidates = 1000 + candidates = self._space.random_sample(n_candidates, random_state=self._random_state) + candidates_tensor = torch.tensor(candidates, dtype=torch.float64) + + # Initialize acquisition values + acq_values = torch.zeros(n_candidates, dtype=torch.float64) + + # Average acquisition function over MCMC samples + for i in range(0, self.num_samples, self.thinning): + # Extract hyperparameters for this sample + outputscale = self._mcmc_samples["outputscale"][i] + noise = self._mcmc_samples["noise"][i] + lengthscale = self._mcmc_samples["lengthscale"][i] + mean = self._mcmc_samples["mean"][i] + + # Define the GP model for this sample + kernel = pyro.contrib.gp.kernels.Matern52( + input_dim=candidates_tensor.shape[1], + lengthscale=lengthscale, + variance=outputscale, + ) + gp = pyro.contrib.gp.models.GPRegression( + X=torch.tensor(self._space.params, dtype=torch.float64), + y=torch.tensor(self._space.target, dtype=torch.float64), + kernel=kernel, + noise=noise, + mean=mean, + ) + + # Compute acquisition function for candidates + acq = self._acquisition_function.evaluate( + candidates=candidates_tensor, + gp=gp, + target_space=self._space, + ) + acq_values += acq / (self.num_samples // self.thinning) + + # Select the candidate with the highest acquisition value + best_idx = torch.argmax(acq_values) + suggestion = candidates[best_idx] + + return self._space.array_to_params(suggestion) + + def maximize(self, init_points: int = 5, n_iter: int = 25) -> None: + """Maximize the target function using SAASBO. + + Parameters + ---------- + init_points: int, optional (default=5) + Number of random points to probe before starting the optimization. + n_iter: int, optional (default=25) + Number of iterations to perform. + """ + self.logger.log_optimization_start(self._space.keys) + self._prime_queue(init_points) + + iteration = 0 + while self._queue or iteration < n_iter: + try: + x_probe = self._queue.popleft() + except IndexError: + x_probe = self.suggest() + iteration += 1 + self.probe(x_probe, lazy=False) + + # Refit the GP after each new observation + self._fit_gp() + + if self._bounds_transformer and iteration > 0: + self.set_bounds(self._bounds_transformer.transform(self._space)) + + self.logger.log_optimization_end() + + def set_gp_params(self, **params: Any) -> None: + """Set parameters for the SAASBO GP model. + + Parameters + ---------- + num_samples: int, optional + Number of MCMC samples. + warmup_steps: int, optional + Number of warmup steps for MCMC. + thinning: int, optional + Thinning factor for MCMC samples. + """ + if "num_samples" in params: + self.num_samples = params.pop("num_samples") + if "warmup_steps" in params: + self.warmup_steps = params.pop("warmup_steps") + if "thinning" in params: + self.thinning = params.pop("thinning") + if params: + self.logger.warning(f"Ignored unknown parameters: {params}") \ No newline at end of file