diff --git a/CHANGELOG.md b/CHANGELOG.md index 09bdc9eb4..ef4e1e8fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,18 @@ Starting from version 2.6.1, releases are automatically created when changes are **Note**: If a tag for the current version already exists, the workflow will skip tag and release creation to avoid duplicates. +### [2.8.1] + +#### Updated + +- Anomalib-orobix to v0.7.0.dev151 in order to make optimal threshold selection more robust with respect to floating point operations. + +#### Fixed + +- `normalize_anomaly_score` now accepts an optional `eval_threshold` (`EvalThreshold`) parameter. When provided, consistency enforcement uses the actual evaluation boundary instead of always using the training threshold at 100.0, preventing misclassification of samples whose raw score falls close to the evaluation thresholds. +- Consistency enforcement in anomaly score normalization now uses `np.nextafter`/`torch.nextafter` (dtype-aware) instead of hardcoded epsilon values, eliminating ULP-gap misclassifications especially at low-precision (fp16) boundaries. +- `AnomalibEvaluation` now builds an `EvalThreshold` from the optimal evaluation threshold and passes it to `normalize_anomaly_score`, ensuring consistent predictions between raw and normalized anomaly scores and anomaly maps. + ### [2.8.0] #### Added diff --git a/poetry.lock b/poetry.lock index 80370acfb..57e310289 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,14 +209,14 @@ files = [ [[package]] name = "anomalib-orobix" -version = "0.7.0.dev150" +version = "0.7.0.dev151" description = "Orobix anomalib fork" optional = false python-versions = "<3.11,>=3.10" groups = ["main"] files = [ - {file = "anomalib_orobix-0.7.0.dev150-py3-none-any.whl", hash = "sha256:bbf018c6cede939e8b48aea66c5d93a7eb0f21ac19de0d086fdcc9e55b35eda2"}, - {file = "anomalib_orobix-0.7.0.dev150.tar.gz", hash = "sha256:835f9930c5807d469083bf363f17dfb784d65bdc40feeca6029f034007e010ea"}, + {file = "anomalib_orobix-0.7.0.dev151-py3-none-any.whl", hash = "sha256:3b5cd037a153f5fbf7bcf8368724d15746a6e02f09b41b4682467338d51ae05c"}, + {file = "anomalib_orobix-0.7.0.dev151.tar.gz", hash = "sha256:d40d720dbccffbd9cef0b6d68c794fa4a8003a21684c2a78d0f0590d663b24de"}, ] [package.dependencies] @@ -7484,4 +7484,4 @@ onnx = ["onnx", "onnxconverter-common", "onnxruntime_gpu", "onnxsim"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.11" -content-hash = "c03f5c770d5bbb907356601d1d14154c9260acfbe24a0996acf4c018c79c8467" +content-hash = "4e23f2a0f94c1b789b568b9256186a1ccb1eadfcdd2afa4d61c4a2ea6877da9a" diff --git a/pyproject.toml b/pyproject.toml index 76453dc95..aba2a83a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "quadra" -version = "2.8.0" +version = "2.8.1" description = "Deep Learning experiment orchestration library" authors = [ "Federico Belotti ", @@ -73,7 +73,7 @@ h5py = "~3.8" timm = "1.0.24" segmentation_models_pytorch = "0.5.0" -anomalib-orobix = "0.7.0.dev150" +anomalib-orobix = "0.7.0.dev151" xxhash = "~3.2" torchinfo = "~1.8" typing_extensions = { version = "4.11.0", python = "<3.10" } diff --git a/quadra/__init__.py b/quadra/__init__.py index 0c308dfa4..9ab1e0c37 100644 --- a/quadra/__init__.py +++ b/quadra/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.8.0" +__version__ = "2.8.1" def get_version(): diff --git a/quadra/tasks/anomaly.py b/quadra/tasks/anomaly.py index 2b50a4157..1beafe4c2 100644 --- a/quadra/tasks/anomaly.py +++ b/quadra/tasks/anomaly.py @@ -26,7 +26,7 @@ from quadra.modules.base import ModelSignatureWrapper from quadra.tasks.base import Evaluation, LightningTask from quadra.utils import utils -from quadra.utils.anomaly import MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score +from quadra.utils.anomaly import EvalThreshold, MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score from quadra.utils.classification import get_results from quadra.utils.evaluation import automatic_datamodule_batch_size from quadra.utils.export import export_model @@ -504,7 +504,12 @@ def generate_report(self) -> None: ), ).item() - anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold) + # Build an EvalThreshold so that consistency enforcement in normalize_anomaly_score uses the + # actual evaluation boundary for checking the consistencies after normalization. This prevents + # potential inconsistent classification when switching between raw and normalized scores. + eval_threshold = EvalThreshold(raw=float(optimal_threshold), normalized=normalized_optimal_threshold) + + anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold, eval_threshold=eval_threshold) if not isinstance(anomaly_scores, np.ndarray): raise ValueError("Anomaly scores must be a numpy array") @@ -543,7 +548,9 @@ def generate_report(self) -> None: if hasattr(self.datamodule, "crop_area") and self.datamodule.crop_area is not None: crop_area = self.datamodule.crop_area - anomaly_maps = normalize_anomaly_score(self.metadata["anomaly_maps"], training_threshold) + anomaly_maps = normalize_anomaly_score( + self.metadata["anomaly_maps"], training_threshold, eval_threshold=eval_threshold + ) if not isinstance(anomaly_maps, torch.Tensor): raise ValueError("Anomaly maps must be a tensor") diff --git a/quadra/utils/anomaly.py b/quadra/utils/anomaly.py index 577ebdb5d..6f4bdabb9 100644 --- a/quadra/utils/anomaly.py +++ b/quadra/utils/anomaly.py @@ -20,6 +20,7 @@ import pytorch_lightning as pl import torch # pylint: disable=unused-import from anomalib.models.components import AnomalyModule +from pydantic import BaseModel from pytorch_lightning import Callback from pytorch_lightning.utilities.types import STEP_OUTPUT @@ -27,57 +28,136 @@ MapOrValue: TypeAlias = "float | torch.Tensor | np.ndarray" -def normalize_anomaly_score(raw_score: MapOrValue, threshold: float) -> MapOrValue: - """Normalize anomaly score value or map based on threshold. +class EvalThreshold(BaseModel): + """Pair of raw and normalized threshold values used for consistency enforcement. + + Attributes: + raw: The unnormalized threshold. + normalized: The corresponding normalized threshold. + """ + + raw: float + normalized: float + + +def ensure_scores_consistency( + normalized_score: MapOrValue, + raw_score: MapOrValue, + eval_threshold: EvalThreshold, +) -> MapOrValue: + """Enforce that the classification based on normalized scores matches the raw classification. + + For every sample, if `raw_score >= eval_threshold.raw` (anomaly), the normalized score is + clipped to be at least `eval_threshold.normalized`. If `raw_score < eval_threshold.raw` + (normal), the normalized score is clipped to be strictly below `eval_threshold.normalized` + using `np.nextafter` so that no hard-coded epsilon is required. Args: - raw_score: Raw anomaly score valure or map - threshold: Threshold for anomaly detection + normalized_score: Normalized anomaly score value or map to adjust. + raw_score: Original (unnormalized) anomaly score used to determine the ground-truth + classification for each sample. + eval_threshold: Threshold pair defining the decision boundary in both spaces. Returns: - Normalized anomaly score value or map clipped between 0 and 1000 + Normalized score with consistent predictions. """ - if threshold > 0: - normalized_score = (raw_score / threshold) * 100.0 - elif threshold == 0: - # TODO: Is this the best way to handle this case? - normalized_score = (raw_score + 1) * 100.0 - else: - normalized_score = 200.0 - ((raw_score / threshold) * 100.0) - - # Ensures that the normalized scores are consistent with the raw scores - # For all the items whose prediction changes after normalization, force the normalized score to be - # consistent with the prediction made on the raw score by clipping the score: - # - to 100.0 if the prediction was "anomaly" on the raw score and "good" on the normalized score - # - to 99.99 if the prediction was "good" on the raw score and "anomaly" on the normalized score score = raw_score if isinstance(score, torch.Tensor): score = score.cpu().numpy() - # Anomalib classify as anomaly if anomaly_score gte threshold - is_anomaly_mask = score >= threshold + + boundary = eval_threshold.normalized + is_anomaly_mask = score >= eval_threshold.raw is_not_anomaly_mask = np.bitwise_not(is_anomaly_mask) + + _inf: torch.Tensor | np.ndarray + below_boundary: torch.Tensor | np.ndarray + anomaly_boundary: torch.Tensor | np.ndarray + epsilon = 1e-3 if isinstance(normalized_score, torch.Tensor): + device = normalized_score.device + # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect + _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype, device=device) + boundary_tensor = torch.tensor(boundary, dtype=normalized_score.dtype, device=device) + anomaly_boundary = boundary_tensor.clone() + # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), + # increase it up to the next representable value + if float(anomaly_boundary) < boundary: + anomaly_boundary = torch.nextafter(anomaly_boundary, _inf) + # Ensure consistency after rouding to 3 decimal places + below_boundary = torch.min(torch.nextafter(boundary_tensor, -_inf), boundary_tensor - epsilon) + if normalized_score.dim() == 0: normalized_score = ( - normalized_score.clamp(min=100.0) if is_anomaly_mask else normalized_score.clamp(max=99.99) + normalized_score.clamp(min=anomaly_boundary) + if is_anomaly_mask + else normalized_score.clamp(max=below_boundary) ) else: - normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=100.0) - normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=99.99) + normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=anomaly_boundary) + normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=below_boundary) elif isinstance(normalized_score, np.ndarray) or np.isscalar(normalized_score): + # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect + dtype = normalized_score.dtype if isinstance(normalized_score, np.ndarray) else np.float64 + _inf = np.array(np.inf, dtype=dtype) + boundary_array = np.array(boundary, dtype=dtype) + anomaly_boundary = boundary_array.copy() + # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), + # increase it up to the next representable value + if float(anomaly_boundary) < boundary: + anomaly_boundary = np.nextafter(anomaly_boundary, _inf) + # Ensure consistency after rouding to 3 decimal places + below_boundary = np.minimum(np.nextafter(boundary_array, -_inf), boundary_array - epsilon) + if np.isscalar(normalized_score) or normalized_score.ndim == 0: # type: ignore[union-attr] normalized_score = ( - np.clip(normalized_score, a_min=100.0, a_max=None) + np.clip(normalized_score, a_min=anomaly_boundary, a_max=None) if is_anomaly_mask - else np.clip(normalized_score, a_min=None, a_max=99.99) + else np.clip(normalized_score, a_min=None, a_max=below_boundary) ) else: normalized_score = cast(np.ndarray, normalized_score) - normalized_score[is_anomaly_mask] = np.clip(normalized_score[is_anomaly_mask], a_min=100.0, a_max=None) + normalized_score[is_anomaly_mask] = np.clip( + normalized_score[is_anomaly_mask], a_min=anomaly_boundary, a_max=None + ) normalized_score[is_not_anomaly_mask] = np.clip( - normalized_score[is_not_anomaly_mask], a_min=None, a_max=99.99 + normalized_score[is_not_anomaly_mask], a_min=None, a_max=below_boundary ) + return normalized_score + + +def normalize_anomaly_score( + raw_score: MapOrValue, + threshold: float, + eval_threshold: EvalThreshold | None = None, +) -> MapOrValue: + """Normalize anomaly score value or map based on threshold. + + The training threshold maps to 100.0 in normalized space. After the linear scaling, + `ensure_scores_consistency` is called to guarantee that every sample's normalized + classification matches its raw classification. + + Args: + raw_score: Raw anomaly score value or map. + threshold: Threshold for anomaly detection, usually it is the training threshold. + eval_threshold: Threshold used during evaluation. It is used for ensure consistency of raw scores + and normalized scores. When `None`, an `EvalThreshold` with `raw=threshold` and `normalized=100.0` is used, + which reproduces the original behaviour for the training-threshold case. + + Returns: + Normalized anomaly score value or map clipped between 0 and 1000 + """ + if threshold > 0: + normalized_score = (raw_score / threshold) * 100.0 + elif threshold == 0: + # TODO: Is this the best way to handle this case? + normalized_score = (raw_score + 1) * 100.0 + else: + normalized_score = 200.0 - ((raw_score / threshold) * 100.0) + + _eval_threshold = eval_threshold if eval_threshold is not None else EvalThreshold(raw=threshold, normalized=100.0) + normalized_score = ensure_scores_consistency(normalized_score, raw_score, _eval_threshold) + if isinstance(normalized_score, torch.Tensor): return torch.clamp(normalized_score, 0.0, 1000.0) diff --git a/tests/utilities/test_anomaly_utils.py b/tests/utilities/test_anomaly_utils.py index 71fad31bb..315db1a71 100644 --- a/tests/utilities/test_anomaly_utils.py +++ b/tests/utilities/test_anomaly_utils.py @@ -1,35 +1,210 @@ +import numpy as np import pytest -from quadra.utils.anomaly import normalize_anomaly_score import torch -import numpy as np + +from quadra.utils.anomaly import EvalThreshold, ensure_scores_consistency, normalize_anomaly_score +from quadra.utils.tests.helpers import get_quadra_test_device + + +class TestEvalThreshold: + def test_valid(self): + et = EvalThreshold(raw=10.0, normalized=100.0) + assert et.raw == 10.0 + assert et.normalized == 100.0 + + +class TestEnsureScoresConsistency: + """The invariant: (result >= eval_threshold.normalized) == (raw_score >= eval_threshold.raw). + + All inputs are deliberately inconsistent (normalized on the *wrong* side + of the boundary) so that the function is forced to correct them. + """ + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + # IS anomaly, normalized placed one step BELOW boundary + (9.0, float(np.nextafter(np.float32(80.0), np.float32(-np.inf))), 8.0, 80.0, 1), + # IS anomaly, raw score exactly AT eval_raw (>= is inclusive) + (8.0, 79.9, 8.0, 80.0, 1), + # NOT anomaly, normalized placed exactly AT boundary (not strictly below) + (7.0, 80.0, 8.0, 80.0, 0), + # NOT anomaly, normalized placed well above boundary + (7.0, 95.0, 8.0, 80.0, 0), + ], + ) + def test_scalar_np_fp32(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + np.array(wrong_normalized, dtype=np.float32), + np.array(raw_score, dtype=np.float32), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + (9.0, 79.0, 8.0, 80.0, 1), + (7.0, 85.0, 8.0, 80.0, 0), + ], + ) + def test_scalar_np_fp16(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + np.array(wrong_normalized, dtype=np.float16), + np.array(raw_score, dtype=np.float16), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize("dtype", [np.float32, np.float16]) + def test_array_np_wrong_side(self, dtype): + """Every score is on the wrong side of the boundary so the function + must correct all of them.""" + eval_raw, eval_norm = 8.0, 80.0 + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + + raw_scores = np.array([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype) + # anomaly scores (8,9,12) placed BELOW boundary; non-anomaly (4,7) placed ABOVE + wrong_normalized = np.array([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype) + + result = ensure_scores_consistency(wrong_normalized.copy(), raw_scores, et) + + raw_preds = (raw_scores >= eval_raw).astype(int) + norm_preds = (result >= eval_norm).astype(int) + np.testing.assert_array_equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + (9.0, float(np.nextafter(np.float32(80.0), np.float32(-np.inf))), 8.0, 80.0, 1), + (8.0, 79.9, 8.0, 80.0, 1), + (7.0, 80.0, 8.0, 80.0, 0), + (7.0, 95.0, 8.0, 80.0, 0), + ], + ) + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_scalar_torch(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred, dtype): + device = get_quadra_test_device() + + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + torch.tensor(wrong_normalized, dtype=dtype, device=device), + torch.tensor(raw_score, dtype=dtype, device=device), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_array_torch_wrong_side(self, dtype): + device = get_quadra_test_device() + + eval_raw, eval_norm = 8.0, 80.0 + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + + raw_scores = torch.tensor([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype, device=device) + wrong_normalized = torch.tensor([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype, device=device) + + result = ensure_scores_consistency(wrong_normalized.clone(), raw_scores, et) + + raw_preds = (raw_scores >= eval_raw).int() + norm_preds = (result >= eval_norm).int() + assert torch.equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "boundary", + [ + 80.0, # exactly representable in fp16 + 80.03, # rounds DOWN in fp16 (fp16(80.03) = 80.0 < 80.03) → ceiling needed + 99.995, # rounds DOWN in fp16 → ceiling needed + ], + ) + def test_fp16_np_anomaly_clipped_to_ceiling(self, boundary): + """IS anomaly score placed at fp16(boundary)-10 must be clipped to a value + that is still >= boundary (float64) after the ceiling rounding.""" + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = np.array(2.0, dtype=np.float16) + # Place normalized score well below boundary so clipping must fire + wrong_norm = np.array(float(np.float16(boundary)) - 10.0, dtype=np.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result >= boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_np_non_anomaly_clipped_below_boundary(self, boundary): + """NOT anomaly score placed well above boundary must be clipped to a value + that is strictly < boundary (float64).""" + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = np.array(0.5, dtype=np.float16) + wrong_norm = np.array(float(np.float16(boundary)) + 10.0, dtype=np.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result < boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_torch_anomaly_clipped_to_ceiling(self, boundary): + device = get_quadra_test_device() + + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = torch.tensor(2.0, dtype=torch.float16, device=device) + wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) - 10.0, dtype=torch.float16) + wrong_norm = wrong_norm.to(device) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result >= boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_torch_non_anomaly_clipped_below_boundary(self, boundary): + device = get_quadra_test_device() + + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = torch.tensor(0.5, dtype=torch.float16, device=device) + wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) + 10.0, dtype=torch.float16) + wrong_norm = wrong_norm.to(device) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result < boundary @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch(raw_score: float, threshold: float): - score = torch.tensor(raw_score, dtype=torch.float32) + device = get_quadra_test_device() + + score = torch.tensor(raw_score, dtype=torch.float32, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_with_dim(raw_score: float, threshold: float): - score = torch.tensor([raw_score], dtype=torch.float32) + device = get_quadra_test_device() + score = torch.tensor([raw_score], dtype=torch.float32, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), np.array([raw_score], dtype=np.float32) / threshold * 100.0) + np.testing.assert_allclose( + normalized_score.cpu().numpy(), np.array([raw_score], dtype=np.float32) / threshold * 100.0 + ) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_fp16(raw_score: float, threshold: float): - score = torch.tensor(raw_score, dtype=torch.float16) + device = get_quadra_test_device() + score = torch.tensor(raw_score, dtype=torch.float16, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0, rtol=1e-3) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0, rtol=1e-3) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_fp16_with_dim(raw_score: float, threshold: float): - score = torch.tensor([raw_score], dtype=torch.float16) + device = get_quadra_test_device() + score = torch.tensor([raw_score], dtype=torch.float16, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0, rtol=1e-3) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0, rtol=1e-3) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) @@ -64,3 +239,95 @@ def test_anomaly_score_normalization_np_fp16_with_dim(raw_score: float, threshol def test_anomaly_score_normalization_float(raw_score: float, threshold: float): normalized_score = normalize_anomaly_score(raw_score, threshold) np.testing.assert_allclose(normalized_score, raw_score / threshold * 100.0) + + +class TestNormalizeAnomalyScoreWithEvalThreshold: + """Verify (normalized >= eval_norm) == (raw >= eval_raw) for every sample.""" + + @pytest.mark.parametrize( + "scores, training, eval_raw, eval_norm", + [ + # eval < training: scores between eval and training boundaries are IS anomaly + # in raw space but cross the training boundary → would be misclassified without + # eval_threshold enforcing consistency at the right boundary + ([4.0, 7.0, 8.0, 9.5, 12.0], 10.0, 8.0, 80.0), + # eval > training: scores between training and eval boundaries cross the training + # boundary but are still NOT anomaly relative to the eval threshold + ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), + # eval == training: default path, kept for non-regression + ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + # training threshold == 0, eval == training: normalized = (raw + 1) * 100 + ([-2.0, -0.5, 0.0, 1.0, 3.0], 0.0, 0.0, 100.0), + # training threshold == 0, eval > training: normalized = (raw + 1) * 100 + ([0.0, 0.2, 0.5, 1.0, 2.0], 0.0, 0.5, 150.0), + # training threshold == 0, eval < training: eval_raw is negative + ([-1.0, -0.5, 0.0, 0.5, 1.0], 0.0, -0.5, 50.0), + ], + ) + @pytest.mark.parametrize("dtype", [np.float32, np.float16]) + def test_consistency_np(self, scores, training, eval_raw, eval_norm, dtype): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + raw = np.array(scores, dtype=dtype) + result = normalize_anomaly_score(raw.copy(), training, eval_threshold=et) + + raw_preds = (raw >= eval_raw).astype(int) + norm_preds = (result >= eval_norm).astype(int) + np.testing.assert_array_equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "scores, training, eval_raw, eval_norm", + [ + ([4.0, 7.0, 8.0, 9.5, 12.0], 10.0, 8.0, 80.0), + ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), + ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + # training threshold == 0, eval == training: normalized = (raw + 1) * 100 + ([-2.0, -0.5, 0.0, 1.0, 3.0], 0.0, 0.0, 100.0), + # training threshold == 0, eval > training: normalized = (raw + 1) * 100 + ([0.0, 0.2, 0.5, 1.0, 2.0], 0.0, 0.5, 150.0), + # training threshold == 0, eval < training: eval_raw is negative + ([-1.0, -0.5, 0.0, 0.5, 1.0], 0.0, -0.5, 50.0), + ], + ) + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_consistency_torch(self, scores, training, eval_raw, eval_norm, dtype): + device = get_quadra_test_device() + + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + raw = torch.tensor(scores, dtype=dtype, device=device) + result = normalize_anomaly_score(raw.clone(), training, eval_threshold=et) + + raw_preds = (raw >= eval_raw).int() + norm_preds = (result >= eval_norm).int() + assert torch.equal(norm_preds, raw_preds), ( + f"dtype={dtype}: raw_preds={raw_preds.tolist()}, norm_preds={norm_preds.tolist()}" + ) + + def test_regression_fp32_score_at_training_boundary(self): + """Regression for the ULP-gap bug. + + For a fp32 score at nextafter(training_threshold, -inf), fp32 arithmetic + normalises it to a value below the float64 eval_norm. Without eval_threshold + the prediction is therefore False (NOT anomaly) even though the raw score + IS anomaly relative to eval_raw. + + The assertion on `result_default` verifies the input actually creates an + inconsistency; if it does not, the test should be revised. + """ + training = 10.0 + # Largest fp32 value strictly below training threshold + eval_raw = float(np.nextafter(np.float32(training), np.float32(-np.inf))) + # eval_norm in float64 lands in the ULP gap above nextafter(fp32(100), -inf) + eval_norm = eval_raw / training * 100.0 + + score = np.array([eval_raw], dtype=np.float32) + + result_default = normalize_anomaly_score(score.copy(), training) + # Precondition: without eval_threshold the prediction IS wrong + assert result_default[0] < eval_norm, ( + "Test precondition failed: expected the default path to produce an inconsistent " + f"result ({result_default[0]:.10f} should be < eval_norm={eval_norm:.10f})" + ) + + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result_fix = normalize_anomaly_score(score.copy(), training, eval_threshold=et) + assert result_fix[0] >= eval_norm, f"Fix failed: {result_fix[0]:.10f} < eval_norm={eval_norm:.10f}"