Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guides/extend.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class CustomInterpolatorState(InterpolatorState):
pass
```

Currently, the available methods are `Lagrange` polynomial interpolation and `Linear` regression. The state of a `Lagrange` polynomial includes the 1d grids and barycentric weights for each input dimension. The state of a `Linear` regression includes the underlying [scikit-learn](https://scikit-learn.org/stable/) linear model. See [Lagrange][amisc.interpolator.Lagrange] and [Linear][amisc.interpolator.Linear] for more details. Note that linear regression also includes options for polynomial features.
Currently, the available methods are `Lagrange` polynomial interpolation, `Linear` regression, and Gaussian process regression `GPR`. The state of a `Lagrange` polynomial includes the 1d grids and barycentric weights for each input dimension. The state of a `Linear` regression or `GPR` includes the underlying [scikit-learn](https://scikit-learn.org/stable/) regression model. See [Lagrange][amisc.interpolator.Lagrange], [Linear][amisc.interpolator.Linear], and [GPR][amisc.interpolator.GPR] for more details. Note that linear regression also includes options for polynomial features, and GPR includes options for the kernel function.

## Model keyword arguments
The `ModelKwarg` interface provides a dataclass for passing extra options to the underlying component models. The default is a simple `dict` that gets passed as a set of `key=value` pairs. The primary reason for overriding this class is if you have complicated arguments that require custom serialization. See the [serialization](#serialization) section below.
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/interface_models.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ As one last note, if your model returns field quantity data, this will be stored
## Specifying a surrogate method
The `Component.interpolator` attribute provides the underlying surrogate "interpolation" method, i.e. the specific mathematical relationship that approximates the model's outputs as a function of its inputs. In this sense, we use the terms "interpolator" and "surrogate" interchangeably to mean the underlying approximation method -- the `Component.interpolator` does not necessarily have to "interpolate" the output by passing through all the training data directly. The naming convention mostly arises from the usage of polynomial interpolation in sparse grids.

Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, and [Linear][amisc.interpolator.Linear] regression. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression is implemented through the [scikit-learn](https://scikit-learn.org/stable/) library, and may optionally include polynomial features.
Currently, the available methods are [Lagrange][amisc.interpolator.Lagrange] polynomial interpolation, which is set by default, [Linear][amisc.interpolator.Linear] regression, and [GPR][amisc.interpolator.GPR]. Multivariate Lagrange polynomials are formed by a tensor-product of univariate Lagrange polynomials in each input dimension, and integrate well with the `SparseGrid` data structure. Lagrange polynomials work well up to an input dimension of around 12-15 for sufficiently smooth functions. More details on how they work can be found in the [theory](../theory/polynomials.md) section. Linear regression and Gaussian Process Regressions are implemented through the [scikit-learn](https://scikit-learn.org/stable/) library. Linear Regression may optionally include polynomial features while Gaussian Process Regression may optionally include scalers.

You may configure the interpolation method via:
```python
Expand Down
2 changes: 1 addition & 1 deletion src/amisc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ def serialize(self, keep_yaml_objects: bool = False, serialize_args: dict[str, t
if len(value) > 0:
d[key] = {str(k): int(v) for k, v in value.items()}
elif key in ComponentSerializers.__annotations__.keys():
if key in ['training_data', 'interpolator'] and not self.has_surrogate:
if key in ['training_data'] and not self.has_surrogate:
continue
else:
d[key] = value.serialize(*serialize_args.get(key, ()), **serialize_kwargs.get(key, {}))
Expand Down
188 changes: 180 additions & 8 deletions src/amisc/interpolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
- `Interpolator`: Abstract class providing basic structure of an interpolator
- `Lagrange`: Concrete implementation for tensor-product barycentric Lagrange interpolation
- `Linear`: Concrete implementation for linear regression using `sklearn`
- `GPR`: Concrete implementation for Gaussian process regression using `sklearn`
- `InterpolatorState`: Interface for a dataclass that stores the internal state of an interpolator
- `LagrangeState`: The internal state for a barycentric Lagrange polynomial interpolator
- `LinearState`: The internal state for a linear interpolator (using sklearn)
- `GPRState`: The internal state for a Gaussian process regression interpolator (using sklearn)
"""
from __future__ import annotations

Expand All @@ -20,13 +22,14 @@

import numpy as np
from sklearn import linear_model, preprocessing
from sklearn.gaussian_process import GaussianProcessRegressor, kernels
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures

from amisc.serialize import Base64Serializable, Serializable, StringSerializable
from amisc.typing import Dataset, MultiIndex

__all__ = ["InterpolatorState", "LagrangeState", "LinearState", "Interpolator", "Lagrange", "Linear"]
__all__ = ["InterpolatorState", "LagrangeState", "LinearState", "GPRState", "Interpolator", "Lagrange", "Linear", "GPR"]


class InterpolatorState(Serializable, ABC):
Expand Down Expand Up @@ -78,6 +81,29 @@ def __eq__(self, other):
return False


@dataclass
class GPRState(InterpolatorState, Base64Serializable):
"""The internal state for a Gaussian Process Regressor interpolator (using sklearn).

:ivar x_vars: the input variables in order
:ivar y_vars: the output variables in order
:ivar regressor: the sklearn regressor object, a pipeline that consists of a preprocessing scaler and a
`GaussianProcessRegressor`.
"""
x_vars: list[str] = field(default_factory=list)
y_vars: list[str] = field(default_factory=list)
regressor: Pipeline = None

def __eq__(self, other):
if isinstance(other, GPRState):
return (self.x_vars == other.x_vars and self.y_vars == other.y_vars and
len(self.regressor.steps) == len(other.regressor.steps) and
self.regressor['gpr'].alpha == other.regressor['gpr'].alpha and
self.regressor['gpr'].kernel_ == other.regressor['gpr'].kernel_)
else:
return False


class Interpolator(Serializable, ABC):
"""Interface for an interpolator object that approximates a model. An interpolator should:

Expand All @@ -86,8 +112,8 @@ class Interpolator(Serializable, ABC):
- `gradient` - compute the grdient/Jacobian at new points (if you want)
- `hessian` - compute the 2nd derivative/Hessian at new points (if you want)

Currently, `Lagrange` and `Linear` interpolators are supported and can be constructed from a configuration `dict`
via `Interpolator.from_dict()`.
Currently, `Lagrange`, `Linear`, and `GPR` interpolators are supported and can be constructed from a configuration
`dict` via `Interpolator.from_dict()`.
"""

@abstractmethod
Expand Down Expand Up @@ -142,15 +168,27 @@ def hessian(self, x: Dataset, state: InterpolatorState, training_data: tuple[Dat

@classmethod
def from_dict(cls, config: dict) -> Interpolator:
"""Create an `Interpolator` object from a `dict` config. Only `method='lagrange'` is supported for now."""
method = config.pop('method', 'lagrange').lower()
match method:
"""Create an `Interpolator` object from a `dict` config. Available methods are `lagrange`, `linear`, and `gpr`.
Will attempt to find the method if not listed.

:param config: a `dict` containing the configuration for the interpolator, with the `method` key specifying the
name of the interpolator method to use, and the rest of the keys are options for the method
"""
method = config.pop('method', 'lagrange')
match method.lower():
case 'lagrange':
return Lagrange(**config)
case 'linear':
return Linear(**config)
case other:
raise NotImplementedError(f"Unknown interpolator method: {other}")
case 'gpr':
return GPR(**config)
case _:
import amisc.interpolator

if hasattr(amisc.interpolator, method):
return getattr(amisc.interpolator, method)(**config)

raise NotImplementedError(f"Unknown interpolator method: {method}")


@dataclass
Expand Down Expand Up @@ -594,3 +632,137 @@ def gradient(self, x: Dataset, state: LinearState, training_data: tuple[Dataset,

def hessian(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset]):
raise NotImplementedError


@dataclass
class GPR(Interpolator, StringSerializable):
"""Implementation of Gaussian Process Regression using `sklearn`. The `GPR` uses a pipeline
of a scaler and a `GaussianProcessRegressor` to approximate the input-output mapping.

:ivar scaler: the scikit-learn preprocessing scaler to use (e.g. 'MinMaxScaler', 'StandardScaler', etc.). If None,
no scaling is applied (default).
:ivar kernel: the kernel to use for building the covariance matrix (e.g. 'RBF', 'Matern', 'PairwiseKernel', etc.).
If a string is provided, then the specified kernel is used with the given `kernel_opts`.
If a list is provided, then kernel operators ('Sum', 'Product', 'Exponentiation') can be used to
combine multiple kernels. The first element of the list should be the kernel or operator name, and
the remaining elements should be the arguments. Dicts are accepted as **kwargs. For example:
`['Sum', ['RBF', {'length_scale': 1.0}], ['Matern', {'length_scale': 1.0}]]` will create a sum of
an RBF and a Matern kernel with the specified length scales.
:ivar scaler_opts: options to pass to the scaler constructor
:ivar kernel_opts: options to pass to the kernel constructor (ignored if kernel is a list, where opts are already
specified for combinations of kernels).
:ivar regressor_opts: options to pass to the `GaussianProcessRegressor` constructor
(see [scikit-learn](https://scikit-learn.org/stable/) documentation).
"""
scaler: str = None
kernel: str | list = 'RBF'
scaler_opts: dict = field(default_factory=dict)
kernel_opts: dict = field(default_factory=dict)
regressor_opts: dict = field(default_factory=lambda: {'n_restarts_optimizer': 5})

def _construct_kernel(self, kernel_list):
"""Build a scikit-learn kernel from a list of kernels (e.g. RBF, Matern, etc.) and kernel operators
(Sum, Product, Exponentiation).

!!! Example
`['Sum', ['RBF'], ['Matern', {'length_scale': 1.0}]]` will become `RBF() + Matern(length_scale=1.0)`

:param kernel_list: list of kernel/operator names and arguments. Kwarg options can be passed as dicts.
:returns: the scikit-learn kernel object
"""
# Base case for single items (just return as is)
if not isinstance(kernel_list, list):
return kernel_list

name = kernel_list[0]
args = [self._construct_kernel(ele) for ele in kernel_list[1:]]

# Base case for passing a single dict of kwargs
if len(args) == 1 and isinstance(args[0], dict):
return getattr(kernels, name)(**args[0])

# Base case for passing a list of args
return getattr(kernels, name)(*args)

def _validate_kernel(self, kernel_list):
"""Make sure all requested kernels are available in scikit-learn."""
if not isinstance(kernel_list, list):
return

name = kernel_list[0]

if not hasattr(kernels, name):
raise ImportError(f"Kernel '{name}' not found in sklearn.gaussian_process.kernels")

for ele in kernel_list[1:]:
self._validate_kernel(ele)

def __post_init__(self):
self._validate_kernel(self.kernel if isinstance(self.kernel, list) else [self.kernel, self.kernel_opts])

if self.scaler is not None:
try:
getattr(preprocessing, self.scaler)
except AttributeError:
raise ImportError(f"Scaler '{self.scaler}' not found in sklearn.preprocessing")

def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset],
old_state: GPRState, input_domains: dict[str, tuple]) -> InterpolatorState:
"""Train a new gaussian process regression model.

:param beta: refinement level indices (Not used for 'GPR')
:param training_data: a tuple of dictionaries containing the new training data (`xtrain`, `ytrain`)
:param old_state: the old regressor state to refine (only used to get the order of input/output variables)
:param input_domains: (not used for `GPR`)
:returns: the new GPR state
"""
gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list)
else [self.kernel, self.kernel_opts])
gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts)
pipe = []
if self.scaler is not None:
pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts)))
pipe.append(('gpr', gp))
regressor = Pipeline(pipe)

xtrain, ytrain = training_data

# Get order of variables for inputs and outputs
if old_state is not None:
x_vars = old_state.x_vars
y_vars = old_state.y_vars
else:
x_vars = list(xtrain.keys())
y_vars = list(ytrain.keys())

# Convert to (N, xdim) and (N, ydim) arrays
x_arr = np.concatenate([xtrain[var][..., np.newaxis] for var in x_vars], axis=-1)
y_arr = np.concatenate([ytrain[var][..., np.newaxis] for var in y_vars], axis=-1)

regressor.fit(x_arr, y_arr)

return GPRState(regressor=regressor, x_vars=x_vars, y_vars=y_vars)

def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]):
"""Predict the output of the model at points `x` using the Gaussian Process Regressor provided in `state`.

:param x: the input Dataset `dict` mapping input variables to prediction locations
:param state: the state containing the Gaussian Process Regressor to use
:param training_data: not used for `GPR` (since the regressor is already trained in `state`)
"""
# Convert to (N, xdim) array for sklearn
x_arr = np.concatenate([x[var][..., np.newaxis] for var in state.x_vars], axis=-1)
loop_shape = x_arr.shape[:-1]
x_arr = x_arr.reshape((-1, x_arr.shape[-1]))

y_arr = state.regressor.predict(x_arr)
y_arr = y_arr.reshape(loop_shape + (len(state.y_vars),)) # (..., ydim)

# Unpack the outputs back into a Dataset
return {var: y_arr[..., i] for i, var in enumerate(state.y_vars)}

def gradient(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]):
raise NotImplementedError

def hessian(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]):
raise NotImplementedError
4 changes: 3 additions & 1 deletion tests/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import itertools
import time
import uuid
import warnings
from dataclasses import dataclass, field
from pathlib import Path

Expand Down Expand Up @@ -408,7 +409,8 @@ def model(inputs):
Ik = [((), (0, 0)), ((), (0, 1)), ((), (1, 0)), ((), (2, 0)), ((), (1, 1)), ((), (0, 2)), ((), (1, 2)),
((), (2, 1)), ((), (2, 2))]
for alpha, beta in Ik:
comp.activate_index(alpha, beta)
with warnings.catch_warnings(action='ignore', category=RuntimeWarning):
comp.activate_index(alpha, beta)
xg = np.linspace(0, 1, 100)
yt = comp.predict({'x': xg}, use_model='best')['y']
ysurr = comp.predict({'x': xg})['y']
Expand Down
4 changes: 3 additions & 1 deletion tests/test_convergence.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test convergence of `System.fit` to a few smooth analytical models."""
import warnings
from pathlib import Path
from typing import Literal

Expand Down Expand Up @@ -60,7 +61,8 @@ def curved_func(inputs, noise_std=0.0):
xt = system.sample_inputs(1000)
yt = curved_func(xt)

system.fit(max_iter=50, max_tol=1e-4)
with warnings.catch_warnings(action='ignore', category=RuntimeWarning):
system.fit(max_iter=50, max_tol=1e-4)
ysurr = system.predict(xt)
l2_error = relative_error(ysurr['y'], yt['y'])
rel_noise = 2 * noise_std / np.mean(yt['y'])
Expand Down
Loading