Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ Starting from version 2.6.1, releases are automatically created when changes are

**Note**: If a tag for the current version already exists, the workflow will skip tag and release creation to avoid duplicates.

### [2.8.1]

#### Updated

- Anomalib-orobix to v0.7.0.dev151 in order to make optimal threshold selection more robust with respect to floating point operations.

#### Fixed

- `normalize_anomaly_score` now accepts an optional `eval_threshold` (`EvalThreshold`) parameter. When provided, consistency enforcement uses the actual evaluation boundary instead of always using the training threshold at 100.0, preventing misclassification of samples whose raw score falls close to the evaluation thresholds.
- Consistency enforcement in anomaly score normalization now uses `np.nextafter`/`torch.nextafter` (dtype-aware) instead of hardcoded epsilon values, eliminating ULP-gap misclassifications especially at low-precision (fp16) boundaries.
- `AnomalibEvaluation` now builds an `EvalThreshold` from the optimal evaluation threshold and passes it to `normalize_anomaly_score`, ensuring consistent predictions between raw and normalized anomaly scores and anomaly maps.

### [2.8.0]

#### Added
Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "quadra"
version = "2.8.0"
version = "2.8.1"
description = "Deep Learning experiment orchestration library"
authors = [
"Federico Belotti <federico.belotti@orobix.com>",
Expand Down Expand Up @@ -73,7 +73,7 @@ h5py = "~3.8"
timm = "1.0.24"
segmentation_models_pytorch = "0.5.0"

anomalib-orobix = "0.7.0.dev150"
anomalib-orobix = "0.7.0.dev151"
xxhash = "~3.2"
torchinfo = "~1.8"
typing_extensions = { version = "4.11.0", python = "<3.10" }
Expand Down
2 changes: 1 addition & 1 deletion quadra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.8.0"
__version__ = "2.8.1"


def get_version():
Expand Down
13 changes: 10 additions & 3 deletions quadra/tasks/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from quadra.modules.base import ModelSignatureWrapper
from quadra.tasks.base import Evaluation, LightningTask
from quadra.utils import utils
from quadra.utils.anomaly import MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score
from quadra.utils.anomaly import EvalThreshold, MapOrValue, ThresholdNormalizationCallback, normalize_anomaly_score
from quadra.utils.classification import get_results
from quadra.utils.evaluation import automatic_datamodule_batch_size
from quadra.utils.export import export_model
Expand Down Expand Up @@ -504,7 +504,12 @@ def generate_report(self) -> None:
),
).item()

anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold)
# Build an EvalThreshold so that consistency enforcement in normalize_anomaly_score uses the
# actual evaluation boundary for checking the consistencies after normalization. This prevents
# potential inconsistent classification when switching between raw and normalized scores.
eval_threshold = EvalThreshold(raw=float(optimal_threshold), normalized=normalized_optimal_threshold)

anomaly_scores = normalize_anomaly_score(anomaly_scores, training_threshold, eval_threshold=eval_threshold)

if not isinstance(anomaly_scores, np.ndarray):
raise ValueError("Anomaly scores must be a numpy array")
Expand Down Expand Up @@ -543,7 +548,9 @@ def generate_report(self) -> None:
if hasattr(self.datamodule, "crop_area") and self.datamodule.crop_area is not None:
crop_area = self.datamodule.crop_area

anomaly_maps = normalize_anomaly_score(self.metadata["anomaly_maps"], training_threshold)
anomaly_maps = normalize_anomaly_score(
self.metadata["anomaly_maps"], training_threshold, eval_threshold=eval_threshold
)

if not isinstance(anomaly_maps, torch.Tensor):
raise ValueError("Anomaly maps must be a tensor")
Expand Down
134 changes: 107 additions & 27 deletions quadra/utils/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,144 @@
import pytorch_lightning as pl
import torch # pylint: disable=unused-import
from anomalib.models.components import AnomalyModule
from pydantic import BaseModel
from pytorch_lightning import Callback
from pytorch_lightning.utilities.types import STEP_OUTPUT

