Skip to content
Open
189 changes: 116 additions & 73 deletions pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,155 @@
from abc import abstractmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add license/copyright metadata here (and header docstring)

from collections.abc import Callable
from typing import TypeAlias
from typing import Any, Callable, TypeAlias

import numpy as np
import numpy.typing as npt
from scipy.optimize import minimize

from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm

_TObjFunc: TypeAlias = Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float]
_TMetrics: TypeAlias = dict[str, int | float]


class DensityBasedAlgorithm(Algorithm):
@staticmethod
def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth: float) -> npt.NDArray[np.float64]:
"""Perform kernel density estimation on the given observations without fitting a model.
"""Abstract base class for density-based change point detection algorithms.
:param observation: the data points for which to estimate the density.
:param bandwidth: the bandwidth parameter for the kernel density estimation.
Provides common infrastructure for methods that detect change points by
analyzing probability density changes in data segments.
"""
def __init__(
self,
min_window_size: int = 10,
threshold: float = 1.1,
bandwidth: float = 1.0
) -> None:
"""
Initializes density-based change point detector.
:return: estimated density values for the observations.
:param min_window_size: minimum data points required in each segment
:param threshold: detection sensitivity (higher = fewer detections)
:param bandwidth:kernel bandwidth for density estimation
"""
n = len(observation)
x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000)
kde_values = np.zeros_like(x_grid)
for x in observation:
kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2)
self.min_window_size = min_window_size
self.threshold = threshold
self.bandwidth = bandwidth

kde_values /= n * bandwidth * np.sqrt(2 * np.pi)
return kde_values
def detect(self, window: npt.NDArray[np.float64]) -> int:
"""Counts change points in the given data window.
def _calculate_weights(
self,
test_value: npt.NDArray[np.float64],
reference_value: npt.NDArray[np.float64],
bandwidth: float,
objective_function: _TObjFunc,
) -> npt.NDArray[np.float64]:
"""Calculate the weights based on the density ratio between test and reference values.
:param window: input data array (1D or 2D)
:return: number of detected change points
"""
return len(self.localize(window))

:param test_value: the test data points.
:param reference_value: the reference data points.
:param bandwidth: the bandwidth parameter for the kernel density estimation.
:param objective_function: the objective function to minimize.
def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Identifies positions of change points in the data window.
:return: the calculated density ratios normalized to their mean.
:param window: input data array (1D or 2D)
:return: list of change point indices
"""
test_density = self._kernel_density_estimation(test_value, bandwidth)
reference_density = self._kernel_density_estimation(reference_value, bandwidth)
window = self._validate_window(window)
if not self._is_window_valid(window):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why previously validated window can be not valid here? Naming looks a little bit confusing.

return []
scores = self._compute_scores(window)
return self._find_change_points(scores)

def objective_function_wrapper(alpha: npt.NDArray[np.float64], /) -> float:
"""Wrapper for the objective function to calculate the density ratio.
def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method '_validate_window' may be 'static'

"""Ensures input window meets processing requirements.
:param alpha: relative parameter that controls the weighting between the numerator distribution
and the denominator distribution in the density ratio estimation.
:param window: raw input data
:return: validated 2D float64 array
"""
window_arr = np.asarray(window, dtype=np.float64)
if window_arr.ndim == 1:
window_arr = window_arr.reshape(-1, 1).astype(np.float64)
return np.array(window_arr, dtype=np.float64)

:return: the value of the objective function to minimize.
"""
objective_density_ratio = np.exp(test_density - reference_density - alpha)
return objective_function(objective_density_ratio, alpha)
def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]:
"""Filters candidate points using threshold and minimum separation.
res = minimize(objective_function_wrapper, np.zeros(len(test_value)), method="L-BFGS-B")
optimized_alpha: npt.NDArray[np.float64] = res.x
density_ratio: npt.NDArray[np.float64] = np.exp(test_density - reference_density - optimized_alpha)
return density_ratio / np.mean(density_ratio)
:param scores: change point scores for each position
:return: filtered list of change point indices
"""
candidates = np.where(scores > self.threshold)[0]
if not candidates.size:
return []
change_points = [int(candidates[0])]
for point in candidates[1:]:
if point - change_points[-1] > self.min_window_size:
change_points.append(int(point))
return change_points

def _is_window_valid(self, window: npt.NDArray[np.float64]) -> bool:
"""Verifies window meets minimum size requirements.
:param window: input data window
:return: True if window can be processed, else False
"""
return len(window) >= 2 * self.min_window_size
Copy link
Collaborator

@alexdtat alexdtat Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why len must be at least twice as the minimal? It looks a little bit confusing.


@abstractmethod
def detect(self, window: npt.NDArray[np.float64]) -> int:
# maybe rtype tuple[int]
"""Function for finding change points in window
@staticmethod
def _kernel_density_estimation(
observation: npt.NDArray[np.float64],
bandwidth: float
) -> npt.NDArray[np.float64]:
"""Computes kernel density estimate using Gaussian kernels.
:param window: part of global data for finding change points
:return: list of right borders of window change points
:param observation: data points for density estimation
:param bandwidth: smoothing parameter for KDE
:return: density values at evaluation points
"""
raise NotImplementedError
n = observation.shape[0]
x_grid: npt.NDArray[np.float64] = np.linspace(
np.min(observation) - 3*bandwidth,
np.max(observation) + 3*bandwidth,
1000,
dtype=np.float64
)

