From 904a121e6f7ff11f2bf98fd88656e305da5e5f18 Mon Sep 17 00:00:00 2001 From: Michele Milesi Date: Thu, 19 Feb 2026 12:17:10 +0100 Subject: [PATCH 1/4] fix: anomaly scores normalization by ensuring classification with raw and normalized scores --- CHANGELOG.md | 8 + pyproject.toml | 2 +- quadra/__init__.py | 2 +- quadra/tasks/anomaly.py | 13 +- quadra/utils/anomaly.py | 132 +++++++++++--- tests/utilities/test_anomaly_utils.py | 249 +++++++++++++++++++++++++- 6 files changed, 372 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 09bdc9eb4..468c43a17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,14 @@ Starting from version 2.6.1, releases are automatically created when changes are **Note**: If a tag for the current version already exists, the workflow will skip tag and release creation to avoid duplicates. +### [2.8.1] + +#### Fixed + +- `normalize_anomaly_score` now accepts an optional `eval_threshold` (`EvalThreshold`) parameter. When provided, consistency enforcement uses the actual evaluation boundary instead of always using the training threshold at 100.0, preventing misclassification of samples whose raw score falls close to the evaluation thresholds. +- Consistency enforcement in anomaly score normalization now uses `np.nextafter`/`torch.nextafter` (dtype-aware) instead of hardcoded epsilon values (e.g. `99.99`), eliminating ULP-gap misclassifications especially at low-precision (fp16) boundaries. +- `AnomalibEvaluation` now builds an `EvalThreshold` from the optimal evaluation threshold and passes it to `normalize_anomaly_score`, ensuring consistent predictions between raw and normalized anomaly scores and anomaly maps. + ### [2.8.0] #### Added diff --git a/pyproject.toml b/pyproject.toml index 76453dc95..1bfbdd016 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "quadra" -version = "2.8.0" +version = "2.8.1" description = "Deep Learning experiment orchestration library" authors = [ "Federico Belotti ", diff --git a/quadra/__init__.py b/quadra/__init__.py index 0c308dfa4..9ab1e0c37 100644 --- a/quadra/__init__.py +++ b/quadra/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.8.0" +__version__ = "2.8.1" def get_version(): diff --git a/quadra/tasks/anomaly.py b/quadra/tasks/anomaly.py index 2b50a4157..1beafe4c2 100644 --- a/quadra/tasks/anomaly.py +++ b/quadra/tasks/anomaly.py @@ -26,7 +26,7 @@ from quadra.modules.base import ModelSignatureWrapper from quadra.tasks.base import Evaluation, LightningTask from quadra.utils import utils -from quadra.utils.anomaly import MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score +from quadra.utils.anomaly import EvalThreshold, MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score from quadra.utils.classification import get_results from quadra.utils.evaluation import automatic_datamodule_batch_size from quadra.utils.export import export_model @@ -504,7 +504,12 @@ def generate_report(self) -> None: ), ).item() - anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold) + # Build an EvalThreshold so that consistency enforcement in normalize_anomaly_score uses the + # actual evaluation boundary for checking the consistencies after normalization. This prevents + # potential inconsistent classification when switching between raw and normalized scores. + eval_threshold = EvalThreshold(raw=float(optimal_threshold), normalized=normalized_optimal_threshold) + + anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold, eval_threshold=eval_threshold) if not isinstance(anomaly_scores, np.ndarray): raise ValueError("Anomaly scores must be a numpy array") @@ -543,7 +548,9 @@ def generate_report(self) -> None: if hasattr(self.datamodule, "crop_area") and self.datamodule.crop_area is not None: crop_area = self.datamodule.crop_area - anomaly_maps = normalize_anomaly_score(self.metadata["anomaly_maps"], training_threshold) + anomaly_maps = normalize_anomaly_score( + self.metadata["anomaly_maps"], training_threshold, eval_threshold=eval_threshold + ) if not isinstance(anomaly_maps, torch.Tensor): raise ValueError("Anomaly maps must be a tensor") diff --git a/quadra/utils/anomaly.py b/quadra/utils/anomaly.py index 577ebdb5d..0d9641c10 100644 --- a/quadra/utils/anomaly.py +++ b/quadra/utils/anomaly.py @@ -20,6 +20,7 @@ import pytorch_lightning as pl import torch # pylint: disable=unused-import from anomalib.models.components import AnomalyModule +from pydantic import BaseModel, model_validator from pytorch_lightning import Callback from pytorch_lightning.utilities.types import STEP_OUTPUT @@ -27,57 +28,134 @@ MapOrValue: TypeAlias = "float | torch.Tensor | np.ndarray" -def normalize_anomaly_score(raw_score: MapOrValue, threshold: float) -> MapOrValue: - """Normalize anomaly score value or map based on threshold. +class EvalThreshold(BaseModel): + """Pair of raw and normalized threshold values used for consistency enforcement. + + Attributes: + raw: The threshold in the original (unnormalized) anomaly score space. + normalized: The corresponding threshold in the normalized score space + (i.e. `(raw / training_threshold) * 100`). + """ + + raw: float + normalized: float + + @model_validator(mode="after") + def check_positive(self) -> EvalThreshold: + """Validate that both threshold values are positive.""" + if self.raw <= 0: + raise ValueError("raw threshold must be positive") + if self.normalized <= 0: + raise ValueError("normalized threshold must be positive") + return self + + +def ensure_scores_consistency( + normalized_score: MapOrValue, + raw_score: MapOrValue, + eval_threshold: EvalThreshold, +) -> MapOrValue: + """Enforce that the classification based on normalized scores matches the raw classification. + + For every sample, if `raw_score >= eval_threshold.raw` (anomaly), the normalized score is + clipped to be at least `eval_threshold.normalized`. If `raw_score < eval_threshold.raw` + (normal), the normalized score is clipped to be strictly below `eval_threshold.normalized` + using `np.nextafter` so that no hard-coded epsilon is required. Args: - raw_score: Raw anomaly score valure or map - threshold: Threshold for anomaly detection + normalized_score: Normalized anomaly score value or map to adjust. + raw_score: Original (unnormalized) anomaly score used to determine the ground-truth + classification for each sample. + eval_threshold: Threshold pair defining the decision boundary in both spaces. Returns: - Normalized anomaly score value or map clipped between 0 and 1000 + Normalized score with consistent predictions. """ - if threshold > 0: - normalized_score = (raw_score / threshold) * 100.0 - elif threshold == 0: - # TODO: Is this the best way to handle this case? - normalized_score = (raw_score + 1) * 100.0 - else: - normalized_score = 200.0 - ((raw_score / threshold) * 100.0) - - # Ensures that the normalized scores are consistent with the raw scores - # For all the items whose prediction changes after normalization, force the normalized score to be - # consistent with the prediction made on the raw score by clipping the score: - # - to 100.0 if the prediction was "anomaly" on the raw score and "good" on the normalized score - # - to 99.99 if the prediction was "good" on the raw score and "anomaly" on the normalized score score = raw_score if isinstance(score, torch.Tensor): score = score.cpu().numpy() - # Anomalib classify as anomaly if anomaly_score gte threshold - is_anomaly_mask = score >= threshold + + boundary = eval_threshold.normalized + is_anomaly_mask = score >= eval_threshold.raw is_not_anomaly_mask = np.bitwise_not(is_anomaly_mask) + + below_boundary: torch.Tensor | np.ndarray + anomaly_boundary: torch.Tensor | np.ndarray if isinstance(normalized_score, torch.Tensor): + # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect + _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype) + anomaly_boundary = torch.tensor(boundary, dtype=normalized_score.dtype) + if float(anomaly_boundary) < boundary: + anomaly_boundary = torch.nextafter(anomaly_boundary, _inf) + below_boundary = torch.nextafter(torch.tensor(boundary, dtype=normalized_score.dtype), -_inf) + if normalized_score.dim() == 0: normalized_score = ( - normalized_score.clamp(min=100.0) if is_anomaly_mask else normalized_score.clamp(max=99.99) + normalized_score.clamp(min=anomaly_boundary) + if is_anomaly_mask + else normalized_score.clamp(max=below_boundary) ) else: - normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=100.0) - normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=99.99) + normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=anomaly_boundary) + normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=below_boundary) elif isinstance(normalized_score, np.ndarray) or np.isscalar(normalized_score): + # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect + dtype = normalized_score.dtype if isinstance(normalized_score, np.ndarray) else np.float64 + anomaly_boundary = np.array(boundary, dtype=dtype) + if float(anomaly_boundary) < boundary: + anomaly_boundary = np.nextafter(anomaly_boundary, np.array(np.inf, dtype=dtype)) + below_boundary = np.nextafter(np.array(boundary, dtype=dtype), np.array(-np.inf, dtype=dtype)) + if np.isscalar(normalized_score) or normalized_score.ndim == 0: # type: ignore[union-attr] normalized_score = ( - np.clip(normalized_score, a_min=100.0, a_max=None) + np.clip(normalized_score, a_min=anomaly_boundary, a_max=None) if is_anomaly_mask - else np.clip(normalized_score, a_min=None, a_max=99.99) + else np.clip(normalized_score, a_min=None, a_max=below_boundary) ) else: normalized_score = cast(np.ndarray, normalized_score) - normalized_score[is_anomaly_mask] = np.clip(normalized_score[is_anomaly_mask], a_min=100.0, a_max=None) + normalized_score[is_anomaly_mask] = np.clip( + normalized_score[is_anomaly_mask], a_min=anomaly_boundary, a_max=None + ) normalized_score[is_not_anomaly_mask] = np.clip( - normalized_score[is_not_anomaly_mask], a_min=None, a_max=99.99 + normalized_score[is_not_anomaly_mask], a_min=None, a_max=below_boundary ) + return normalized_score + + +def normalize_anomaly_score( + raw_score: MapOrValue, + threshold: float, + eval_threshold: EvalThreshold | None = None, +) -> MapOrValue: + """Normalize anomaly score value or map based on threshold. + + The training threshold maps to 100.0 in normalized space. After the linear scaling, + `ensure_scores_consistency` is called to guarantee that every sample's normalized + classification matches its raw classification. + + Args: + raw_score: Raw anomaly score value or map. + threshold: Threshold for anomaly detection, usually it is the training threshold. + eval_threshold: Threshold used during evaluation. It is used for ensure consistency of raw scores + and normalized scores. When `None`, an `EvalThreshold` with `raw=threshold` and `normalized=100.0` is used, + which reproduces the original behaviour for the training-threshold case. + + Returns: + Normalized anomaly score value or map clipped between 0 and 1000 + """ + if threshold > 0: + normalized_score = (raw_score / threshold) * 100.0 + elif threshold == 0: + # TODO: Is this the best way to handle this case? + normalized_score = (raw_score + 1) * 100.0 + else: + normalized_score = 200.0 - ((raw_score / threshold) * 100.0) + + _eval_threshold = eval_threshold if eval_threshold is not None else EvalThreshold(raw=threshold, normalized=100.0) + normalized_score = ensure_scores_consistency(normalized_score, raw_score, _eval_threshold) + if isinstance(normalized_score, torch.Tensor): return torch.clamp(normalized_score, 0.0, 1000.0) diff --git a/tests/utilities/test_anomaly_utils.py b/tests/utilities/test_anomaly_utils.py index 71fad31bb..4e503511b 100644 --- a/tests/utilities/test_anomaly_utils.py +++ b/tests/utilities/test_anomaly_utils.py @@ -1,7 +1,174 @@ +import numpy as np import pytest -from quadra.utils.anomaly import normalize_anomaly_score import torch -import numpy as np + +from quadra.utils.anomaly import EvalThreshold, ensure_scores_consistency, normalize_anomaly_score + + +class TestEvalThreshold: + def test_valid(self): + et = EvalThreshold(raw=10.0, normalized=100.0) + assert et.raw == 10.0 + assert et.normalized == 100.0 + + @pytest.mark.parametrize("raw", [0.0, -1.0]) + def test_invalid_raw(self, raw: float): + with pytest.raises(ValueError, match="raw threshold"): + EvalThreshold(raw=raw, normalized=100.0) + + @pytest.mark.parametrize("normalized", [0.0, -1.0]) + def test_invalid_normalized(self, normalized: float): + with pytest.raises(ValueError, match="normalized threshold"): + EvalThreshold(raw=10.0, normalized=normalized) + + +class TestEnsureScoresConsistency: + """The invariant: (result >= eval_threshold.normalized) == (raw_score >= eval_threshold.raw). + + All inputs are deliberately inconsistent (normalized on the *wrong* side + of the boundary) so that the function is forced to correct them. + """ + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + # IS anomaly, normalized placed one step BELOW boundary + (9.0, float(np.nextafter(np.float32(80.0), np.float32(-np.inf))), 8.0, 80.0, 1), + # IS anomaly, raw score exactly AT eval_raw (>= is inclusive) + (8.0, 79.9, 8.0, 80.0, 1), + # NOT anomaly, normalized placed exactly AT boundary (not strictly below) + (7.0, 80.0, 8.0, 80.0, 0), + # NOT anomaly, normalized placed well above boundary + (7.0, 95.0, 8.0, 80.0, 0), + ], + ) + def test_scalar_np_fp32(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + np.array(wrong_normalized, dtype=np.float32), + np.array(raw_score, dtype=np.float32), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + (9.0, 79.0, 8.0, 80.0, 1), + (7.0, 85.0, 8.0, 80.0, 0), + ], + ) + def test_scalar_np_fp16(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + np.array(wrong_normalized, dtype=np.float16), + np.array(raw_score, dtype=np.float16), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize("dtype", [np.float32, np.float16]) + def test_array_np_wrong_side(self, dtype): + """Every score is on the wrong side of the boundary so the function + must correct all of them.""" + eval_raw, eval_norm = 8.0, 80.0 + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + + raw_scores = np.array([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype) + # anomaly scores (8,9,12) placed BELOW boundary; non-anomaly (4,7) placed ABOVE + wrong_normalized = np.array([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype) + + result = ensure_scores_consistency(wrong_normalized.copy(), raw_scores, et) + + raw_preds = (raw_scores >= eval_raw).astype(int) + norm_preds = (result >= eval_norm).astype(int) + np.testing.assert_array_equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred", + [ + (9.0, float(np.nextafter(np.float32(80.0), np.float32(-np.inf))), 8.0, 80.0, 1), + (8.0, 79.9, 8.0, 80.0, 1), + (7.0, 80.0, 8.0, 80.0, 0), + (7.0, 95.0, 8.0, 80.0, 0), + ], + ) + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_scalar_torch(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred, dtype): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result = ensure_scores_consistency( + torch.tensor(wrong_normalized, dtype=dtype), + torch.tensor(raw_score, dtype=dtype), + et, + ) + assert int(result >= eval_norm) == expected_pred + + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_array_torch_wrong_side(self, dtype): + eval_raw, eval_norm = 8.0, 80.0 + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + + raw_scores = torch.tensor([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype) + wrong_normalized = torch.tensor([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype) + + result = ensure_scores_consistency(wrong_normalized.clone(), raw_scores, et) + + raw_preds = (raw_scores >= eval_raw).int() + norm_preds = (result >= eval_norm).int() + assert torch.equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "boundary", + [ + 80.0, # exactly representable in fp16 + 80.03, # rounds DOWN in fp16 (fp16(80.03) = 80.0 < 80.03) → ceiling needed + 99.995, # rounds DOWN in fp16 → ceiling needed + ], + ) + def test_fp16_np_anomaly_clipped_to_ceiling(self, boundary): + """IS anomaly score placed at fp16(boundary)-10 must be clipped to a value + that is still >= boundary (float64) after the ceiling rounding.""" + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = np.array(2.0, dtype=np.float16) + # Place normalized score well below boundary so clipping must fire + wrong_norm = np.array(float(np.float16(boundary)) - 10.0, dtype=np.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result >= boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_np_non_anomaly_clipped_below_boundary(self, boundary): + """NOT anomaly score placed well above boundary must be clipped to a value + that is strictly < boundary (float64).""" + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = np.array(0.5, dtype=np.float16) + wrong_norm = np.array(float(np.float16(boundary)) + 10.0, dtype=np.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result < boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_torch_anomaly_clipped_to_ceiling(self, boundary): + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = torch.tensor(2.0, dtype=torch.float16) + wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) - 10.0, dtype=torch.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result >= boundary + + @pytest.mark.parametrize( + "boundary", + [80.0, 80.03, 99.995], + ) + def test_fp16_torch_non_anomaly_clipped_below_boundary(self, boundary): + et = EvalThreshold(raw=2.0, normalized=boundary) + raw = torch.tensor(0.5, dtype=torch.float16) + wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) + 10.0, dtype=torch.float16) + result = ensure_scores_consistency(wrong_norm, raw, et) + assert result < boundary @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) @@ -64,3 +231,81 @@ def test_anomaly_score_normalization_np_fp16_with_dim(raw_score: float, threshol def test_anomaly_score_normalization_float(raw_score: float, threshold: float): normalized_score = normalize_anomaly_score(raw_score, threshold) np.testing.assert_allclose(normalized_score, raw_score / threshold * 100.0) + + +class TestNormalizeAnomalyScoreWithEvalThreshold: + """Verify (normalized >= eval_norm) == (raw >= eval_raw) for every sample.""" + + @pytest.mark.parametrize( + "scores, training, eval_raw, eval_norm", + [ + # eval < training: scores between eval and training boundaries are IS anomaly + # in raw space but cross the training boundary → would be misclassified without + # eval_threshold enforcing consistency at the right boundary + ([4.0, 7.0, 8.0, 9.5, 12.0], 10.0, 8.0, 80.0), + # eval > training: scores between training and eval boundaries cross the training + # boundary but are still NOT anomaly relative to the eval threshold + ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), + # eval == training: default path, kept for non-regression + ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + ], + ) + @pytest.mark.parametrize("dtype", [np.float32, np.float16]) + def test_consistency_np(self, scores, training, eval_raw, eval_norm, dtype): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + raw = np.array(scores, dtype=dtype) + result = normalize_anomaly_score(raw.copy(), training, eval_threshold=et) + + raw_preds = (raw >= eval_raw).astype(int) + norm_preds = (result >= eval_norm).astype(int) + np.testing.assert_array_equal(norm_preds, raw_preds) + + @pytest.mark.parametrize( + "scores, training, eval_raw, eval_norm", + [ + ([4.0, 7.0, 8.0, 9.5, 12.0], 10.0, 8.0, 80.0), + ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), + ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + ], + ) + @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) + def test_consistency_torch(self, scores, training, eval_raw, eval_norm, dtype): + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + raw = torch.tensor(scores, dtype=dtype) + result = normalize_anomaly_score(raw.clone(), training, eval_threshold=et) + + raw_preds = (raw >= eval_raw).long() + norm_preds = (result >= eval_norm).long() + assert torch.equal(norm_preds, raw_preds), ( + f"dtype={dtype}: raw_preds={raw_preds.tolist()}, norm_preds={norm_preds.tolist()}" + ) + + def test_regression_fp32_score_at_training_boundary(self): + """Regression for the ULP-gap bug. + + For a fp32 score at nextafter(training_threshold, -inf), fp32 arithmetic + normalises it to a value below the float64 eval_norm. Without eval_threshold + the prediction is therefore False (NOT anomaly) even though the raw score + IS anomaly relative to eval_raw. + + The assertion on `result_default` verifies the input actually creates an + inconsistency; if it does not, the test should be revised. + """ + training = 10.0 + # Largest fp32 value strictly below training threshold + eval_raw = float(np.nextafter(np.float32(training), np.float32(-np.inf))) + # eval_norm in float64 lands in the ULP gap above nextafter(fp32(100), -inf) + eval_norm = eval_raw / training * 100.0 + + score = np.array([eval_raw], dtype=np.float32) + + result_default = normalize_anomaly_score(score.copy(), training) + # Precondition: without eval_threshold the prediction IS wrong + assert result_default[0] < eval_norm, ( + "Test precondition failed: expected the default path to produce an inconsistent " + f"result ({result_default[0]:.10f} should be < eval_norm={eval_norm:.10f})" + ) + + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) + result_fix = normalize_anomaly_score(score.copy(), training, eval_threshold=et) + assert result_fix[0] >= eval_norm, f"Fix failed: {result_fix[0]:.10f} < eval_norm={eval_norm:.10f}" From 6bd600ed9ad7f3636b6e69a96892ee3b3aaa4a2e Mon Sep 17 00:00:00 2001 From: Michele Milesi Date: Thu, 19 Feb 2026 13:26:52 +0100 Subject: [PATCH 2/4] fix: device in ensuring predictions after normalization --- quadra/utils/anomaly.py | 23 +++----- tests/utilities/test_anomaly_utils.py | 76 +++++++++++++++++---------- 2 files changed, 56 insertions(+), 43 deletions(-) diff --git a/quadra/utils/anomaly.py b/quadra/utils/anomaly.py index 0d9641c10..e30f2e30f 100644 --- a/quadra/utils/anomaly.py +++ b/quadra/utils/anomaly.py @@ -20,7 +20,7 @@ import pytorch_lightning as pl import torch # pylint: disable=unused-import from anomalib.models.components import AnomalyModule -from pydantic import BaseModel, model_validator +from pydantic import BaseModel from pytorch_lightning import Callback from pytorch_lightning.utilities.types import STEP_OUTPUT @@ -32,23 +32,13 @@ class EvalThreshold(BaseModel): """Pair of raw and normalized threshold values used for consistency enforcement. Attributes: - raw: The threshold in the original (unnormalized) anomaly score space. - normalized: The corresponding threshold in the normalized score space - (i.e. `(raw / training_threshold) * 100`). + raw: The unnormalized threshold. + normalized: The corresponding normalized threshold. """ raw: float normalized: float - @model_validator(mode="after") - def check_positive(self) -> EvalThreshold: - """Validate that both threshold values are positive.""" - if self.raw <= 0: - raise ValueError("raw threshold must be positive") - if self.normalized <= 0: - raise ValueError("normalized threshold must be positive") - return self - def ensure_scores_consistency( normalized_score: MapOrValue, @@ -82,12 +72,13 @@ def ensure_scores_consistency( below_boundary: torch.Tensor | np.ndarray anomaly_boundary: torch.Tensor | np.ndarray if isinstance(normalized_score, torch.Tensor): + device = normalized_score.device # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect - _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype) - anomaly_boundary = torch.tensor(boundary, dtype=normalized_score.dtype) + _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype, device=device) + anomaly_boundary = torch.tensor(boundary, dtype=normalized_score.dtype, device=device) if float(anomaly_boundary) < boundary: anomaly_boundary = torch.nextafter(anomaly_boundary, _inf) - below_boundary = torch.nextafter(torch.tensor(boundary, dtype=normalized_score.dtype), -_inf) + below_boundary = torch.nextafter(torch.tensor(boundary, dtype=normalized_score.dtype, device=device), -_inf) if normalized_score.dim() == 0: normalized_score = ( diff --git a/tests/utilities/test_anomaly_utils.py b/tests/utilities/test_anomaly_utils.py index 4e503511b..315db1a71 100644 --- a/tests/utilities/test_anomaly_utils.py +++ b/tests/utilities/test_anomaly_utils.py @@ -3,6 +3,7 @@ import torch from quadra.utils.anomaly import EvalThreshold, ensure_scores_consistency, normalize_anomaly_score +from quadra.utils.tests.helpers import get_quadra_test_device class TestEvalThreshold: @@ -11,16 +12,6 @@ def test_valid(self): assert et.raw == 10.0 assert et.normalized == 100.0 - @pytest.mark.parametrize("raw", [0.0, -1.0]) - def test_invalid_raw(self, raw: float): - with pytest.raises(ValueError, match="raw threshold"): - EvalThreshold(raw=raw, normalized=100.0) - - @pytest.mark.parametrize("normalized", [0.0, -1.0]) - def test_invalid_normalized(self, normalized: float): - with pytest.raises(ValueError, match="normalized threshold"): - EvalThreshold(raw=10.0, normalized=normalized) - class TestEnsureScoresConsistency: """The invariant: (result >= eval_threshold.normalized) == (raw_score >= eval_threshold.raw). @@ -95,21 +86,25 @@ def test_array_np_wrong_side(self, dtype): ) @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) def test_scalar_torch(self, raw_score, wrong_normalized, eval_raw, eval_norm, expected_pred, dtype): + device = get_quadra_test_device() + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) result = ensure_scores_consistency( - torch.tensor(wrong_normalized, dtype=dtype), - torch.tensor(raw_score, dtype=dtype), + torch.tensor(wrong_normalized, dtype=dtype, device=device), + torch.tensor(raw_score, dtype=dtype, device=device), et, ) assert int(result >= eval_norm) == expected_pred @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) def test_array_torch_wrong_side(self, dtype): + device = get_quadra_test_device() + eval_raw, eval_norm = 8.0, 80.0 et = EvalThreshold(raw=eval_raw, normalized=eval_norm) - raw_scores = torch.tensor([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype) - wrong_normalized = torch.tensor([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype) + raw_scores = torch.tensor([4.0, 7.0, 8.0, 9.0, 12.0], dtype=dtype, device=device) + wrong_normalized = torch.tensor([85.0, 85.0, 75.0, 75.0, 75.0], dtype=dtype, device=device) result = ensure_scores_consistency(wrong_normalized.clone(), raw_scores, et) @@ -153,9 +148,12 @@ def test_fp16_np_non_anomaly_clipped_below_boundary(self, boundary): [80.0, 80.03, 99.995], ) def test_fp16_torch_anomaly_clipped_to_ceiling(self, boundary): + device = get_quadra_test_device() + et = EvalThreshold(raw=2.0, normalized=boundary) - raw = torch.tensor(2.0, dtype=torch.float16) + raw = torch.tensor(2.0, dtype=torch.float16, device=device) wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) - 10.0, dtype=torch.float16) + wrong_norm = wrong_norm.to(device) result = ensure_scores_consistency(wrong_norm, raw, et) assert result >= boundary @@ -164,39 +162,49 @@ def test_fp16_torch_anomaly_clipped_to_ceiling(self, boundary): [80.0, 80.03, 99.995], ) def test_fp16_torch_non_anomaly_clipped_below_boundary(self, boundary): + device = get_quadra_test_device() + et = EvalThreshold(raw=2.0, normalized=boundary) - raw = torch.tensor(0.5, dtype=torch.float16) + raw = torch.tensor(0.5, dtype=torch.float16, device=device) wrong_norm = torch.tensor(float(torch.tensor(boundary, dtype=torch.float16)) + 10.0, dtype=torch.float16) + wrong_norm = wrong_norm.to(device) result = ensure_scores_consistency(wrong_norm, raw, et) assert result < boundary @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch(raw_score: float, threshold: float): - score = torch.tensor(raw_score, dtype=torch.float32) + device = get_quadra_test_device() + + score = torch.tensor(raw_score, dtype=torch.float32, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_with_dim(raw_score: float, threshold: float): - score = torch.tensor([raw_score], dtype=torch.float32) + device = get_quadra_test_device() + score = torch.tensor([raw_score], dtype=torch.float32, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), np.array([raw_score], dtype=np.float32) / threshold * 100.0) + np.testing.assert_allclose( + normalized_score.cpu().numpy(), np.array([raw_score], dtype=np.float32) / threshold * 100.0 + ) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_fp16(raw_score: float, threshold: float): - score = torch.tensor(raw_score, dtype=torch.float16) + device = get_quadra_test_device() + score = torch.tensor(raw_score, dtype=torch.float16, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0, rtol=1e-3) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0, rtol=1e-3) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) def test_anomaly_score_normalization_torch_fp16_with_dim(raw_score: float, threshold: float): - score = torch.tensor([raw_score], dtype=torch.float16) + device = get_quadra_test_device() + score = torch.tensor([raw_score], dtype=torch.float16, device=device) normalized_score = normalize_anomaly_score(score, threshold) - np.testing.assert_allclose(normalized_score.numpy(), raw_score / threshold * 100.0, rtol=1e-3) + np.testing.assert_allclose(normalized_score.cpu().numpy(), raw_score / threshold * 100.0, rtol=1e-3) @pytest.mark.parametrize("raw_score, threshold", [(1.345, 1.24), (1.24, 1.345)]) @@ -248,6 +256,12 @@ class TestNormalizeAnomalyScoreWithEvalThreshold: ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), # eval == training: default path, kept for non-regression ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + # training threshold == 0, eval == training: normalized = (raw + 1) * 100 + ([-2.0, -0.5, 0.0, 1.0, 3.0], 0.0, 0.0, 100.0), + # training threshold == 0, eval > training: normalized = (raw + 1) * 100 + ([0.0, 0.2, 0.5, 1.0, 2.0], 0.0, 0.5, 150.0), + # training threshold == 0, eval < training: eval_raw is negative + ([-1.0, -0.5, 0.0, 0.5, 1.0], 0.0, -0.5, 50.0), ], ) @pytest.mark.parametrize("dtype", [np.float32, np.float16]) @@ -266,16 +280,24 @@ def test_consistency_np(self, scores, training, eval_raw, eval_norm, dtype): ([4.0, 7.0, 8.0, 9.5, 12.0], 10.0, 8.0, 80.0), ([8.0, 9.0, 10.0, 11.0, 13.0], 10.0, 12.0, 120.0), ([8.0, 9.0, 10.0, 11.0, 12.0], 10.0, 10.0, 100.0), + # training threshold == 0, eval == training: normalized = (raw + 1) * 100 + ([-2.0, -0.5, 0.0, 1.0, 3.0], 0.0, 0.0, 100.0), + # training threshold == 0, eval > training: normalized = (raw + 1) * 100 + ([0.0, 0.2, 0.5, 1.0, 2.0], 0.0, 0.5, 150.0), + # training threshold == 0, eval < training: eval_raw is negative + ([-1.0, -0.5, 0.0, 0.5, 1.0], 0.0, -0.5, 50.0), ], ) @pytest.mark.parametrize("dtype", [torch.float32, torch.float16]) def test_consistency_torch(self, scores, training, eval_raw, eval_norm, dtype): + device = get_quadra_test_device() + et = EvalThreshold(raw=eval_raw, normalized=eval_norm) - raw = torch.tensor(scores, dtype=dtype) + raw = torch.tensor(scores, dtype=dtype, device=device) result = normalize_anomaly_score(raw.clone(), training, eval_threshold=et) - raw_preds = (raw >= eval_raw).long() - norm_preds = (result >= eval_norm).long() + raw_preds = (raw >= eval_raw).int() + norm_preds = (result >= eval_norm).int() assert torch.equal(norm_preds, raw_preds), ( f"dtype={dtype}: raw_preds={raw_preds.tolist()}, norm_preds={norm_preds.tolist()}" ) From fcdbb44e5658cdf00dfa1d4fbe2b79d1e7c8b1d9 Mon Sep 17 00:00:00 2001 From: Michele Milesi Date: Fri, 20 Feb 2026 17:52:18 +0100 Subject: [PATCH 3/4] fix: upgrade anomalib --- poetry.lock | 8 ++++---- pyproject.toml | 2 +- quadra/utils/anomaly.py | 4 ++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 80370acfb..57e310289 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,14 +209,14 @@ files = [ [[package]] name = "anomalib-orobix" -version = "0.7.0.dev150" +version = "0.7.0.dev151" description = "Orobix anomalib fork" optional = false python-versions = "<3.11,>=3.10" groups = ["main"] files = [ - {file = "anomalib_orobix-0.7.0.dev150-py3-none-any.whl", hash = "sha256:bbf018c6cede939e8b48aea66c5d93a7eb0f21ac19de0d086fdcc9e55b35eda2"}, - {file = "anomalib_orobix-0.7.0.dev150.tar.gz", hash = "sha256:835f9930c5807d469083bf363f17dfb784d65bdc40feeca6029f034007e010ea"}, + {file = "anomalib_orobix-0.7.0.dev151-py3-none-any.whl", hash = "sha256:3b5cd037a153f5fbf7bcf8368724d15746a6e02f09b41b4682467338d51ae05c"}, + {file = "anomalib_orobix-0.7.0.dev151.tar.gz", hash = "sha256:d40d720dbccffbd9cef0b6d68c794fa4a8003a21684c2a78d0f0590d663b24de"}, ] [package.dependencies] @@ -7484,4 +7484,4 @@ onnx = ["onnx", "onnxconverter-common", "onnxruntime_gpu", "onnxsim"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.11" -content-hash = "c03f5c770d5bbb907356601d1d14154c9260acfbe24a0996acf4c018c79c8467" +content-hash = "4e23f2a0f94c1b789b568b9256186a1ccb1eadfcdd2afa4d61c4a2ea6877da9a" diff --git a/pyproject.toml b/pyproject.toml index 1bfbdd016..aba2a83a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,7 @@ h5py = "~3.8" timm = "1.0.24" segmentation_models_pytorch = "0.5.0" -anomalib-orobix = "0.7.0.dev150" +anomalib-orobix = "0.7.0.dev151" xxhash = "~3.2" torchinfo = "~1.8" typing_extensions = { version = "4.11.0", python = "<3.10" } diff --git a/quadra/utils/anomaly.py b/quadra/utils/anomaly.py index e30f2e30f..216cf92bd 100644 --- a/quadra/utils/anomaly.py +++ b/quadra/utils/anomaly.py @@ -76,6 +76,8 @@ def ensure_scores_consistency( # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype, device=device) anomaly_boundary = torch.tensor(boundary, dtype=normalized_score.dtype, device=device) + # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), + # increase it up to the next representable value if float(anomaly_boundary) < boundary: anomaly_boundary = torch.nextafter(anomaly_boundary, _inf) below_boundary = torch.nextafter(torch.tensor(boundary, dtype=normalized_score.dtype, device=device), -_inf) @@ -93,6 +95,8 @@ def ensure_scores_consistency( # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect dtype = normalized_score.dtype if isinstance(normalized_score, np.ndarray) else np.float64 anomaly_boundary = np.array(boundary, dtype=dtype) + # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), + # increase it up to the next representable value if float(anomaly_boundary) < boundary: anomaly_boundary = np.nextafter(anomaly_boundary, np.array(np.inf, dtype=dtype)) below_boundary = np.nextafter(np.array(boundary, dtype=dtype), np.array(-np.inf, dtype=dtype)) From 58b166fba306125f937f6e89153e3cab3fc4534b Mon Sep 17 00:00:00 2001 From: Michele Milesi Date: Mon, 23 Feb 2026 09:32:57 +0100 Subject: [PATCH 4/4] fix: ensure_scores_consistency now select minimum between nextafter and 1e-3 for avoiding inconsistencies after rounding --- CHANGELOG.md | 6 +++++- quadra/utils/anomaly.py | 17 ++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 468c43a17..ef4e1e8fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,14 @@ Starting from version 2.6.1, releases are automatically created when changes are ### [2.8.1] +#### Updated + +- Anomalib-orobix to v0.7.0.dev151 in order to make optimal threshold selection more robust with respect to floating point operations. + #### Fixed - `normalize_anomaly_score` now accepts an optional `eval_threshold` (`EvalThreshold`) parameter. When provided, consistency enforcement uses the actual evaluation boundary instead of always using the training threshold at 100.0, preventing misclassification of samples whose raw score falls close to the evaluation thresholds. -- Consistency enforcement in anomaly score normalization now uses `np.nextafter`/`torch.nextafter` (dtype-aware) instead of hardcoded epsilon values (e.g. `99.99`), eliminating ULP-gap misclassifications especially at low-precision (fp16) boundaries. +- Consistency enforcement in anomaly score normalization now uses `np.nextafter`/`torch.nextafter` (dtype-aware) instead of hardcoded epsilon values, eliminating ULP-gap misclassifications especially at low-precision (fp16) boundaries. - `AnomalibEvaluation` now builds an `EvalThreshold` from the optimal evaluation threshold and passes it to `normalize_anomaly_score`, ensuring consistent predictions between raw and normalized anomaly scores and anomaly maps. ### [2.8.0] diff --git a/quadra/utils/anomaly.py b/quadra/utils/anomaly.py index 216cf92bd..6f4bdabb9 100644 --- a/quadra/utils/anomaly.py +++ b/quadra/utils/anomaly.py @@ -69,18 +69,22 @@ def ensure_scores_consistency( is_anomaly_mask = score >= eval_threshold.raw is_not_anomaly_mask = np.bitwise_not(is_anomaly_mask) + _inf: torch.Tensor | np.ndarray below_boundary: torch.Tensor | np.ndarray anomaly_boundary: torch.Tensor | np.ndarray + epsilon = 1e-3 if isinstance(normalized_score, torch.Tensor): device = normalized_score.device # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect _inf = torch.tensor(float("inf"), dtype=normalized_score.dtype, device=device) - anomaly_boundary = torch.tensor(boundary, dtype=normalized_score.dtype, device=device) + boundary_tensor = torch.tensor(boundary, dtype=normalized_score.dtype, device=device) + anomaly_boundary = boundary_tensor.clone() # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), # increase it up to the next representable value if float(anomaly_boundary) < boundary: anomaly_boundary = torch.nextafter(anomaly_boundary, _inf) - below_boundary = torch.nextafter(torch.tensor(boundary, dtype=normalized_score.dtype, device=device), -_inf) + # Ensure consistency after rouding to 3 decimal places + below_boundary = torch.min(torch.nextafter(boundary_tensor, -_inf), boundary_tensor - epsilon) if normalized_score.dim() == 0: normalized_score = ( @@ -94,12 +98,15 @@ def ensure_scores_consistency( elif isinstance(normalized_score, np.ndarray) or np.isscalar(normalized_score): # Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect dtype = normalized_score.dtype if isinstance(normalized_score, np.ndarray) else np.float64 - anomaly_boundary = np.array(boundary, dtype=dtype) + _inf = np.array(np.inf, dtype=dtype) + boundary_array = np.array(boundary, dtype=dtype) + anomaly_boundary = boundary_array.copy() # If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float), # increase it up to the next representable value if float(anomaly_boundary) < boundary: - anomaly_boundary = np.nextafter(anomaly_boundary, np.array(np.inf, dtype=dtype)) - below_boundary = np.nextafter(np.array(boundary, dtype=dtype), np.array(-np.inf, dtype=dtype)) + anomaly_boundary = np.nextafter(anomaly_boundary, _inf) + # Ensure consistency after rouding to 3 decimal places + below_boundary = np.minimum(np.nextafter(boundary_array, -_inf), boundary_array - epsilon) if np.isscalar(normalized_score) or normalized_score.ndim == 0: # type: ignore[union-attr] normalized_score = (