# https://github.com/python/cpython/issues/90015#issuecomment-1172996118
MapOrValue: TypeAlias = "float | torch.Tensor | np.ndarray"


def normalize_anomaly_score(raw_score: MapOrValue, threshold: float) -> MapOrValue:
"""Normalize anomaly score value or map based on threshold.
class EvalThreshold(BaseModel):
"""Pair of raw and normalized threshold values used for consistency enforcement.

Attributes:
raw: The unnormalized threshold.
normalized: The corresponding normalized threshold.
"""

raw: float
normalized: float


def ensure_scores_consistency(
normalized_score: MapOrValue,
raw_score: MapOrValue,
eval_threshold: EvalThreshold,
) -> MapOrValue:
"""Enforce that the classification based on normalized scores matches the raw classification.

For every sample, if `raw_score >= eval_threshold.raw` (anomaly), the normalized score is
clipped to be at least `eval_threshold.normalized`. If `raw_score < eval_threshold.raw`
(normal), the normalized score is clipped to be strictly below `eval_threshold.normalized`
using `np.nextafter` so that no hard-coded epsilon is required.

Args:
raw_score: Raw anomaly score valure or map
threshold: Threshold for anomaly detection
normalized_score: Normalized anomaly score value or map to adjust.
raw_score: Original (unnormalized) anomaly score used to determine the ground-truth
classification for each sample.
eval_threshold: Threshold pair defining the decision boundary in both spaces.

