diff --git a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py index 0cea21bb..f2412d2e 100644 --- a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py +++ b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py @@ -1,112 +1,155 @@ from abc import abstractmethod -from collections.abc import Callable -from typing import TypeAlias +from typing import Any, Callable, TypeAlias import numpy as np import numpy.typing as npt -from scipy.optimize import minimize from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm _TObjFunc: TypeAlias = Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float] _TMetrics: TypeAlias = dict[str, int | float] - class DensityBasedAlgorithm(Algorithm): - @staticmethod - def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth: float) -> npt.NDArray[np.float64]: - """Perform kernel density estimation on the given observations without fitting a model. + """Abstract base class for density-based change point detection algorithms. - :param observation: the data points for which to estimate the density. - :param bandwidth: the bandwidth parameter for the kernel density estimation. + Provides common infrastructure for methods that detect change points by + analyzing probability density changes in data segments. + """ + def __init__( + self, + min_window_size: int = 10, + threshold: float = 1.1, + bandwidth: float = 1.0 + ) -> None: + """ + Initializes density-based change point detector. - :return: estimated density values for the observations. + :param min_window_size: minimum data points required in each segment + :param threshold: detection sensitivity (higher = fewer detections) + :param bandwidth:kernel bandwidth for density estimation """ - n = len(observation) - x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000) - kde_values = np.zeros_like(x_grid) - for x in observation: - kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2) + self.min_window_size = min_window_size + self.threshold = threshold + self.bandwidth = bandwidth - kde_values /= n * bandwidth * np.sqrt(2 * np.pi) - return kde_values + def detect(self, window: npt.NDArray[np.float64]) -> int: + """Counts change points in the given data window. - def _calculate_weights( - self, - test_value: npt.NDArray[np.float64], - reference_value: npt.NDArray[np.float64], - bandwidth: float, - objective_function: _TObjFunc, - ) -> npt.NDArray[np.float64]: - """Calculate the weights based on the density ratio between test and reference values. + :param window: input data array (1D or 2D) + :return: number of detected change points + """ + return len(self.localize(window)) - :param test_value: the test data points. - :param reference_value: the reference data points. - :param bandwidth: the bandwidth parameter for the kernel density estimation. - :param objective_function: the objective function to minimize. + def localize(self, window: npt.NDArray[np.float64]) -> list[int]: + """Identifies positions of change points in the data window. - :return: the calculated density ratios normalized to their mean. + :param window: input data array (1D or 2D) + :return: list of change point indices """ - test_density = self._kernel_density_estimation(test_value, bandwidth) - reference_density = self._kernel_density_estimation(reference_value, bandwidth) + window = self._validate_window(window) + if not self._is_window_valid(window): + return [] + scores = self._compute_scores(window) + return self._find_change_points(scores) - def objective_function_wrapper(alpha: npt.NDArray[np.float64], /) -> float: - """Wrapper for the objective function to calculate the density ratio. + def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]: + """Ensures input window meets processing requirements. - :param alpha: relative parameter that controls the weighting between the numerator distribution - and the denominator distribution in the density ratio estimation. + :param window: raw input data + :return: validated 2D float64 array + """ + window_arr = np.asarray(window, dtype=np.float64) + if window_arr.ndim == 1: + window_arr = window_arr.reshape(-1, 1).astype(np.float64) + return np.array(window_arr, dtype=np.float64) - :return: the value of the objective function to minimize. - """ - objective_density_ratio = np.exp(test_density - reference_density - alpha) - return objective_function(objective_density_ratio, alpha) + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: + """Filters candidate points using threshold and minimum separation. - res = minimize(objective_function_wrapper, np.zeros(len(test_value)), method="L-BFGS-B") - optimized_alpha: npt.NDArray[np.float64] = res.x - density_ratio: npt.NDArray[np.float64] = np.exp(test_density - reference_density - optimized_alpha) - return density_ratio / np.mean(density_ratio) + :param scores: change point scores for each position + :return: filtered list of change point indices + """ + candidates = np.where(scores > self.threshold)[0] + if not candidates.size: + return [] + change_points = [int(candidates[0])] + for point in candidates[1:]: + if point - change_points[-1] > self.min_window_size: + change_points.append(int(point)) + return change_points + + def _is_window_valid(self, window: npt.NDArray[np.float64]) -> bool: + """Verifies window meets minimum size requirements. + + :param window: input data window + :return: True if window can be processed, else False + """ + return len(window) >= 2 * self.min_window_size - @abstractmethod - def detect(self, window: npt.NDArray[np.float64]) -> int: - # maybe rtype tuple[int] - """Function for finding change points in window + @staticmethod + def _kernel_density_estimation( + observation: npt.NDArray[np.float64], + bandwidth: float + ) -> npt.NDArray[np.float64]: + """Computes kernel density estimate using Gaussian kernels. - :param window: part of global data for finding change points - :return: list of right borders of window change points + :param observation: data points for density estimation + :param bandwidth: smoothing parameter for KDE + :return: density values at evaluation points """ - raise NotImplementedError + n = observation.shape[0] + x_grid: npt.NDArray[np.float64] = np.linspace( + np.min(observation) - 3*bandwidth, + np.max(observation) + 3*bandwidth, + 1000, + dtype=np.float64 + ) + + diff = x_grid[:, np.newaxis] - observation + kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2) + kde_vals = kernel_vals.sum(axis=1) + + return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2*np.pi)), dtype=np.float64) @abstractmethod - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Function for finding coordinates of change points in window + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + """Computes change point scores. - :param window: part of global data for finding change points - :return: list of window change points + :param window: validated input data + :return: array of change point scores """ raise NotImplementedError @staticmethod - def evaluate_detection_accuracy(true_change_points: list[int], detected_change_points: list[int]) -> _TMetrics: - """Evaluate the accuracy of change point detection. - - :param true_change_points: list of true change point indices. - :param detected_change_points: list of detected change point indices. - - :return: a dictionary with evaluation metrics (precision, recall, F1 score). + def evaluate_detection_accuracy( + true_change_points: list[int], + detected_change_points: list[int] + ) -> _TMetrics: + """Computes detection performance metrics. + + :param true_change_points: ground truth change points + :param detected_change_points: algorithm-detected change points + :return: dictionary containing precision, recall, F1, and error counts """ - true_positive = len(set(true_change_points) & set(detected_change_points)) - false_positive = len(set(detected_change_points) - set(true_change_points)) - false_negative = len(set(true_change_points) - set(detected_change_points)) - - precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0.0 - recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0.0 - f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0 + true_positives = len(set(true_change_points) & set(detected_change_points)) + false_positives = len(set(detected_change_points) - set(true_change_points)) + false_negatives = len(set(true_change_points) - set(detected_change_points)) + + precision = ( + true_positives / (true_positives + false_positives) + if (true_positives + false_positives) > 0 else 0.0 + ) + recall = ( + true_positives / (true_positives + false_negatives) + if (true_positives + false_negatives) > 0 else 0.0 + ) + f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 return { "precision": precision, "recall": recall, - "f1_score": f1_score, - "true_positive": true_positive, - "false_positive": false_positive, - "false_negative": false_negative, + "f1_score": f1, + "true_positive": true_positives, + "false_positive": false_positives, + "false_negative": false_negatives, } diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 39eb255c..1cae33db 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -1,82 +1,106 @@ -from typing import cast +""" +Module for implementation of CPD algorithm using KLIEP-based divergence estimation. +""" + +__author__ = "Aleksandra Listkova" +__copyright__ = "Copyright (c) 2025 Aleksandra Listkova" +__license__ = "SPDX-License-Identifier: MIT" import numpy as np import numpy.typing as npt -from numpy import dtype, float64, ndarray +from scipy.optimize import minimize from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm class KliepAlgorithm(DensityBasedAlgorithm): - """Kullback-Leibler Importance Estimation Procedure (KLIEP) algorithm - for change point detection. - - KLIEP estimates the density ratio between two distributions and uses - the importance weights for detecting changes in the data distribution. - """ + def __init__( + self, + bandwidth: float = 1.0, + regularization: float = 0.1, + threshold: float = 1.1, + max_iter: int = 100, + min_window_size: int = 10, + ) -> None: + """ + Initializes a new instance of KLIEP-based change point detection algorithm. - def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1): - """Initialize the KLIEP algorithm. + :param bandwidth: kernel bandwidth for density estimation. + :param regularization: regularization coefficient for alpha optimization. + :param threshold: threshold for change point detection. + :param max_iter: maximum iterations for optimization solver. + :param min_window_size: minimum segment size for reliable estimation. + """ + super().__init__(min_window_size, threshold, bandwidth) + self.regularization = regularization + self.max_iter = max_iter - Args: - bandwidth (float): bandwidth parameter for density estimation. - regularization_coef (float): regularization parameter. - threshold (float, optional): threshold for detecting change points. - Defaults to 1.1. + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ - self.bandwidth = bandwidth - self.regularization_coef = regularization_coef - self.threshold = np.float64(threshold) + Computes KLIEP-based change point scores for each position in the window. - def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float: - """Loss function for KLIEP. + :param window: input data window (1D array). + :return: array of change point scores at each index. + """ + n_points = window.shape[0] + scores: npt.NDArray[np.float64] = np.zeros(n_points, dtype=np.float64) + common_grid = self._build_common_grid(window) - Args: - density_ratio (np.ndarray): estimated density ratio. - alpha (np.ndarray): coefficients for the density ratio. + for i in range(self.min_window_size, n_points - self.min_window_size): + before = window[:i] + after = window[i:] - Returns: - float: the computed loss value. - """ - return -np.mean(density_ratio) + self.regularization_coef * np.sum(alpha**2) + before_density = self._kde_on_grid(before, self.bandwidth, common_grid) + after_density = self._kde_on_grid(after, self.bandwidth, common_grid) - def detect(self, window: npt.NDArray[np.float64]) -> int: - """Detect the number of change points in the given data window - using KLIEP. + alpha = self._optimize_alpha(after_density, before_density) + scores[i] = np.mean(np.log(after_density + 1e-10)) - np.mean(np.log(before_density + 1e-10)) - alpha + return scores - Args: - window (Iterable[float]): the data window to detect change points. + def _optimize_alpha(self, test_density: npt.NDArray[np.float64], ref_density: npt.NDArray[np.float64]) -> float: + """ + Optimizes alpha parameter for density ratio estimation. - Returns: - int: the number of detected change points. + :param test_density: KDE values for test segment (after potential CP). + :param ref_density: KDE values for reference segment (before potential CP). + :return: optimal alpha value for density ratio adjustment. """ - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) + def loss(alpha_array: npt.NDArray[np.float64], /) -> float: + alpha = alpha_array[0] + ratio = np.exp(np.log(test_density) - np.log(ref_density + 1e-10) - alpha) + return float(-np.mean(np.log(ratio + 1e-10)) + self.regularization * alpha**2) - return np.count_nonzero(weights > self.threshold) + initial_alpha = np.array([0.0], dtype=np.float64) + bounds = [(0.0, None)] - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Localize the change points in the given data window using KLIEP. + res = minimize(loss, x0=initial_alpha, method="L-BFGS-B", bounds=bounds, options={"maxiter": self.max_iter}) + return float(res.x[0]) - Args: - window (Iterable[float]): the data window to localize - change points. + def _build_common_grid(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + """ + Creates evaluation grid for density estimation. - Returns: - List[int]: the indices of the detected change points. + :param window: input data window. + :return: grid spanning data range with bandwidth-adjusted margins. """ - window_sample = np.array(window) - weights: ndarray[tuple[int, ...], dtype[float64]] = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, + return np.linspace( + np.min(window) - 3 * self.bandwidth, np.max(window) + 3 * self.bandwidth, 1000, dtype=np.float64 ) - return cast(list[int], np.where(weights > self.threshold)[0].tolist()) + def _kde_on_grid( + self, observation: npt.NDArray[np.float64], bandwidth: float, grid: npt.NDArray[np.float64] + ) -> npt.NDArray[np.float64]: + """ + Computes kernel density estimate on specified grid. + + :param observation: data points for KDE. + :param bandwidth: kernel bandwidth parameter. + :param grid: evaluation grid points. + :return: density values at grid points. + """ + n = observation.shape[0] + diff = grid[:, np.newaxis] - observation + kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2) + kde_vals = kernel_vals.sum(axis=1) + return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2 * np.pi)), dtype=np.float64) diff --git a/pysatl_cpd/core/algorithms/rulsif_algorithm.py b/pysatl_cpd/core/algorithms/rulsif_algorithm.py index f0f04da8..1a808b2e 100644 --- a/pysatl_cpd/core/algorithms/rulsif_algorithm.py +++ b/pysatl_cpd/core/algorithms/rulsif_algorithm.py @@ -1,79 +1,64 @@ -from typing import cast +""" +Module for implementation of CPD algorithm using RULSIF-based divergence estimation. +""" + +__author__ = "Aleksandra Listkova" +__copyright__ = "Copyright (c) 2025 Aleksandra Listkova" +__license__ = "SPDX-License-Identifier: MIT" import numpy as np import numpy.typing as npt +from scipy.linalg import solve from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm class RulsifAlgorithm(DensityBasedAlgorithm): - """Relative Unconstrained Least-Squares Importance Fitting (RULSIF) - algorithm for change point detection. - - RULSIF estimates the density ratio between two distributions and uses - the importance weights for detecting changes in the data distribution. - """ - - def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1): - """Initialize the RULSIF algorithm. - - Args: - bandwidth (float): bandwidth parameter for density estimation. - regularization_coef (float): regularization parameter. - threshold (float, optional): threshold for detecting change points. - Defaults to 1.1. + def __init__( + self, + alpha: float = 0.1, + bandwidth: float = 1.0, + lambda_reg: float = 0.1, + threshold: float = 1.1, + min_window_size: int = 10, + ) -> None: """ - self.bandwidth = bandwidth - self.regularization_coef = regularization_coef - self.threshold = threshold - - def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float: - """Loss function for RULSIF. - - Args: - density_ratio (np.ndarray): estimated density ratio. - alpha (np.ndarray): coefficients for the density ratio. - - Returns: - float: the computed loss value. + Initializes RULSIF-based change point detector. + + :param alpha: mixture coefficient (0-1) for reference/test densities + :param bandwidth: kernel bandwidth for density estimation + :param lambda_reg: L2 regularization strength + :param threshold: detection sensitivity threshold + :param min_window_size: minimum segment size requirement + :raises ValueError: if alpha is not in (0,1) """ - return np.mean((density_ratio - 1) ** 2) + self.regularization_coef * np.sum(alpha**2) + super().__init__(min_window_size, threshold, bandwidth) + if not 0 < alpha < 1: + raise ValueError("Alpha must be between 0 and 1") + self.alpha = alpha + self.lambda_reg = lambda_reg - def detect(self, window: npt.NDArray[np.float64]) -> int: - """Detect the number of change points in the given data window - using RULSIF. - - Args: - window (Iterable[float]): the data window to detect change points. - - Returns: - int: the number of detected change points. + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) - - return np.count_nonzero(weights > self.threshold) + Computes RULSIF-based change point scores for each position. - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Localize the change points in the given data window using RULSIF. - - Args: - window (Iterable[float]): the data window to localize change points. - - Returns: - List[int]: the indices of the detected change points. + :param window: input data window (1D array) + :return: array of divergence scores at each index """ - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) - - return cast(list[int], np.where(weights > self.threshold)[0].tolist()) + n_points = window.shape[0] + scores: npt.NDArray[np.float64] = np.zeros(n_points, dtype=np.float64) + for i in range(self.min_window_size, n_points - self.min_window_size): + ref = window[:i] + test = window[i:] + K_ref = self._kernel_density_estimation(ref, self.bandwidth) + K_test = self._kernel_density_estimation(test, self.bandwidth) + H = ( + (1 - self.alpha) * (K_ref @ K_ref.T) / i + + self.alpha * (K_test @ K_test.T) / (n_points - i) + + self.lambda_reg * np.eye(K_ref.shape[0], dtype=np.float64) + ) + h = K_test.mean(axis=1) + theta = solve(H, h, assume_a='pos') + density_ratio = theta @ K_test + scores[i] = np.mean((density_ratio - 1) ** 2) + return scores