Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import pandas as pd
import pytest

from xenonpy.descriptor import Compositions, Counting
from xenonpy.descriptor import Compositions, Counting, KernelMean, RBFKernel
from xenonpy.descriptor.base import BaseCompositionFeaturizer


def test_compositional_feature_1():
def test_base_composition_1():

class FakeFeaturizer(BaseCompositionFeaturizer):

Expand Down Expand Up @@ -71,5 +71,23 @@ def test_comp_descriptor_1():
assert np.all(tmp1.values == tmp2.values)


def test_kernel_mean_1():
comps = [{'H': 2}, {'Al': 3, 'Pd': 4}, {'C': 1, 'O': 5, 'H': 20}]

n_bins = 2
delta = 1 / (n_bins - 1)
kernel_mean = KernelMean(RBFKernel(sigma=delta * 0.4), grid=n_bins, n_jobs=1)
desc = kernel_mean.transform(comps)
assert desc.shape == (3, 116)
assert isinstance(desc, np.ndarray)

n_bins = 3
delta = 1 / (n_bins - 1)
kernel_mean = KernelMean(RBFKernel(sigma=delta * 0.4), grid=n_bins, n_jobs=1)
desc = kernel_mean.transform(pd.Series(comps))
assert desc.shape == (3, 174)
assert isinstance(desc, pd.DataFrame)


if __name__ == "__main__":
pytest.main()
1 change: 1 addition & 0 deletions xenonpy/descriptor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
from .fingerprint import *
from .frozen_featurizer import *
from .structure import *
from .kernel import *
132 changes: 96 additions & 36 deletions xenonpy/descriptor/compositions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,105 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from typing import Union, List
from itertools import count
from typing import Callable, List, Sequence, Union, Tuple

import numpy as np
import pandas as pd

from xenonpy.descriptor.base import BaseDescriptor, BaseCompositionFeaturizer
from joblib import Parallel, delayed
from pymatgen.core import Composition as PMGComp
from sklearn.preprocessing import MinMaxScaler
from xenonpy.datatools import preset
from xenonpy.descriptor import calculate_rbf_kernel_matrix
from xenonpy.descriptor.base import (BaseCompositionFeaturizer, BaseDescriptor, BaseFeaturizer)

__all__ = [
'Compositions', 'Counting', 'WeightedAverage', 'WeightedSum', 'WeightedVariance',
'HarmonicMean', 'GeometricMean', 'MaxPooling', 'MinPooling'
'Compositions',
'Counting',
'WeightedAverage',
'WeightedSum',
'WeightedVariance',
'HarmonicMean',
'GeometricMean',
'MaxPooling',
'MinPooling',
'KernelMean',
]


class Counting(BaseCompositionFeaturizer):
class KernelMean(BaseFeaturizer):
"""Add kernel mean descriptor."

"""

def __init__(self,
*,
one_hot_vec=False,
n_jobs=-1,
on_errors='raise',
return_type='any',
target_col=None):
kernel_matrix: Union[None, pd.DataFrame, Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray,
str]]] = None,
feature_matrix: Union[None, pd.DataFrame] = None,
on_errors: str = 'raise',
return_type: str = 'any',
target_col: Union[Sequence[str], str, None] = 'composition',
n_jobs: int = 1):
super().__init__(n_jobs=0, on_errors=on_errors, return_type=return_type, target_col=target_col)

# prepare kernel matrix
if kernel_matrix is None:
if feature_matrix is None: # use elemental info
feature_matrix = preset.elements_completed

# calculate kernel matrix
all_dists, labels = calculate_rbf_kernel_matrix(element_info=feature_matrix)
all_dists = [MinMaxScaler().fit_transform(np.sum(m, axis=0).T).T for m in all_dists
] # MinMaxScale for each element
count_mapper = {k: count() for k in labels.index.unique()}
kernel_matrix, labels = np.concatenate(all_dists,
axis=1), [f'{l}_{count_mapper[l].__next__()}' for l in labels.index]
elif callable(kernel_matrix):
# calculate kernel matrix
kernel_matrix, labels = kernel_matrix(feature_matrix)
else:
kernel_matrix, labels = kernel_matrix.values, kernel_matrix.columns

self._kernel_matrix = pd.DataFrame(kernel_matrix, index=feature_matrix.index, columns=labels)
self.__n_jobs = n_jobs # this param should not overwrite the property of parent class
self.__authors__ = ['TsumiNa']

def featurize(self, comps):
# Unified to python list
if isinstance(comps, (pd.Series, np.ndarray)):
comps = comps.tolist()

kernel_matrix = self._kernel_matrix

def inner(comp):
# unified to python dict
if isinstance(comp, PMGComp):
comp = comp.as_dict()

# calculate proportion vector for the given composition
t = sum(comp.values())
proportion_vec = np.zeros(kernel_matrix.shape[0])
for (k, v) in comp.items():
elem_i = kernel_matrix.index.get_loc(k)
proportion_vec[elem_i] = v / t

