diff --git a/README.md b/README.md index 5f7811d..934d6be 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ ![build](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/deploy.yml?logo=github) ![docs](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/docs.yml?logo=materialformkdocs&logoColor=%2523cccccc&label=docs) ![tests](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/tests.yml?logo=github&logoColor=%2523cccccc&label=tests) -![Code Coverage](https://img.shields.io/badge/coverage-89%25-yellowgreen?logo=codecov) +![Code Coverage](https://img.shields.io/badge/coverage-86%25-yellowgreen?logo=codecov) [![Algorithm description](https://img.shields.io/badge/DOI-10.1002/nme.6958-blue)](https://doi.org/10.1002/nme.6958) Efficient framework for building surrogates of multidisciplinary systems using the adaptive multi-index stochastic collocation ([AMISC](https://onlinelibrary.wiley.com/doi/full/10.1002/nme.6958)) technique. diff --git a/src/amisc/component.py b/src/amisc/component.py index d0e7f74..d8c1992 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -40,9 +40,9 @@ from pydantic import BaseModel, ConfigDict, ValidationInfo, field_validator from typing_extensions import TypedDict -from amisc.interpolator import Interpolator, InterpolatorState, Lagrange +from amisc.interpolator import GPR, Interpolator, InterpolatorState, Lagrange from amisc.serialize import PickleSerializable, Serializable, StringSerializable, YamlSerializable -from amisc.training import SparseGrid, TrainingData +from amisc.training import SparseGrid, TrainingData, UncertaintySampling from amisc.typing import COORDS_STR_ID, LATENT_STR_ID, Dataset, MultiIndex from amisc.utils import ( _get_yaml_path, @@ -1213,8 +1213,17 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P # Training data is the same for all surrogate fidelity indices, given constant data fidelity design_list.append([]) continue - - design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], + if isinstance(self.interpolator, GPR) and isinstance(self.training_data, UncertaintySampling): + states = list(self.misc_states) + match_state = [tup for tup in states if tup[0] == a] + if match_state: + best_state = max(match_state, key=lambda x: sum(x[1]))[2].regressor.named_steps['gpr'] + else: + best_state = None + design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], + domains,best_state, weight_fcns) + else: + design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], domains, weight_fcns) design_pts, fc = to_model_dataset(design_pts, self.inputs, del_latent=True, **field_coords) @@ -1301,7 +1310,6 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P # Only for initial index which didn't come from the candidate set self.update_misc_coeff(IndexSet(s), index_set='test') self.active_set.update(s) - self.update_misc_coeff(neighbors, index_set='test') # neighbors will only ever pass through here once self.candidate_set.update(neighbors) diff --git a/src/amisc/training.py b/src/amisc/training.py index ddb1fc5..409f2f1 100644 --- a/src/amisc/training.py +++ b/src/amisc/training.py @@ -16,7 +16,8 @@ import numpy as np from numpy.typing import ArrayLike -from scipy.optimize import direct +from scipy.optimize import direct, minimize +from sklearn.gaussian_process.kernels import WhiteKernel from amisc.serialize import PickleSerializable, Serializable from amisc.typing import LATENT_STR_ID, Dataset, MultiIndex @@ -121,6 +122,8 @@ def from_dict(cls, config: dict) -> TrainingData: match method: case 'sparse-grid': return SparseGrid(**config) + case 'uncertainty-sampling': + return UncertaintySampling(**config) case other: raise NotImplementedError(f"Unknown training data method: {other}") @@ -516,3 +519,601 @@ def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, raise NotImplementedError(f"Unknown collocation method: {other}") return z_pts + +@dataclass +class UncertaintySampling(TrainingData, PickleSerializable): + """A class for storing training data in a sparse grid format. The `UncertaintySampling` class stores training points + by their coordinate location in a larger tensor-product grid, and obtains new training data by refining + a single 1d grid at a time. Refinement is done by Leja sampling for initial iterations, then using GP + uncertainty for subsequent iterations. + + !!! Note "MISC and sparse grids" + MISC itself can be thought of as an extension to the well-known sparse grid technique, so this class + readily integrates with the MISC implementation in `Component`. Sparse grids limit the curse + of dimensionality up to about `dim = 10-15` for the input space (which would otherwise be infeasible with a + normal full tensor-product grid of the same size). + + !!! Info "About points in a sparse grid" + A sparse grid approximates a full tensor-product grid $(N_1, N_2, ..., N_d)$, where $N_i$ is the number of grid + points along dimension $i$, for a $d$-dimensional space. Each point is uniquely identified in the sparse grid + by a list of indices $(j_1, j_2, ..., j_d)$, where $j_i = 0 ... N_i$. We refer to this unique identifier as a + "grid coordinate". In the `SparseGrid` data structure, these coordinates are used along with the `alpha` + fidelity index to uniquely locate the training data for a given multi-index pair. + + !!! Info "About Uncertainty Sampling" + Uncertainty Sampling is an active learning strategy that selects new training points based on the model's + uncertainty. This approach aims to improve the surrogate model's accuracy by focusing on areas of the + input space where the model is less certain about its predictions. The uncertainty is typically quantified + using sklearn's GaussianProcessRegressor, which provides both mean predictions and uncertainty estimates. + + :ivar collocation_rule: the collocation rule to use for generating new grid points (only 'leja' is supported) + :ivar knots_per_level: the number of grid knots/points per level in the `beta` fidelity multi-index + :ivar expand_latent_method: method for expanding latent grids, either 'round-robin' or 'tensor-product' + :ivar opt_args: extra arguments for the global 1d `direct` optimizer + :ivar betas: a set of all `beta` multi-indices that have been seen so far + :ivar x_grids: a `dict` of grid points for each 1d input dimension + :ivar yi_map: a `dict` of model outputs for each grid coordinate + :ivar yi_nan_map: a `dict` of imputed model outputs for each grid coordinate where the model failed (or gave nan) + :ivar error_map: a `dict` of error information for each grid coordinate where the model failed + :ivar latent_size: the number of latent coefficients for each variable (0 if scalar) + :ivar d_thresh: minimum distance threshold for new points (specified as fraction of input domain size) + :ivar thresh_relax: relaxation factor for minimum distance threshold in case no new points are found (threshold is + successively relaxed until number of points criteria is satisfied) + :ivar grid_resolution: number of sample points per dimension for estimating GP uncertainty + (specified as multiple of number of new points to add) + """ + MAX_IMPUTE_SIZE: ClassVar[int] = 10 # don't try to impute large arrays + + collocation_rule: str = 'leja' + knots_per_level: int = 2 + expand_latent_method: str = 'round-robin' # or 'tensor-product', for converting beta to latent grid sizes + opt_args: dict = field(default_factory=lambda: {'locally_biased': False, 'maxfun': 300}) # for leja optimizer + + betas: set[MultiIndex] = field(default_factory=set) + x_grids: dict[str, ArrayLike] = field(default_factory=dict) + yi_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, ArrayLike]]] = field(default_factory=dict) + yi_nan_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, ArrayLike]]] = field(default_factory=dict) + error_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, Any]]] = field(default_factory=dict) + latent_size: dict[str, int] = field(default_factory=dict) # keep track of latent grid sizes for each variable + d_thresh: float = 0.20 # minimum distance threshold for new points (as fraction of input domain) + thresh_relax: float = 0.5 # relaxation factor for minimum distance threshold if no new points are found + variance_criterion: str = 'local' # or 'global', for uncertainty sampling + grid_size: int = 100 # Number of monte carlo samples to evaluate global GP uncertainty + + def clear(self): + """Clear all training data.""" + self.betas.clear() + self.x_grids.clear() + self.yi_map.clear() + self.yi_nan_map.clear() + self.error_map.clear() + self.latent_size.clear() + + def get_by_coord(self, alpha: MultiIndex, coords: list, y_vars: list = None, skip_nan: bool = False): + """Get training data from the sparse grid for a given `alpha` and list of grid coordinates. Try to replace + `nan` values with imputed values. Skip any data points with remaining `nan` values if `skip_nan=True`. + + :param alpha: the model fidelity indices + :param coords: a list of grid coordinates to locate the `yi` values in the sparse grid data structure + :param y_vars: the keys of the outputs to return (if `None`, return all outputs) + :param skip_nan: skip any data points with remaining `nan` values if `skip_nan=True` (only for numeric outputs) + :returns: `dicts` of model inputs `xi_dict` and outputs `yi_dict` + """ + N = len(coords) + is_numeric = {} + is_singleton = {} + xi_dict = self._extract_grid_points(coords) + yi_dict = {} + + first_yi = next(iter(self.yi_map[alpha].values())) + if y_vars is None: + y_vars = first_yi.keys() + + for var in y_vars: + yi = np.atleast_1d(first_yi[var]) + is_numeric[var] = self._is_numeric(yi) + is_singleton[var] = self._is_singleton(yi) + yi_dict[var] = np.empty(N, dtype=np.float64 if is_numeric[var] and is_singleton[var] else object) + + for i, coord in enumerate(coords): + try: + yi_curr = self.yi_map[alpha][coord] + for var in y_vars: + yi = arr if (arr := self.yi_nan_map[alpha].get(coord, {}).get(var)) is not None else yi_curr[var] + yi_dict[var][i] = yi if is_singleton[var] else np.atleast_1d(yi) + + except KeyError as e: + raise KeyError(f"Can't access sparse grid data for alpha={alpha}, coord={coord}. " + f"Make sure the data has been set first.") from e + + # Delete nans if requested (only for numeric singleton outputs) + if skip_nan: + nan_idx = np.full(N, False) + for var in y_vars: + if is_numeric[var] and is_singleton[var]: + nan_idx |= np.isnan(yi_dict[var]) + + xi_dict = {k: v[~nan_idx] for k, v in xi_dict.items()} + yi_dict = {k: v[~nan_idx] for k, v in yi_dict.items()} + + return xi_dict, yi_dict # Both with elements of shape (N, ...) for N grid points + + def get(self, alpha: MultiIndex, beta: MultiIndex, y_vars: list[str] = None, skip_nan: bool = False): + """Get the training data from the sparse grid for a given `alpha` and `beta` pair.""" + return self.get_by_coord(alpha, list(self._expand_grid_coords(beta)), y_vars=y_vars, skip_nan=skip_nan) + + def set_errors(self, alpha: MultiIndex, beta: MultiIndex, coords: list, errors: list[dict]): + """Store error information in the sparse-grid for a given multi-index pair.""" + for coord, error in zip(coords, errors): + self.error_map[alpha][coord] = copy.deepcopy(error) + + def set(self, alpha: MultiIndex, beta: MultiIndex, coords: list, yi_dict: dict[str, ArrayLike]): + """Store model output `yi_dict` values. + + :param alpha: the model fidelity indices + :param beta: the surrogate fidelity indices + :param coords: a list of grid coordinates to locate the `yi` values in the sparse grid data structure + :param yi_dict: a `dict` of model output `yi` values + """ + for i, coord in enumerate(coords): # First dim of yi is loop dim aligning with coords + new_yi = {} + for var, yi in yi_dict.items(): + yi = np.atleast_1d(yi[i]) + new_yi[var] = (float(yi[0]) if self._is_numeric(yi) else yi[0]) if self._is_singleton(yi) else yi.tolist() # noqa: E501 + self.yi_map[alpha][coord] = copy.deepcopy(new_yi) + + def impute_missing_data(self, alpha: MultiIndex, beta: MultiIndex): + """Impute missing values in the sparse grid for a given multi-index pair by linear regression imputation.""" + imputer, xi_all, yi_all = None, None, None + + # only impute (small-length) numeric quantities + yi_dict = next(iter(self.yi_map[alpha].values())) + output_vars = [var for var in self._numeric_outputs(yi_dict) + if len(np.ravel(yi_dict[var])) <= self.MAX_IMPUTE_SIZE] + + for coord, yi_dict in self.yi_map[alpha].items(): + if any([np.any(np.isnan(yi_dict[var])) for var in output_vars]): + if imputer is None: + # Grab all 'good' interpolation points and train a simple linear regression fit + xi_all, yi_all = self.get(alpha, beta, y_vars=output_vars, skip_nan=True) + if len(xi_all) == 0 or len(next(iter(xi_all.values()))) == 0: + continue # possible if no good data has been set yet + + N = next(iter(xi_all.values())).shape[0] # number of grid points + xi_mat = np.concatenate([xi_all[var][:, np.newaxis] if len(xi_all[var].shape) == 1 else + xi_all[var] for var in xi_all.keys()], axis=-1) + yi_mat = np.concatenate([yi_all[var][:, np.newaxis] if len(yi_all[var].shape) == 1 else + yi_all[var].reshape((N, -1)) for var in output_vars], axis=-1) + + imputer = _RidgeRegression(alpha=1.0) + imputer.fit(xi_mat, yi_mat) + + # Run the imputer for this coordinate + x_interp = self._extract_grid_points(coord) + x_interp = np.concatenate([x_interp[var][:, np.newaxis] if len(x_interp[var].shape) == 1 else + x_interp[var] for var in x_interp.keys()], axis=-1) + y_interp = imputer.predict(x_interp) + + # Unpack the imputed value + y_impute = {} + start_idx = 0 + for var in output_vars: + var_shape = yi_all[var].shape[1:] or (1,) + end_idx = start_idx + int(np.prod(var_shape)) + yi = np.atleast_1d(y_interp[0, start_idx:end_idx]).reshape(var_shape) + nan_idx = np.isnan(np.atleast_1d(yi_dict[var])) + yi[~nan_idx] = np.atleast_1d(yi_dict[var])[~nan_idx] # Only keep imputed values where yi is nan + y_impute[var] = float(yi[0]) if self._is_singleton(yi) else yi.tolist() + start_idx = end_idx + + self.yi_nan_map[alpha][coord] = copy.deepcopy(y_impute) + + def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, gp_model, weight_fcns: dict = None): + """Refine the sparse grid for a given `alpha` and `beta` pair and given collocation rules. Return any new + grid points that do not have model evaluations saved yet. + + !!! Note + The `beta` multi-index is used to determine the number of collocation points in each input dimension. The + length of `beta` should therefore match the number of variables in `x_vars`. + """ + weight_fcns = weight_fcns or {} + + if gp_model is None: # Fallback to Leja sampling + # Initialize a sparse grid for beta=(0, 0, ..., 0) + if np.sum(beta) == 0: + if len(self.x_grids) == 0: + num_latent = {} + for var in input_domains: + if LATENT_STR_ID in var: + base_id = var.split(LATENT_STR_ID)[0] + num_latent[base_id] = 1 if base_id not in num_latent else num_latent[base_id] + 1 + else: + num_latent[var] = 0 + self.latent_size = num_latent + + new_pt = {} + domains = iter(input_domains.items()) + for grid_size in self.beta_to_knots(beta): + if isinstance(grid_size, int): # scalars + var, domain = next(domains) + new_pt[var] = self.collocation_1d(grid_size, domain, method=self.collocation_rule, + wt_fcn=weight_fcns.get(var, None), + opt_args=self.opt_args).tolist() + else: # latent coeffs + for s in grid_size: + var, domain = next(domains) + new_pt[var] = self.collocation_1d(s, domain, method=self.collocation_rule, + wt_fcn=weight_fcns.get(var, None), + opt_args=self.opt_args).tolist() + self.x_grids = new_pt + self.betas.add(beta) + self.yi_map.setdefault(alpha, dict()) + self.yi_nan_map.setdefault(alpha, dict()) + self.error_map.setdefault(alpha, dict()) + new_coords = list(self._expand_grid_coords(beta)) + return new_coords, self._extract_grid_points(new_coords) + + # Otherwise, refine the sparse grid + for beta_old in self.betas: + # Get the first lower neighbor in the sparse grid and refine the 1d grid if necessary + if self.is_one_level_refinement(beta_old, beta): + new_grid_size = self.beta_to_knots(beta) + inputs = zip(self.x_grids.keys(), self.x_grids.values(), input_domains.values()) + + for new_size in new_grid_size: + if isinstance(new_size, int): # scalar grid + var, grid, domain = next(inputs) + if len(grid) < new_size: + num_new_pts = new_size - len(grid) + self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, + opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() + else: # latent grid + for s_new in new_size: + var, grid, domain = next(inputs) + if len(grid) < s_new: + num_new_pts = s_new - len(grid) + self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, + opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() + break + + new_coords = [] + for coord in self._expand_grid_coords(beta): + if coord not in self.yi_map[alpha]: + # If we have not computed this grid coordinate yet + new_coords.append(coord) + + new_pts = self._extract_grid_points(new_coords) + + self.betas.add(beta) + return new_coords, new_pts + + else: + # Initialize a sparse grid for beta=(0, 0, ..., 0) + if np.sum(beta) == 0: + if len(self.x_grids) == 0: + num_latent = {} + for var in input_domains: + if LATENT_STR_ID in var: + base_id = var.split(LATENT_STR_ID)[0] + num_latent[base_id] = 1 if base_id not in num_latent else num_latent[base_id] + 1 + else: + num_latent[var] = 0 + self.latent_size = num_latent + + new_pt = {} + domains = iter(input_domains.items()) + for grid_size in self.beta_to_knots(beta): + if isinstance(grid_size, int): # scalars + var, domain = next(domains) + new_pt[var] = self.sampling_1d(var, grid_size, input_domains, gp_model).tolist() + else: # latent coeffs + for s in grid_size: + var, domain = next(domains) + new_pt[var] = self.sampling_1d(var, s, input_domains, gp_model).tolist() + self.x_grids = new_pt + self.betas.add(beta) + self.yi_map.setdefault(alpha, dict()) + self.yi_nan_map.setdefault(alpha, dict()) + self.error_map.setdefault(alpha, dict()) + new_coords = list(self._expand_grid_coords(beta)) + return new_coords, self._extract_grid_points(new_coords) + + # Otherwise, refine the sparse grid + for beta_old in self.betas: + # Get the first lower neighbor in the sparse grid and refine the 1d grid if necessary + if self.is_one_level_refinement(beta_old, beta): + new_grid_size = self.beta_to_knots(beta) + inputs = zip(self.x_grids.keys(), self.x_grids.values(), input_domains.values()) + + for new_size in new_grid_size: + if isinstance(new_size, int): # scalar grid + var, grid, domain = next(inputs) + if len(grid) < new_size: + num_new_pts = new_size - len(grid) + self.x_grids[var] = self.sampling_1d(var, num_new_pts, input_domains, + gp_model, grid).tolist() + else: # latent grid + for s_new in new_size: + var, grid, domain = next(inputs) + if len(grid) < s_new: + num_new_pts = s_new - len(grid) + self.x_grids[var] = self.sampling_1d(var, num_new_pts, input_domains, + gp_model, grid).tolist() + break + + new_coords = [] + for coord in self._expand_grid_coords(beta): + if coord not in self.yi_map[alpha]: + # If we have not computed this grid coordinate yet + new_coords.append(coord) + + new_pts = self._extract_grid_points(new_coords) + + self.betas.add(beta) + return new_coords, new_pts + + + def _extract_grid_points(self, coords: list[tuple] | tuple): + """Extract the `x` grid points located at `coords` from `x_grids` and return as the `pts` dictionary.""" + if not isinstance(coords, list): + coords = [coords] + pts = {var: np.empty(len(coords)) for var in self.x_grids} + + for k, coord in enumerate(coords): + grids = iter(self.x_grids.items()) + for idx in coord: + if isinstance(idx, int): # scalar grid point + var, grid = next(grids) + pts[var][k] = grid[idx] + else: # latent coefficients + for i in idx: + var, grid = next(grids) + pts[var][k] = grid[i] + + return pts + + def _expand_grid_coords(self, beta: MultiIndex): + """Iterable over all grid coordinates for a given `beta`, accounting for scalars and latent coefficients.""" + grid_size = self.beta_to_knots(beta) + grid_coords = [] + for s in grid_size: + if isinstance(s, int): # scalars + grid_coords.append(range(s)) + else: # latent coefficients + grid_coords.append(itertools.product(*[range(latent_size) for latent_size in s])) + + yield from itertools.product(*grid_coords) + + @staticmethod + def _is_singleton(arr: np.ndarray): + return len(arr.shape) == 1 and arr.shape[0] == 1 + + @staticmethod + def _is_numeric(arr: np.ndarray): + return np.issubdtype(arr.dtype, np.number) + + @classmethod + def _numeric_outputs(cls, yi_dict: dict[str, ArrayLike]) -> list[str]: + """Return a list of the output variables that have numeric data.""" + output_vars = [] + for var in yi_dict.keys(): + try: + if cls._is_numeric(np.atleast_1d(yi_dict[var])): + output_vars.append(var) + except Exception: + continue + return output_vars + + @staticmethod + def is_one_level_refinement(beta_old: tuple, beta_new: tuple) -> bool: + """Check if a new `beta` multi-index is a one-level refinement from a previous `beta`. + + !!! Example + Refining from `(0, 1, 2)` to the new multi-index `(1, 1, 2)` is a one-level refinement. But refining to + either `(2, 1, 2)` or `(1, 2, 2)` are not, since more than one refinement occurs at the same time. + + :param beta_old: the starting multi-index + :param beta_new: the new refined multi-index + :returns: whether `beta_new` is a one-level refinement from `beta_old` + """ + level_diff = np.array(beta_new, dtype=int) - np.array(beta_old, dtype=int) + ind = np.nonzero(level_diff)[0] + return ind.shape[0] == 1 and level_diff[ind] == 1 + + def beta_to_knots(self, beta: MultiIndex, knots_per_level: int = None, latent_size: dict = None, + expand_latent_method: str = None) -> tuple: + """Convert a `beta` multi-index to the number of knots per dimension in the sparse grid. + + :param beta: refinement level indices + :param knots_per_level: level-to-grid-size multiplier, i.e. number of new points (or knots) for each beta level + :param latent_size: the number of latent coefficients for each variable (0 if scalar); number of variables and + order should match the `beta` multi-index + :param expand_latent_method: method for expanding latent grids, either 'round-robin' or 'tensor-product' + :returns: the number of knots/points per dimension for the sparse grid + """ + knots_per_level = knots_per_level or self.knots_per_level + latent_size = latent_size or self.latent_size + expand_latent_method = expand_latent_method or self.expand_latent_method + + grid_size = [] + for i, (var, num_latent) in enumerate(latent_size.items()): + if num_latent > 0: + match expand_latent_method: + case 'round-robin': + if beta[i] == 0: + grid_size.append((1,) * num_latent) # initializes all latent grids to 1 + else: + latent_refine_idx = (beta[i] - 1) % num_latent + latent_refine_num = ((beta[i] - 1) // num_latent) + 1 + latent_beta = tuple([latent_refine_num] * (latent_refine_idx + 1) + + [latent_refine_num - 1] * (num_latent - latent_refine_idx - 1)) + latent_grid = [knots_per_level * latent_beta[j] + 1 for j in range(num_latent)] + grid_size.append(tuple(latent_grid)) + case 'tensor-product': + grid_size.append((knots_per_level * beta[i] + 1,) * num_latent) + case other: + raise NotImplementedError(f"Unknown method for expanding latent grids: {other}") + else: + grid_size.append(knots_per_level * beta[i] + 1) + + return tuple(grid_size) + + + def sampling_1d(self, var:str, N: int, input_domains:dict, gp_model, z_pts: np.ndarray = None) -> np.ndarray: + """Find the next `N` points in the 1d sequence of `z_pts` using GP uncertainty sampling. + + :param var: the variable to sample points for + :param N: number of new points to add to the sequence + :param input_domains: a `dict` of input variable domains + :param gp_model: a trained GaussianProcessRegressor model for estimating uncertainty + :param z_pts: current univariate sequence `(Nz,)`, start at middle of input_domains[var] if `None` + :returns: the univariate sequence `z_pts` augmented by `N` new points + """ + + if z_pts is None: + z_pts = np.array([0.5*(input_domains[var][0] + input_domains[var][1])]) + N = N - 1 + z_pts = np.atleast_1d(z_pts) + else: + if self.variance_criterion == 'local': + d_min = self.d_thresh * (input_domains[var][1] - input_domains[var][0]) + def J1(x): + # Local cost function + '''Optimize for highest variance at local subdomain''' + var_order = list(self.x_grids.keys()) + grids = [] + for v in var_order: + if v == var: + grids.append(np.atleast_1d(x)) + else: + grids.append(np.atleast_1d(self.x_grids[v])) + mesh = np.meshgrid(*grids, indexing='ij') + cand_grid = np.stack([m.reshape(-1) for m in mesh], axis=-1) + _, std = gp_model.predict(cand_grid, return_std=True) + variance = std**2 + return -np.sum(variance) + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], self.knots_per_level) + constraints = [] + # Enforce minimumm distance constraint + for xi in z_pts: + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[0] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[1] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x: np.abs(x[0] - x[1]) - d_min}) + res = minimize(J1, x0, bounds = [(input_domains[var][0], + input_domains[var][1])] * self.knots_per_level, + constraints=constraints) + z_new = res.x + if res.x is None: + d_min *= self.thresh_relax + constraints = [] + for xi in z_pts: + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[0] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[1] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x: np.abs(x[0] - x[1]) - d_min}) + res = minimize(J1, x0, bounds = [(input_domains[var][0], + input_domains[var][1])] * self.knots_per_level, + constraints=constraints) + z_new = res.x + z_pts = np.concatenate((z_pts, z_new)) + + elif self.variance_criterion == 'global': + kernel = gp_model.kernel_ + if isinstance(kernel, WhiteKernel): + noise_var = kernel.noise_level + elif hasattr(kernel, "k1") and isinstance(kernel.k1, WhiteKernel): + noise_var = kernel.k1.noise_level + elif hasattr(kernel, "k2") and isinstance(kernel.k2, WhiteKernel): + noise_var = kernel.k2.noise_level + def J2(x): + # Global cost function + '''Optimize for lowest posterior variance over a monte carlo test grid''' + var_order = list(self.x_grids.keys()) + N = self.grid_size + T = np.zeros((N, len(var_order))) + for i, var in enumerate(var_order): + low, high = input_domains[var] + T[:,i] = np.random.uniform(low, high, N) + grids = [] + for v in var_order: + if v == var: + grids.append(np.atleast_1d(x)) + else: + grids.append(np.atleast_1d(self.x_grids[v])) + mesh = np.meshgrid(*grids, indexing='ij') + cand_grid = np.stack([m.reshape(-1) for m in mesh], axis=-1) + Q = np.vstack((T, cand_grid)) + _, cov = gp_model.predict(Q, return_cov=True) + if cov.ndim == 2: + cov_TT = cov[:N, :N] + cov_Tx = cov[:N, N:] + cov_xT = cov[N:, :N] + cov_xx = cov[N:, N:] + sigma_bar_T = ( + np.diag(cov_TT) + - cov_Tx @ ( + np.linalg.pinv(cov_xx + noise_var * np.eye(cov_xx.shape[0])) + @ cov_xT + ) + ) + return np.sum(sigma_bar_T) + else: + cost = 0 + cov_shape = cov.shape + for i in range(cov_shape[2]): + cov_yi = cov[:,:,i] + cov_TT = cov_yi[:N, :N] + cov_Tx = cov_yi[:N, N:] + cov_xT = cov_yi[N:, :N] + cov_xx = cov_yi[N:, N:] + sigma_bar_T = np.diag(cov_TT) - cov_Tx @ (np.linalg.pinv(cov_xx) @ cov_xT) + cost += np.sum(sigma_bar_T) + return cost + + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], self.knots_per_level) + res = minimize(J2, x0, bounds = [(input_domains[var][0], input_domains[var][1])] * self.knots_per_level) + z_new = res.x + z_pts = np.concatenate((z_pts, z_new)) + + return z_pts + + + @staticmethod + def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, + wt_fcn: callable = None, method='leja', opt_args=None) -> np.ndarray: + """Find the next `N` points in the 1d sequence of `z_pts` using the provided collocation method. + + :param N: number of new points to add to the sequence + :param z_bds: bounds on the 1d domain + :param z_pts: current univariate sequence `(Nz,)`, start at middle of `z_bds` if `None` + :param wt_fcn: weighting function, uses a constant weight if `None`, callable as `wt_fcn(z)` + :param method: collocation method to use, currently only 'leja' is supported + :param opt_args: extra arguments for the global 1d `direct` optimizer + :returns: the univariate sequence `z_pts` augmented by `N` new points + """ + opt_args = opt_args or {} + if wt_fcn is None: + wt_fcn = lambda z: 1 + if z_pts is None: + z_pts = (z_bds[1] + z_bds[0]) / 2 + N = N - 1 + z_pts = np.atleast_1d(z_pts) + + match method: + case 'leja': + # Construct Leja sequence by maximizing the Leja objective sequentially + for i in range(N): + obj_fun = lambda z: -wt_fcn(np.array(z)) * np.prod(np.abs(z - z_pts)) + res = direct(obj_fun, [z_bds], **opt_args) # Use global DIRECT optimization over 1d domain + z_star = res.x + z_pts = np.concatenate((z_pts, z_star)) + case other: + raise NotImplementedError(f"Unknown collocation method: {other}") + + return z_pts + + + diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index 450cb4e..6b5a95a 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -1,10 +1,8 @@ """Test interpolation classes. Currently, only Lagrange interpolation and Linear regression are supported.""" import itertools -import warnings import matplotlib.pyplot as plt import numpy as np -from sklearn.exceptions import ConvergenceWarning from sklearn.preprocessing import PolynomialFeatures from uqtils import approx_hess, approx_jac, ax_default @@ -394,7 +392,6 @@ def model(inputs): def test_GPR_2d(plots=False): """Test GPR interpolator with 2D Branin Function (https://www.sfu.ca/~ssurjano/branin.html)""" - rng = np.random.default_rng(41) num_train = 150 num_test = 30 noise_std = 8 @@ -414,7 +411,7 @@ def model(inputs, noise_std): noise_std * np.random.randn(*input_shape)) return {'y': y} - xtrain = {'x1': rng.uniform(-5, 10, num_train), 'x2': rng.uniform(0, 15, num_train)} + xtrain = {'x1': np.random.uniform(-5, 10, num_train), 'x2': np.random.uniform(0, 15, num_train)} ytrain = model(xtrain, noise_std) interp = GPR(kernel=['Sum', ['Product', @@ -423,10 +420,8 @@ def model(inputs, noise_std): ['WhiteKernel', {'noise_level': noise_std, 'noise_level_bounds': (0.1 * noise_std, 10 * noise_std)}]]) - with warnings.catch_warnings(action='ignore', category=ConvergenceWarning): - state = interp.refine((), (xtrain, ytrain), None, {}) - - xtest = {'x1': rng.uniform(-5, 10, num_test), 'x2': rng.uniform(0, 15, num_test)} + state = interp.refine((), (xtrain, ytrain), None, {}) + xtest = {'x1': np.random.uniform(-5, 10, num_test), 'x2': np.random.uniform(0, 15, num_test)} ytest = model(xtest, noise_std=0) ypred = interp.predict(xtest, state, ()) diff --git a/tests/test_training_data.py b/tests/test_training_data.py index a8c4a3b..01997aa 100644 --- a/tests/test_training_data.py +++ b/tests/test_training_data.py @@ -2,9 +2,12 @@ import itertools import numpy as np +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import RBF +from sklearn.gaussian_process.kernels import ConstantKernel as C from amisc.compression import SVD -from amisc.training import SparseGrid +from amisc.training import SparseGrid, UncertaintySampling from amisc.utils import to_model_dataset from amisc.variable import Variable, VariableList @@ -46,6 +49,54 @@ def simple_model(inputs): assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) +def test_uncertainty_sampling(): + """Test the `SparseGrid` data structure `get`, `set`, and `refine` methods.""" + def simple_model(inputs): + y1 = inputs['x1'] + y2 = 10 ** (inputs['x2']) + y3 = inputs['x3'] - 5 + return {'y1': y1, 'y2': y2, 'y3': y3} + + x_vars = VariableList([Variable('x1', distribution='U(0, 1)', norm='linear(2, 2)'), + Variable('x2', distribution='LU(1e-3, 1e-1)', norm='log10'), + Variable('x3', distribution='LN(0, 1)', norm='log10')]) + y_vars = VariableList(['y1', 'y2', 'y3']) + domains = x_vars.get_domains(norm=True) + weight_fcns = x_vars.get_pdfs(norm=True) + variance_criterions = ['global', 'local'] + for variance_criterion in variance_criterions: + grid = UncertaintySampling(collocation_rule='leja', knots_per_level=2, variance_criterion=variance_criterion) + + beta_list = list(itertools.product(*[range(3) for _ in range(len(x_vars))])) + alpha_list = [()] * len(beta_list) + design_list = [] + design_pts = [] + y_list = [] + + # Initialize GP model for uncertainty sampling + kernel = C(1.0, (1e-3, 1e3)) * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e2)) + gp_model = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5, random_state=42) + x_train = np.array([0.5, 0.01, 0.5]).reshape(1, -1) + y_train = np.concatenate( + [simple_model({'x1': x_train[:, 0], 'x2': x_train[:, 1], 'x3': x_train[:, 2]})[var] for var in y_vars] + ).reshape(1, -1) + gp_model.fit(x_train, y_train) + + # Refine the sparse grid + for alpha, beta in zip(alpha_list, beta_list): + new_idx, new_pts = grid.refine(alpha, beta, domains, gp_model, weight_fcns) + new_y = simple_model(new_pts) + design_list.append(new_idx) + design_pts.append(new_pts) + y_list.append(new_y) + grid.set(alpha, beta, new_idx, new_y) + + # Extract data from sparse grid and check values + for i, (alpha, beta) in enumerate(zip(alpha_list, beta_list)): + x_pts, y_pts = grid.get_by_coord(alpha, design_list[i]) + assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) + assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) + def test_imputer(): """Test that imputation works for fixing missing data."""