diff --git a/pysatl_cpd/core/algorithms/sdar/__init__.py b/pysatl_cpd/core/algorithms/sdar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pysatl_cpd/core/algorithms/sdar/sdar.py b/pysatl_cpd/core/algorithms/sdar/sdar.py new file mode 100644 index 00000000..3ff1a0b6 --- /dev/null +++ b/pysatl_cpd/core/algorithms/sdar/sdar.py @@ -0,0 +1,188 @@ +from collections import deque +from typing import Any, Optional + +import numpy as np +import numpy.typing as npt + + +class SDARmodel: + """ + Implements a Seasonal Vector Autoregressive (SDAR) model for online calculation + of anomaly scores in time series. + """ + + def __init__(self, order: int = 5, forgetting_factor: float = 0.99, smoothing_window_size: int = 5) -> None: + """ + Initializes the SDAR model. + + :param order: The order of the autoregressive model (p). Must be a positive integer. + :param forgetting_factor: The forgetting factor (lambda) in the range (0, 1). + Values close to 1 imply slow forgetting. + :param smoothing_window_size: The size of the window for averaging (smoothing) the anomaly scores. + Must be a positive integer. + :return: + """ + assert order > 0, "Order must be a positive integer." + assert 0 < forgetting_factor < 1, "Forgetting factor must be between 0 and 1." + assert smoothing_window_size > 0, "Smoothing window size must be a positive integer." + + self.__order = order + self.__lambda = forgetting_factor + self.__smoothing_window_size: int = smoothing_window_size + + self.__dimension: Optional[int] = None + self.__mu: Optional[npt.NDArray[np.float64]] = None + self.__covariance_matrices: deque[npt.NDArray[np.float64]] = deque(maxlen=self.__order + 1) + self.__ar_matrices: list[npt.NDArray[np.float64]] = [] + self.__sigma: Optional[npt.NDArray[np.float64]] = None + + self.__history: deque[npt.NDArray[np.float64]] = deque(maxlen=self.__order) + self.__scores_buffer: deque[np.float64] = deque(maxlen=self.__smoothing_window_size) + + def __initialize_state(self, x: npt.NDArray[np.float64]) -> None: + """ + Initializes the model's internal state based on the first observation. + :param x: the first observation. + :return: + """ + self.__dimension = x.shape[0] + self.__mu = np.zeros(self.__dimension) + + for _ in range(self.__order + 1): + self.__covariance_matrices.append(np.zeros((self.__dimension, self.__dimension))) + + self.__ar_matrices = [np.zeros((self.__dimension, self.__dimension)) for _ in range(self.__order)] + self.__sigma = np.eye(self.__dimension) + + def clear(self) -> None: + """ + Resets the internal state of the model to its initial values. + :return: + """ + if self.__dimension is None: + return + + assert self.__mu is not None, "clear() should not be called on an uninitialized model with mu=None" + + self.__mu.fill(0) + for i in range(len(self.__covariance_matrices)): + self.__covariance_matrices[i].fill(0) + self.__ar_matrices = [np.zeros((self.__dimension, self.__dimension)) for _ in range(self.__order)] + self.__sigma = np.eye(self.__dimension) + self.__history.clear() + self.__scores_buffer.clear() + + def __calculate_error(self, x: npt.NDArray[np.float64]) -> Any: + """ + Calculates the prediction error for the current observation. + :param x: a new observation. + :return: the error vector between the observation and the predicted point. + """ + assert self.__mu is not None, "Model must be initialized before calculating error." + + centered_history = np.array(list(self.__history)) - self.__mu + prediction_offset = np.sum([self.__ar_matrices[i] @ centered_history[i] for i in range(self.__order)], axis=0) + x_hat = self.__mu + prediction_offset + error = x - x_hat + return error + + def __update__matrices(self, x: npt.NDArray[np.float64]) -> None: + """ + Recursively updates the covariance matrices using the forgetting factor. + :param x: a new observation. + :return: + """ + assert self.__mu is not None, "Model must be initialized before update matrices." + + centered_x = x - self.__mu + self.__covariance_matrices[0] = self.__lambda * self.__covariance_matrices[0] + (1 - self.__lambda) * np.outer( + centered_x, centered_x + ) + for i in range(1, self.__order + 1): + centered_hist = self.__history[-i] - self.__mu + self.__covariance_matrices[i] = self.__lambda * self.__covariance_matrices[i] + ( + 1 - self.__lambda + ) * np.outer(centered_x, centered_hist) + + def __update_model(self, x: npt.NDArray[np.float64]) -> None: + """ + Performs a single model update step based on a new observation. + :param x: a new observation. + :return: + """ + if self.__dimension is None: + self.__initialize_state(x) + + assert self.__mu is not None + if len(self.__history) < self.__order: + self.__mu = self.__lambda * self.__mu + (1 - self.__lambda) * x + self.__history.append(x) + return + + self.__mu = self.__lambda * self.__mu + (1 - self.__lambda) * x + + self.__update__matrices(x) + self.__solve_equations() + + error = self.__calculate_error(x) + + assert self.__sigma is not None + self.__sigma = self.__lambda * self.__sigma + (1 - self.__lambda) * np.outer(error, error) + + score = self.__calculate_log_likelihood(error) + score = np.maximum(score, 1e-6) + self.__scores_buffer.append(score) + self.__history.append(x) + return + + def __calculate_log_likelihood(self, error: npt.NDArray[np.float64]) -> np.float64: + """ + Calculates the anomaly score as the negative log-likelihood of the prediction error. + :param error: the prediction error. + :return: the calculated anomaly score. + """ + assert self.__dimension is not None and self.__sigma is not None, "Model must be initialized." + _, log_det = np.linalg.slogdet(self.__sigma) + + inv_sigma = np.linalg.inv(self.__sigma) + mahalanobis_dist = error.T @ inv_sigma @ error + + log_likelihood = -0.5 * (self.__dimension * np.log(2 * np.pi) + log_det + mahalanobis_dist) + return np.float64(-log_likelihood) + + def __solve_equations(self) -> None: + """ + Finds the AR model coefficients by solving the equations for the multivariate case. + Handles potential singularity of the matrix. + :return: + """ + assert self.__dimension is not None, "Model must be initialized before solve equations." + + d, p = self.__dimension, self.__order + transition_matrix = np.zeros((p * d, p * d)) + for i in range(p): + for j in range(p): + lag = abs(i - j) + cov_mat = self.__covariance_matrices[lag] + transition_matrix[i * d : (i + 1) * d, j * d : (j + 1) * d] = cov_mat if i >= j else cov_mat.T + + transition_matrix_stacked = np.vstack([self.__covariance_matrices[i + 1] for i in range(p)]) + + weights: npt.NDArray[np.float64] = np.linalg.solve(transition_matrix, transition_matrix_stacked).astype( + np.float64 + ) + self.__ar_matrices = [weights[i * d : (i + 1) * d, :] for i in range(p)] + + def get_smoothed_score(self, x: npt.NDArray[np.float64]) -> Optional[np.float64]: + """ + Processes a new observation and returns the smoothed anomaly score. + :param x: new observation. + :return: the smoothed anomaly score, or None if the smoothing buffer is not yet full. + """ + self.__update_model(x) + if len(self.__scores_buffer) < self.__smoothing_window_size: + return None + + score = np.mean(list(self.__scores_buffer)) + score = np.maximum(score, 1e-9) + return np.float64(score) diff --git a/pysatl_cpd/core/algorithms/sdar_online_algorithm.py b/pysatl_cpd/core/algorithms/sdar_online_algorithm.py new file mode 100644 index 00000000..e727c5a1 --- /dev/null +++ b/pysatl_cpd/core/algorithms/sdar_online_algorithm.py @@ -0,0 +1,112 @@ +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm +from pysatl_cpd.core.algorithms.sdar.sdar import SDARmodel + + +class SDARAlgorithm(OnlineAlgorithm): + """ + An online change point detection algorithm based on a two-level SDAR model. + """ + + def __init__( + self, order: int = 2, smoothing_window_size: int = 3, forgetting_factor: float = 0.99, threshold: float = 5 + ) -> None: + """ + Initializes the change point detection algorithm. + :param order: the AR model order for both SDAR models. + :param smoothing_window_size: the smoothing window size for both SDAR models. + :param forgetting_factor: the forgetting factor (lambda) for both SDAR models. + :param threshold: the threshold for the second-level anomaly score, above which a change point is detected. + :return: + """ + assert threshold >= 0, "Threshold must be non-negative." + + self.__threshold: np.float64 = np.float64(threshold) + + self.__first_model: SDARmodel = SDARmodel(order, forgetting_factor, smoothing_window_size) + self.__second_model: SDARmodel = SDARmodel(order, forgetting_factor, smoothing_window_size) + + self.__first_scores: list[np.float64 | None] = [] + self.__second_scores: list[np.float64 | None] = [] + + self.__current_time: int = 0 + self.__was_change_point: bool = False + self.__change_point: Optional[int] = None + + def clear(self) -> None: + self.__first_model.clear() + self.__second_model.clear() + + self.__first_scores = [] + self.__second_scores = [] + + self.__current_time = 0 + self.__was_change_point = False + self.__change_point = None + + def __process_point(self, observation: npt.NDArray[np.float64]) -> None: + """ + Processes a single observation, updating both models and checking for a change point. + :param observation: new observation. + :return: + """ + self.__current_time += 1 + + first_smooth_score = self.__first_model.get_smoothed_score(observation) + self.__first_scores.append(first_smooth_score) + if first_smooth_score is None: + return + + score_as_input = np.array([first_smooth_score]) + second_smooth_score = self.__second_model.get_smoothed_score(score_as_input) + self.__second_scores.append(second_smooth_score) + if second_smooth_score is None: + return + + if second_smooth_score >= self.__threshold: + self.__handle_change_point() + + def __handle_change_point(self) -> None: + """ + Handles the change point detection. It registers the location and resets the models' state. + :return: + """ + self.__was_change_point = True + self.__change_point = self.__current_time + + self.__first_model.clear() + self.__second_model.clear() + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Method for a step of detection of a change point. + :param observation: new observation of a time series. + :return: bool observation whether a change point was detected after processing the new observation. + """ + if not isinstance(observation, np.ndarray): + observation = np.array([observation]) + + self.__process_point(observation) + result = self.__was_change_point + self.__was_change_point = False + return result + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Method for a step of localization of a change point. + :param observation: new observation of a time series + :return: absolute location of a change point, acquired after processing the new observation, + or None if there wasn't any. + """ + if not isinstance(observation, np.ndarray): + observation = np.array([observation]) + + self.__process_point(observation) + result = self.__change_point + self.__was_change_point = False + self.__change_point = None + return result diff --git a/tests/test_core/test_algorithms/test_sdar_online_algorithm.py b/tests/test_core/test_algorithms/test_sdar_online_algorithm.py new file mode 100644 index 00000000..467f8eff --- /dev/null +++ b/tests/test_core/test_algorithms/test_sdar_online_algorithm.py @@ -0,0 +1,96 @@ +import numpy as np +import pytest + +import pysatl_cpd.generator.distributions as dstr +from pysatl_cpd.core.algorithms.sdar_online_algorithm import SDARAlgorithm + + +@pytest.fixture(scope="module") +def set_seed(): + np.random.seed(1) + + +@pytest.fixture +def algorithm_factory(): + def _factory(): + return SDARAlgorithm( + order=2, + forgetting_factor=0.97, + smoothing_window_size=5, + threshold=2, + ) + + return _factory + + +@pytest.fixture(scope="function") +def data_params(): + base_params = { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + "tolerable_deviation": 25, + } + return base_params + + +@pytest.fixture() +def generate_data(data_params): + np.random.seed(1) + cp = data_params["change_point"] + size = data_params["size"] + + left_distr = dstr.Distribution.from_str(str(dstr.Distributions.BETA), {"alpha": "0.5", "beta": "0.5"}) + right_distr = dstr.Distribution.from_str(str(dstr.Distributions.NORMAL), {"mean": "1.0", "variance": "1.0"}) + return np.concatenate([left_distr.scipy_sample(cp), right_distr.scipy_sample(size - cp)]) + + +class TestSDARAlgorithm: + @pytest.fixture(autouse=True) + def setup(self, algorithm_factory): + self.algorithm_factory = algorithm_factory + + def test_consecutive_detection(self, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + algorithm = self.algorithm_factory() + was_change_point = False + for value in generate_data: + result = algorithm.detect(value) + if result: + was_change_point = True + + assert was_change_point, "There was undetected change point in data" + + def test_correctness_of_consecutive_detection(self, generate_data, data_params): + outer_algorithm = self.algorithm_factory() + for _ in range(data_params["num_of_tests"]): + inner_algorithm = self.algorithm_factory() + outer_result = [] + inner_result = [] + + for value in generate_data: + outer_result.append(outer_algorithm.detect(value)) + inner_result.append(inner_algorithm.detect(value)) + + outer_algorithm.clear() + assert outer_result == inner_result, "Consecutive and independent detection should give same results" + + def test_correctness_of_consecutive_localization(self, generate_data, data_params): + outer_algorithm = self.algorithm_factory() + for _ in range(data_params["num_of_tests"]): + inner_algorithm = self.algorithm_factory() + + for value in generate_data: + outer_result = outer_algorithm.localize(value) + inner_result = inner_algorithm.localize(value) + assert outer_result == inner_result, "Consecutive and independent localization should give same results" + + outer_algorithm.clear() + + def test_online_localization_correctness(self, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + algorithm = self.algorithm_factory() + for time, value in np.ndenumerate(generate_data): + result = algorithm.localize(value) + if result: + assert result <= time[0] + 1, "Change point cannot be in future"