return proportion_vec

proportion_matrix = Parallel(n_jobs=self.__n_jobs)(delayed(inner)(comp) for comp in comps)
proportion_matrix = np.stack(proportion_matrix)

# fast way using dot calculation
return (proportion_matrix.T[:, :, np.newaxis] @ (kernel_matrix.values)[:, np.newaxis, :]).sum(axis=0)

@property
def feature_labels(self):
return self._kernel_matrix.columns


class Counting(BaseCompositionFeaturizer):

def __init__(self, *, one_hot_vec=False, n_jobs=-1, on_errors='raise', return_type='any', target_col=None):
"""

Parameters
Expand Down Expand Up @@ -53,10 +130,7 @@ def __init__(self,
Default is None.
"""

super().__init__(n_jobs=n_jobs,
on_errors=on_errors,
return_type=return_type,
target_col=target_col)
super().__init__(n_jobs=n_jobs, on_errors=on_errors, return_type=return_type, target_col=target_col)
self.one_hot_vec = one_hot_vec
self._elems = self.elements.index.tolist()
self.__authors__ = ['TsumiNa']
Expand Down Expand Up @@ -410,24 +484,10 @@ def __init__(self,
super().__init__(featurizers=featurizers)

self.composition = Counting(n_jobs=n_jobs, on_errors=on_errors)
self.composition = WeightedAverage(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = WeightedSum(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = WeightedVariance(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = GeometricMean(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = HarmonicMean(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = MaxPooling(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = MinPooling(n_jobs=n_jobs,
on_errors=on_errors,
elemental_info=elemental_info)
self.composition = WeightedAverage(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = WeightedSum(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = WeightedVariance(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = GeometricMean(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = HarmonicMean(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = MaxPooling(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
self.composition = MinPooling(n_jobs=n_jobs, on_errors=on_errors, elemental_info=elemental_info)
100 changes: 100 additions & 0 deletions xenonpy/descriptor/kernel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2021 TsumiNa
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pandas as pd
from typing import Union, Sequence
from sklearn.preprocessing import MinMaxScaler
from xenonpy.datatools import preset

__all__ = ['rbf_kernel' 'calculate_rbf_kernel_matrix']


def rbf_kernel(x_i: np.ndarray, x_j: Union[np.ndarray, int, float], sigmas: Union[float, int, np.ndarray,
Sequence]) -> np.ndarray:
"""
Radial Basis Function (RBF) kernel function.
https://en.wikipedia.org/wiki/Radial_basis_function_kernel

Parameters
----------
sigmas:
The standard deviations (SD).
Can be a single number or a 1d array-like object.
x_i:
Should be a 1d array.
x_j : np.ndarray
Should be a 1d array.

Returns
-------
np.ndarray
Distribution under RBF kernel.

Raises
------
ValueError
Raise error if sigmas has wrong dimension.
"""
sigmas = np.asarray(sigmas)
if sigmas.ndim == 0:
sigmas = sigmas[np.newaxis]
if sigmas.ndim != 1:
raise ValueError('parameter `sigmas` must be a array-like object which has dimension 1')

# K(x_i, x_j) = exp(-||x_i - x_j||^2 / (2 * sigma^2))
p1 = np.power(np.expand_dims(x_i, axis=x_i.ndim) - x_j, 2)
p2 = np.power(sigmas, 2) * 2
dists = np.exp(-np.expand_dims(p1, axis=p1.ndim) / p2).transpose([2, 0, 1])

if dists.shape[0] == 1:
return dists[0]
return dists


def calculate_rbf_kernel_matrix(
*,
element_info: Union[None, pd.DataFrame] = None,
scaled_element_info: bool = False,
quartiles: Sequence[int] = (25, 50, 75),
half_interval_by_sigma: float = 2,
sort_centers: bool = True,
):
if element_info is None:
element_info = preset.elements_completed

if scaled_element_info:
element_info = pd.DataFrame(MinMaxScaler().fit_transform(element_info),
columns=element_info.columns,
index=element_info.index)

all_dists = []
center_labels = []
for feature, data in element_info.iteritems():
if sort_centers:
data = data.values
centers = np.unique(data)
else:
centers = data.unique()
data = data.values
intervals = np.unique([abs(i - j) for i, j in zip(data[:-1], data[1:])]) # get all intervals
quartiles = np.percentile(intervals / 2, [25, 50, 75]) # get 25%, 50%, 75% quantile of intervals / 2
sigmas = quartiles / half_interval_by_sigma # use unique quantiles as sigma of RBF kernel

# RBF kernel
dists = rbf_kernel(data, centers, sigmas)
all_dists.append(dists)
center_labels.append(pd.Series(centers, index=[feature] * centers.size))

return all_dists, pd.concat(center_labels)