diff = x_grid[:, np.newaxis] - observation
kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2)
kde_vals = kernel_vals.sum(axis=1)

return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2*np.pi)), dtype=np.float64)

@abstractmethod
def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Function for finding coordinates of change points in window
def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""Computes change point scores.
:param window: part of global data for finding change points
:return: list of window change points
:param window: validated input data
:return: array of change point scores
"""
raise NotImplementedError

@staticmethod
def evaluate_detection_accuracy(true_change_points: list[int], detected_change_points: list[int]) -> _TMetrics:
"""Evaluate the accuracy of change point detection.
:param true_change_points: list of true change point indices.
:param detected_change_points: list of detected change point indices.
:return: a dictionary with evaluation metrics (precision, recall, F1 score).
def evaluate_detection_accuracy(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this here? It seems this logic belongs to benchmarking.

true_change_points: list[int],
detected_change_points: list[int]
) -> _TMetrics:
"""Computes detection performance metrics.
:param true_change_points: ground truth change points
:param detected_change_points: algorithm-detected change points
:return: dictionary containing precision, recall, F1, and error counts
"""
true_positive = len(set(true_change_points) & set(detected_change_points))
false_positive = len(set(detected_change_points) - set(true_change_points))
false_negative = len(set(true_change_points) - set(detected_change_points))

precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0.0
recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0.0
f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0
true_positives = len(set(true_change_points) & set(detected_change_points))
false_positives = len(set(detected_change_points) - set(true_change_points))
false_negatives = len(set(true_change_points) - set(detected_change_points))

precision = (
true_positives / (true_positives + false_positives)
if (true_positives + false_positives) > 0 else 0.0
)
recall = (
true_positives / (true_positives + false_negatives)
if (true_positives + false_negatives) > 0 else 0.0
)
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

return {
"precision": precision,
"recall": recall,
"f1_score": f1_score,
"true_positive": true_positive,
"false_positive": false_positive,
"false_negative": false_negative,
"f1_score": f1,
"true_positive": true_positives,
"false_positive": false_positives,
"false_negative": false_negatives,
}
136 changes: 80 additions & 56 deletions pysatl_cpd/core/algorithms/kliep_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,106 @@
from typing import cast
"""
Module for implementation of CPD algorithm using KLIEP-based divergence estimation.
"""

__author__ = "Aleksandra Listkova"
__copyright__ = "Copyright (c) 2025 Aleksandra Listkova"
__license__ = "SPDX-License-Identifier: MIT"

import numpy as np
import numpy.typing as npt
from numpy import dtype, float64, ndarray
from scipy.optimize import minimize

from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm


