Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
188 changes: 188 additions & 0 deletions pysatl_cpd/core/algorithms/sdar/sdar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from collections import deque
from typing import Any, Optional

import numpy as np
import numpy.typing as npt


class SDARmodel:
"""
Implements a Seasonal Vector Autoregressive (SDAR) model for online calculation
of anomaly scores in time series.
"""

def __init__(self, order: int = 5, forgetting_factor: float = 0.99, smoothing_window_size: int = 5) -> None:
"""
Initializes the SDAR model.

:param order: The order of the autoregressive model (p). Must be a positive integer.
:param forgetting_factor: The forgetting factor (lambda) in the range (0, 1).
Values close to 1 imply slow forgetting.
:param smoothing_window_size: The size of the window for averaging (smoothing) the anomaly scores.
Must be a positive integer.
:return:
"""
assert order > 0, "Order must be a positive integer."
assert 0 < forgetting_factor < 1, "Forgetting factor must be between 0 and 1."
assert smoothing_window_size > 0, "Smoothing window size must be a positive integer."

self.__order = order
self.__lambda = forgetting_factor
self.__smoothing_window_size: int = smoothing_window_size

self.__dimension: Optional[int] = None
self.__mu: Optional[npt.NDArray[np.float64]] = None
self.__covariance_matrices: deque[npt.NDArray[np.float64]] = deque(maxlen=self.__order + 1)
self.__ar_matrices: list[npt.NDArray[np.float64]] = []
self.__sigma: Optional[npt.NDArray[np.float64]] = None

self.__history: deque[npt.NDArray[np.float64]] = deque(maxlen=self.__order)
self.__scores_buffer: deque[np.float64] = deque(maxlen=self.__smoothing_window_size)

def __initialize_state(self, x: npt.NDArray[np.float64]) -> None:
"""
Initializes the model's internal state based on the first observation.
:param x: the first observation.
:return:
"""
self.__dimension = x.shape[0]
self.__mu = np.zeros(self.__dimension)

for _ in range(self.__order + 1):
self.__covariance_matrices.append(np.zeros((self.__dimension, self.__dimension)))

self.__ar_matrices = [np.zeros((self.__dimension, self.__dimension)) for _ in range(self.__order)]
self.__sigma = np.eye(self.__dimension)

def clear(self) -> None:
"""
Resets the internal state of the model to its initial values.
:return:
"""
if self.__dimension is None:
return

assert self.__mu is not None, "clear() should not be called on an uninitialized model with mu=None"

self.__mu.fill(0)
for i in range(len(self.__covariance_matrices)):
self.__covariance_matrices[i].fill(0)
self.__ar_matrices = [np.zeros((self.__dimension, self.__dimension)) for _ in range(self.__order)]
self.__sigma = np.eye(self.__dimension)
self.__history.clear()
self.__scores_buffer.clear()

def __calculate_error(self, x: npt.NDArray[np.float64]) -> Any:
"""
Calculates the prediction error for the current observation.
:param x: a new observation.
:return: the error vector between the observation and the predicted point.
"""
assert self.__mu is not None, "Model must be initialized before calculating error."

centered_history = np.array(list(self.__history)) - self.__mu
prediction_offset = np.sum([self.__ar_matrices[i] @ centered_history[i] for i in range(self.__order)], axis=0)
x_hat = self.__mu + prediction_offset
error = x - x_hat
return error

def __update__matrices(self, x: npt.NDArray[np.float64]) -> None:
"""
Recursively updates the covariance matrices using the forgetting factor.
:param x: a new observation.
:return:
"""
assert self.__mu is not None, "Model must be initialized before update matrices."

centered_x = x - self.__mu
self.__covariance_matrices[0] = self.__lambda * self.__covariance_matrices[0] + (1 - self.__lambda) * np.outer(
centered_x, centered_x
)
for i in range(1, self.__order + 1):
centered_hist = self.__history[-i] - self.__mu
self.__covariance_matrices[i] = self.__lambda * self.__covariance_matrices[i] + (
1 - self.__lambda
) * np.outer(centered_x, centered_hist)

def __update_model(self, x: npt.NDArray[np.float64]) -> None:
"""
Performs a single model update step based on a new observation.
:param x: a new observation.
:return:
"""
if self.__dimension is None:
self.__initialize_state(x)

assert self.__mu is not None
if len(self.__history) < self.__order:
self.__mu = self.__lambda * self.__mu + (1 - self.__lambda) * x
self.__history.append(x)
return