Returns:
Normalized anomaly score value or map clipped between 0 and 1000
Normalized score with consistent predictions.
"""
if threshold > 0:
normalized_score = (raw_score / threshold) * 100.0
elif threshold == 0:
# TODO: Is this the best way to handle this case?
normalized_score = (raw_score + 1) * 100.0
else:
normalized_score = 200.0 - ((raw_score / threshold) * 100.0)

# Ensures that the normalized scores are consistent with the raw scores
# For all the items whose prediction changes after normalization, force the normalized score to be
# consistent with the prediction made on the raw score by clipping the score:
# - to 100.0 if the prediction was "anomaly" on the raw score and "good" on the normalized score
# - to 99.99 if the prediction was "good" on the raw score and "anomaly" on the normalized score
score = raw_score
if isinstance(score, torch.Tensor):
score = score.cpu().numpy()
# Anomalib classify as anomaly if anomaly_score gte threshold
is_anomaly_mask = score >= threshold

boundary = eval_threshold.normalized
is_anomaly_mask = score >= eval_threshold.raw
is_not_anomaly_mask = np.bitwise_not(is_anomaly_mask)
Comment on lines 64 to 70
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure_scores_consistency always converts raw_score to a NumPy array and builds is_anomaly_mask/is_not_anomaly_mask as NumPy booleans. In the torch.Tensor branch those NumPy masks are then used to index a Torch tensor, which will break for CUDA tensors (and can also be problematic on CPU). Build the masks as Torch boolean tensors on the same device when normalized_score is a Torch tensor (e.g., compare raw_score in torch or convert the mask to torch.bool on normalized_score.device).

Copilot uses AI. Check for mistakes.

_inf: torch.Tensor | np.ndarray
below_boundary: torch.Tensor | np.ndarray
anomaly_boundary: torch.Tensor | np.ndarray
epsilon = 1e-3
if isinstance(normalized_score, torch.Tensor):
device = normalized_score.device
# Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect
_inf = torch.tensor(float("inf"), dtype=normalized_score.dtype, device=device)
boundary_tensor = torch.tensor(boundary, dtype=normalized_score.dtype, device=device)
anomaly_boundary = boundary_tensor.clone()
# If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float),
# increase it up to the next representable value
if float(anomaly_boundary) < boundary:
anomaly_boundary = torch.nextafter(anomaly_boundary, _inf)
# Ensure consistency after rouding to 3 decimal places
below_boundary = torch.min(torch.nextafter(boundary_tensor, -_inf), boundary_tensor - epsilon)

if normalized_score.dim() == 0:
normalized_score = (
normalized_score.clamp(min=100.0) if is_anomaly_mask else normalized_score.clamp(max=99.99)
normalized_score.clamp(min=anomaly_boundary)
if is_anomaly_mask
else normalized_score.clamp(max=below_boundary)
)
else:
normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=100.0)
normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=99.99)
normalized_score[is_anomaly_mask] = normalized_score[is_anomaly_mask].clamp(min=anomaly_boundary)
normalized_score[is_not_anomaly_mask] = normalized_score[is_not_anomaly_mask].clamp(max=below_boundary)
elif isinstance(normalized_score, np.ndarray) or np.isscalar(normalized_score):
# Work in scores dtype, cast boundaries to the same dype to ensure that casts take effect
dtype = normalized_score.dtype if isinstance(normalized_score, np.ndarray) else np.float64
_inf = np.array(np.inf, dtype=dtype)
boundary_array = np.array(boundary, dtype=dtype)
anomaly_boundary = boundary_array.copy()
# If dtype cast causes anomaly_boundary to be smaller than normalized boundary (float),
# increase it up to the next representable value
if float(anomaly_boundary) < boundary:
anomaly_boundary = np.nextafter(anomaly_boundary, _inf)
# Ensure consistency after rouding to 3 decimal places
below_boundary = np.minimum(np.nextafter(boundary_array, -_inf), boundary_array - epsilon)

if np.isscalar(normalized_score) or normalized_score.ndim == 0: # type: ignore[union-attr]
normalized_score = (
np.clip(normalized_score, a_min=100.0, a_max=None)
np.clip(normalized_score, a_min=anomaly_boundary, a_max=None)
if is_anomaly_mask
else np.clip(normalized_score, a_min=None, a_max=99.99)
else np.clip(normalized_score, a_min=None, a_max=below_boundary)
)
else:
normalized_score = cast(np.ndarray, normalized_score)
normalized_score[is_anomaly_mask] = np.clip(normalized_score[is_anomaly_mask], a_min=100.0, a_max=None)
normalized_score[is_anomaly_mask] = np.clip(
normalized_score[is_anomaly_mask], a_min=anomaly_boundary, a_max=None
)
normalized_score[is_not_anomaly_mask] = np.clip(
normalized_score[is_not_anomaly_mask], a_min=None, a_max=99.99
normalized_score[is_not_anomaly_mask], a_min=None, a_max=below_boundary
)

return normalized_score


def normalize_anomaly_score(
raw_score: MapOrValue,
threshold: float,
eval_threshold: EvalThreshold | None = None,
) -> MapOrValue:
"""Normalize anomaly score value or map based on threshold.

The training threshold maps to 100.0 in normalized space. After the linear scaling,
`ensure_scores_consistency` is called to guarantee that every sample's normalized
classification matches its raw classification.

Args:
raw_score: Raw anomaly score value or map.
threshold: Threshold for anomaly detection, usually it is the training threshold.
eval_threshold: Threshold used during evaluation. It is used for ensure consistency of raw scores
and normalized scores. When `None`, an `EvalThreshold` with `raw=threshold` and `normalized=100.0` is used,
which reproduces the original behaviour for the training-threshold case.

Returns:
Normalized anomaly score value or map clipped between 0 and 1000
"""
if threshold > 0:
normalized_score = (raw_score / threshold) * 100.0
elif threshold == 0:
# TODO: Is this the best way to handle this case?
normalized_score = (raw_score + 1) * 100.0
else:
normalized_score = 200.0 - ((raw_score / threshold) * 100.0)

_eval_threshold = eval_threshold if eval_threshold is not None else EvalThreshold(raw=threshold, normalized=100.0)
normalized_score = ensure_scores_consistency(normalized_score, raw_score, _eval_threshold)

if isinstance(normalized_score, torch.Tensor):
return torch.clamp(normalized_score, 0.0, 1000.0)

Expand Down
Loading