class KliepAlgorithm(DensityBasedAlgorithm):
"""Kullback-Leibler Importance Estimation Procedure (KLIEP) algorithm
for change point detection.
KLIEP estimates the density ratio between two distributions and uses
the importance weights for detecting changes in the data distribution.
"""
def __init__(
self,
bandwidth: float = 1.0,
regularization: float = 0.1,
threshold: float = 1.1,
max_iter: int = 100,
min_window_size: int = 10,
) -> None:
"""
Initializes a new instance of KLIEP-based change point detection algorithm.
def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1):
"""Initialize the KLIEP algorithm.
:param bandwidth: kernel bandwidth for density estimation.
:param regularization: regularization coefficient for alpha optimization.
:param threshold: threshold for change point detection.
:param max_iter: maximum iterations for optimization solver.
:param min_window_size: minimum segment size for reliable estimation.
"""
super().__init__(min_window_size, threshold, bandwidth)
self.regularization = regularization
self.max_iter = max_iter

Args:
bandwidth (float): bandwidth parameter for density estimation.
regularization_coef (float): regularization parameter.
threshold (float, optional): threshold for detecting change points.
Defaults to 1.1.
def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""
self.bandwidth = bandwidth
self.regularization_coef = regularization_coef
self.threshold = np.float64(threshold)
Computes KLIEP-based change point scores for each position in the window.
def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float:
"""Loss function for KLIEP.
:param window: input data window (1D array).
:return: array of change point scores at each index.
"""
n_points = window.shape[0]
scores: npt.NDArray[np.float64] = np.zeros(n_points, dtype=np.float64)
common_grid = self._build_common_grid(window)

Args:
density_ratio (np.ndarray): estimated density ratio.
alpha (np.ndarray): coefficients for the density ratio.
for i in range(self.min_window_size, n_points - self.min_window_size):
before = window[:i]
after = window[i:]

Returns:
float: the computed loss value.
"""
return -np.mean(density_ratio) + self.regularization_coef * np.sum(alpha**2)
before_density = self._kde_on_grid(before, self.bandwidth, common_grid)
after_density = self._kde_on_grid(after, self.bandwidth, common_grid)

def detect(self, window: npt.NDArray[np.float64]) -> int:
"""Detect the number of change points in the given data window
using KLIEP.
alpha = self._optimize_alpha(after_density, before_density)
scores[i] = np.mean(np.log(after_density + 1e-10)) - np.mean(np.log(before_density + 1e-10)) - alpha
return scores

Args:
window (Iterable[float]): the data window to detect change points.
def _optimize_alpha(self, test_density: npt.NDArray[np.float64], ref_density: npt.NDArray[np.float64]) -> float:
"""
Optimizes alpha parameter for density ratio estimation.
Returns:
int: the number of detected change points.
:param test_density: KDE values for test segment (after potential CP).
:param ref_density: KDE values for reference segment (before potential CP).
:return: optimal alpha value for density ratio adjustment.
"""

window_sample = np.array(window)
weights = self._calculate_weights(
test_value=window_sample,
reference_value=window_sample,
bandwidth=self.bandwidth,
objective_function=self._loss_function,
)
def loss(alpha_array: npt.NDArray[np.float64], /) -> float:
alpha = alpha_array[0]
ratio = np.exp(np.log(test_density) - np.log(ref_density + 1e-10) - alpha)
return float(-np.mean(np.log(ratio + 1e-10)) + self.regularization * alpha**2)
Copy link
Collaborator

@alexdtat alexdtat Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these 1e-10? Is it for numerical stability? Should it be a parameter? Or maybe make it a private field of class?


return np.count_nonzero(weights > self.threshold)
initial_alpha = np.array([0.0], dtype=np.float64)
bounds = [(0.0, None)]

def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Localize the change points in the given data window using KLIEP.
res = minimize(loss, x0=initial_alpha, method="L-BFGS-B", bounds=bounds, options={"maxiter": self.max_iter})
return float(res.x[0])

Args:
window (Iterable[float]): the data window to localize
change points.
def _build_common_grid(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""
Creates evaluation grid for density estimation.
Returns:
List[int]: the indices of the detected change points.
:param window: input data window.
:return: grid spanning data range with bandwidth-adjusted margins.
"""
window_sample = np.array(window)
weights: ndarray[tuple[int, ...], dtype[float64]] = self._calculate_weights(
test_value=window_sample,
reference_value=window_sample,
bandwidth=self.bandwidth,
objective_function=self._loss_function,
return np.linspace(
np.min(window) - 3 * self.bandwidth, np.max(window) + 3 * self.bandwidth, 1000, dtype=np.float64
)

return cast(list[int], np.where(weights > self.threshold)[0].tolist())
def _kde_on_grid(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method '_kde_on_grid' may be 'static'

self, observation: npt.NDArray[np.float64], bandwidth: float, grid: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
"""
Computes kernel density estimate on specified grid.
:param observation: data points for KDE.
:param bandwidth: kernel bandwidth parameter.
:param grid: evaluation grid points.
:return: density values at grid points.
"""
n = observation.shape[0]
diff = grid[:, np.newaxis] - observation
kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2)
kde_vals = kernel_vals.sum(axis=1)
return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2 * np.pi)), dtype=np.float64)
Loading