self.__mu = self.__lambda * self.__mu + (1 - self.__lambda) * x

self.__update__matrices(x)
self.__solve_equations()

error = self.__calculate_error(x)

assert self.__sigma is not None
self.__sigma = self.__lambda * self.__sigma + (1 - self.__lambda) * np.outer(error, error)

score = self.__calculate_log_likelihood(error)
score = np.maximum(score, 1e-6)
self.__scores_buffer.append(score)
self.__history.append(x)
return

def __calculate_log_likelihood(self, error: npt.NDArray[np.float64]) -> np.float64:
"""
Calculates the anomaly score as the negative log-likelihood of the prediction error.
:param error: the prediction error.
:return: the calculated anomaly score.
"""
assert self.__dimension is not None and self.__sigma is not None, "Model must be initialized."
_, log_det = np.linalg.slogdet(self.__sigma)

inv_sigma = np.linalg.inv(self.__sigma)
mahalanobis_dist = error.T @ inv_sigma @ error

log_likelihood = -0.5 * (self.__dimension * np.log(2 * np.pi) + log_det + mahalanobis_dist)
return np.float64(-log_likelihood)

def __solve_equations(self) -> None:
"""
Finds the AR model coefficients by solving the equations for the multivariate case.
Handles potential singularity of the matrix.
:return:
"""
assert self.__dimension is not None, "Model must be initialized before solve equations."

d, p = self.__dimension, self.__order
transition_matrix = np.zeros((p * d, p * d))
for i in range(p):
for j in range(p):
lag = abs(i - j)
cov_mat = self.__covariance_matrices[lag]
transition_matrix[i * d : (i + 1) * d, j * d : (j + 1) * d] = cov_mat if i >= j else cov_mat.T

transition_matrix_stacked = np.vstack([self.__covariance_matrices[i + 1] for i in range(p)])

weights: npt.NDArray[np.float64] = np.linalg.solve(transition_matrix, transition_matrix_stacked).astype(
np.float64
)
self.__ar_matrices = [weights[i * d : (i + 1) * d, :] for i in range(p)]

def get_smoothed_score(self, x: npt.NDArray[np.float64]) -> Optional[np.float64]:
"""
Processes a new observation and returns the smoothed anomaly score.
:param x: new observation.
:return: the smoothed anomaly score, or None if the smoothing buffer is not yet full.
"""
self.__update_model(x)
if len(self.__scores_buffer) < self.__smoothing_window_size:
return None

score = np.mean(list(self.__scores_buffer))
score = np.maximum(score, 1e-9)
return np.float64(score)
112 changes: 112 additions & 0 deletions pysatl_cpd/core/algorithms/sdar_online_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from typing import Optional

import numpy as np
import numpy.typing as npt

from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm
from pysatl_cpd.core.algorithms.sdar.sdar import SDARmodel


class SDARAlgorithm(OnlineAlgorithm):
"""
An online change point detection algorithm based on a two-level SDAR model.
"""

def __init__(
self, order: int = 2, smoothing_window_size: int = 3, forgetting_factor: float = 0.99, threshold: float = 5
) -> None:
"""
Initializes the change point detection algorithm.
:param order: the AR model order for both SDAR models.
:param smoothing_window_size: the smoothing window size for both SDAR models.
:param forgetting_factor: the forgetting factor (lambda) for both SDAR models.
:param threshold: the threshold for the second-level anomaly score, above which a change point is detected.
:return:
"""
assert threshold >= 0, "Threshold must be non-negative."

self.__threshold: np.float64 = np.float64(threshold)

self.__first_model: SDARmodel = SDARmodel(order, forgetting_factor, smoothing_window_size)
self.__second_model: SDARmodel = SDARmodel(order, forgetting_factor, smoothing_window_size)

self.__first_scores: list[np.float64 | None] = []
self.__second_scores: list[np.float64 | None] = []

self.__current_time: int = 0
self.__was_change_point: bool = False
self.__change_point: Optional[int] = None

def clear(self) -> None:
self.__first_model.clear()
self.__second_model.clear()

self.__first_scores = []
self.__second_scores = []

self.__current_time = 0
self.__was_change_point = False
self.__change_point = None

def __process_point(self, observation: npt.NDArray[np.float64]) -> None:
"""
Processes a single observation, updating both models and checking for a change point.
:param observation: new observation.
:return:
"""
self.__current_time += 1

