Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
![build](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/deploy.yml?logo=github)
![docs](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/docs.yml?logo=materialformkdocs&logoColor=%2523cccccc&label=docs)
![tests](https://img.shields.io/github/actions/workflow/status/eckelsjd/amisc/tests.yml?logo=github&logoColor=%2523cccccc&label=tests)
![Code Coverage](https://img.shields.io/badge/coverage-89%25-yellowgreen?logo=codecov)
![Code Coverage](https://img.shields.io/badge/coverage-88%25-yellowgreen?logo=codecov)
[![Algorithm description](https://img.shields.io/badge/DOI-10.1002/nme.6958-blue)](https://doi.org/10.1002/nme.6958)

Efficient framework for building surrogates of multidisciplinary systems using the adaptive multi-index stochastic collocation ([AMISC](https://onlinelibrary.wiley.com/doi/full/10.1002/nme.6958)) technique.
Expand Down
11 changes: 7 additions & 4 deletions src/amisc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ class Component(BaseModel, Serializable):
vectorized: bool = False
call_unpacked: Optional[bool] = None # If the model expects inputs/outputs like `func(x1, x2, ...)->(y1, y2, ...)
ret_unpacked: Optional[bool] = None
groups: Optional[list[list[Variable]]] = None # Optional grouping of inputs for global optimization

# Data storage/states for a MISC component
active_set: list | set | IndexSet = IndexSet() # set of active (alpha, beta) multi-indices
Expand Down Expand Up @@ -799,7 +800,8 @@ def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best
if cached and (data := self._cache.get("training", {}).get(alpha, {}).get(beta)) is not None:
return data
else:
return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True)
return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True,
groups = self.groups)
except Exception as e:
self.logger.error(f"Error getting training data for alpha={alpha}, beta={beta}.")
raise e
Expand Down Expand Up @@ -1117,7 +1119,7 @@ def predict(self, inputs: dict | Dataset,
args = (self.misc_states.get((alpha, beta)),
self.get_training_data(alpha, beta, y_vars=y_vars, cached=True))

results.append(self.interpolator.predict(inputs, *args) if executor is None else
results.append(self.interpolator.predict(inputs, *args, groups = self.groups) if executor is None else
executor.submit(self.interpolator.predict, inputs, *args))

if executor is not None:
Expand Down Expand Up @@ -1217,7 +1219,7 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P
continue

design_coords, design_pts = self.training_data.refine(a, b[:len(self.data_fidelity)],
domains, weight_fcns)
domains,weight_fcns, groups = self.groups)
design_pts, fc = to_model_dataset(design_pts, self.inputs, del_latent=True, **field_coords)

# Remove duplicate (alpha, coords) pairs -- so you don't evaluate the model twice for the same input
Expand Down Expand Up @@ -1288,7 +1290,8 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P
self.misc_costs[a, b] = num_train_pts
self.misc_states[a, b] = self.interpolator.refine(b[len(self.data_fidelity):],
self.training_data.get(a, b[:len(self.data_fidelity)],
y_vars=y_vars, skip_nan=True),
y_vars=y_vars, skip_nan=True,
groups = self.groups),
self.misc_states.get((alpha, beta)),
domains)
start_idx = end_idx
Expand Down
78 changes: 56 additions & 22 deletions src/amisc/interpolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset],

return LagrangeState(weights=weights, x_grids=x_grids)

def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset, Dataset]):
def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset, Dataset],
groups: list[list[str]] = None):
"""Predict the output of the model at points `x` with barycentric Lagrange interpolation."""
# Convert `x` and `yi` to 2d arrays: (N, xdim) and (N, ydim)
# Inputs `x` may come in unordered, but they should get realigned with the internal `x_grids` state
Expand Down Expand Up @@ -306,19 +307,41 @@ def predict(self, x: Dataset, state: LagrangeState, training_data: tuple[Dataset
y = np.zeros(x_arr.shape[:-1] + (ydim,)) # (..., ydim)

# Loop over multi-indices and compute tensor-product lagrange polynomials
indices = [range(s) for s in grid_sizes.values()]
for i, j in enumerate(itertools.product(*indices)):
L_j = quotient[..., dims, j] / qsum # (..., xdim)

# Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j
if check_interp_pts:
other_pts = np.copy(div_zero_idx)
other_pts[div_zero_idx[..., dims, j]] = False
L_j[div_zero_idx[..., dims, j]] = 1
L_j[np.any(other_pts, axis=-1)] = 0
if groups:
# sizes per group (assumes each member of a group shares same 1D grid length)
group_sizes = [int(state.x_grids[grp[0]].shape[0]) for grp in groups]
for i_group, group_idx_tuple in enumerate(itertools.product(*[range(s) for s in group_sizes])):
# expand group-level indices into per-variable multi-index j (repeat per group member)
j_list = []
for grp_idx, grp in zip(group_idx_tuple, groups):
j_list.extend([grp_idx] * len(grp))
j = tuple(j_list) # length == xdim

L_j = quotient[..., dims, j] / qsum # (..., xdim)

if check_interp_pts:
other_pts = np.copy(div_zero_idx)
other_pts[div_zero_idx[..., dims, j]] = False
L_j[div_zero_idx[..., dims, j]] = 1
L_j[np.any(other_pts, axis=-1)] = 0

# yi_arr expected to have one row per group-node
y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i_group, :]
else:
indices = [range(s) for s in grid_sizes.values()]
for i, j in enumerate(itertools.product(*indices)):
L_j = quotient[..., dims, j] / qsum # (..., xdim)

# Set L_j(x==x_j)=1 for the current j and set L_j(x==x_j)=0 for x_j = x_i, i != j
if check_interp_pts:
other_pts = np.copy(div_zero_idx)
other_pts[div_zero_idx[..., dims, j]] = False
L_j[div_zero_idx[..., dims, j]] = 1
L_j[np.any(other_pts, axis=-1)] = 0

# Add multivariate basis polynomial contribution to interpolation output
y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i, :]
# Add multivariate basis polynomial contribution to interpolation output
y += np.prod(L_j, axis=-1, keepdims=True) * yi_arr[i, :]

