diff --git a/README.md b/README.md index 5f7811d..b402bcd 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ ![build](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/deploy.yml?logo=github) ![docs](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/docs.yml?logo=materialformkdocs&logoColor=%2523cccccc&label=docs) ![tests](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/tests.yml?logo=github&logoColor=%2523cccccc&label=tests) -![Code Coverage](https://img.shields.io/badge/coverage-89%25-yellowgreen?logo=codecov) +![Code Coverage](https://img.shields.io/badge/coverage-88%25-yellowgreen?logo=codecov) [![Algorithm description](https://img.shields.io/badge/DOI-10.1002/nme.6958-blue)](https://doi.org/10.1002/nme.6958) Efficient framework for building surrogates of multidisciplinary systems using the adaptive multi-index stochastic collocation ([AMISC](https://onlinelibrary.wiley.com/doi/full/10.1002/nme.6958)) technique. diff --git a/src/amisc/component.py b/src/amisc/component.py index 5d5a342..f221083 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -399,6 +399,7 @@ class Component(BaseModel, Serializable): vectorized: bool = False call_unpacked: Optional[bool] = None # If the model expects inputs/outputs like `func(x1, x2, ...)->(y1, y2, ...) ret_unpacked: Optional[bool] = None + groups: Optional[list[list[Variable]]] = None # Optional grouping of inputs for global optimization # Data storage/states for a MISC component active_set: list | set | IndexSet = IndexSet() # set of active (alpha, beta) multi-indices @@ -799,7 +800,8 @@ def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best if cached and (data := self._cache.get("training", {}).get(alpha, {}).get(beta)) is not None: return data else: - return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True) + return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True, + groups = self.groups) except Exception as e: self.logger.error(f"Error getting training data for alpha={alpha}, beta={beta}.") raise e @@ -1117,7 +1119,7 @@ def predict(self, inputs: dict | Dataset, args = (self.misc_states.get((alpha, beta)), self.get_training_data(alpha, beta, y_vars=y_vars, cached=True)) - results.append(self.interpolator.predict(inputs, *args) if executor is None else + results.append(self.interpolator.predict(inputs, *args, groups = self.groups) if executor is None else executor.submit(self.interpolator.predict, inputs, *args)) if executor is not None: @@ -1217,7 +1219,7 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P continue design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], - domains, weight_fcns) + domains,weight_fcns, groups = self.groups) design_pts, fc = to_model_dataset(design_pts, self.inputs, del_latent=True, **field_coords) # Remove duplicate (alpha, coords) pairs -- so you don't evaluate the model twice for the same input @@ -1288,7 +1290,8 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P self.misc_costs[a, b] = num_train_pts self.misc_states[a, b] = self.interpolator.refine(b[len(self.data_fidelity):], self.training_data.get(a, b[:len(self.data_fidelity)], - y_vars=y_vars, skip_nan=True), + y_vars=y_vars, skip_nan=True, + groups = self.groups), self.misc_states.get((alpha, beta)), domains) start_idx = end_idx diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index bba9abd..ae87bc5 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -276,7 +276,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], return LagrangeState(weights=weights, x_grids=x_grids) - def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset, Dataset]): + def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset, Dataset], + groups: list[list[str]] = None): """Predict the output of the model at points `x` with barycentric Lagrange interpolation.""" # Convert `x` and `yi` to 2d arrays: (N, xdim) and (N, ydim) # Inputs `x` may come in unordered, but they should get realigned with the internal `x_grids` state @@ -306,19 +307,41 @@ def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset y = np.zeros(x_arr.shape[:-1] + (ydim,)) # (..., ydim) # Loop over multi-indices and compute tensor-product lagrange polynomials - indices = [range(s) for s in grid_sizes.values()] - for i, j in enumerate(itertools.product(*indices)): - L_j = quotient[..., dims, j] / qsum # (..., xdim) - # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j - if check_interp_pts: - other_pts = np.copy(div_zero_idx) - other_pts[div_zero_idx[..., dims, j]] = False - L_j[div_zero_idx[..., dims, j]] = 1 - L_j[np.any(other_pts, axis=-1)] = 0 + if groups: + # sizes per group (assumes each member of a group shares same 1D grid length) + group_sizes = [int(state.x_grids[grp[0]].shape[0]) for grp in groups] + for i_group, group_idx_tuple in enumerate(itertools.product(*[range(s) for s in group_sizes])): + # expand group-level indices into per-variable multi-index j (repeat per group member) + j_list = [] + for grp_idx, grp in zip(group_idx_tuple, groups): + j_list.extend([grp_idx] * len(grp)) + j = tuple(j_list) # length == xdim + + L_j = quotient[..., dims, j] / qsum # (..., xdim) + + if check_interp_pts: + other_pts = np.copy(div_zero_idx) + other_pts[div_zero_idx[..., dims, j]] = False + L_j[div_zero_idx[..., dims, j]] = 1 + L_j[np.any(other_pts, axis=-1)] = 0 + + # yi_arr expected to have one row per group-node + y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i_group, :] + else: + indices = [range(s) for s in grid_sizes.values()] + for i, j in enumerate(itertools.product(*indices)): + L_j = quotient[..., dims, j] / qsum # (..., xdim) + + # Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j + if check_interp_pts: + other_pts = np.copy(div_zero_idx) + other_pts[div_zero_idx[..., dims, j]] = False + L_j[div_zero_idx[..., dims, j]] = 1 + L_j[np.any(other_pts, axis=-1)] = 0 - # Add multivariate basis polynomial contribution to interpolation output - y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i, :] + # Add multivariate basis polynomial contribution to interpolation output + y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i, :] # Unpack the outputs back into a Dataset y_ret = {} @@ -609,7 +632,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], return LinearState(regressor=regressor, x_vars=x_vars, y_vars=y_vars) - def predict(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset]): + def predict(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset], + groups: list[list[str]] = None): """Predict the output of the model at points `x` using the linear regressor provided in `state`. :param x: the input Dataset `dict` mapping input variables to prediction locations @@ -716,14 +740,23 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], :param input_domains: (not used for `GPR`) :returns: the new GPR state """ - gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list) - else [self.kernel, self.kernel_opts]) - gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts) - pipe = [] - if self.scaler is not None: - pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) - pipe.append(('gpr', gp)) - regressor = Pipeline(pipe) + if old_state is None: + gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list) + else [self.kernel, self.kernel_opts]) + gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts) + pipe = [] + if self.scaler is not None: + pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) + pipe.append(('gpr', gp)) + regressor = Pipeline(pipe) + else: + gp_kernel = old_state.regressor.named_steps['gpr'].kernel_ + gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts) + pipe = [] + if self.scaler is not None: + pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) + pipe.append(('gpr', gp)) + regressor = Pipeline(pipe) xtrain, ytrain = training_data @@ -743,7 +776,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], return GPRState(regressor=regressor, x_vars=x_vars, y_vars=y_vars) - def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]): + def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset], + groups: list[list[str]] = None): """Predict the output of the model at points `x` using the Gaussian Process Regressor provided in `state`. :param x: the input Dataset `dict` mapping input variables to prediction locations diff --git a/src/amisc/training.py b/src/amisc/training.py index ddb1fc5..ce9ae23 100644 --- a/src/amisc/training.py +++ b/src/amisc/training.py @@ -10,18 +10,28 @@ import copy import itertools +import warnings from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any, ClassVar import numpy as np from numpy.typing import ArrayLike -from scipy.optimize import direct +from scipy.optimize import direct, minimize +from sklearn.exceptions import ConvergenceWarning +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import RBF, WhiteKernel +from sklearn.gaussian_process.kernels import ConstantKernel as C +from sklearn.pipeline import Pipeline from amisc.serialize import PickleSerializable, Serializable from amisc.typing import LATENT_STR_ID, Dataset, MultiIndex from amisc.utils import _RidgeRegression +# silence sklearn GPR warnings (ConvergenceWarning, UserWarning) coming from gaussian_process internals +warnings.filterwarnings("ignore", category=ConvergenceWarning, module=r"sklearn\.gaussian_process.*") +warnings.filterwarnings("ignore", category=UserWarning, module=r"sklearn\.gaussian_process.*") + __all__ = ['TrainingData', 'SparseGrid'] @@ -154,14 +164,19 @@ class SparseGrid(TrainingData, PickleSerializable): :ivar yi_nan_map: a `dict` of imputed model outputs for each grid coordinate where the model failed (or gave nan) :ivar error_map: a `dict` of error information for each grid coordinate where the model failed :ivar latent_size: the number of latent coefficients for each variable (0 if scalar) + :ivar d_thresh: minimum distance threshold for new points (as fraction of input domain) for uncertainty sampling + :ivar thresh_relax: relaxation factor for minimum distance threshold if no new points are found for uncertainty + sampling + :ivar variance_criterion: criterion for selecting new points based on GP uncertainty, either 'local' or 'global' + :ivar grid_size: number of monte carlo samples to evaluate global GP uncertainty for uncertainty sampling """ MAX_IMPUTE_SIZE: ClassVar[int] = 10 # don't try to impute large arrays - collocation_rule: str = 'leja' + collocation_rule: str = 'leja' # or 'uncertainty' knots_per_level: int = 2 expand_latent_method: str = 'round-robin' # or 'tensor-product', for converting beta to latent grid sizes opt_args: dict = field(default_factory=lambda: {'locally_biased': False, 'maxfun': 300}) # for leja optimizer - + betas: set[MultiIndex] = field(default_factory=set) x_grids: dict[str, ArrayLike] = field(default_factory=dict) yi_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, ArrayLike]]] = field(default_factory=dict) @@ -169,6 +184,12 @@ class SparseGrid(TrainingData, PickleSerializable): error_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, Any]]] = field(default_factory=dict) latent_size: dict[str, int] = field(default_factory=dict) # keep track of latent grid sizes for each variable + # Additional parameters for uncertainty sampling + d_thresh: float = 0.20 # minimum distance threshold for new points (as fraction of input domain) + thresh_relax: float = 0.5 # relaxation factor for minimum distance threshold if no new points are found + variance_criterion: str = 'global' # or 'global', for uncertainty sampling + grid_size: int = 100 # Number of monte carlo samples to evaluate global GP uncertainty + def clear(self): """Clear all training data.""" self.betas.clear() @@ -227,9 +248,9 @@ def get_by_coord(self, alpha: MultiIndex, coords: list, y_vars: list = None, ski return xi_dict, yi_dict # Both with elements of shape (N, ...) for N grid points - def get(self, alpha: MultiIndex, beta: MultiIndex, y_vars: list[str] = None, skip_nan: bool = False): + def get(self, alpha: MultiIndex, beta: MultiIndex, y_vars: list[str] = None, skip_nan: bool = False, groups = None): """Get the training data from the sparse grid for a given `alpha` and `beta` pair.""" - return self.get_by_coord(alpha, list(self._expand_grid_coords(beta)), y_vars=y_vars, skip_nan=skip_nan) + return self.get_by_coord(alpha, list(self._expand_grid_coords(beta,groups)), y_vars=y_vars, skip_nan=skip_nan) def set_errors(self, alpha: MultiIndex, beta: MultiIndex, coords: list, errors: list[dict]): """Store error information in the sparse-grid for a given multi-index pair.""" @@ -297,7 +318,8 @@ def impute_missing_data(self, alpha: MultiIndex, beta: MultiIndex): self.yi_nan_map[alpha][coord] = copy.deepcopy(y_impute) - def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weight_fcns: dict = None): + def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, + weight_fcns: dict = None, groups: list[list[str]] = None): """Refine the sparse grid for a given `alpha` and `beta` pair and given collocation rules. Return any new grid points that do not have model evaluations saved yet. @@ -305,8 +327,27 @@ def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weigh The `beta` multi-index is used to determine the number of collocation points in each input dimension. The length of `beta` should therefore match the number of variables in `x_vars`. """ + # Train a Gaussian process regressor for uncertainty-based sampling + regressor = None + if self.collocation_rule == 'uncertainty': + # Check if we have data corresponding to model_fidelity alpha to train a GP on the fly + if alpha not in self.yi_map or not self.yi_map[alpha]: + regressor = None + else: + # Retrieve all existing data for 'alpha' and train a GP regressor + coords = list(self.yi_map[alpha].keys()) + xi_dict, yi_dict = self.get_by_coord(alpha, coords) + x_vars = list(xi_dict.keys()) + y_vars = list(yi_dict.keys()) + x_arr = np.concatenate([xi_dict[var][..., np.newaxis] for var in x_vars], axis=-1) + y_arr = np.concatenate([yi_dict[var][..., np.newaxis] for var in y_vars], axis=-1) + gp_kernel = C(1, (1e-2,1e2))*RBF(1, (1e-3,1)) + regressor = Pipeline([ + ('gpr', GaussianProcessRegressor(kernel=gp_kernel, n_restarts_optimizer=5)) + ]) + regressor.fit(x_arr, y_arr) weight_fcns = weight_fcns or {} - + # Initialize a sparse grid for beta=(0, 0, ..., 0) if np.sum(beta) == 0: if len(self.x_grids) == 0: @@ -320,17 +361,26 @@ def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weigh self.latent_size = num_latent new_pt = {} - domains = iter(input_domains.items()) - for grid_size in self.beta_to_knots(beta): + domains = iter(input_domains.keys()) if groups is None else iter(groups) + for grid_size in self.beta_to_knots(beta, groups = groups): if isinstance(grid_size, int): # scalars - var, domain = next(domains) - new_pt[var] = self.collocation_1d(grid_size, domain, method=self.collocation_rule, + var = next(domains) + if isinstance(var, str): + var = var + elif isinstance(var, list) and len(var) == 1: + var = var[0] + else: + var = "__".join(map(str, var)) + + new_pt[var] = self.collocation_1d(var, grid_size, input_domains, regressor, + method=self.collocation_rule, wt_fcn=weight_fcns.get(var, None), opt_args=self.opt_args).tolist() else: # latent coeffs for s in grid_size: - var, domain = next(domains) - new_pt[var] = self.collocation_1d(s, domain, method=self.collocation_rule, + var = next(domains) + new_pt[var] = self.collocation_1d(var, s, input_domains, regressor, + method=self.collocation_rule, wt_fcn=weight_fcns.get(var, None), opt_args=self.opt_args).tolist() self.x_grids = new_pt @@ -338,37 +388,48 @@ def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weigh self.yi_map.setdefault(alpha, dict()) self.yi_nan_map.setdefault(alpha, dict()) self.error_map.setdefault(alpha, dict()) - new_coords = list(self._expand_grid_coords(beta)) + new_coords = list(self._expand_grid_coords(beta, groups=groups)) return new_coords, self._extract_grid_points(new_coords) # Otherwise, refine the sparse grid for beta_old in self.betas: # Get the first lower neighbor in the sparse grid and refine the 1d grid if necessary if self.is_one_level_refinement(beta_old, beta): - new_grid_size = self.beta_to_knots(beta) + new_grid_size = self.beta_to_knots(beta, groups = groups) inputs = zip(self.x_grids.keys(), self.x_grids.values(), input_domains.values()) for new_size in new_grid_size: if isinstance(new_size, int): # scalar grid var, grid, domain = next(inputs) - if len(grid) < new_size: - num_new_pts = new_size - len(grid) - self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, opt_args=self.opt_args, - wt_fcn=weight_fcns.get(var, None), - method=self.collocation_rule).tolist() + if "__" in var: + grid = np.asarray(grid) + if grid.shape[1] < new_size: + num_new_pts = new_size - grid.shape[1] + self.x_grids[var] = self.collocation_1d(var, num_new_pts, input_domains, regressor, + grid, opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() + else: + if len(grid) < new_size: + num_new_pts = new_size - len(grid) + self.x_grids[var] = self.collocation_1d(var, num_new_pts, input_domains, regressor, + grid, opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() else: # latent grid for s_new in new_size: var, grid, domain = next(inputs) if len(grid) < s_new: num_new_pts = s_new - len(grid) - self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, + self.x_grids[var] = self.collocation_1d(var, num_new_pts, input_domains, regressor, + grid, opt_args=self.opt_args, wt_fcn=weight_fcns.get(var, None), method=self.collocation_rule).tolist() break new_coords = [] - for coord in self._expand_grid_coords(beta): + for coord in self._expand_grid_coords(beta, groups=groups): if coord not in self.yi_map[alpha]: # If we have not computed this grid coordinate yet new_coords.append(coord) @@ -382,14 +443,25 @@ def _extract_grid_points(self, coords: list[tuple] | tuple): """Extract the `x` grid points located at `coords` from `x_grids` and return as the `pts` dictionary.""" if not isinstance(coords, list): coords = [coords] - pts = {var: np.empty(len(coords)) for var in self.x_grids} + # Split grouped variables into individual components + const_vars = [] + for var in self.x_grids.keys(): + if "__" in var: + const_vars.extend(var.split("__")) + else: + const_vars.append(var) + pts = {var: np.empty(len(coords)) for var in const_vars} for k, coord in enumerate(coords): grids = iter(self.x_grids.items()) for idx in coord: if isinstance(idx, int): # scalar grid point var, grid = next(grids) - pts[var][k] = grid[idx] + if "__" in var: # grouped vars + for i, v in enumerate(var.split("__")): + pts[v][k] = grid[i][idx] + else: + pts[var][k] = grid[idx] else: # latent coefficients for i in idx: var, grid = next(grids) @@ -397,9 +469,9 @@ def _extract_grid_points(self, coords: list[tuple] | tuple): return pts - def _expand_grid_coords(self, beta: MultiIndex): + def _expand_grid_coords(self, beta: MultiIndex, groups: list[list[str]] = None): """Iterable over all grid coordinates for a given `beta`, accounting for scalars and latent coefficients.""" - grid_size = self.beta_to_knots(beta) + grid_size = self.beta_to_knots(beta, groups = groups) grid_coords = [] for s in grid_size: if isinstance(s, int): # scalars @@ -446,7 +518,7 @@ def is_one_level_refinement(beta_old: tuple, beta_new: tuple) -> bool: return ind.shape[0] == 1 and level_diff[ind] == 1 def beta_to_knots(self, beta: MultiIndex, knots_per_level: int = None, latent_size: dict = None, - expand_latent_method: str = None) -> tuple: + expand_latent_method: str = None, groups: list[list[str]] = None ) -> tuple: """Convert a `beta` multi-index to the number of knots per dimension in the sparse grid. :param beta: refinement level indices @@ -454,6 +526,7 @@ def beta_to_knots(self, beta: MultiIndex, knots_per_level: int = None, latent_si :param latent_size: the number of latent coefficients for each variable (0 if scalar); number of variables and order should match the `beta` multi-index :param expand_latent_method: method for expanding latent grids, either 'round-robin' or 'tensor-product' + :param groups: list of variable groups for handling grouped variables :returns: the number of knots/points per dimension for the sparse grid """ knots_per_level = knots_per_level or self.knots_per_level @@ -461,36 +534,42 @@ def beta_to_knots(self, beta: MultiIndex, knots_per_level: int = None, latent_si expand_latent_method = expand_latent_method or self.expand_latent_method grid_size = [] - for i, (var, num_latent) in enumerate(latent_size.items()): - if num_latent > 0: - match expand_latent_method: - case 'round-robin': - if beta[i] == 0: - grid_size.append((1,) * num_latent) # initializes all latent grids to 1 - else: - latent_refine_idx = (beta[i] - 1) % num_latent - latent_refine_num = ((beta[i] - 1) // num_latent) + 1 - latent_beta = tuple([latent_refine_num] * (latent_refine_idx + 1) + - [latent_refine_num - 1] * (num_latent - latent_refine_idx - 1)) - latent_grid = [knots_per_level * latent_beta[j] + 1 for j in range(num_latent)] - grid_size.append(tuple(latent_grid)) - case 'tensor-product': - grid_size.append((knots_per_level * beta[i] + 1,) * num_latent) - case other: - raise NotImplementedError(f"Unknown method for expanding latent grids: {other}") - else: + if groups is None: + for i, (var, num_latent) in enumerate(latent_size.items()): + if num_latent > 0: + match expand_latent_method: + case 'round-robin': + if beta[i] == 0: + grid_size.append((1,) * num_latent) # initializes all latent grids to 1 + else: + latent_refine_idx = (beta[i] - 1) % num_latent + latent_refine_num = ((beta[i] - 1) // num_latent) + 1 + latent_beta = tuple([latent_refine_num] * (latent_refine_idx + 1) + + [latent_refine_num - 1] * (num_latent - latent_refine_idx - 1)) + latent_grid = [knots_per_level * latent_beta[j] + 1 for j in range(num_latent)] + grid_size.append(tuple(latent_grid)) + case 'tensor-product': + grid_size.append((knots_per_level * beta[i] + 1,) * num_latent) + case other: + raise NotImplementedError(f"Unknown method for expanding latent grids: {other}") + else: + grid_size.append(knots_per_level * beta[i] + 1) + else: + for i in range(len(groups)): + # Handle only scalars for now grid_size.append(knots_per_level * beta[i] + 1) return tuple(grid_size) - @staticmethod - def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, + + def collocation_1d(self, var: str, N: int, input_domains: dict, regressor, z_pts: np.ndarray = None, wt_fcn: callable = None, method='leja', opt_args=None) -> np.ndarray: """Find the next `N` points in the 1d sequence of `z_pts` using the provided collocation method. :param N: number of new points to add to the sequence - :param z_bds: bounds on the 1d domain - :param z_pts: current univariate sequence `(Nz,)`, start at middle of `z_bds` if `None` + :param input_domains: bounds for each input variable, used for optimization domain + :param regressor: a trained regressor for uncertainty-based sampling (only used if `method='uncertainty'`) + :param z_pts: current univariate sequence `(Nz,)`, start at middle of `input_domains[var]` if `None` :param wt_fcn: weighting function, uses a constant weight if `None`, callable as `wt_fcn(z)` :param method: collocation method to use, currently only 'leja' is supported :param opt_args: extra arguments for the global 1d `direct` optimizer @@ -500,19 +579,178 @@ def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, if wt_fcn is None: wt_fcn = lambda z: 1 if z_pts is None: - z_pts = (z_bds[1] + z_bds[0]) / 2 + if isinstance(var, str) and "__" in var: #Grouped vars + z_pts = np.array([(input_domains[v][0] + + input_domains[v][1]) / 2.0 for v in var.split("__")]).reshape(-1, 1) + else: + z_pts = (input_domains[var][1] + input_domains[var][0]) / 2 N = N - 1 z_pts = np.atleast_1d(z_pts) match method: case 'leja': - # Construct Leja sequence by maximizing the Leja objective sequentially for i in range(N): - obj_fun = lambda z: -wt_fcn(np.array(z)) * np.prod(np.abs(z - z_pts)) - res = direct(obj_fun, [z_bds], **opt_args) # Use global DIRECT optimization over 1d domain - z_star = res.x - z_pts = np.concatenate((z_pts, z_star)) + # Maximize Leja objective sequentially + if isinstance(var, str) and "__" in var: #Grouped vars + obj_fun = lambda z: -wt_fcn(np.atleast_1d(z)) * np.prod( + np.linalg.norm(z_pts - np.atleast_1d(z).reshape(-1,1),axis = 0)) + bounds = [input_domains[v] for v in var.split("__")] + res = direct(obj_fun, bounds, **opt_args) + z_star = np.atleast_1d(res.x).reshape(-1, 1) + z_pts = np.concatenate((z_pts, z_star), axis = 1) + else: + obj_fun = lambda z: -wt_fcn(np.array(z)) * np.prod(np.abs(z - z_pts)) + # Use global DIRECT optimization over 1d domain + res = direct(obj_fun, [input_domains[var]], **opt_args) + z_star = res.x + z_pts = np.concatenate((z_pts, z_star)) + case 'uncertainty': + if regressor is None: + z_pts = self.collocation_1d(var, N, input_domains, None, z_pts, wt_fcn=wt_fcn, + method='leja', opt_args=opt_args) + else: + if self.variance_criterion == 'local': + if isinstance(var, str) and "__" in var: #Grouped vars + ranges = [input_domains[v][1] - input_domains[v][0] for v in var.split("__")] + d_min = self.d_thresh * np.linalg.norm(ranges) + elif isinstance(var, str) and "__" not in var: + d_min = self.d_thresh * (input_domains[var][1] - input_domains[var][0]) + def J1(x): # Objective function for local uncertainty sampling + if isinstance(var, str) and "__" in var: #Grouped vars + x = np.asarray(x).reshape(N, len(var.split("__"))) + else: + x = np.asarray(x).reshape(-1,1) + var_order = list(self.x_grids.keys()) + arrays = [] + for v in var_order: + if v == var: + arrays.append(x) + else: + arr = np.asarray(self.x_grids[v]) + if arr.ndim == 1: + arrays.append(arr[:,None]) + else: + arrays.append(arr.T) + size = [arr.shape[0] for arr in arrays] + idxs = [np.arange(n) for n in size] + grids = np.meshgrid(*idxs, indexing='ij') + flat_idxs = [g.ravel() for g in grids] + cand_grid = [A[idx] for A, idx in zip(arrays, flat_idxs)] + cand_grid = np.hstack(cand_grid) + _, std = regressor.predict(cand_grid, return_std=True) + variance = std**2 + return -np.sum(variance) + if isinstance(var, str) and "__" in var: #Grouped vars + x0 = np.random.uniform([input_domains[v][0] for v in var.split("__")], + [input_domains[v][1] for v in var.split("__")], + size=(N, len(var.split("__")))).flatten() + bounds = [(input_domains[v][0], input_domains[v][1]) for v in var.split("__")] * N + constraints = [] # min distance constraints + for xi in z_pts.T: + for i in range(N): + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi, i=i: np.linalg.norm( + x.reshape(N, len(var.split("__")))[i,:] - xp) - d_min}) + res = minimize(J1, x0, bounds=bounds, constraints=constraints) + z_star = res.x.reshape(N, len(var.split("__"))).T + z_pts = np.concatenate((z_pts, z_star), axis = 1) + else: + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], N) + bounds = [(input_domains[var][0], input_domains[var][1])] + constraints = [] # min distance constraints + for xp in z_pts: + constraints.append({ + 'type': 'ineq', + 'fun': lambda x, xp=xp: np.abs(float(x)) - xp - d_min + }) + constraints.append({ + 'type': 'ineq', + 'fun': lambda x, xp=xp: xp - np.abs(float(x)) - d_min + }) + # Simpler: enforce |x - xp| >= d_min + constraints.append({ + 'type': 'ineq', + 'fun': lambda x, xp=xp: np.abs(float(x) - xp) - d_min + }) + res = minimize(J1, x0, bounds=bounds) + z_star = res.x + z_pts = np.concatenate((z_pts, z_star)) + + + elif self.variance_criterion == 'global': + kernel = regressor.named_steps['gpr'].kernel_ + if isinstance(kernel, WhiteKernel): + noise_var = kernel.noise_level + elif hasattr(kernel, "k1") and isinstance(kernel.k1, WhiteKernel): + noise_var = kernel.k1.noise_level + elif hasattr(kernel, "k2") and isinstance(kernel.k2, WhiteKernel): + noise_var = kernel.k2.noise_level + else: + noise_var = 0 + def J2(x): # Objective function for global uncertainty sampling + T = np.zeros((self.grid_size, len(input_domains.keys()))) + for i, domain in enumerate(input_domains.values()): + low, high = domain + T[:,i] = np.random.uniform(low, high, self.grid_size) + if isinstance(var, str) and "__" in var: #Grouped vars + x = np.asarray(x).reshape(N, len(var.split("__"))) + else: + x = np.asarray(x).reshape(-1,1) + var_order = list(self.x_grids.keys()) + arrays = [] + for v in var_order: + if v == var: + arrays.append(x) + else: + arr = np.asarray(self.x_grids[v]) + if arr.ndim == 1: + arrays.append(arr[:,None]) + else: + arrays.append(arr.T) + size = [arr.shape[0] for arr in arrays] + idxs = [np.arange(n) for n in size] + grids = np.meshgrid(*idxs, indexing='ij') + flat_idxs = [g.ravel() for g in grids] + cand_grid = [A[idx] for A, idx in zip(arrays, flat_idxs)] + cand_grid = np.hstack(cand_grid) + Q = np.vstack((T, cand_grid)) + _, cov = regressor.predict(Q, return_cov=True) + if cov.ndim == 2: + cov_TT = cov[:self.grid_size, :self.grid_size] + cov_Tx = cov[:self.grid_size, self.grid_size:] + cov_xT = cov[self.grid_size:, :self.grid_size] + cov_xx = cov[self.grid_size:, self.grid_size:] + sigma_bar_T = (np.diag(cov_TT)- cov_Tx + @ (np.linalg.pinv(cov_xx + noise_var * np.eye(cov_xx.shape[0])) + @ cov_xT)) + return np.sum(sigma_bar_T) + else: + cost = 0 + cov_shape = cov.shape + for i in range(cov_shape[2]): + cov_yi = cov[:,:,i] + cov_TT = cov_yi[:self.grid_size, :self.grid_size] + cov_Tx = cov_yi[:self.grid_size, self.grid_size:] + cov_xT = cov_yi[self.grid_size:, :self.grid_size] + cov_xx = cov_yi[self.grid_size:, self.grid_size:] + sigma_bar_T = np.diag(cov_TT) - cov_Tx @ (np.linalg.pinv(cov_xx) @ cov_xT) + cost += np.sum(sigma_bar_T) + return cost + if isinstance(var, str) and "__" in var: #Grouped vars + x0 = np.random.uniform([input_domains[v][0] for v in var.split("__")], + [input_domains[v][1] for v in var.split("__")], + size=(N, len(var.split("__")))).flatten() + bounds = [(input_domains[v][0], input_domains[v][1]) for v in var.split("__")] * N + res = minimize(J2, x0, bounds=bounds) + z_star = res.x.reshape(N, len(var.split("__"))).T + z_pts = np.concatenate((z_pts, z_star), axis = 1) + else: + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], N) + res = minimize(J2, x0, + bounds = [(input_domains[var][0], input_domains[var][1])] * N) + z_star = res.x + z_pts = np.concatenate((z_pts, z_star)) case other: - raise NotImplementedError(f"Unknown collocation method: {other}") - + raise NotImplementedError(f"Unknown collocation method: {other}") + return z_pts + \ No newline at end of file diff --git a/tests/test_training_data.py b/tests/test_training_data.py index a8c4a3b..c59c83c 100644 --- a/tests/test_training_data.py +++ b/tests/test_training_data.py @@ -23,28 +23,56 @@ def simple_model(inputs): y_vars = VariableList(['y1', 'y2', 'y3']) domains = x_vars.get_domains(norm=True) weight_fcns = x_vars.get_pdfs(norm=True) - grid = SparseGrid(collocation_rule='leja', knots_per_level=2) - - beta_list = list(itertools.product(*[range(3) for _ in range(len(x_vars))])) - alpha_list = [()] * len(beta_list) - design_list = [] - design_pts = [] - y_list = [] - - # Refine the sparse grid - for alpha, beta in zip(alpha_list, beta_list): - new_idx, new_pts = grid.refine(alpha, beta, domains, weight_fcns) - new_y = simple_model(new_pts) - design_list.append(new_idx) - design_pts.append(new_pts) - y_list.append(new_y) - grid.set(alpha, beta, new_idx, new_y) - - # Extract data from sparse grid and check values - for i, (alpha, beta) in enumerate(zip(alpha_list, beta_list)): - x_pts, y_pts = grid.get_by_coord(alpha, design_list[i]) - assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) - assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) + collocation_rules = ['leja', 'uncertainty'] + variance_criteria = ['local', 'global'] + for collocation_rule in collocation_rules: + if collocation_rule == 'uncertainty': + for variance_criterion in variance_criteria: + grid = SparseGrid(collocation_rule=collocation_rule, knots_per_level=2, + variance_criterion=variance_criterion) + beta_list = list(itertools.product(*[range(3) for _ in range(len(x_vars))])) + alpha_list = [()] * len(beta_list) + design_list = [] + design_pts = [] + y_list = [] + + # Refine the sparse grid + for alpha, beta in zip(alpha_list, beta_list): + new_idx, new_pts = grid.refine(alpha, beta, domains, weight_fcns) + new_y = simple_model(new_pts) + design_list.append(new_idx) + design_pts.append(new_pts) + y_list.append(new_y) + grid.set(alpha, beta, new_idx, new_y) + + # Extract data from sparse grid and check values + for i, (alpha, beta) in enumerate(zip(alpha_list, beta_list)): + x_pts, y_pts = grid.get_by_coord(alpha, design_list[i]) + assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) + assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) + else: + grid = SparseGrid(collocation_rule=collocation_rule, knots_per_level=2) + + beta_list = list(itertools.product(*[range(3) for _ in range(len(x_vars))])) + alpha_list = [()] * len(beta_list) + design_list = [] + design_pts = [] + y_list = [] + + # Refine the sparse grid + for alpha, beta in zip(alpha_list, beta_list): + new_idx, new_pts = grid.refine(alpha, beta, domains, weight_fcns) + new_y = simple_model(new_pts) + design_list.append(new_idx) + design_pts.append(new_pts) + y_list.append(new_y) + grid.set(alpha, beta, new_idx, new_y) + + # Extract data from sparse grid and check values + for i, (alpha, beta) in enumerate(zip(alpha_list, beta_list)): + x_pts, y_pts = grid.get_by_coord(alpha, design_list[i]) + assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) + assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) def test_imputer():