diff --git a/tests/descriptor/test_elemental.py b/tests/descriptor/test_compositional.py similarity index 74% rename from tests/descriptor/test_elemental.py rename to tests/descriptor/test_compositional.py index 0c52731..e6e8729 100644 --- a/tests/descriptor/test_elemental.py +++ b/tests/descriptor/test_compositional.py @@ -6,11 +6,11 @@ import pandas as pd import pytest -from xenonpy.descriptor import Compositions, Counting +from xenonpy.descriptor import Compositions, Counting, KernelMean, RBFKernel from xenonpy.descriptor.base import BaseCompositionFeaturizer -def test_compositional_feature_1(): +def test_base_composition_1(): class FakeFeaturizer(BaseCompositionFeaturizer): @@ -71,5 +71,23 @@ def test_comp_descriptor_1(): assert np.all(tmp1.values == tmp2.values) +def test_kernel_mean_1(): + comps = [{'H': 2}, {'Al': 3, 'Pd': 4}, {'C': 1, 'O': 5, 'H': 20}] + + n_bins = 2 + delta = 1 / (n_bins - 1) + kernel_mean = KernelMean(RBFKernel(sigma=delta * 0.4), grid=n_bins, n_jobs=1) + desc = kernel_mean.transform(comps) + assert desc.shape == (3, 116) + assert isinstance(desc, np.ndarray) + + n_bins = 3 + delta = 1 / (n_bins - 1) + kernel_mean = KernelMean(RBFKernel(sigma=delta * 0.4), grid=n_bins, n_jobs=1) + desc = kernel_mean.transform(pd.Series(comps)) + assert desc.shape == (3, 174) + assert isinstance(desc, pd.DataFrame) + + if __name__ == "__main__": pytest.main() diff --git a/xenonpy/descriptor/__init__.py b/xenonpy/descriptor/__init__.py index de156fe..28d7fb4 100644 --- a/xenonpy/descriptor/__init__.py +++ b/xenonpy/descriptor/__init__.py @@ -7,3 +7,4 @@ from .fingerprint import * from .frozen_featurizer import * from .structure import * +from .kernel import * diff --git a/xenonpy/descriptor/compositions.py b/xenonpy/descriptor/compositions.py index 43c61c1..709d5ea 100644 --- a/xenonpy/descriptor/compositions.py +++ b/xenonpy/descriptor/compositions.py @@ -2,28 +2,105 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. -from typing import Union, List +from itertools import count +from typing import Callable, List, Sequence, Union, Tuple import numpy as np import pandas as pd - -from xenonpy.descriptor.base import BaseDescriptor, BaseCompositionFeaturizer +from joblib import Parallel, delayed +from pymatgen.core import Composition as PMGComp +from sklearn.preprocessing import MinMaxScaler +from xenonpy.datatools import preset +from xenonpy.descriptor import calculate_rbf_kernel_matrix +from xenonpy.descriptor.base import (BaseCompositionFeaturizer, BaseDescriptor, BaseFeaturizer) __all__ = [ - 'Compositions', 'Counting', 'WeightedAverage', 'WeightedSum', 'WeightedVariance', - 'HarmonicMean', 'GeometricMean', 'MaxPooling', 'MinPooling' + 'Compositions', + 'Counting', + 'WeightedAverage', + 'WeightedSum', + 'WeightedVariance', + 'HarmonicMean', + 'GeometricMean', + 'MaxPooling', + 'MinPooling', + 'KernelMean', ] -class Counting(BaseCompositionFeaturizer): +class KernelMean(BaseFeaturizer): + """Add kernel mean descriptor." + + """ def __init__(self, *, - one_hot_vec=False, - n_jobs=-1, - on_errors='raise', - return_type='any', - target_col=None): + kernel_matrix: Union[None, pd.DataFrame, Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, + str]]] = None, + feature_matrix: Union[None, pd.DataFrame] = None, + on_errors: str = 'raise', + return_type: str = 'any', + target_col: Union[Sequence[str], str, None] = 'composition', + n_jobs: int = 1): + super().__init__(n_jobs=0, on_errors=on_errors, return_type=return_type, target_col=target_col) + + # prepare kernel matrix + if kernel_matrix is None: + if feature_matrix is None: # use elemental info + feature_matrix = preset.elements_completed + + # calculate kernel matrix + all_dists, labels = calculate_rbf_kernel_matrix(element_info=feature_matrix) + all_dists = [MinMaxScaler().fit_transform(np.sum(m, axis=0).T).T for m in all_dists + ] # MinMaxScale for each element + count_mapper = {k: count() for k in labels.index.unique()} + kernel_matrix, labels = np.concatenate(all_dists, + axis=1), [f'{l}_{count_mapper[l].__next__()}' for l in labels.index] + elif callable(kernel_matrix): + # calculate kernel matrix + kernel_matrix, labels = kernel_matrix(feature_matrix) + else: + kernel_matrix, labels = kernel_matrix.values, kernel_matrix.columns + + self._kernel_matrix = pd.DataFrame(kernel_matrix, index=feature_matrix.index, columns=labels) + self.__n_jobs = n_jobs # this param should not overwrite the property of parent class + self.__authors__ = ['TsumiNa'] + + def featurize(self, comps): + # Unified to python list + if isinstance(comps, (pd.Series, np.ndarray)): + comps = comps.tolist() + + kernel_matrix = self._kernel_matrix + + def inner(comp): + # unified to python dict + if isinstance(comp, PMGComp): + comp = comp.as_dict() + + # calculate proportion vector for the given composition + t = sum(comp.values()) + proportion_vec = np.zeros(kernel_matrix.shape[0]) + for (k, v) in comp.items(): + elem_i = kernel_matrix.index.get_loc(k) + proportion_vec[elem_i] = v / t + + return proportion_vec + + proportion_matrix = Parallel(n_jobs=self.__n_jobs)(delayed(inner)(comp) for comp in comps) + proportion_matrix = np.stack(proportion_matrix) + + # fast way using dot calculation + return (proportion_matrix.T[:, :, np.newaxis] @ (kernel_matrix.values)[:, np.newaxis, :]).sum(axis=0) + + @property + def feature_labels(self): + return self._kernel_matrix.columns + + +class Counting(BaseCompositionFeaturizer): + + def __init__(self, *, one_hot_vec=False, n_jobs=-1, on_errors='raise', return_type='any', target_col=None): """ Parameters @@ -53,10 +130,7 @@ def __init__(self, Default is None. """ - super().__init__(n_jobs=n_jobs, - on_errors=on_errors, - return_type=return_type, - target_col=target_col) + super().__init__(n_jobs=n_jobs, on_errors=on_errors, return_type=return_type, target_col=target_col) self.one_hot_vec = one_hot_vec self._elems = self.elements.index.tolist() self.__authors__ = ['TsumiNa'] @@ -410,24 +484,10 @@ def __init__(self, super().__init__(featurizers=featurizers) self.composition = Counting(n_jobs=n_jobs, on_errors=on_errors) - self.composition = WeightedAverage(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = WeightedSum(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = WeightedVariance(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = GeometricMean(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = HarmonicMean(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = MaxPooling(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) - self.composition = MinPooling(n_jobs=n_jobs, - on_errors=on_errors, - elemental_info=elemental_info) + self.composition = WeightedAverage(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = WeightedSum(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = WeightedVariance(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = GeometricMean(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = HarmonicMean(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = MaxPooling(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) + self.composition = MinPooling(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info) diff --git a/xenonpy/descriptor/kernel.py b/xenonpy/descriptor/kernel.py new file mode 100644 index 0000000..3a96740 --- /dev/null +++ b/xenonpy/descriptor/kernel.py @@ -0,0 +1,100 @@ +# Copyright 2021 TsumiNa +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pandas as pd +from typing import Union, Sequence +from sklearn.preprocessing import MinMaxScaler +from xenonpy.datatools import preset + +__all__ = ['rbf_kernel' 'calculate_rbf_kernel_matrix'] + + +def rbf_kernel(x_i: np.ndarray, x_j: Union[np.ndarray, int, float], sigmas: Union[float, int, np.ndarray, + Sequence]) -> np.ndarray: + """ + Radial Basis Function (RBF) kernel function. + https://en.wikipedia.org/wiki/Radial_basis_function_kernel + + Parameters + ---------- + sigmas: + The standard deviations (SD). + Can be a single number or a 1d array-like object. + x_i: + Should be a 1d array. + x_j : np.ndarray + Should be a 1d array. + + Returns + ------- + np.ndarray + Distribution under RBF kernel. + + Raises + ------ + ValueError + Raise error if sigmas has wrong dimension. + """ + sigmas = np.asarray(sigmas) + if sigmas.ndim == 0: + sigmas = sigmas[np.newaxis] + if sigmas.ndim != 1: + raise ValueError('parameter `sigmas` must be a array-like object which has dimension 1') + + # K(x_i, x_j) = exp(-||x_i - x_j||^2 / (2 * sigma^2)) + p1 = np.power(np.expand_dims(x_i, axis=x_i.ndim) - x_j, 2) + p2 = np.power(sigmas, 2) * 2 + dists = np.exp(-np.expand_dims(p1, axis=p1.ndim) / p2).transpose([2, 0, 1]) + + if dists.shape[0] == 1: + return dists[0] + return dists + + +def calculate_rbf_kernel_matrix( + *, + element_info: Union[None, pd.DataFrame] = None, + scaled_element_info: bool = False, + quartiles: Sequence[int] = (25, 50, 75), + half_interval_by_sigma: float = 2, + sort_centers: bool = True, +): + if element_info is None: + element_info = preset.elements_completed + + if scaled_element_info: + element_info = pd.DataFrame(MinMaxScaler().fit_transform(element_info), + columns=element_info.columns, + index=element_info.index) + + all_dists = [] + center_labels = [] + for feature, data in element_info.iteritems(): + if sort_centers: + data = data.values + centers = np.unique(data) + else: + centers = data.unique() + data = data.values + intervals = np.unique([abs(i - j) for i, j in zip(data[:-1], data[1:])]) # get all intervals + quartiles = np.percentile(intervals / 2, [25, 50, 75]) # get 25%, 50%, 75% quantile of intervals / 2 + sigmas = quartiles / half_interval_by_sigma # use unique quantiles as sigma of RBF kernel + + # RBF kernel + dists = rbf_kernel(data, centers, sigmas) + all_dists.append(dists) + center_labels.append(pd.Series(centers, index=[feature] * centers.size)) + + return all_dists, pd.concat(center_labels)