first_smooth_score = self.__first_model.get_smoothed_score(observation)
self.__first_scores.append(first_smooth_score)
if first_smooth_score is None:
return

score_as_input = np.array([first_smooth_score])
second_smooth_score = self.__second_model.get_smoothed_score(score_as_input)
self.__second_scores.append(second_smooth_score)
if second_smooth_score is None:
return

if second_smooth_score >= self.__threshold:
self.__handle_change_point()

def __handle_change_point(self) -> None:
"""
Handles the change point detection. It registers the location and resets the models' state.
:return:
"""
self.__was_change_point = True
self.__change_point = self.__current_time

self.__first_model.clear()
self.__second_model.clear()

def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool:
"""
Method for a step of detection of a change point.
:param observation: new observation of a time series.
:return: bool observation whether a change point was detected after processing the new observation.
"""
if not isinstance(observation, np.ndarray):
observation = np.array([observation])

self.__process_point(observation)
result = self.__was_change_point
self.__was_change_point = False
return result

def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]:
"""
Method for a step of localization of a change point.
:param observation: new observation of a time series
:return: absolute location of a change point, acquired after processing the new observation,
or None if there wasn't any.
"""
if not isinstance(observation, np.ndarray):
observation = np.array([observation])

self.__process_point(observation)
result = self.__change_point
self.__was_change_point = False
self.__change_point = None
return result
96 changes: 96 additions & 0 deletions tests/test_core/test_algorithms/test_sdar_online_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import numpy as np
import pytest

import pysatl_cpd.generator.distributions as dstr
from pysatl_cpd.core.algorithms.sdar_online_algorithm import SDARAlgorithm


@pytest.fixture(scope="module")
def set_seed():
np.random.seed(1)


@pytest.fixture
def algorithm_factory():
def _factory():
return SDARAlgorithm(
order=2,
forgetting_factor=0.97,
smoothing_window_size=5,
threshold=2,
)

return _factory


@pytest.fixture(scope="function")
def data_params():
base_params = {
"num_of_tests": 10,
"size": 500,
"change_point": 250,
"tolerable_deviation": 25,
}
return base_params


@pytest.fixture()
def generate_data(data_params):
np.random.seed(1)
cp = data_params["change_point"]
size = data_params["size"]

left_distr = dstr.Distribution.from_str(str(dstr.Distributions.BETA), {"alpha": "0.5", "beta": "0.5"})
right_distr = dstr.Distribution.from_str(str(dstr.Distributions.NORMAL), {"mean": "1.0", "variance": "1.0"})
return np.concatenate([left_distr.scipy_sample(cp), right_distr.scipy_sample(size - cp)])


class TestSDARAlgorithm:
@pytest.fixture(autouse=True)
def setup(self, algorithm_factory):
self.algorithm_factory = algorithm_factory

def test_consecutive_detection(self, generate_data, data_params):
for _ in range(data_params["num_of_tests"]):
algorithm = self.algorithm_factory()
was_change_point = False
for value in generate_data:
result = algorithm.detect(value)
if result:
was_change_point = True

assert was_change_point, "There was undetected change point in data"

def test_correctness_of_consecutive_detection(self, generate_data, data_params):
outer_algorithm = self.algorithm_factory()
for _ in range(data_params["num_of_tests"]):
inner_algorithm = self.algorithm_factory()
outer_result = []
inner_result = []

for value in generate_data:
outer_result.append(outer_algorithm.detect(value))
inner_result.append(inner_algorithm.detect(value))

outer_algorithm.clear()
assert outer_result == inner_result, "Consecutive and independent detection should give same results"

def test_correctness_of_consecutive_localization(self, generate_data, data_params):
outer_algorithm = self.algorithm_factory()
for _ in range(data_params["num_of_tests"]):
inner_algorithm = self.algorithm_factory()

for value in generate_data:
outer_result = outer_algorithm.localize(value)
inner_result = inner_algorithm.localize(value)
assert outer_result == inner_result, "Consecutive and independent localization should give same results"

outer_algorithm.clear()

def test_online_localization_correctness(self, generate_data, data_params):
for _ in range(data_params["num_of_tests"]):
algorithm = self.algorithm_factory()
for time, value in np.ndenumerate(generate_data):
result = algorithm.localize(value)
if result:
assert result <= time[0] + 1, "Change point cannot be in future"