# Unpack the outputs back into a Dataset
y_ret = {}
Expand Down Expand Up @@ -609,7 +632,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset],

return LinearState(regressor=regressor, x_vars=x_vars, y_vars=y_vars)

def predict(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset]):
def predict(self, x: Dataset, state: LinearState, training_data: tuple[Dataset, Dataset],
groups: list[list[str]] = None):
"""Predict the output of the model at points `x` using the linear regressor provided in `state`.

:param x: the input Dataset `dict` mapping input variables to prediction locations
Expand Down Expand Up @@ -716,14 +740,23 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset],
:param input_domains: (not used for `GPR`)
:returns: the new GPR state
"""
gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list)
else [self.kernel, self.kernel_opts])
gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts)
pipe = []
if self.scaler is not None:
pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts)))
pipe.append(('gpr', gp))
regressor = Pipeline(pipe)
if old_state is None:
gp_kernel = self._construct_kernel(self.kernel if isinstance(self.kernel, list)
else [self.kernel, self.kernel_opts])
gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts)
pipe = []
if self.scaler is not None:
pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts)))
pipe.append(('gpr', gp))
regressor = Pipeline(pipe)
else:
gp_kernel = old_state.regressor.named_steps['gpr'].kernel_
gp = GaussianProcessRegressor(kernel=gp_kernel, **self.regressor_opts)
pipe = []
if self.scaler is not None:
pipe.append(('scaler', getattr(preprocessing, self.scaler)(**self.scaler_opts)))
pipe.append(('gpr', gp))
regressor = Pipeline(pipe)

xtrain, ytrain = training_data

Expand All @@ -743,7 +776,8 @@ def refine(self, beta: MultiIndex, training_data: tuple[Dataset, Dataset],

return GPRState(regressor=regressor, x_vars=x_vars, y_vars=y_vars)

def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset]):
def predict(self, x: Dataset, state: GPRState, training_data: tuple[Dataset, Dataset],
groups: list[list[str]] = None):
"""Predict the output of the model at points `x` using the Gaussian Process Regressor provided in `state`.

:param x: the input Dataset `dict` mapping input variables to prediction locations
Expand Down
Loading
Loading