From fcef6541006af1ac51d55d06f6793af301589180 Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Wed, 19 Feb 2025 13:51:03 -0500 Subject: [PATCH 1/8] Adding only GPR changes --- src/amisc/interpolator.py | 110 ++++++++++++++++++++++++++++++++++++- tests/test_interpolator.py | 103 +++++++++++++++++++++++++++++++++- 2 files changed, 210 insertions(+), 3 deletions(-) diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index 2871ad5..603d150 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -7,9 +7,11 @@ - `Interpolator`: Abstract class providing basic structure of an interpolator - `Lagrange`: Concrete implementation for tensor-product barycentric Lagrange interpolation - `Linear`: Concrete implementation for linear regression using `sklearn` +- `GPR`: Concrete implementation for Gaussian process regression using `sklearn` - `InterpolatorState`: Interface for a dataclass that stores the internal state of an interpolator - `LagrangeState`: The internal state for a barycentric Lagrange polynomial interpolator - `LinearState`: The internal state for a linear interpolator (using sklearn) +- `GPRState`: The internal state for a Gaussian process regression interpolator (using sklearn) """ from __future__ import annotations @@ -21,12 +23,14 @@ import numpy as np from sklearn import linear_model, preprocessing from sklearn.pipeline import Pipeline -from sklearn.preprocessing import PolynomialFeatures +from sklearn.preprocessing import PolynomialFeatures, MinMaxScaler +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C, WhiteKernel, PairwiseKernel from amisc.serialize import Base64Serializable, Serializable, StringSerializable from amisc.typing import Dataset, MultiIndex -__all__ = ["InterpolatorState", "LagrangeState", "LinearState", "Interpolator", "Lagrange", "Linear"] +__all__ = ["InterpolatorState", "LagrangeState", "LinearState", "GPRState", "Interpolator", "Lagrange", "Linear", "GPR"] class InterpolatorState(Serializable, ABC): @@ -78,6 +82,28 @@ def __eq__(self, other): return False +@dataclass +class GPRState(InterpolatorState, Base64Serializable): + """The internal state for a Gaussian Process Regressor interpolator (using sklearn). + + :ivar x_vars: the input variables in order + :ivar y_vars: the output variables in order + :ivar regressor: the sklearn regressor object, a pipeline that consists of a `MinMaxScaler` and a + `GaussianProcessRegressor`. + """ + x_vars: list[str] = field(default_factory=list) + y_vars: list[str] = field(default_factory=list) + regressor: Pipeline = None + + def __eq__(self, other): + if isinstance(other, GPRState): + return (self.x_vars == other.x_vars and self.y_vars == other.y_vars and + np.allclose(self.regressor['gpr'].kernel_.k1.metric, other.regressor['gpr'].kernel_.k1.metric) and + np.allclose(self.regressor['gpr'].kernel_.k2.noise_level, other.regressor['gpr'].kernel_.k2.noise_level)) + else: + return False + + class Interpolator(Serializable, ABC): """Interface for an interpolator object that approximates a model. An interpolator should: @@ -149,6 +175,8 @@ def from_dict(cls, config: dict) -> Interpolator: return Lagrange(**config) case 'linear': return Linear(**config) + case 'gpr': + return GPR(**config) case other: raise NotImplementedError(f"Unknown interpolator method: {other}") @@ -594,3 +622,81 @@ def gradient(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, def hessian(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset]): raise NotImplementedError + + +@dataclass +class GPR(Interpolator, StringSerializable): + """Implementation of Gaussian Process Regression using `sklearn`. The `GaussianProcessInterpolator` uses a pipeline of + `MinMaxScaler` and a `GaussianProcessRegressor` to approximate the input-output mapping. + MinMaxScaler scales the input dimensions to [0,1] before passing them to the regressor. + + : ivar kernel_type: the kernel type to use for building the covariance matrix. (Defaults to RBF) + """ + + kernel_type: str = 'rbf' # Defaults to RBF Kernel + + def __post_init__(self): + + if self.kernel_type not in ['additive_chi2', 'chi2', 'linear', 'poly', 'polynomial', 'rbf', 'laplacian', + 'sigmoid', 'cosine']: + raise ValueError(f"Metric {self.kernel_type} not found in sklearn.metrics.pairwise") + + def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], + old_state: GPRState, input_domains: dict[str, tuple]) -> InterpolatorState: + """Train a new gaussian process regression model. + + :param beta: refinement level indices (Not used for 'GPR') + :param training_data: a tuple of dictionaries containing the new training data (`xtrain`, `ytrain`) + :param old_state: the old regressor state to refine (only used to get the order of input/output variables) + :param input_domains: (not used for `GPR`) + :returns: the new linear state + """ + xtrain, ytrain = training_data + + # Get order of variables for inputs and outputs + if old_state is not None: + x_vars = old_state.x_vars + y_vars = old_state.y_vars + else: + x_vars = list(xtrain.keys()) + y_vars = list(ytrain.keys()) + + # Convert to (N, xdim) and (N, ydim) arrays + x_arr = np.concatenate([xtrain[var][..., np.newaxis] for var in x_vars], axis=-1) + y_arr = np.concatenate([ytrain[var][..., np.newaxis] for var in y_vars], axis=-1) + + self.kernel = PairwiseKernel(metric=self.kernel_type, gamma=1) + WhiteKernel(noise_level=1e-4, + noise_level_bounds=(1e-10, 1e-6)) + + gp = GaussianProcessRegressor(kernel=self.kernel, n_restarts_optimizer=15, random_state=42) + pipe = [('scaler', MinMaxScaler()), ('gpr', gp)] + regressor = Pipeline(pipe) + + regressor.fit(x_arr, y_arr) + + return GPRState(regressor=regressor, x_vars=x_vars, y_vars=y_vars) + + def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]): + """Predict the output of the model at points `x` using the Gaussian Process Regressor provided in `state`. + + :param x: the input Dataset `dict` mapping input variables to prediction locations + :param state: the state containing the Gaussian Process Regressor to use + :param training_data: not used for `GaussianProcessInterpolator` (since the regressor is already trained in `state`) + """ + + # Convert to (N, xdim) array for sklearn + x_arr = np.concatenate([x[var][..., np.newaxis] for var in state.x_vars], axis=-1) + loop_shape = x_arr.shape[:-1] + x_arr = x_arr.reshape((-1, x_arr.shape[-1])) + + y_arr = state.regressor.predict(x_arr) + y_arr = y_arr.reshape(loop_shape + (len(state.y_vars),)) # (..., ydim) + + # Unpack the outputs back into a Dataset + return {var: y_arr[..., i] for i, var in enumerate(state.y_vars)} + + def gradient(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]): + raise NotImplementedError + + def hessian(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]): + raise NotImplementedError diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index c7ce479..b131899 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -7,7 +7,7 @@ from uqtils import approx_hess, approx_jac, ax_default from amisc.examples.models import nonlinear_wave, tanh_func -from amisc.interpolator import Lagrange, Linear +from amisc.interpolator import Lagrange, Linear, GPR from amisc.training import SparseGrid from amisc.utils import relative_error from amisc.variable import Variable @@ -343,3 +343,104 @@ def polynomial_model(inputs, noise_std=0.0): assert coeff_err < tol, f'Error in polynomial coefficients: {coeff_err} > {tol}' assert intercept_err < tol, f'Error in polynomial intercept: {intercept_err} > {tol}' assert err < tol, f'Error in polynomial prediction: {err} > {tol}' + + +def test_GPR_1d(plots=False): + """Test GPR interpoation for a 1D function.""" + num_train = 50 + num_test = 20 + noise_std = 0.5 + + def model(inputs, noise_std): + x1 = inputs['x1'] + y = (6 * x1 - 2) ** 2 * np.sin(12 * x1 - 4) + noise_std * np.random.randn(*x1.shape) + return {'y': y} + + xtrain = {'x1': np.random.uniform(0, 1, num_train)} + ytrain = model(xtrain, noise_std) + + interp = GPR() + state = interp.refine((), (xtrain, ytrain), None, {}) + + xtest = {'x1': np.linspace(0, 1, num_test)} + ytest = model(xtest, noise_std=0) + ypred = interp.predict(xtest, state, ()) + + l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} + print(f'L2 error: {l2_error}') + if noise_std > 0: + assert all((l2_error[var] < 0.09 for var in ypred)), f'L2 error : {l2_error} > 0.09' + else: + assert all((l2_error[var] < 1e-2 for var in ypred)), f'L2 error : {l2_error} > 0.01' + + if plots: + plt.plot(xtest['x1'], ytest['y'], 'r--', label='Model') + plt.plot(xtest['x1'], ypred['y'], 'k', label='Surrogate') + plt.scatter(xtrain['x1'], ytrain['y'], marker='o', s=25, color='blue', label='Training data') + plt.legend() + plt.show() + + +def test_GPR_2d(plots=False): + """Stress Test GPR interpolator with Brannin Function""" + num_train = 100 + num_test = 20 + noise_std = 10 + + def model(inputs, noise_std): + """Branin function.""" + x1 = inputs['x1'] + x2 = inputs['x2'] + a = 1 + b = 5.1 / (4 * np.pi ** 2) + c = 5 / np.pi + r = 6 + s = 10 + t = 1 / (8 * np.pi) + y = a * (x2 - b * x1 ** 2 + c * x1 - r) ** 2 + s * (1 - t) * np.cos(x1) + s + noise_std * np.random.randn() + return {'y': y} + + xtrain = {'x1': np.random.uniform(-5, 10, num_train), 'x2': np.random.uniform(0, 15, num_train)} + ytrain = model(xtrain, noise_std) + interp = GPR() + + state = interp.refine((), (xtrain, ytrain), None, {}) + xtest = {'x1': np.random.uniform(-5, 10, num_test), 'x2': np.random.uniform(0, 15, num_test)} + ytest = model(xtest, noise_std=0) + + ypred = interp.predict(xtest, state, ()) + + l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} + print(f'L2 error: {l2_error}') + if noise_std > 0: + assert all((l2_error[var] < 0.2 for var in ypred)), f'L2 error : {l2_error} > 0.2' + else: + assert all((l2_error[var] < 0.05 for var in ypred)), f'L2 error : {l2_error} > 0.05' + + if plots: + test_x1 = np.linspace(-5, 10, num_test) + test_x2 = np.linspace(0, 15, num_test) + X1, X2 = np.meshgrid(test_x1, test_x2) + test_X = np.array([X1.flatten(), X2.flatten()]).T + true_Y = model({'x1': X1.flatten(), 'x2': X2.flatten()}, noise_std=0)['y'] + pred_Y = interp.predict({'x1': X1.flatten(), 'x2': X2.flatten()}, state, ())['y'] + + import matplotlib.colors as mcolors + norm = mcolors.Normalize(vmin=min(true_Y.min(), pred_Y.min()), vmax=max(true_Y.max(), pred_Y.max())) + fig, axs = plt.subplots(1, 2) + contour1 = axs[0].contourf(X1, X2, true_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) + axs[0].set_title('True Function') + axs[0].set_xlabel('x1') + axs[0].set_ylabel('x2', rotation=0) + axs[0].set_xlim(-5.1, 10.1) + axs[0].set_ylim(-0.1, 15.1) + contour2 = axs[1].contourf(X1, X2, pred_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) + # axs[1].scatter(xtrain['x1'], xtrain['x2'], c=ytrain['y'], cmap='viridis', norm=norm, edgecolors='white', label='Training data') + axs[1].set_title('Predicted Function') + axs[1].set_xlabel('x1') + axs[1].set_ylabel('x2', rotation=0) + axs[1].set_xlim(-5.1, 10.1) + axs[1].set_ylim(-0.1, 15.1) + cbar = fig.colorbar(contour2, ax=axs, orientation='vertical') + cbar.set_label('y', rotation=0) + plt.show() From 73bb649d6ef147444b55642208a6b000b576733d Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Sun, 4 May 2025 15:22:12 -0400 Subject: [PATCH 2/8] minr fixes and documentation --- docs/guides/interface_models.md | 11 ++++++++++- src/amisc/interpolator.py | 26 +++++++++++++++++--------- tests/test_interpolator.py | 11 ++++------- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/docs/guides/interface_models.md b/docs/guides/interface_models.md index 618710d..d9fad93 100644 --- a/docs/guides/interface_models.md +++ b/docs/guides/interface_models.md @@ -228,7 +228,16 @@ As one last note, if your model returns field quantity data, this will be stored ## Specifying a surrogate method The `Component.interpolator` attribute provides the underlying surrogate "interpolation" method, i.e. the specific mathematical relationship that approximates the model's outputs as a function of its inputs. In this sense, we use the terms "interpolator" and "surrogate" interchangeably to mean the underlying approximation method -- the `Component.interpolator` does not necessarily have to "interpolate" the output by passing through all the training data directly. The naming convention mostly arises from the usage of polynomial interpolation in sparse grids. -Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, and [Linear][amisc.interpolator.Linear] regression. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression is implemented through the [scikit-learn](https://scikit-learn.org/stable/) library, and may optionally include polynomial features. +Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, [Linear][amisc.interpolator.Linear] regression, and [GPR][amisc.interpolator.GPR]. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression is implemented through the [scikit-learn](https://scikit-learn.org/stable/) library, and may optionally include polynomial features. Gaussian Process Regression is also implemented through the [scikit-learn](https://scikit-learn.org/stable/). Currently, the kernels supported for [GPR][amisc.interpolator.GPR] are: + +- Chi Squared +- Additive Chi Squared +- Linear +- Polynomial +- RBF +- Laplacian +- Sigmoid +- Cosine You may configure the interpolation method via: ```python diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index 603d150..0e9a2e6 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -17,19 +17,25 @@ import copy import itertools +import warnings from abc import ABC, abstractmethod from dataclasses import dataclass, field import numpy as np from sklearn import linear_model, preprocessing -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import PolynomialFeatures, MinMaxScaler from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C, WhiteKernel, PairwiseKernel +from sklearn.gaussian_process.kernels import PairwiseKernel, WhiteKernel +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures from amisc.serialize import Base64Serializable, Serializable, StringSerializable from amisc.typing import Dataset, MultiIndex +warnings.filterwarnings("ignore", module="sklearn") + + + + __all__ = ["InterpolatorState", "LagrangeState", "LinearState", "GPRState", "Interpolator", "Lagrange", "Linear", "GPR"] @@ -98,8 +104,10 @@ class GPRState(InterpolatorState, Base64Serializable): def __eq__(self, other): if isinstance(other, GPRState): return (self.x_vars == other.x_vars and self.y_vars == other.y_vars and - np.allclose(self.regressor['gpr'].kernel_.k1.metric, other.regressor['gpr'].kernel_.k1.metric) and - np.allclose(self.regressor['gpr'].kernel_.k2.noise_level, other.regressor['gpr'].kernel_.k2.noise_level)) + np.allclose(self.regressor['gpr'].kernel_.k1.metric, + other.regressor['gpr'].kernel_.k1.metric) and + np.allclose(self.regressor['gpr'].kernel_.k2.noise_level, + other.regressor['gpr'].kernel_.k2.noise_level)) else: return False @@ -626,8 +634,8 @@ def hessian(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, @dataclass class GPR(Interpolator, StringSerializable): - """Implementation of Gaussian Process Regression using `sklearn`. The `GaussianProcessInterpolator` uses a pipeline of - `MinMaxScaler` and a `GaussianProcessRegressor` to approximate the input-output mapping. + """Implementation of Gaussian Process Regression using `sklearn`. The `GaussianProcessInterpolator` uses a pipeline + of `MinMaxScaler` and a `GaussianProcessRegressor` to approximate the input-output mapping. MinMaxScaler scales the input dimensions to [0,1] before passing them to the regressor. : ivar kernel_type: the kernel type to use for building the covariance matrix. (Defaults to RBF) @@ -649,7 +657,7 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], :param training_data: a tuple of dictionaries containing the new training data (`xtrain`, `ytrain`) :param old_state: the old regressor state to refine (only used to get the order of input/output variables) :param input_domains: (not used for `GPR`) - :returns: the new linear state + :returns: the new GPR state """ xtrain, ytrain = training_data @@ -681,7 +689,7 @@ def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dat :param x: the input Dataset `dict` mapping input variables to prediction locations :param state: the state containing the Gaussian Process Regressor to use - :param training_data: not used for `GaussianProcessInterpolator` (since the regressor is already trained in `state`) + :param training_data: not used for `GPR` (since the regressor is already trained in `state`) """ # Convert to (N, xdim) array for sklearn diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index b131899..0f5fd1f 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -7,7 +7,7 @@ from uqtils import approx_hess, approx_jac, ax_default from amisc.examples.models import nonlinear_wave, tanh_func -from amisc.interpolator import Lagrange, Linear, GPR +from amisc.interpolator import GPR, Lagrange, Linear from amisc.training import SparseGrid from amisc.utils import relative_error from amisc.variable import Variable @@ -346,7 +346,7 @@ def polynomial_model(inputs, noise_std=0.0): def test_GPR_1d(plots=False): - """Test GPR interpoation for a 1D function.""" + """Test GPR interpolation for a 1D function.""" num_train = 50 num_test = 20 noise_std = 0.5 @@ -421,22 +421,19 @@ def model(inputs, noise_std): test_x1 = np.linspace(-5, 10, num_test) test_x2 = np.linspace(0, 15, num_test) X1, X2 = np.meshgrid(test_x1, test_x2) - test_X = np.array([X1.flatten(), X2.flatten()]).T true_Y = model({'x1': X1.flatten(), 'x2': X2.flatten()}, noise_std=0)['y'] pred_Y = interp.predict({'x1': X1.flatten(), 'x2': X2.flatten()}, state, ())['y'] import matplotlib.colors as mcolors norm = mcolors.Normalize(vmin=min(true_Y.min(), pred_Y.min()), vmax=max(true_Y.max(), pred_Y.max())) fig, axs = plt.subplots(1, 2) - contour1 = axs[0].contourf(X1, X2, true_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) + axs[0].contourf(X1, X2, true_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) axs[0].set_title('True Function') axs[0].set_xlabel('x1') axs[0].set_ylabel('x2', rotation=0) axs[0].set_xlim(-5.1, 10.1) axs[0].set_ylim(-0.1, 15.1) - contour2 = axs[1].contourf(X1, X2, pred_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) - # axs[1].scatter(xtrain['x1'], xtrain['x2'], c=ytrain['y'], cmap='viridis', norm=norm, edgecolors='white', label='Training data') - axs[1].set_title('Predicted Function') + contour2 = axs[1].contourf(X1, X2, pred_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) axs[1].set_xlabel('x1') axs[1].set_ylabel('x2', rotation=0) axs[1].set_xlim(-5.1, 10.1) From 87072a938afcdd3d891513dbdb431dd8a57edc5f Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Sun, 4 May 2025 15:31:06 -0400 Subject: [PATCH 3/8] minor change in test functions --- tests/test_interpolator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index 0f5fd1f..2f8f67b 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -349,7 +349,7 @@ def test_GPR_1d(plots=False): """Test GPR interpolation for a 1D function.""" num_train = 50 num_test = 20 - noise_std = 0.5 + noise_std = 0 def model(inputs, noise_std): x1 = inputs['x1'] @@ -440,4 +440,4 @@ def model(inputs, noise_std): axs[1].set_ylim(-0.1, 15.1) cbar = fig.colorbar(contour2, ax=axs, orientation='vertical') cbar.set_label('y', rotation=0) - plt.show() + plt.show() \ No newline at end of file From 477826ee48e0fcb1278dbf9e1b7ed520ee625628 Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Mon, 5 May 2025 20:49:23 -0400 Subject: [PATCH 4/8] additional kernel support, n - dimensional interpolator test --- docs/guides/interface_models.md | 11 +---- src/amisc/interpolator.py | 59 ++++++++++++++--------- tests/test_convergence.py | 2 +- tests/test_interpolator.py | 85 +++++++++++++++++++++++++-------- tests/test_system.py | 5 +- 5 files changed, 106 insertions(+), 56 deletions(-) diff --git a/docs/guides/interface_models.md b/docs/guides/interface_models.md index d9fad93..27e35a6 100644 --- a/docs/guides/interface_models.md +++ b/docs/guides/interface_models.md @@ -228,16 +228,7 @@ As one last note, if your model returns field quantity data, this will be stored ## Specifying a surrogate method The `Component.interpolator` attribute provides the underlying surrogate "interpolation" method, i.e. the specific mathematical relationship that approximates the model's outputs as a function of its inputs. In this sense, we use the terms "interpolator" and "surrogate" interchangeably to mean the underlying approximation method -- the `Component.interpolator` does not necessarily have to "interpolate" the output by passing through all the training data directly. The naming convention mostly arises from the usage of polynomial interpolation in sparse grids. -Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, [Linear][amisc.interpolator.Linear] regression, and [GPR][amisc.interpolator.GPR]. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression is implemented through the [scikit-learn](https://scikit-learn.org/stable/) library, and may optionally include polynomial features. Gaussian Process Regression is also implemented through the [scikit-learn](https://scikit-learn.org/stable/). Currently, the kernels supported for [GPR][amisc.interpolator.GPR] are: - -- Chi Squared -- Additive Chi Squared -- Linear -- Polynomial -- RBF -- Laplacian -- Sigmoid -- Cosine +Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, [Linear][amisc.interpolator.Linear] regression, and [GPR][amisc.interpolator.GPR]. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression and Gaussian Process Regressions are implemented through the [scikit-learn](https://scikit-learn.org/stable/) library. Linear Regression may optionally include polynomial features while Gaussian Process Regression may optionally include scalers. You may configure the interpolation method via: ```python diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index 0e9a2e6..0ae993e 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -17,25 +17,19 @@ import copy import itertools -import warnings from abc import ABC, abstractmethod from dataclasses import dataclass, field import numpy as np from sklearn import linear_model, preprocessing -from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import PairwiseKernel, WhiteKernel +from sklearn.gaussian_process import GaussianProcessRegressor, kernels +from sklearn.gaussian_process.kernels import WhiteKernel from sklearn.pipeline import Pipeline -from sklearn.preprocessing import MinMaxScaler, PolynomialFeatures +from sklearn.preprocessing import PolynomialFeatures from amisc.serialize import Base64Serializable, Serializable, StringSerializable from amisc.typing import Dataset, MultiIndex -warnings.filterwarnings("ignore", module="sklearn") - - - - __all__ = ["InterpolatorState", "LagrangeState", "LinearState", "GPRState", "Interpolator", "Lagrange", "Linear", "GPR"] @@ -104,8 +98,8 @@ class GPRState(InterpolatorState, Base64Serializable): def __eq__(self, other): if isinstance(other, GPRState): return (self.x_vars == other.x_vars and self.y_vars == other.y_vars and - np.allclose(self.regressor['gpr'].kernel_.k1.metric, - other.regressor['gpr'].kernel_.k1.metric) and + np.allclose(self.regressor['gpr'].kernel_.k1, + other.regressor['gpr'].kernel_.k1) and np.allclose(self.regressor['gpr'].kernel_.k2.noise_level, other.regressor['gpr'].kernel_.k2.noise_level)) else: @@ -635,19 +629,35 @@ def hessian(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, @dataclass class GPR(Interpolator, StringSerializable): """Implementation of Gaussian Process Regression using `sklearn`. The `GaussianProcessInterpolator` uses a pipeline - of `MinMaxScaler` and a `GaussianProcessRegressor` to approximate the input-output mapping. - MinMaxScaler scales the input dimensions to [0,1] before passing them to the regressor. + of a scaler and a `GaussianProcessRegressor` to approximate the input-output mapping. - : ivar kernel_type: the kernel type to use for building the covariance matrix. (Defaults to RBF) + : ivar kernel: the kernel to use for building the covariance matrix. (eg: 'RBF', 'Matern','PairwiseKernel', etc.) + :ivar kernel_opts: Hyperparameters for the kernel chosen. + :ivar scaler: the scikit-learn preprocessing scaler to use (e.g. 'MinMaxScaler', 'StandardScaler', etc.). If None, + MinMaxScaler is used (default). + :ivar regressor_opts: options to pass to the regressor constructor + (see [scikit-learn](https://scikit-learn.org/stable/) documentation). + :ivar scaler_opts: options to pass to the scaler constructor """ - kernel_type: str = 'rbf' # Defaults to RBF Kernel + kernel: str = 'PairwiseKernel' + kernel_opts: dict = field(default_factory=lambda: {'gamma': 1.0, 'metric': 'poly'}) + scaler : str = 'MinMaxScaler' + regressor_opts: dict = field(default_factory = lambda: {'n_restarts_optimizer': 15, 'random_state': 42}) + scaler_opts: dict = field(default_factory=lambda: {'feature_range': (0, 1), 'copy': True, 'clip': False}) def __post_init__(self): - - if self.kernel_type not in ['additive_chi2', 'chi2', 'linear', 'poly', 'polynomial', 'rbf', 'laplacian', - 'sigmoid', 'cosine']: - raise ValueError(f"Metric {self.kernel_type} not found in sklearn.metrics.pairwise") + try: + getattr(kernels, self.kernel) + except AttributeError: + raise ImportError(f"Kernel '{self.kernel}' not found in sklearn.gaussian_process.kernels") + + if self.scaler is not None: + try: + getattr(preprocessing, self.scaler) + except AttributeError: + raise ImportError(f"Scaler '{self.scaler}' not found in sklearn.preprocessing") + def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], old_state: GPRState, input_domains: dict[str, tuple]) -> InterpolatorState: @@ -673,11 +683,14 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], x_arr = np.concatenate([xtrain[var][..., np.newaxis] for var in x_vars], axis=-1) y_arr = np.concatenate([ytrain[var][..., np.newaxis] for var in y_vars], axis=-1) - self.kernel = PairwiseKernel(metric=self.kernel_type, gamma=1) + WhiteKernel(noise_level=1e-4, - noise_level_bounds=(1e-10, 1e-6)) + gp_kernel = getattr(kernels, self.kernel)(**self.kernel_opts) + WhiteKernel(1e-10, (1e-10,1e-2)) + + gp = GaussianProcessRegressor(kernel = gp_kernel, **self.regressor_opts) + pipe = [] + if self.scaler is not None: + pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) + pipe.extend([('gpr', gp)]) - gp = GaussianProcessRegressor(kernel=self.kernel, n_restarts_optimizer=15, random_state=42) - pipe = [('scaler', MinMaxScaler()), ('gpr', gp)] regressor = Pipeline(pipe) regressor.fit(x_arr, y_arr) diff --git a/tests/test_convergence.py b/tests/test_convergence.py index 1d2a5d8..cdcac31 100644 --- a/tests/test_convergence.py +++ b/tests/test_convergence.py @@ -64,4 +64,4 @@ def curved_func(inputs, noise_std=0.0): ysurr = system.predict(xt) l2_error = relative_error(ysurr['y'], yt['y']) rel_noise = 2 * noise_std / np.mean(yt['y']) - assert l2_error < rel_noise, f"L2 error: {l2_error} is greater than relative noise level: {rel_noise}" + assert l2_error < rel_noise, f"L2 error: {l2_error} is greater than relative noise level: {rel_noise}" \ No newline at end of file diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index 2f8f67b..ff732de 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -346,32 +346,28 @@ def polynomial_model(inputs, noise_std=0.0): def test_GPR_1d(plots=False): - """Test GPR interpolation for a 1D function.""" - num_train = 50 + """Test GPR interpolation for no noise 1D function.""" + num_train = 100 num_test = 20 - noise_std = 0 - def model(inputs, noise_std): + def model(inputs): x1 = inputs['x1'] - y = (6 * x1 - 2) ** 2 * np.sin(12 * x1 - 4) + noise_std * np.random.randn(*x1.shape) + y = (6 * x1 - 2) ** 2 * np.sin(12 * x1 - 4) return {'y': y} xtrain = {'x1': np.random.uniform(0, 1, num_train)} - ytrain = model(xtrain, noise_std) + ytrain = model(xtrain) - interp = GPR() + interp = GPR(kernel='RBF', kernel_opts={'length_scale': 1}) state = interp.refine((), (xtrain, ytrain), None, {}) xtest = {'x1': np.linspace(0, 1, num_test)} - ytest = model(xtest, noise_std=0) + ytest = model(xtest) ypred = interp.predict(xtest, state, ()) l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} - print(f'L2 error: {l2_error}') - if noise_std > 0: - assert all((l2_error[var] < 0.09 for var in ypred)), f'L2 error : {l2_error} > 0.09' - else: - assert all((l2_error[var] < 1e-2 for var in ypred)), f'L2 error : {l2_error} > 0.01' + assert all ([l2_error[var] < 0.01 for var in l2_error]), f'L2 error {l2_error} is greater than 0.01' + if plots: plt.plot(xtest['x1'], ytest['y'], 'r--', label='Model') @@ -379,10 +375,22 @@ def model(inputs, noise_std): plt.scatter(xtrain['x1'], ytrain['y'], marker='o', s=25, color='blue', label='Training data') plt.legend() plt.show() + + '''Test for a few other kernels''' + regressors = {'Matern': {'length_scale': 1, 'nu':2.5}, + 'RationalQuadratic': {'length_scale': 1, 'alpha': 1}, + 'ExpSineSquared': {'length_scale': 1, 'periodicity': 1}} + for regressor, opts in regressors.items(): + interp = GPR(kernel=regressor, kernel_opts=opts) + state = interp.refine((), (xtrain, ytrain), None, {}) + + ypred = interp.predict(xtest, state, ()) + err = {var: relative_error(ypred[var], ytest[var]) for var in ypred} + assert all([err[var] < 0.01 for var in err]), f'L2 error {err} is greater than 0.01 for {regressor}' def test_GPR_2d(plots=False): - """Stress Test GPR interpolator with Brannin Function""" + """Stress Test GPR interpolator with 2D Brannin Function""" num_train = 100 num_test = 20 noise_std = 10 @@ -411,11 +419,9 @@ def model(inputs, noise_std): ypred = interp.predict(xtest, state, ()) l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} - print(f'L2 error: {l2_error}') - if noise_std > 0: - assert all((l2_error[var] < 0.2 for var in ypred)), f'L2 error : {l2_error} > 0.2' - else: - assert all((l2_error[var] < 0.05 for var in ypred)), f'L2 error : {l2_error} > 0.05' + + rel_noise = 2 *noise_std / np.mean(ytest['y']) + assert l2_error['y'] < rel_noise, f'L2 error: {l2_error["y"]} is greater than relative noise {rel_noise}' if plots: test_x1 = np.linspace(-5, 10, num_test) @@ -440,4 +446,43 @@ def model(inputs, noise_std): axs[1].set_ylim(-0.1, 15.1) cbar = fig.colorbar(contour2, ax=axs, orientation='vertical') cbar.set_label('y', rotation=0) - plt.show() \ No newline at end of file + plt.show() + +def test_GPR_nd(): + """Multi-dimensionsal GPR test with noise""" + num_train = 150 + num_test = 20 + noise_std = 5 + + def model(inputs, noise_std=0.0): + x1 = inputs['x1'] + x2 = inputs['x2'] + x3 = inputs['x3'] + x4 = inputs['x4'] + + y1 = x1 + 0.01 * x2 + 0.1 * x3 + np.random.randn(*x1.shape) * noise_std + y2 = np.sin(np.pi * x1) + 0.001 * x2 + 0.5 * x4 + np.random.randn(*x1.shape) * noise_std + y3 = x1 * x4 + 0.05 * x3**2 + np.random.randn(*x1.shape) * noise_std + + + return {'y1': y1, 'y2': y2, 'y3': y3} + + xtrain = {'x1': np.random.uniform(0,1, num_train), + 'x2': np.random.uniform(100,500, num_train), + 'x3': np.random.uniform(-10,10, num_train), + 'x4': np.random.uniform(0.01,10, num_train)} + ytrain = model(xtrain, noise_std=noise_std) + + interp = GPR(kernel='RBF', kernel_opts={'length_scale': 1}) + state = interp.refine((), (xtrain, ytrain), None, {}) + xtest = {'x1': np.random.uniform(0,1, num_test), + 'x2': np.random.uniform(100,500, num_test), + 'x3': np.random.uniform(-10,10, num_test), + 'x4': np.random.uniform(0.01,10, num_test)} + ytest = model(xtest, noise_std=0) + ypred = interp.predict(xtest, state, ()) + l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} + rel_noise = {var: np.abs(2 * noise_std / np.mean(ytest[var])) for var in ytest} + assert all([l2_error[var] < rel_noise[var] for var in l2_error]), f'L2 error: {l2_error} is \ + greater than relative noise {rel_noise}' + \ No newline at end of file diff --git a/tests/test_system.py b/tests/test_system.py index 2c97f3b..4d84908 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -422,9 +422,10 @@ def test_fire_sat(tmp_path, plots=False): # Test multiple interpolators on the Power component interpolators = {'lagrange': {}, - 'linear': {'regressor': 'RidgeCV', 'regressor_opts': {'alphas': np.logspace(-5, 4, 10).tolist()}} + 'linear': {'regressor': 'RidgeCV', 'regressor_opts': {'alphas': np.logspace(-5, 4, 10).tolist()}}, + 'GPR' :{} } - surrogate_fidelities = {'lagrange': (), 'linear': (2,)} + surrogate_fidelities = {'lagrange': (), 'linear': (2,), 'GPR': (2,)} for interpolator, config in interpolators.items(): surr.logger.info(f'Running "{interpolator}" interpolator...') From 8cff5919e1523490b14d25f14613f7f9bad4c968 Mon Sep 17 00:00:00 2001 From: Joshua Eckels Date: Wed, 7 May 2025 08:38:57 -0600 Subject: [PATCH 5/8] Allow kernel operators, fix GPR tests for RBF default --- docs/guides/extend.md | 2 +- src/amisc/component.py | 2 +- src/amisc/interpolator.py | 129 +++++++++++++++++++++++++------------ tests/test_interpolator.py | 81 ++++++++++++----------- tests/test_system.py | 4 +- 5 files changed, 135 insertions(+), 83 deletions(-) diff --git a/docs/guides/extend.md b/docs/guides/extend.md index d0e6313..efc9aa4 100644 --- a/docs/guides/extend.md +++ b/docs/guides/extend.md @@ -150,7 +150,7 @@ class CustomInterpolatorState(InterpolatorState): pass ``` -Currently, the available methods are `Lagrange` polynomial interpolation and `Linear` regression. The state of a `Lagrange` polynomial includes the 1d grids and barycentric weights for each input dimension. The state of a `Linear` regression includes the underlying [scikit-learn](https://scikit-learn.org/stable/) linear model. See [Lagrange][amisc.interpolator.Lagrange] and [Linear][amisc.interpolator.Linear] for more details. Note that linear regression also includes options for polynomial features. +Currently, the available methods are `Lagrange` polynomial interpolation, `Linear` regression, and Gaussian process regression `GPR`. The state of a `Lagrange` polynomial includes the 1d grids and barycentric weights for each input dimension. The state of a `Linear` regression or `GPR` includes the underlying [scikit-learn](https://scikit-learn.org/stable/) regression model. See [Lagrange][amisc.interpolator.Lagrange], [Linear][amisc.interpolator.Linear], and [GPR][amisc.interpolator.GPR] for more details. Note that linear regression also includes options for polynomial features, and GPR includes options for the kernel function. ## Model keyword arguments The `ModelKwarg` interface provides a dataclass for passing extra options to the underlying component models. The default is a simple `dict` that gets passed as a set of `key=value` pairs. The primary reason for overriding this class is if you have complicated arguments that require custom serialization. See the [serialization](#serialization) section below. diff --git a/src/amisc/component.py b/src/amisc/component.py index a1cc48d..d0e7f74 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -1514,7 +1514,7 @@ def serialize(self, keep_yaml_objects: bool = False, serialize_args: dict[str, t if len(value) > 0: d[key] = {str(k): int(v) for k, v in value.items()} elif key in ComponentSerializers.__annotations__.keys(): - if key in ['training_data', 'interpolator'] and not self.has_surrogate: + if key in ['training_data'] and not self.has_surrogate: continue else: d[key] = value.serialize(*serialize_args.get(key, ()), **serialize_kwargs.get(key, {})) diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index 0ae993e..bba9abd 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -23,7 +23,6 @@ import numpy as np from sklearn import linear_model, preprocessing from sklearn.gaussian_process import GaussianProcessRegressor, kernels -from sklearn.gaussian_process.kernels import WhiteKernel from sklearn.pipeline import Pipeline from sklearn.preprocessing import PolynomialFeatures @@ -88,7 +87,7 @@ class GPRState(InterpolatorState, Base64Serializable): :ivar x_vars: the input variables in order :ivar y_vars: the output variables in order - :ivar regressor: the sklearn regressor object, a pipeline that consists of a `MinMaxScaler` and a + :ivar regressor: the sklearn regressor object, a pipeline that consists of a preprocessing scaler and a `GaussianProcessRegressor`. """ x_vars: list[str] = field(default_factory=list) @@ -98,10 +97,9 @@ class GPRState(InterpolatorState, Base64Serializable): def __eq__(self, other): if isinstance(other, GPRState): return (self.x_vars == other.x_vars and self.y_vars == other.y_vars and - np.allclose(self.regressor['gpr'].kernel_.k1, - other.regressor['gpr'].kernel_.k1) and - np.allclose(self.regressor['gpr'].kernel_.k2.noise_level, - other.regressor['gpr'].kernel_.k2.noise_level)) + len(self.regressor.steps) == len(other.regressor.steps) and + self.regressor['gpr'].alpha == other.regressor['gpr'].alpha and + self.regressor['gpr'].kernel_ == other.regressor['gpr'].kernel_) else: return False @@ -114,8 +112,8 @@ class Interpolator(Serializable, ABC): - `gradient` - compute the grdient/Jacobian at new points (if you want) - `hessian` - compute the 2nd derivative/Hessian at new points (if you want) - Currently, `Lagrange` and `Linear` interpolators are supported and can be constructed from a configuration `dict` - via `Interpolator.from_dict()`. + Currently, `Lagrange`, `Linear`, and `GPR` interpolators are supported and can be constructed from a configuration + `dict` via `Interpolator.from_dict()`. """ @abstractmethod @@ -170,17 +168,27 @@ def hessian(self, x: Dataset, state: InterpolatorState, training_data: tuple[Dat @classmethod def from_dict(cls, config: dict) -> Interpolator: - """Create an `Interpolator` object from a `dict` config. Only `method='lagrange'` is supported for now.""" - method = config.pop('method', 'lagrange').lower() - match method: + """Create an `Interpolator` object from a `dict` config. Available methods are `lagrange`, `linear`, and `gpr`. + Will attempt to find the method if not listed. + + :param config: a `dict` containing the configuration for the interpolator, with the `method` key specifying the + name of the interpolator method to use, and the rest of the keys are options for the method + """ + method = config.pop('method', 'lagrange') + match method.lower(): case 'lagrange': return Lagrange(**config) case 'linear': return Linear(**config) case 'gpr': return GPR(**config) - case other: - raise NotImplementedError(f"Unknown interpolator method: {other}") + case _: + import amisc.interpolator + + if hasattr(amisc.interpolator, method): + return getattr(amisc.interpolator, method)(**config) + + raise NotImplementedError(f"Unknown interpolator method: {method}") @dataclass @@ -628,36 +636,75 @@ def hessian(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, @dataclass class GPR(Interpolator, StringSerializable): - """Implementation of Gaussian Process Regression using `sklearn`. The `GaussianProcessInterpolator` uses a pipeline - of a scaler and a `GaussianProcessRegressor` to approximate the input-output mapping. + """Implementation of Gaussian Process Regression using `sklearn`. The `GPR` uses a pipeline + of a scaler and a `GaussianProcessRegressor` to approximate the input-output mapping. - : ivar kernel: the kernel to use for building the covariance matrix. (eg: 'RBF', 'Matern','PairwiseKernel', etc.) - :ivar kernel_opts: Hyperparameters for the kernel chosen. :ivar scaler: the scikit-learn preprocessing scaler to use (e.g. 'MinMaxScaler', 'StandardScaler', etc.). If None, - MinMaxScaler is used (default). - :ivar regressor_opts: options to pass to the regressor constructor - (see [scikit-learn](https://scikit-learn.org/stable/) documentation). + no scaling is applied (default). + :ivar kernel: the kernel to use for building the covariance matrix (e.g. 'RBF', 'Matern', 'PairwiseKernel', etc.). + If a string is provided, then the specified kernel is used with the given `kernel_opts`. + If a list is provided, then kernel operators ('Sum', 'Product', 'Exponentiation') can be used to + combine multiple kernels. The first element of the list should be the kernel or operator name, and + the remaining elements should be the arguments. Dicts are accepted as **kwargs. For example: + `['Sum', ['RBF', {'length_scale': 1.0}], ['Matern', {'length_scale': 1.0}]]` will create a sum of + an RBF and a Matern kernel with the specified length scales. :ivar scaler_opts: options to pass to the scaler constructor + :ivar kernel_opts: options to pass to the kernel constructor (ignored if kernel is a list, where opts are already + specified for combinations of kernels). + :ivar regressor_opts: options to pass to the `GaussianProcessRegressor` constructor + (see [scikit-learn](https://scikit-learn.org/stable/) documentation). """ + scaler: str = None + kernel: str | list = 'RBF' + scaler_opts: dict = field(default_factory=dict) + kernel_opts: dict = field(default_factory=dict) + regressor_opts: dict = field(default_factory=lambda: {'n_restarts_optimizer': 5}) + + def _construct_kernel(self, kernel_list): + """Build a scikit-learn kernel from a list of kernels (e.g. RBF, Matern, etc.) and kernel operators + (Sum, Product, Exponentiation). + + !!! Example + `['Sum', ['RBF'], ['Matern', {'length_scale': 1.0}]]` will become `RBF() + Matern(length_scale=1.0)` + + :param kernel_list: list of kernel/operator names and arguments. Kwarg options can be passed as dicts. + :returns: the scikit-learn kernel object + """ + # Base case for single items (just return as is) + if not isinstance(kernel_list, list): + return kernel_list + + name = kernel_list[0] + args = [self._construct_kernel(ele) for ele in kernel_list[1:]] + + # Base case for passing a single dict of kwargs + if len(args) == 1 and isinstance(args[0], dict): + return getattr(kernels, name)(**args[0]) + + # Base case for passing a list of args + return getattr(kernels, name)(*args) + + def _validate_kernel(self, kernel_list): + """Make sure all requested kernels are available in scikit-learn.""" + if not isinstance(kernel_list, list): + return - kernel: str = 'PairwiseKernel' - kernel_opts: dict = field(default_factory=lambda: {'gamma': 1.0, 'metric': 'poly'}) - scaler : str = 'MinMaxScaler' - regressor_opts: dict = field(default_factory = lambda: {'n_restarts_optimizer': 15, 'random_state': 42}) - scaler_opts: dict = field(default_factory=lambda: {'feature_range': (0, 1), 'copy': True, 'clip': False}) + name = kernel_list[0] + + if not hasattr(kernels, name): + raise ImportError(f"Kernel '{name}' not found in sklearn.gaussian_process.kernels") + + for ele in kernel_list[1:]: + self._validate_kernel(ele) def __post_init__(self): - try: - getattr(kernels, self.kernel) - except AttributeError: - raise ImportError(f"Kernel '{self.kernel}' not found in sklearn.gaussian_process.kernels") - + self._validate_kernel(self.kernel if isinstance(self.kernel, list) else [self.kernel, self.kernel_opts]) + if self.scaler is not None: try: getattr(preprocessing, self.scaler) except AttributeError: raise ImportError(f"Scaler '{self.scaler}' not found in sklearn.preprocessing") - def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], old_state: GPRState, input_domains: dict[str, tuple]) -> InterpolatorState: @@ -669,6 +716,15 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], :param input_domains: (not used for `GPR`) :returns: the new GPR state """ + gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list) + else [self.kernel, self.kernel_opts]) + gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts) + pipe = [] + if self.scaler is not None: + pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) + pipe.append(('gpr', gp)) + regressor = Pipeline(pipe) + xtrain, ytrain = training_data # Get order of variables for inputs and outputs @@ -683,16 +739,6 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], x_arr = np.concatenate([xtrain[var][..., np.newaxis] for var in x_vars], axis=-1) y_arr = np.concatenate([ytrain[var][..., np.newaxis] for var in y_vars], axis=-1) - gp_kernel = getattr(kernels, self.kernel)(**self.kernel_opts) + WhiteKernel(1e-10, (1e-10,1e-2)) - - gp = GaussianProcessRegressor(kernel = gp_kernel, **self.regressor_opts) - pipe = [] - if self.scaler is not None: - pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts))) - pipe.extend([('gpr', gp)]) - - regressor = Pipeline(pipe) - regressor.fit(x_arr, y_arr) return GPRState(regressor=regressor, x_vars=x_vars, y_vars=y_vars) @@ -704,7 +750,6 @@ def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dat :param state: the state containing the Gaussian Process Regressor to use :param training_data: not used for `GPR` (since the regressor is already trained in `state`) """ - # Convert to (N, xdim) array for sklearn x_arr = np.concatenate([x[var][..., np.newaxis] for var in state.x_vars], axis=-1) loop_shape = x_arr.shape[:-1] diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index ff732de..5c2372f 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -347,18 +347,18 @@ def polynomial_model(inputs, noise_std=0.0): def test_GPR_1d(plots=False): """Test GPR interpolation for no noise 1D function.""" - num_train = 100 - num_test = 20 + num_train = 30 + num_test = 40 def model(inputs): x1 = inputs['x1'] y = (6 * x1 - 2) ** 2 * np.sin(12 * x1 - 4) return {'y': y} - xtrain = {'x1': np.random.uniform(0, 1, num_train)} + xtrain = {'x1': np.linspace(0, 1, num_train)} ytrain = model(xtrain) - interp = GPR(kernel='RBF', kernel_opts={'length_scale': 1}) + interp = GPR(kernel='RBF', kernel_opts={'length_scale': 0.1, 'length_scale_bounds': 'fixed'}) state = interp.refine((), (xtrain, ytrain), None, {}) xtest = {'x1': np.linspace(0, 1, num_test)} @@ -366,8 +366,7 @@ def model(inputs): ypred = interp.predict(xtest, state, ()) l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} - assert all ([l2_error[var] < 0.01 for var in l2_error]), f'L2 error {l2_error} is greater than 0.01' - + assert all([l2_error[var] < 0.01 for var in l2_error]), f'L2 error {l2_error} is greater than 0.01' if plots: plt.plot(xtest['x1'], ytest['y'], 'r--', label='Model') @@ -375,9 +374,9 @@ def model(inputs): plt.scatter(xtrain['x1'], ytrain['y'], marker='o', s=25, color='blue', label='Training data') plt.legend() plt.show() - - '''Test for a few other kernels''' - regressors = {'Matern': {'length_scale': 1, 'nu':2.5}, + + # Test for a few other kernels + regressors = {'Matern': {'length_scale': 0.1, 'nu': 2.5}, 'RationalQuadratic': {'length_scale': 1, 'alpha': 1}, 'ExpSineSquared': {'length_scale': 1, 'periodicity': 1}} for regressor, opts in regressors.items(): @@ -390,27 +389,34 @@ def model(inputs): def test_GPR_2d(plots=False): - """Stress Test GPR interpolator with 2D Brannin Function""" - num_train = 100 - num_test = 20 - noise_std = 10 + """Test GPR interpolator with 2D Branin Function (https://www.sfu.ca/~ssurjano/branin.html)""" + num_train = 150 + num_test = 30 + noise_std = 8 def model(inputs, noise_std): """Branin function.""" x1 = inputs['x1'] x2 = inputs['x2'] + input_shape = np.atleast_1d(x1).shape a = 1 b = 5.1 / (4 * np.pi ** 2) c = 5 / np.pi r = 6 s = 10 t = 1 / (8 * np.pi) - y = a * (x2 - b * x1 ** 2 + c * x1 - r) ** 2 + s * (1 - t) * np.cos(x1) + s + noise_std * np.random.randn() + y = (a * (x2 - b * x1 ** 2 + c * x1 - r) ** 2 + s * (1 - t) * np.cos(x1) + s + + noise_std * np.random.randn(*input_shape)) return {'y': y} xtrain = {'x1': np.random.uniform(-5, 10, num_train), 'x2': np.random.uniform(0, 15, num_train)} ytrain = model(xtrain, noise_std) - interp = GPR() + interp = GPR(kernel=['Sum', + ['Product', + ['ConstantKernel', {'constant_value': 100, 'constant_value_bounds': (1000, 100000)}], + ['RBF', {'length_scale': 5, 'length_scale_bounds': (1, 10)}]], + ['WhiteKernel', {'noise_level': noise_std, 'noise_level_bounds': (0.1 * noise_std, + 10 * noise_std)}]]) state = interp.refine((), (xtrain, ytrain), None, {}) xtest = {'x1': np.random.uniform(-5, 10, num_test), 'x2': np.random.uniform(0, 15, num_test)} @@ -419,40 +425,43 @@ def model(inputs, noise_std): ypred = interp.predict(xtest, state, ()) l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} - - rel_noise = 2 *noise_std / np.mean(ytest['y']) + rel_noise = 2 * noise_std / np.percentile(ytrain['y'], 75) # pretty skewed so use percentile assert l2_error['y'] < rel_noise, f'L2 error: {l2_error["y"]} is greater than relative noise {rel_noise}' if plots: test_x1 = np.linspace(-5, 10, num_test) test_x2 = np.linspace(0, 15, num_test) X1, X2 = np.meshgrid(test_x1, test_x2) - true_Y = model({'x1': X1.flatten(), 'x2': X2.flatten()}, noise_std=0)['y'] - pred_Y = interp.predict({'x1': X1.flatten(), 'x2': X2.flatten()}, state, ())['y'] + true_Y = model({'x1': X1, 'x2': X2}, noise_std=0)['y'] + pred_Y = interp.predict({'x1': X1, 'x2': X2}, state, ())['y'] import matplotlib.colors as mcolors norm = mcolors.Normalize(vmin=min(true_Y.min(), pred_Y.min()), vmax=max(true_Y.max(), pred_Y.max())) - fig, axs = plt.subplots(1, 2) - axs[0].contourf(X1, X2, true_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) + fig, axs = plt.subplots(1, 2, figsize=(11, 5), layout='tight') + axs[0].contourf(X1, X2, true_Y, levels=10, cmap='viridis', norm=norm) axs[0].set_title('True Function') axs[0].set_xlabel('x1') axs[0].set_ylabel('x2', rotation=0) axs[0].set_xlim(-5.1, 10.1) axs[0].set_ylim(-0.1, 15.1) - contour2 = axs[1].contourf(X1, X2, pred_Y.reshape(X1.shape), levels=10, cmap='viridis', norm=norm) + contour2 = axs[1].contourf(X1, X2, pred_Y, levels=10, cmap='viridis', norm=norm) + axs[1].scatter(xtrain['x1'], xtrain['x2'], c=ytrain['y'], marker='o', s=25, cmap='viridis', + norm=norm, alpha=1, linewidths=2, edgecolors='black') axs[1].set_xlabel('x1') axs[1].set_ylabel('x2', rotation=0) axs[1].set_xlim(-5.1, 10.1) axs[1].set_ylim(-0.1, 15.1) - cbar = fig.colorbar(contour2, ax=axs, orientation='vertical') + axs[1].set_title('GPR surrogate') + cbar = fig.colorbar(contour2, ax=axs[1], orientation='vertical') cbar.set_label('y', rotation=0) plt.show() + def test_GPR_nd(): """Multi-dimensionsal GPR test with noise""" num_train = 150 - num_test = 20 - noise_std = 5 + num_test = 30 + noise_std = 0.1 def model(inputs, noise_std=0.0): x1 = inputs['x1'] @@ -464,25 +473,23 @@ def model(inputs, noise_std=0.0): y2 = np.sin(np.pi * x1) + 0.001 * x2 + 0.5 * x4 + np.random.randn(*x1.shape) * noise_std y3 = x1 * x4 + 0.05 * x3**2 + np.random.randn(*x1.shape) * noise_std - return {'y1': y1, 'y2': y2, 'y3': y3} - - xtrain = {'x1': np.random.uniform(0,1, num_train), - 'x2': np.random.uniform(100,500, num_train), - 'x3': np.random.uniform(-10,10, num_train), - 'x4': np.random.uniform(0.01,10, num_train)} + + xtrain = {'x1': np.random.uniform(0, 1, num_train), + 'x2': np.random.uniform(100, 500, num_train), + 'x3': np.random.uniform(-10, 10, num_train), + 'x4': np.random.uniform(0.01, 0.1, num_train)} ytrain = model(xtrain, noise_std=noise_std) - interp = GPR(kernel='RBF', kernel_opts={'length_scale': 1}) + interp = GPR(kernel=['Sum', ['RBF', {'length_scale': [0.1, 10, 1, 0.1]}], ['WhiteKernel']]) state = interp.refine((), (xtrain, ytrain), None, {}) - xtest = {'x1': np.random.uniform(0,1, num_test), - 'x2': np.random.uniform(100,500, num_test), - 'x3': np.random.uniform(-10,10, num_test), - 'x4': np.random.uniform(0.01,10, num_test)} + xtest = {'x1': np.random.uniform(0, 1, num_test), + 'x2': np.random.uniform(100, 500, num_test), + 'x3': np.random.uniform(-10, 10, num_test), + 'x4': np.random.uniform(0.01, 0.1, num_test)} ytest = model(xtest, noise_std=0) ypred = interp.predict(xtest, state, ()) l2_error = {var: relative_error(ypred[var], ytest[var]) for var in ypred} rel_noise = {var: np.abs(2 * noise_std / np.mean(ytest[var])) for var in ytest} assert all([l2_error[var] < rel_noise[var] for var in l2_error]), f'L2 error: {l2_error} is \ greater than relative noise {rel_noise}' - \ No newline at end of file diff --git a/tests/test_system.py b/tests/test_system.py index 4d84908..5f9a094 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -423,9 +423,9 @@ def test_fire_sat(tmp_path, plots=False): # Test multiple interpolators on the Power component interpolators = {'lagrange': {}, 'linear': {'regressor': 'RidgeCV', 'regressor_opts': {'alphas': np.logspace(-5, 4, 10).tolist()}}, - 'GPR' :{} + 'GPR': {} } - surrogate_fidelities = {'lagrange': (), 'linear': (2,), 'GPR': (2,)} + surrogate_fidelities = {'lagrange': (), 'linear': (2,), 'GPR': ()} for interpolator, config in interpolators.items(): surr.logger.info(f'Running "{interpolator}" interpolator...') From d83daffa4308151946fb47da53ef6d5a0b6a879a Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Wed, 7 May 2025 13:43:49 -0400 Subject: [PATCH 6/8] integrate GPR into fire_sat test. --- tests/test_system.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_system.py b/tests/test_system.py index 5f9a094..a9b83ac 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -423,7 +423,9 @@ def test_fire_sat(tmp_path, plots=False): # Test multiple interpolators on the Power component interpolators = {'lagrange': {}, 'linear': {'regressor': 'RidgeCV', 'regressor_opts': {'alphas': np.logspace(-5, 4, 10).tolist()}}, - 'GPR': {} + 'GPR': {'scaler': 'MinMaxScaler', 'kernel': ['Product', + ['ConstantKernel'], + ['PairwiseKernel', {'metric': 'poly'}]]} } surrogate_fidelities = {'lagrange': (), 'linear': (2,), 'GPR': ()} @@ -581,4 +583,4 @@ def _object_to_numeric(array): # for field quantities # Check error final_iter = system.train_history[-1] for var, perf in final_iter['test_error'].items(): - assert perf < 0.2 + assert perf < 0.2 \ No newline at end of file From df707dca1f8f92cb19e7a3fa15b10bd0f5e3e359 Mon Sep 17 00:00:00 2001 From: Joshua Eckels Date: Wed, 14 May 2025 18:01:09 -0600 Subject: [PATCH 7/8] Fix sklearn warnings --- tests/test_component.py | 4 +++- tests/test_convergence.py | 6 ++++-- tests/test_interpolator.py | 5 +++-- tests/test_system.py | 13 +++++++------ 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tests/test_component.py b/tests/test_component.py index 883d76e..8d3bdd4 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -4,6 +4,7 @@ import itertools import time import uuid +import warnings from dataclasses import dataclass, field from pathlib import Path @@ -408,7 +409,8 @@ def model(inputs): Ik = [((), (0, 0)), ((), (0, 1)), ((), (1, 0)), ((), (2, 0)), ((), (1, 1)), ((), (0, 2)), ((), (1, 2)), ((), (2, 1)), ((), (2, 2))] for alpha, beta in Ik: - comp.activate_index(alpha, beta) + with warnings.catch_warnings(action='ignore', category=RuntimeWarning): + comp.activate_index(alpha, beta) xg = np.linspace(0, 1, 100) yt = comp.predict({'x': xg}, use_model='best')['y'] ysurr = comp.predict({'x': xg})['y'] diff --git a/tests/test_convergence.py b/tests/test_convergence.py index cdcac31..707d4d5 100644 --- a/tests/test_convergence.py +++ b/tests/test_convergence.py @@ -1,4 +1,5 @@ """Test convergence of `System.fit` to a few smooth analytical models.""" +import warnings from pathlib import Path from typing import Literal @@ -60,8 +61,9 @@ def curved_func(inputs, noise_std=0.0): xt = system.sample_inputs(1000) yt = curved_func(xt) - system.fit(max_iter=50, max_tol=1e-4) + with warnings.catch_warnings(action='ignore', category=RuntimeWarning): + system.fit(max_iter=50, max_tol=1e-4) ysurr = system.predict(xt) l2_error = relative_error(ysurr['y'], yt['y']) rel_noise = 2 * noise_std / np.mean(yt['y']) - assert l2_error < rel_noise, f"L2 error: {l2_error} is greater than relative noise level: {rel_noise}" \ No newline at end of file + assert l2_error < rel_noise, f"L2 error: {l2_error} is greater than relative noise level: {rel_noise}" diff --git a/tests/test_interpolator.py b/tests/test_interpolator.py index 5c2372f..21c8b2a 100644 --- a/tests/test_interpolator.py +++ b/tests/test_interpolator.py @@ -377,8 +377,9 @@ def model(inputs): # Test for a few other kernels regressors = {'Matern': {'length_scale': 0.1, 'nu': 2.5}, - 'RationalQuadratic': {'length_scale': 1, 'alpha': 1}, - 'ExpSineSquared': {'length_scale': 1, 'periodicity': 1}} + 'RationalQuadratic': {'length_scale': 1, 'alpha': 1, 'alpha_bounds': 'fixed'}, + 'ExpSineSquared': {'length_scale': 1e-3, 'length_scale_bounds': 'fixed', + 'periodicity': 1000, 'periodicity_bounds': 'fixed'}} for regressor, opts in regressors.items(): interp = GPR(kernel=regressor, kernel_opts=opts) state = interp.refine((), (xtrain, ytrain), None, {}) diff --git a/tests/test_system.py b/tests/test_system.py index a9b83ac..aa11079 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -8,6 +8,7 @@ import numpy as np from scipy.optimize import fsolve from scipy.stats import gaussian_kde +from sklearn.exceptions import ConvergenceWarning from uqtils import ax_default from amisc import Component, System, Variable @@ -423,9 +424,7 @@ def test_fire_sat(tmp_path, plots=False): # Test multiple interpolators on the Power component interpolators = {'lagrange': {}, 'linear': {'regressor': 'RidgeCV', 'regressor_opts': {'alphas': np.logspace(-5, 4, 10).tolist()}}, - 'GPR': {'scaler': 'MinMaxScaler', 'kernel': ['Product', - ['ConstantKernel'], - ['PairwiseKernel', {'metric': 'poly'}]]} + 'GPR': {'scaler': 'MinMaxScaler', 'kernel': 'PairwiseKernel', 'kernel_opts': {'metric': 'poly'}} } surrogate_fidelities = {'lagrange': (), 'linear': (2,), 'GPR': ()} @@ -434,8 +433,10 @@ def test_fire_sat(tmp_path, plots=False): surr.clear() surr['Power'].interpolator = Interpolator.from_dict(dict(method=interpolator, **config)) surr['Power'].surrogate_fidelity = surrogate_fidelities[interpolator] - surr.fit(targets=targs, max_iter=10, max_tol=1e-3, runtime_hr=4/12, test_set=(xt, yt), - plot_interval=4, estimate_bounds=True) + + with warnings.catch_warnings(action='ignore', category=(RuntimeWarning, ConvergenceWarning)): + surr.fit(targets=targs, max_iter=10, max_tol=1e-3, runtime_hr=4/12, test_set=(xt, yt), + plot_interval=4, estimate_bounds=True) ysurr = surr.predict(xt, targets=targs) for var in yt: @@ -583,4 +584,4 @@ def _object_to_numeric(array): # for field quantities # Check error final_iter = system.train_history[-1] for var, perf in final_iter['test_error'].items(): - assert perf < 0.2 \ No newline at end of file + assert perf < 0.2 From 147b699e09fb1fce4d97b46ce72c748c7fb3f330 Mon Sep 17 00:00:00 2001 From: Shubham Rath Date: Mon, 13 Oct 2025 16:56:41 -0400 Subject: [PATCH 8/8] Integrate Uncertainty Sampling routine --- README.md | 2 +- src/amisc/component.py | 18 +- src/amisc/interpolator.py | 3 +- src/amisc/training.py | 603 +++++++++++++++++++++++++++++++++++- tests/test_training_data.py | 53 +++- 5 files changed, 670 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5f7811d..934d6be 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ ![build](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/deploy.yml?logo=github) ![docs](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/docs.yml?logo=materialformkdocs&logoColor=%2523cccccc&label=docs) ![tests](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/tests.yml?logo=github&logoColor=%2523cccccc&label=tests) -![Code Coverage](https://img.shields.io/badge/coverage-89%25-yellowgreen?logo=codecov) +![Code Coverage](https://img.shields.io/badge/coverage-86%25-yellowgreen?logo=codecov) [![Algorithm description](https://img.shields.io/badge/DOI-10.1002/nme.6958-blue)](https://doi.org/10.1002/nme.6958) Efficient framework for building surrogates of multidisciplinary systems using the adaptive multi-index stochastic collocation ([AMISC](https://onlinelibrary.wiley.com/doi/full/10.1002/nme.6958)) technique. diff --git a/src/amisc/component.py b/src/amisc/component.py index d0e7f74..d8c1992 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -40,9 +40,9 @@ from pydantic import BaseModel, ConfigDict, ValidationInfo, field_validator from typing_extensions import TypedDict -from amisc.interpolator import Interpolator, InterpolatorState, Lagrange +from amisc.interpolator import GPR, Interpolator, InterpolatorState, Lagrange from amisc.serialize import PickleSerializable, Serializable, StringSerializable, YamlSerializable -from amisc.training import SparseGrid, TrainingData +from amisc.training import SparseGrid, TrainingData, UncertaintySampling from amisc.typing import COORDS_STR_ID, LATENT_STR_ID, Dataset, MultiIndex from amisc.utils import ( _get_yaml_path, @@ -1213,8 +1213,17 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P # Training data is the same for all surrogate fidelity indices, given constant data fidelity design_list.append([]) continue - - design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], + if isinstance(self.interpolator, GPR) and isinstance(self.training_data, UncertaintySampling): + states = list(self.misc_states) + match_state = [tup for tup in states if tup[0] == a] + if match_state: + best_state = max(match_state, key=lambda x: sum(x[1]))[2].regressor.named_steps['gpr'] + else: + best_state = None + design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], + domains,best_state, weight_fcns) + else: + design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)], domains, weight_fcns) design_pts, fc = to_model_dataset(design_pts, self.inputs, del_latent=True, **field_coords) @@ -1301,7 +1310,6 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P # Only for initial index which didn't come from the candidate set self.update_misc_coeff(IndexSet(s), index_set='test') self.active_set.update(s) - self.update_misc_coeff(neighbors, index_set='test') # neighbors will only ever pass through here once self.candidate_set.update(neighbors) diff --git a/src/amisc/interpolator.py b/src/amisc/interpolator.py index bba9abd..0b2e475 100644 --- a/src/amisc/interpolator.py +++ b/src/amisc/interpolator.py @@ -717,7 +717,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset], :returns: the new GPR state """ gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list) - else [self.kernel, self.kernel_opts]) + else [self.kernel, self.kernel_opts]) + gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts) pipe = [] if self.scaler is not None: diff --git a/src/amisc/training.py b/src/amisc/training.py index ddb1fc5..409f2f1 100644 --- a/src/amisc/training.py +++ b/src/amisc/training.py @@ -16,7 +16,8 @@ import numpy as np from numpy.typing import ArrayLike -from scipy.optimize import direct +from scipy.optimize import direct, minimize +from sklearn.gaussian_process.kernels import WhiteKernel from amisc.serialize import PickleSerializable, Serializable from amisc.typing import LATENT_STR_ID, Dataset, MultiIndex @@ -121,6 +122,8 @@ def from_dict(cls, config: dict) -> TrainingData: match method: case 'sparse-grid': return SparseGrid(**config) + case 'uncertainty-sampling': + return UncertaintySampling(**config) case other: raise NotImplementedError(f"Unknown training data method: {other}") @@ -516,3 +519,601 @@ def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, raise NotImplementedError(f"Unknown collocation method: {other}") return z_pts + +@dataclass +class UncertaintySampling(TrainingData, PickleSerializable): + """A class for storing training data in a sparse grid format. The `UncertaintySampling` class stores training points + by their coordinate location in a larger tensor-product grid, and obtains new training data by refining + a single 1d grid at a time. Refinement is done by Leja sampling for initial iterations, then using GP + uncertainty for subsequent iterations. + + !!! Note "MISC and sparse grids" + MISC itself can be thought of as an extension to the well-known sparse grid technique, so this class + readily integrates with the MISC implementation in `Component`. Sparse grids limit the curse + of dimensionality up to about `dim = 10-15` for the input space (which would otherwise be infeasible with a + normal full tensor-product grid of the same size). + + !!! Info "About points in a sparse grid" + A sparse grid approximates a full tensor-product grid $(N_1, N_2, ..., N_d)$, where $N_i$ is the number of grid + points along dimension $i$, for a $d$-dimensional space. Each point is uniquely identified in the sparse grid + by a list of indices $(j_1, j_2, ..., j_d)$, where $j_i = 0 ... N_i$. We refer to this unique identifier as a + "grid coordinate". In the `SparseGrid` data structure, these coordinates are used along with the `alpha` + fidelity index to uniquely locate the training data for a given multi-index pair. + + !!! Info "About Uncertainty Sampling" + Uncertainty Sampling is an active learning strategy that selects new training points based on the model's + uncertainty. This approach aims to improve the surrogate model's accuracy by focusing on areas of the + input space where the model is less certain about its predictions. The uncertainty is typically quantified + using sklearn's GaussianProcessRegressor, which provides both mean predictions and uncertainty estimates. + + :ivar collocation_rule: the collocation rule to use for generating new grid points (only 'leja' is supported) + :ivar knots_per_level: the number of grid knots/points per level in the `beta` fidelity multi-index + :ivar expand_latent_method: method for expanding latent grids, either 'round-robin' or 'tensor-product' + :ivar opt_args: extra arguments for the global 1d `direct` optimizer + :ivar betas: a set of all `beta` multi-indices that have been seen so far + :ivar x_grids: a `dict` of grid points for each 1d input dimension + :ivar yi_map: a `dict` of model outputs for each grid coordinate + :ivar yi_nan_map: a `dict` of imputed model outputs for each grid coordinate where the model failed (or gave nan) + :ivar error_map: a `dict` of error information for each grid coordinate where the model failed + :ivar latent_size: the number of latent coefficients for each variable (0 if scalar) + :ivar d_thresh: minimum distance threshold for new points (specified as fraction of input domain size) + :ivar thresh_relax: relaxation factor for minimum distance threshold in case no new points are found (threshold is + successively relaxed until number of points criteria is satisfied) + :ivar grid_resolution: number of sample points per dimension for estimating GP uncertainty + (specified as multiple of number of new points to add) + """ + MAX_IMPUTE_SIZE: ClassVar[int] = 10 # don't try to impute large arrays + + collocation_rule: str = 'leja' + knots_per_level: int = 2 + expand_latent_method: str = 'round-robin' # or 'tensor-product', for converting beta to latent grid sizes + opt_args: dict = field(default_factory=lambda: {'locally_biased': False, 'maxfun': 300}) # for leja optimizer + + betas: set[MultiIndex] = field(default_factory=set) + x_grids: dict[str, ArrayLike] = field(default_factory=dict) + yi_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, ArrayLike]]] = field(default_factory=dict) + yi_nan_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, ArrayLike]]] = field(default_factory=dict) + error_map: dict[MultiIndex, dict[tuple[int, ...], dict[str, Any]]] = field(default_factory=dict) + latent_size: dict[str, int] = field(default_factory=dict) # keep track of latent grid sizes for each variable + d_thresh: float = 0.20 # minimum distance threshold for new points (as fraction of input domain) + thresh_relax: float = 0.5 # relaxation factor for minimum distance threshold if no new points are found + variance_criterion: str = 'local' # or 'global', for uncertainty sampling + grid_size: int = 100 # Number of monte carlo samples to evaluate global GP uncertainty + + def clear(self): + """Clear all training data.""" + self.betas.clear() + self.x_grids.clear() + self.yi_map.clear() + self.yi_nan_map.clear() + self.error_map.clear() + self.latent_size.clear() + + def get_by_coord(self, alpha: MultiIndex, coords: list, y_vars: list = None, skip_nan: bool = False): + """Get training data from the sparse grid for a given `alpha` and list of grid coordinates. Try to replace + `nan` values with imputed values. Skip any data points with remaining `nan` values if `skip_nan=True`. + + :param alpha: the model fidelity indices + :param coords: a list of grid coordinates to locate the `yi` values in the sparse grid data structure + :param y_vars: the keys of the outputs to return (if `None`, return all outputs) + :param skip_nan: skip any data points with remaining `nan` values if `skip_nan=True` (only for numeric outputs) + :returns: `dicts` of model inputs `xi_dict` and outputs `yi_dict` + """ + N = len(coords) + is_numeric = {} + is_singleton = {} + xi_dict = self._extract_grid_points(coords) + yi_dict = {} + + first_yi = next(iter(self.yi_map[alpha].values())) + if y_vars is None: + y_vars = first_yi.keys() + + for var in y_vars: + yi = np.atleast_1d(first_yi[var]) + is_numeric[var] = self._is_numeric(yi) + is_singleton[var] = self._is_singleton(yi) + yi_dict[var] = np.empty(N, dtype=np.float64 if is_numeric[var] and is_singleton[var] else object) + + for i, coord in enumerate(coords): + try: + yi_curr = self.yi_map[alpha][coord] + for var in y_vars: + yi = arr if (arr := self.yi_nan_map[alpha].get(coord, {}).get(var)) is not None else yi_curr[var] + yi_dict[var][i] = yi if is_singleton[var] else np.atleast_1d(yi) + + except KeyError as e: + raise KeyError(f"Can't access sparse grid data for alpha={alpha}, coord={coord}. " + f"Make sure the data has been set first.") from e + + # Delete nans if requested (only for numeric singleton outputs) + if skip_nan: + nan_idx = np.full(N, False) + for var in y_vars: + if is_numeric[var] and is_singleton[var]: + nan_idx |= np.isnan(yi_dict[var]) + + xi_dict = {k: v[~nan_idx] for k, v in xi_dict.items()} + yi_dict = {k: v[~nan_idx] for k, v in yi_dict.items()} + + return xi_dict, yi_dict # Both with elements of shape (N, ...) for N grid points + + def get(self, alpha: MultiIndex, beta: MultiIndex, y_vars: list[str] = None, skip_nan: bool = False): + """Get the training data from the sparse grid for a given `alpha` and `beta` pair.""" + return self.get_by_coord(alpha, list(self._expand_grid_coords(beta)), y_vars=y_vars, skip_nan=skip_nan) + + def set_errors(self, alpha: MultiIndex, beta: MultiIndex, coords: list, errors: list[dict]): + """Store error information in the sparse-grid for a given multi-index pair.""" + for coord, error in zip(coords, errors): + self.error_map[alpha][coord] = copy.deepcopy(error) + + def set(self, alpha: MultiIndex, beta: MultiIndex, coords: list, yi_dict: dict[str, ArrayLike]): + """Store model output `yi_dict` values. + + :param alpha: the model fidelity indices + :param beta: the surrogate fidelity indices + :param coords: a list of grid coordinates to locate the `yi` values in the sparse grid data structure + :param yi_dict: a `dict` of model output `yi` values + """ + for i, coord in enumerate(coords): # First dim of yi is loop dim aligning with coords + new_yi = {} + for var, yi in yi_dict.items(): + yi = np.atleast_1d(yi[i]) + new_yi[var] = (float(yi[0]) if self._is_numeric(yi) else yi[0]) if self._is_singleton(yi) else yi.tolist() # noqa: E501 + self.yi_map[alpha][coord] = copy.deepcopy(new_yi) + + def impute_missing_data(self, alpha: MultiIndex, beta: MultiIndex): + """Impute missing values in the sparse grid for a given multi-index pair by linear regression imputation.""" + imputer, xi_all, yi_all = None, None, None + + # only impute (small-length) numeric quantities + yi_dict = next(iter(self.yi_map[alpha].values())) + output_vars = [var for var in self._numeric_outputs(yi_dict) + if len(np.ravel(yi_dict[var])) <= self.MAX_IMPUTE_SIZE] + + for coord, yi_dict in self.yi_map[alpha].items(): + if any([np.any(np.isnan(yi_dict[var])) for var in output_vars]): + if imputer is None: + # Grab all 'good' interpolation points and train a simple linear regression fit + xi_all, yi_all = self.get(alpha, beta, y_vars=output_vars, skip_nan=True) + if len(xi_all) == 0 or len(next(iter(xi_all.values()))) == 0: + continue # possible if no good data has been set yet + + N = next(iter(xi_all.values())).shape[0] # number of grid points + xi_mat = np.concatenate([xi_all[var][:, np.newaxis] if len(xi_all[var].shape) == 1 else + xi_all[var] for var in xi_all.keys()], axis=-1) + yi_mat = np.concatenate([yi_all[var][:, np.newaxis] if len(yi_all[var].shape) == 1 else + yi_all[var].reshape((N, -1)) for var in output_vars], axis=-1) + + imputer = _RidgeRegression(alpha=1.0) + imputer.fit(xi_mat, yi_mat) + + # Run the imputer for this coordinate + x_interp = self._extract_grid_points(coord) + x_interp = np.concatenate([x_interp[var][:, np.newaxis] if len(x_interp[var].shape) == 1 else + x_interp[var] for var in x_interp.keys()], axis=-1) + y_interp = imputer.predict(x_interp) + + # Unpack the imputed value + y_impute = {} + start_idx = 0 + for var in output_vars: + var_shape = yi_all[var].shape[1:] or (1,) + end_idx = start_idx + int(np.prod(var_shape)) + yi = np.atleast_1d(y_interp[0, start_idx:end_idx]).reshape(var_shape) + nan_idx = np.isnan(np.atleast_1d(yi_dict[var])) + yi[~nan_idx] = np.atleast_1d(yi_dict[var])[~nan_idx] # Only keep imputed values where yi is nan + y_impute[var] = float(yi[0]) if self._is_singleton(yi) else yi.tolist() + start_idx = end_idx + + self.yi_nan_map[alpha][coord] = copy.deepcopy(y_impute) + + def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, gp_model, weight_fcns: dict = None): + """Refine the sparse grid for a given `alpha` and `beta` pair and given collocation rules. Return any new + grid points that do not have model evaluations saved yet. + + !!! Note + The `beta` multi-index is used to determine the number of collocation points in each input dimension. The + length of `beta` should therefore match the number of variables in `x_vars`. + """ + weight_fcns = weight_fcns or {} + + if gp_model is None: # Fallback to Leja sampling + # Initialize a sparse grid for beta=(0, 0, ..., 0) + if np.sum(beta) == 0: + if len(self.x_grids) == 0: + num_latent = {} + for var in input_domains: + if LATENT_STR_ID in var: + base_id = var.split(LATENT_STR_ID)[0] + num_latent[base_id] = 1 if base_id not in num_latent else num_latent[base_id] + 1 + else: + num_latent[var] = 0 + self.latent_size = num_latent + + new_pt = {} + domains = iter(input_domains.items()) + for grid_size in self.beta_to_knots(beta): + if isinstance(grid_size, int): # scalars + var, domain = next(domains) + new_pt[var] = self.collocation_1d(grid_size, domain, method=self.collocation_rule, + wt_fcn=weight_fcns.get(var, None), + opt_args=self.opt_args).tolist() + else: # latent coeffs + for s in grid_size: + var, domain = next(domains) + new_pt[var] = self.collocation_1d(s, domain, method=self.collocation_rule, + wt_fcn=weight_fcns.get(var, None), + opt_args=self.opt_args).tolist() + self.x_grids = new_pt + self.betas.add(beta) + self.yi_map.setdefault(alpha, dict()) + self.yi_nan_map.setdefault(alpha, dict()) + self.error_map.setdefault(alpha, dict()) + new_coords = list(self._expand_grid_coords(beta)) + return new_coords, self._extract_grid_points(new_coords) + + # Otherwise, refine the sparse grid + for beta_old in self.betas: + # Get the first lower neighbor in the sparse grid and refine the 1d grid if necessary + if self.is_one_level_refinement(beta_old, beta): + new_grid_size = self.beta_to_knots(beta) + inputs = zip(self.x_grids.keys(), self.x_grids.values(), input_domains.values()) + + for new_size in new_grid_size: + if isinstance(new_size, int): # scalar grid + var, grid, domain = next(inputs) + if len(grid) < new_size: + num_new_pts = new_size - len(grid) + self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, + opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() + else: # latent grid + for s_new in new_size: + var, grid, domain = next(inputs) + if len(grid) < s_new: + num_new_pts = s_new - len(grid) + self.x_grids[var] = self.collocation_1d(num_new_pts, domain, grid, + opt_args=self.opt_args, + wt_fcn=weight_fcns.get(var, None), + method=self.collocation_rule).tolist() + break + + new_coords = [] + for coord in self._expand_grid_coords(beta): + if coord not in self.yi_map[alpha]: + # If we have not computed this grid coordinate yet + new_coords.append(coord) + + new_pts = self._extract_grid_points(new_coords) + + self.betas.add(beta) + return new_coords, new_pts + + else: + # Initialize a sparse grid for beta=(0, 0, ..., 0) + if np.sum(beta) == 0: + if len(self.x_grids) == 0: + num_latent = {} + for var in input_domains: + if LATENT_STR_ID in var: + base_id = var.split(LATENT_STR_ID)[0] + num_latent[base_id] = 1 if base_id not in num_latent else num_latent[base_id] + 1 + else: + num_latent[var] = 0 + self.latent_size = num_latent + + new_pt = {} + domains = iter(input_domains.items()) + for grid_size in self.beta_to_knots(beta): + if isinstance(grid_size, int): # scalars + var, domain = next(domains) + new_pt[var] = self.sampling_1d(var, grid_size, input_domains, gp_model).tolist() + else: # latent coeffs + for s in grid_size: + var, domain = next(domains) + new_pt[var] = self.sampling_1d(var, s, input_domains, gp_model).tolist() + self.x_grids = new_pt + self.betas.add(beta) + self.yi_map.setdefault(alpha, dict()) + self.yi_nan_map.setdefault(alpha, dict()) + self.error_map.setdefault(alpha, dict()) + new_coords = list(self._expand_grid_coords(beta)) + return new_coords, self._extract_grid_points(new_coords) + + # Otherwise, refine the sparse grid + for beta_old in self.betas: + # Get the first lower neighbor in the sparse grid and refine the 1d grid if necessary + if self.is_one_level_refinement(beta_old, beta): + new_grid_size = self.beta_to_knots(beta) + inputs = zip(self.x_grids.keys(), self.x_grids.values(), input_domains.values()) + + for new_size in new_grid_size: + if isinstance(new_size, int): # scalar grid + var, grid, domain = next(inputs) + if len(grid) < new_size: + num_new_pts = new_size - len(grid) + self.x_grids[var] = self.sampling_1d(var, num_new_pts, input_domains, + gp_model, grid).tolist() + else: # latent grid + for s_new in new_size: + var, grid, domain = next(inputs) + if len(grid) < s_new: + num_new_pts = s_new - len(grid) + self.x_grids[var] = self.sampling_1d(var, num_new_pts, input_domains, + gp_model, grid).tolist() + break + + new_coords = [] + for coord in self._expand_grid_coords(beta): + if coord not in self.yi_map[alpha]: + # If we have not computed this grid coordinate yet + new_coords.append(coord) + + new_pts = self._extract_grid_points(new_coords) + + self.betas.add(beta) + return new_coords, new_pts + + + def _extract_grid_points(self, coords: list[tuple] | tuple): + """Extract the `x` grid points located at `coords` from `x_grids` and return as the `pts` dictionary.""" + if not isinstance(coords, list): + coords = [coords] + pts = {var: np.empty(len(coords)) for var in self.x_grids} + + for k, coord in enumerate(coords): + grids = iter(self.x_grids.items()) + for idx in coord: + if isinstance(idx, int): # scalar grid point + var, grid = next(grids) + pts[var][k] = grid[idx] + else: # latent coefficients + for i in idx: + var, grid = next(grids) + pts[var][k] = grid[i] + + return pts + + def _expand_grid_coords(self, beta: MultiIndex): + """Iterable over all grid coordinates for a given `beta`, accounting for scalars and latent coefficients.""" + grid_size = self.beta_to_knots(beta) + grid_coords = [] + for s in grid_size: + if isinstance(s, int): # scalars + grid_coords.append(range(s)) + else: # latent coefficients + grid_coords.append(itertools.product(*[range(latent_size) for latent_size in s])) + + yield from itertools.product(*grid_coords) + + @staticmethod + def _is_singleton(arr: np.ndarray): + return len(arr.shape) == 1 and arr.shape[0] == 1 + + @staticmethod + def _is_numeric(arr: np.ndarray): + return np.issubdtype(arr.dtype, np.number) + + @classmethod + def _numeric_outputs(cls, yi_dict: dict[str, ArrayLike]) -> list[str]: + """Return a list of the output variables that have numeric data.""" + output_vars = [] + for var in yi_dict.keys(): + try: + if cls._is_numeric(np.atleast_1d(yi_dict[var])): + output_vars.append(var) + except Exception: + continue + return output_vars + + @staticmethod + def is_one_level_refinement(beta_old: tuple, beta_new: tuple) -> bool: + """Check if a new `beta` multi-index is a one-level refinement from a previous `beta`. + + !!! Example + Refining from `(0, 1, 2)` to the new multi-index `(1, 1, 2)` is a one-level refinement. But refining to + either `(2, 1, 2)` or `(1, 2, 2)` are not, since more than one refinement occurs at the same time. + + :param beta_old: the starting multi-index + :param beta_new: the new refined multi-index + :returns: whether `beta_new` is a one-level refinement from `beta_old` + """ + level_diff = np.array(beta_new, dtype=int) - np.array(beta_old, dtype=int) + ind = np.nonzero(level_diff)[0] + return ind.shape[0] == 1 and level_diff[ind] == 1 + + def beta_to_knots(self, beta: MultiIndex, knots_per_level: int = None, latent_size: dict = None, + expand_latent_method: str = None) -> tuple: + """Convert a `beta` multi-index to the number of knots per dimension in the sparse grid. + + :param beta: refinement level indices + :param knots_per_level: level-to-grid-size multiplier, i.e. number of new points (or knots) for each beta level + :param latent_size: the number of latent coefficients for each variable (0 if scalar); number of variables and + order should match the `beta` multi-index + :param expand_latent_method: method for expanding latent grids, either 'round-robin' or 'tensor-product' + :returns: the number of knots/points per dimension for the sparse grid + """ + knots_per_level = knots_per_level or self.knots_per_level + latent_size = latent_size or self.latent_size + expand_latent_method = expand_latent_method or self.expand_latent_method + + grid_size = [] + for i, (var, num_latent) in enumerate(latent_size.items()): + if num_latent > 0: + match expand_latent_method: + case 'round-robin': + if beta[i] == 0: + grid_size.append((1,) * num_latent) # initializes all latent grids to 1 + else: + latent_refine_idx = (beta[i] - 1) % num_latent + latent_refine_num = ((beta[i] - 1) // num_latent) + 1 + latent_beta = tuple([latent_refine_num] * (latent_refine_idx + 1) + + [latent_refine_num - 1] * (num_latent - latent_refine_idx - 1)) + latent_grid = [knots_per_level * latent_beta[j] + 1 for j in range(num_latent)] + grid_size.append(tuple(latent_grid)) + case 'tensor-product': + grid_size.append((knots_per_level * beta[i] + 1,) * num_latent) + case other: + raise NotImplementedError(f"Unknown method for expanding latent grids: {other}") + else: + grid_size.append(knots_per_level * beta[i] + 1) + + return tuple(grid_size) + + + def sampling_1d(self, var:str, N: int, input_domains:dict, gp_model, z_pts: np.ndarray = None) -> np.ndarray: + """Find the next `N` points in the 1d sequence of `z_pts` using GP uncertainty sampling. + + :param var: the variable to sample points for + :param N: number of new points to add to the sequence + :param input_domains: a `dict` of input variable domains + :param gp_model: a trained GaussianProcessRegressor model for estimating uncertainty + :param z_pts: current univariate sequence `(Nz,)`, start at middle of input_domains[var] if `None` + :returns: the univariate sequence `z_pts` augmented by `N` new points + """ + + if z_pts is None: + z_pts = np.array([0.5*(input_domains[var][0] + input_domains[var][1])]) + N = N - 1 + z_pts = np.atleast_1d(z_pts) + else: + if self.variance_criterion == 'local': + d_min = self.d_thresh * (input_domains[var][1] - input_domains[var][0]) + def J1(x): + # Local cost function + '''Optimize for highest variance at local subdomain''' + var_order = list(self.x_grids.keys()) + grids = [] + for v in var_order: + if v == var: + grids.append(np.atleast_1d(x)) + else: + grids.append(np.atleast_1d(self.x_grids[v])) + mesh = np.meshgrid(*grids, indexing='ij') + cand_grid = np.stack([m.reshape(-1) for m in mesh], axis=-1) + _, std = gp_model.predict(cand_grid, return_std=True) + variance = std**2 + return -np.sum(variance) + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], self.knots_per_level) + constraints = [] + # Enforce minimumm distance constraint + for xi in z_pts: + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[0] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[1] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x: np.abs(x[0] - x[1]) - d_min}) + res = minimize(J1, x0, bounds = [(input_domains[var][0], + input_domains[var][1])] * self.knots_per_level, + constraints=constraints) + z_new = res.x + if res.x is None: + d_min *= self.thresh_relax + constraints = [] + for xi in z_pts: + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[0] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x, xp=xi: np.abs(x[1] - xp) - d_min}) + constraints.append({'type': 'ineq', 'fun': lambda x: np.abs(x[0] - x[1]) - d_min}) + res = minimize(J1, x0, bounds = [(input_domains[var][0], + input_domains[var][1])] * self.knots_per_level, + constraints=constraints) + z_new = res.x + z_pts = np.concatenate((z_pts, z_new)) + + elif self.variance_criterion == 'global': + kernel = gp_model.kernel_ + if isinstance(kernel, WhiteKernel): + noise_var = kernel.noise_level + elif hasattr(kernel, "k1") and isinstance(kernel.k1, WhiteKernel): + noise_var = kernel.k1.noise_level + elif hasattr(kernel, "k2") and isinstance(kernel.k2, WhiteKernel): + noise_var = kernel.k2.noise_level + def J2(x): + # Global cost function + '''Optimize for lowest posterior variance over a monte carlo test grid''' + var_order = list(self.x_grids.keys()) + N = self.grid_size + T = np.zeros((N, len(var_order))) + for i, var in enumerate(var_order): + low, high = input_domains[var] + T[:,i] = np.random.uniform(low, high, N) + grids = [] + for v in var_order: + if v == var: + grids.append(np.atleast_1d(x)) + else: + grids.append(np.atleast_1d(self.x_grids[v])) + mesh = np.meshgrid(*grids, indexing='ij') + cand_grid = np.stack([m.reshape(-1) for m in mesh], axis=-1) + Q = np.vstack((T, cand_grid)) + _, cov = gp_model.predict(Q, return_cov=True) + if cov.ndim == 2: + cov_TT = cov[:N, :N] + cov_Tx = cov[:N, N:] + cov_xT = cov[N:, :N] + cov_xx = cov[N:, N:] + sigma_bar_T = ( + np.diag(cov_TT) + - cov_Tx @ ( + np.linalg.pinv(cov_xx + noise_var * np.eye(cov_xx.shape[0])) + @ cov_xT + ) + ) + return np.sum(sigma_bar_T) + else: + cost = 0 + cov_shape = cov.shape + for i in range(cov_shape[2]): + cov_yi = cov[:,:,i] + cov_TT = cov_yi[:N, :N] + cov_Tx = cov_yi[:N, N:] + cov_xT = cov_yi[N:, :N] + cov_xx = cov_yi[N:, N:] + sigma_bar_T = np.diag(cov_TT) - cov_Tx @ (np.linalg.pinv(cov_xx) @ cov_xT) + cost += np.sum(sigma_bar_T) + return cost + + x0 = np.random.uniform(input_domains[var][0], input_domains[var][1], self.knots_per_level) + res = minimize(J2, x0, bounds = [(input_domains[var][0], input_domains[var][1])] * self.knots_per_level) + z_new = res.x + z_pts = np.concatenate((z_pts, z_new)) + + return z_pts + + + @staticmethod + def collocation_1d(N: int, z_bds: tuple, z_pts: np.ndarray = None, + wt_fcn: callable = None, method='leja', opt_args=None) -> np.ndarray: + """Find the next `N` points in the 1d sequence of `z_pts` using the provided collocation method. + + :param N: number of new points to add to the sequence + :param z_bds: bounds on the 1d domain + :param z_pts: current univariate sequence `(Nz,)`, start at middle of `z_bds` if `None` + :param wt_fcn: weighting function, uses a constant weight if `None`, callable as `wt_fcn(z)` + :param method: collocation method to use, currently only 'leja' is supported + :param opt_args: extra arguments for the global 1d `direct` optimizer + :returns: the univariate sequence `z_pts` augmented by `N` new points + """ + opt_args = opt_args or {} + if wt_fcn is None: + wt_fcn = lambda z: 1 + if z_pts is None: + z_pts = (z_bds[1] + z_bds[0]) / 2 + N = N - 1 + z_pts = np.atleast_1d(z_pts) + + match method: + case 'leja': + # Construct Leja sequence by maximizing the Leja objective sequentially + for i in range(N): + obj_fun = lambda z: -wt_fcn(np.array(z)) * np.prod(np.abs(z - z_pts)) + res = direct(obj_fun, [z_bds], **opt_args) # Use global DIRECT optimization over 1d domain + z_star = res.x + z_pts = np.concatenate((z_pts, z_star)) + case other: + raise NotImplementedError(f"Unknown collocation method: {other}") + + return z_pts + + + diff --git a/tests/test_training_data.py b/tests/test_training_data.py index a8c4a3b..01997aa 100644 --- a/tests/test_training_data.py +++ b/tests/test_training_data.py @@ -2,9 +2,12 @@ import itertools import numpy as np +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import RBF +from sklearn.gaussian_process.kernels import ConstantKernel as C from amisc.compression import SVD -from amisc.training import SparseGrid +from amisc.training import SparseGrid, UncertaintySampling from amisc.utils import to_model_dataset from amisc.variable import Variable, VariableList @@ -46,6 +49,54 @@ def simple_model(inputs): assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) +def test_uncertainty_sampling(): + """Test the `SparseGrid` data structure `get`, `set`, and `refine` methods.""" + def simple_model(inputs): + y1 = inputs['x1'] + y2 = 10 ** (inputs['x2']) + y3 = inputs['x3'] - 5 + return {'y1': y1, 'y2': y2, 'y3': y3} + + x_vars = VariableList([Variable('x1', distribution='U(0, 1)', norm='linear(2, 2)'), + Variable('x2', distribution='LU(1e-3, 1e-1)', norm='log10'), + Variable('x3', distribution='LN(0, 1)', norm='log10')]) + y_vars = VariableList(['y1', 'y2', 'y3']) + domains = x_vars.get_domains(norm=True) + weight_fcns = x_vars.get_pdfs(norm=True) + variance_criterions = ['global', 'local'] + for variance_criterion in variance_criterions: + grid = UncertaintySampling(collocation_rule='leja', knots_per_level=2, variance_criterion=variance_criterion) + + beta_list = list(itertools.product(*[range(3) for _ in range(len(x_vars))])) + alpha_list = [()] * len(beta_list) + design_list = [] + design_pts = [] + y_list = [] + + # Initialize GP model for uncertainty sampling + kernel = C(1.0, (1e-3, 1e3)) * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e2)) + gp_model = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5, random_state=42) + x_train = np.array([0.5, 0.01, 0.5]).reshape(1, -1) + y_train = np.concatenate( + [simple_model({'x1': x_train[:, 0], 'x2': x_train[:, 1], 'x3': x_train[:, 2]})[var] for var in y_vars] + ).reshape(1, -1) + gp_model.fit(x_train, y_train) + + # Refine the sparse grid + for alpha, beta in zip(alpha_list, beta_list): + new_idx, new_pts = grid.refine(alpha, beta, domains, gp_model, weight_fcns) + new_y = simple_model(new_pts) + design_list.append(new_idx) + design_pts.append(new_pts) + y_list.append(new_y) + grid.set(alpha, beta, new_idx, new_y) + + # Extract data from sparse grid and check values + for i, (alpha, beta) in enumerate(zip(alpha_list, beta_list)): + x_pts, y_pts = grid.get_by_coord(alpha, design_list[i]) + assert all([np.allclose(design_pts[i][var], x_pts[var]) for var in x_vars]) + assert all([np.allclose(y_list[i][var], y_pts[var]) for var in y_vars]) + def test_imputer(): """Test that imputation works for fixing missing data."""