diff --git a/python-package/insightface/app/__init__.py b/python-package/insightface/app/__init__.py index 2a0e49202..7e6bf97a9 100644 --- a/python-package/insightface/app/__init__.py +++ b/python-package/insightface/app/__init__.py @@ -1,2 +1,6 @@ from .face_analysis import * from .mask_renderer import * +from .common import Face +from .visualizer import FaceVisualizer, draw_faces + +__all__ = ['FaceAnalysis', 'Face', 'FaceVisualizer', 'draw_faces'] diff --git a/python-package/insightface/app/common.py b/python-package/insightface/app/common.py index 82ca987ae..276690fed 100644 --- a/python-package/insightface/app/common.py +++ b/python-package/insightface/app/common.py @@ -1,49 +1,108 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np from numpy.linalg import norm as l2norm -#from easydict import EasyDict -class Face(dict): - def __init__(self, d=None, **kwargs): +class Face: + _ATTR_DEFS: Dict[str, Tuple[type, Any]] = { + 'bbox': (np.ndarray, None), + 'kps': (np.ndarray, None), + 'det_score': (float, None), + 'embedding': (np.ndarray, None), + 'gender': (int, None), + 'age': (int, None), + 'pose': (np.ndarray, None), + } + + def __init__(self, d: Optional[Dict[str, Any]] = None, **kwargs: Any) -> None: + self._data: Dict[str, Any] = {} if d is None: d = {} if kwargs: d.update(**kwargs) for k, v in d.items(): - setattr(self, k, v) - # Class attributes - #for k in self.__class__.__dict__.keys(): - # if not (k.startswith('__') and k.endswith('__')) and not k in ('update', 'pop'): - # setattr(self, k, getattr(self, k)) - - def __setattr__(self, name, value): - if isinstance(value, (list, tuple)): - value = [self.__class__(x) - if isinstance(x, dict) else x for x in value] - elif isinstance(value, dict) and not isinstance(value, self.__class__): - value = self.__class__(value) - super(Face, self).__setattr__(name, value) - super(Face, self).__setitem__(name, value) - - __setitem__ = __setattr__ - - def __getattr__(self, name): - return None + self._data[k] = v + + def __setattr__(self, name: str, value: Any) -> None: + if name == '_data': + super().__setattr__(name, value) + else: + self._data[name] = value + + def __getattr__(self, name: str) -> Any: + if name == '_data': + return super().__getattribute__(name) + return self._data.get(name) + + def __setitem__(self, name: str, value: Any) -> None: + self._data[name] = value + + def __getitem__(self, name: str) -> Any: + return self._data.get(name) + + def __contains__(self, name: str) -> bool: + return name in self._data + + def keys(self) -> List[str]: + return list(self._data.keys()) + + def items(self) -> List[Tuple[str, Any]]: + return list(self._data.items()) + + def get(self, name: str, default: Any = None) -> Any: + return self._data.get(name, default) + + def to_dict(self) -> Dict[str, Any]: + return dict(self._data) @property - def embedding_norm(self): + def embedding_norm(self) -> Optional[float]: if self.embedding is None: return None - return l2norm(self.embedding) + return float(l2norm(self.embedding)) - @property - def normed_embedding(self): + @property + def normed_embedding(self) -> Optional[np.ndarray]: if self.embedding is None: return None - return self.embedding / self.embedding_norm + norm_val = self.embedding_norm + if norm_val is None or norm_val == 0: + return None + return self.embedding / norm_val - @property - def sex(self): + @property + def sex(self) -> Optional[str]: if self.gender is None: return None - return 'M' if self.gender==1 else 'F' + return 'M' if self.gender == 1 else 'F' + + def get_bbox(self) -> Optional[np.ndarray]: + bbox = self._data.get('bbox') + if bbox is not None and len(bbox) >= 4: + return bbox[:4] + return None + + def get_kps(self) -> Optional[np.ndarray]: + return self._data.get('kps') + + def get_det_score(self) -> Optional[float]: + return self._data.get('det_score') + + def get_embedding(self) -> Optional[np.ndarray]: + return self._data.get('embedding') + + def get_gender_age(self) -> Tuple[Optional[int], Optional[int]]: + return self._data.get('gender'), self._data.get('age') + + def __repr__(self) -> str: + attrs = [] + for k in ['bbox', 'det_score', 'kps', 'embedding', 'gender', 'age', 'pose']: + v = self._data.get(k) + if v is not None: + if isinstance(v, np.ndarray): + attrs.append(f'{k}=array{v.shape}') + else: + attrs.append(f'{k}={v}') + return f"Face({', '.join(attrs)})" diff --git a/python-package/insightface/app/face_analysis.py b/python-package/insightface/app/face_analysis.py index a9112b14a..77bb2613b 100644 --- a/python-package/insightface/app/face_analysis.py +++ b/python-package/insightface/app/face_analysis.py @@ -4,106 +4,434 @@ # @Time : 2021-05-04 # @Function : - -from __future__ import division +from __future__ import annotations import glob import os.path as osp +from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np +import onnx import onnxruntime -from numpy.linalg import norm from ..model_zoo import model_zoo from ..utils import DEFAULT_MP_NAME, ensure_available from .common import Face +from .visualizer import draw_faces __all__ = ['FaceAnalysis'] + +def _safe_slice_bbox(bboxes: np.ndarray, index: int) -> Optional[np.ndarray]: + """Safely extract bounding box from detection result. + + Args: + bboxes: Detection result array with shape (N, M) where M >= 5. + index: Index of the detection to extract. + + Returns: + Bounding box array [x1, y1, x2, y2] or None if extraction fails. + """ + if bboxes is None or bboxes.ndim != 2: + return None + if index < 0 or index >= bboxes.shape[0]: + return None + if bboxes.shape[1] < 4: + return None + return bboxes[index, :4].copy() + + +def _safe_slice_score(bboxes: np.ndarray, index: int) -> float: + """Safely extract detection score from detection result. + + Args: + bboxes: Detection result array with shape (N, M) where M >= 5. + index: Index of the detection to extract. + + Returns: + Detection score or 0.0 if extraction fails. + """ + if bboxes is None or bboxes.ndim != 2: + return 0.0 + if index < 0 or index >= bboxes.shape[0]: + return 0.0 + if bboxes.shape[1] < 5: + return 0.0 + return float(bboxes[index, 4]) + + +def _safe_slice_kps(kpss: Optional[np.ndarray], index: int) -> Optional[np.ndarray]: + """Safely extract keypoints from detection result. + + Args: + kpss: Keypoints array with shape (N, K, 2) or None. + index: Index of the detection to extract. + + Returns: + Keypoints array with shape (K, 2) or None if extraction fails. + """ + if kpss is None: + return None + if kpss.ndim != 3: + return None + if index < 0 or index >= kpss.shape[0]: + return None + return kpss[index].copy() + + class FaceAnalysis: - def __init__(self, name=DEFAULT_MP_NAME, root='~/.insightface', allowed_modules=None, **kwargs): + """Face Analysis Pipeline. + + This class provides a unified interface for face detection, alignment, + and feature extraction using multiple ONNX models. + + Attributes: + models: Dictionary mapping task names to model instances. + det_model: The detection model instance. + model_dir: Directory containing the ONNX model files. + + Example: + >>> app = FaceAnalysis(name='buffalo_l') + >>> app.prepare(ctx_id=0, det_size=(640, 640)) + >>> faces = app.get(img) + >>> for face in faces: + ... print(face.bbox, face.det_score) + """ + + _MODEL_SIGNATURES = { + 'detection': lambda inputs, outputs: len(outputs) >= 5, + 'landmark_2d_106': lambda inputs, outputs: ( + len(inputs) > 0 and + len(inputs[0].shape) >= 4 and + inputs[0].shape[2] == 192 and + inputs[0].shape[3] == 192 + ), + 'genderage': lambda inputs, outputs: ( + len(inputs) > 0 and + len(inputs[0].shape) >= 4 and + inputs[0].shape[2] == 96 and + inputs[0].shape[3] == 96 + ), + 'inswapper': lambda inputs, outputs: ( + len(inputs) == 2 and + len(inputs[0].shape) >= 4 and + inputs[0].shape[2] == 128 and + inputs[0].shape[3] == 128 + ), + 'recognition': lambda inputs, outputs: ( + len(inputs) > 0 and + len(inputs[0].shape) >= 4 and + inputs[0].shape[2] == inputs[0].shape[3] and + inputs[0].shape[2] >= 112 and + inputs[0].shape[2] % 16 == 0 + ), + } + + def __init__( + self, + name: str = DEFAULT_MP_NAME, + root: str = '~/.insightface', + allowed_modules: Optional[List[str]] = None, + **kwargs: Any, + ) -> None: + """Initialize FaceAnalysis. + + Args: + name: Name of the model pack to load. Defaults to 'buffalo_l'. + root: Root directory for model storage. Defaults to '~/.insightface'. + allowed_modules: List of module names to load. If None, loads all + available modules. Common values: ['detection', 'recognition', + 'landmark_2d_106', 'genderage']. + **kwargs: Additional arguments passed to model initialization. + """ onnxruntime.set_default_logger_severity(3) - self.models = {} + self.models: Dict[str, Any] = {} + self._model_paths: Dict[str, str] = {} + self._loaded_models: Set[str] = set() + self._allowed_modules: Optional[Set[str]] = None + self._init_kwargs = kwargs + + if allowed_modules is not None: + self._allowed_modules = set(allowed_modules) + self.model_dir = ensure_available('models', name, root=root) + self._scan_model_files() + + assert 'detection' in self._model_paths, \ + "Detection model is required but not found" + self.det_model = None + + def _scan_model_files(self) -> None: + """Scan and register model files without loading them. + + Uses ONNX metadata to identify model types, avoiding full model loading. + """ onnx_files = glob.glob(osp.join(self.model_dir, '*.onnx')) onnx_files = sorted(onnx_files) + for onnx_file in onnx_files: - model = model_zoo.get_model(onnx_file, **kwargs) - if model is None: + task_name = self._identify_model_task_fast(onnx_file) + if task_name is None: print('model not recognized:', onnx_file) - elif allowed_modules is not None and model.taskname not in allowed_modules: - print('model ignore:', onnx_file, model.taskname) - del model - elif model.taskname not in self.models and (allowed_modules is None or model.taskname in allowed_modules): - print('find model:', onnx_file, model.taskname, model.input_shape, model.input_mean, model.input_std) - self.models[model.taskname] = model - else: - print('duplicated model task type, ignore:', onnx_file, model.taskname) - del model - assert 'detection' in self.models - self.det_model = self.models['detection'] - - - def prepare(self, ctx_id, det_thresh=0.5, det_size=(640, 640)): + continue + + if self._allowed_modules is not None and task_name not in self._allowed_modules: + print('model ignore:', onnx_file, task_name) + continue + + if task_name in self._model_paths: + print('duplicated model task type, ignore:', onnx_file, task_name) + continue + + print('find model:', onnx_file, task_name) + self._model_paths[task_name] = onnx_file + + def _identify_model_task_fast(self, onnx_file: str) -> Optional[str]: + """Identify model type by reading ONNX metadata only (no weight loading). + + This method reads only the model graph structure without loading weights, + making it much faster than creating an InferenceSession. + + Args: + onnx_file: Path to the ONNX model file. + + Returns: + Task name string or None if unrecognized. + """ + try: + model = onnx.load(onnx_file, load_external_data=False) + graph = model.graph + + inputs = list(graph.input) + outputs = list(graph.output) + + input_shapes = [] + for inp in inputs: + shape = [] + for dim in inp.type.tensor_type.shape.dim: + if dim.dim_value > 0: + shape.append(dim.dim_value) + else: + shape.append(-1) + input_shapes.append(shape) + + output_count = len(outputs) + + if self._MODEL_SIGNATURES['detection'](input_shapes, [None] * output_count): + return 'detection' + + if len(input_shapes) > 0 and len(input_shapes[0]) >= 4: + h, w = input_shapes[0][2], input_shapes[0][3] + + if h == 192 and w == 192: + return 'landmark_2d_106' + + if h == 96 and w == 96: + return 'genderage' + + if len(input_shapes) == 2 and h == 128 and w == 128: + return 'inswapper' + + if h == w and h >= 112 and h % 16 == 0: + return 'recognition' + + return None + + except Exception as e: + print(f'Error identifying model {onnx_file}: {e}') + return None + + def _load_model(self, task_name: str) -> Any: + """Lazily load a model by task name. + + Args: + task_name: Name of the task to load. + + Returns: + Model instance or None if not found. + """ + if task_name in self._loaded_models: + return self.models.get(task_name) + + if task_name not in self._model_paths: + return None + + onnx_file = self._model_paths[task_name] + model = model_zoo.get_model(onnx_file, **self._init_kwargs) + + if model is not None: + self.models[task_name] = model + self._loaded_models.add(task_name) + print(f'loaded model: {onnx_file}, task: {task_name}') + + return model + + def _ensure_det_model(self) -> Any: + """Ensure detection model is loaded.""" + if self.det_model is None: + self.det_model = self._load_model('detection') + return self.det_model + + def prepare( + self, + ctx_id: int, + det_thresh: float = 0.5, + det_size: Tuple[int, int] = (640, 640), + ) -> None: + """Prepare models for inference. + + This method initializes the detection model with specified parameters. + Other models are loaded lazily when needed. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + det_thresh: Detection threshold for face detection. Defaults to 0.5. + det_size: Input size for detection model. Defaults to (640, 640). + """ self.det_thresh = det_thresh assert det_size is not None print('set det-size:', det_size) self.det_size = det_size - for taskname, model in self.models.items(): - if taskname=='detection': - model.prepare(ctx_id, input_size=det_size, det_thresh=det_thresh) - else: - model.prepare(ctx_id) - - def get(self, img, max_num=0, det_metric='default'): - bboxes, kpss = self.det_model.detect(img, - max_num=max_num, - metric=det_metric) - if bboxes.shape[0] == 0: + + self._ensure_det_model() + if self.det_model is not None: + self.det_model.prepare(ctx_id, input_size=det_size, det_thresh=det_thresh) + + def get( + self, + img: np.ndarray, + max_num: int = 0, + det_metric: str = 'default', + ) -> List[Face]: + """Detect and analyze faces in an image. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) or RGB + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + max_num: Maximum number of faces to detect. 0 means no limit. + det_metric: Detection metric for face selection. + - 'default': Prefer faces closer to image center + - 'max': Select largest faces + + Returns: + List of Face objects containing detection results and attributes. + Each Face object may contain: + - bbox: np.ndarray, shape (4,), bounding box [x1, y1, x2, y2] + - kps: np.ndarray, shape (5, 2), 5 facial keypoints + - det_score: float, detection confidence score + - embedding: np.ndarray, face embedding vector (if recognition model loaded) + - gender: int, 0 for female, 1 for male (if genderage model loaded) + - age: int, estimated age (if genderage model loaded) + """ + det_model = self._ensure_det_model() + if det_model is None: + return [] + + bboxes, kpss = det_model.detect(img, max_num=max_num, metric=det_metric) + + if bboxes is None or bboxes.shape[0] == 0: return [] - ret = [] - for i in range(bboxes.shape[0]): - bbox = bboxes[i, 0:4] - det_score = bboxes[i, 4] - kps = None - if kpss is not None: - kps = kpss[i] + + num_faces = bboxes.shape[0] + ret: List[Face] = [] + + for i in range(num_faces): + bbox = _safe_slice_bbox(bboxes, i) + det_score = _safe_slice_score(bboxes, i) + kps = _safe_slice_kps(kpss, i) + + if bbox is None: + continue + face = Face(bbox=bbox, kps=kps, det_score=det_score) - for taskname, model in self.models.items(): - if taskname=='detection': + + for task_name in self._model_paths: + if task_name == 'detection': continue - model.get(img, face) + if self._allowed_modules is not None and task_name not in self._allowed_modules: + continue + + model = self._load_model(task_name) + if model is not None: + try: + model.get(img, face) + except Exception as e: + print(f'Error running {task_name} model: {e}') + ret.append(face) + return ret - - def draw_on(self, img, faces): - import cv2 - dimg = img.copy() - for i in range(len(faces)): - face = faces[i] - box = face.bbox.astype(int) - color = (0, 0, 255) - cv2.rectangle(dimg, (box[0], box[1]), (box[2], box[3]), color, 2) - if face.kps is not None: - kps = face.kps.astype(int) - #print(landmark.shape) - for l in range(kps.shape[0]): - color = (0, 0, 255) - if l == 0 or l == 3: - color = (0, 255, 0) - cv2.circle(dimg, (kps[l][0], kps[l][1]), 1, color, - 2) - if face.gender is not None and face.age is not None: - cv2.putText(dimg,'%s,%d'%(face.sex,face.age), (box[0]-1, box[1]-4),cv2.FONT_HERSHEY_COMPLEX,0.7,(0,255,0),1) - - #for key, value in face.items(): - # if key.startswith('landmark_3d'): - # print(key, value.shape) - # print(value[0:10,:]) - # lmk = np.round(value).astype(int) - # for l in range(lmk.shape[0]): - # color = (255, 0, 0) - # cv2.circle(dimg, (lmk[l][0], lmk[l][1]), 1, color, - # 2) - return dimg - + + def draw_on( + self, + img: np.ndarray, + faces: List[Face], + ) -> np.ndarray: + """Draw face analysis results on image. + + Args: + img: Input image as numpy array (BGR format, uint8, 0-255). + faces: List of Face objects to visualize. + + Returns: + Image with drawn bounding boxes, keypoints, and attributes. + """ + return draw_faces(img, faces) + + def get_model(self, task_name: str) -> Optional[Any]: + """Get a loaded model by task name. + + Args: + task_name: Name of the task (e.g., 'detection', 'recognition'). + + Returns: + Model instance if loaded, None otherwise. + """ + return self.models.get(task_name) + + def load_model(self, task_name: str) -> Optional[Any]: + """Explicitly load a model by task name. + + Args: + task_name: Name of the task to load. + + Returns: + Loaded model instance. + """ + return self._load_model(task_name) + + def unload_model(self, task_name: str) -> bool: + """Unload a model to free memory. + + Args: + task_name: Name of the task to unload. + + Returns: + True if model was unloaded, False if not found. + """ + if task_name in self.models: + del self.models[task_name] + self._loaded_models.discard(task_name) + if task_name == 'detection': + self.det_model = None + return True + return False + + def list_available_models(self) -> List[str]: + """List all available model task names. + + Returns: + List of task names for available models. + """ + return list(self._model_paths.keys()) + + def list_loaded_models(self) -> List[str]: + """List currently loaded model task names. + + Returns: + List of task names for loaded models. + """ + return list(self._loaded_models) diff --git a/python-package/insightface/app/visualizer.py b/python-package/insightface/app/visualizer.py new file mode 100644 index 000000000..91bb675de --- /dev/null +++ b/python-package/insightface/app/visualizer.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +"""Face visualization utilities. + +This module provides visualization functions for face analysis results, +decoupled from the core business logic. +""" + +from __future__ import annotations + +from typing import List, Optional, Tuple, Union +import numpy as np + +from .common import Face + + +class FaceVisualizer: + """Visualizer for face analysis results. + + This class provides methods to draw bounding boxes, keypoints, + and other face attributes on images. + """ + + COLOR_RED: Tuple[int, int, int] = (0, 0, 255) + COLOR_GREEN: Tuple[int, int, int] = (0, 255, 0) + COLOR_BLUE: Tuple[int, int, int] = (255, 0, 0) + COLOR_WHITE: Tuple[int, int, int] = (255, 255, 255) + + def __init__( + self, + box_color: Tuple[int, int, int] = COLOR_RED, + kps_color: Tuple[int, int, int] = COLOR_RED, + kps_highlight_color: Tuple[int, int, int] = COLOR_GREEN, + text_color: Tuple[int, int, int] = COLOR_GREEN, + box_thickness: int = 2, + kps_radius: int = 2, + text_scale: float = 0.7, + text_thickness: int = 1, + ) -> None: + self.box_color = box_color + self.kps_color = kps_color + self.kps_highlight_color = kps_highlight_color + self.text_color = text_color + self.box_thickness = box_thickness + self.kps_radius = kps_radius + self.text_scale = text_scale + self.text_thickness = text_thickness + self._cv2 = None + + def _get_cv2(self): + if self._cv2 is None: + import cv2 + self._cv2 = cv2 + return self._cv2 + + def draw_faces( + self, + img: np.ndarray, + faces: List[Face], + draw_box: bool = True, + draw_kps: bool = True, + draw_gender_age: bool = True, + ) -> np.ndarray: + cv2 = self._get_cv2() + result = img.copy() + + for face in faces: + if draw_box: + self._draw_bbox(result, face) + if draw_kps: + self._draw_kps(result, face) + if draw_gender_age: + self._draw_gender_age(result, face) + + return result + + def _draw_bbox(self, img: np.ndarray, face: Face) -> None: + cv2 = self._get_cv2() + bbox = face.get_bbox() + if bbox is None: + return + box = bbox.astype(int) + if len(box) >= 4: + cv2.rectangle( + img, + (box[0], box[1]), + (box[2], box[3]), + self.box_color, + self.box_thickness, + ) + + def _draw_kps(self, img: np.ndarray, face: Face) -> None: + cv2 = self._get_cv2() + kps = face.get_kps() + if kps is None: + return + kps_int = kps.astype(int) + for idx, kp in enumerate(kps_int): + color = self.kps_color + if idx == 0 or idx == 3: + color = self.kps_highlight_color + cv2.circle( + img, + (kp[0], kp[1]), + self.kps_radius, + color, + self.kps_radius, + ) + + def _draw_gender_age(self, img: np.ndarray, face: Face) -> None: + cv2 = self._get_cv2() + gender, age = face.get_gender_age() + if gender is None or age is None: + return + bbox = face.get_bbox() + if bbox is None: + return + box = bbox.astype(int) + sex = 'M' if gender == 1 else 'F' + text = f'{sex},{age}' + cv2.putText( + img, + text, + (box[0] - 1, box[1] - 4), + cv2.FONT_HERSHEY_COMPLEX, + self.text_scale, + self.text_color, + self.text_thickness, + ) + + +def draw_faces( + img: np.ndarray, + faces: List[Face], + box_color: Tuple[int, int, int] = (0, 0, 255), + kps_color: Tuple[int, int, int] = (0, 0, 255), + kps_highlight_color: Tuple[int, int, int] = (0, 255, 0), + text_color: Tuple[int, int, int] = (0, 255, 0), + draw_box: bool = True, + draw_kps: bool = True, + draw_gender_age: bool = True, +) -> np.ndarray: + visualizer = FaceVisualizer( + box_color=box_color, + kps_color=kps_color, + kps_highlight_color=kps_highlight_color, + text_color=text_color, + ) + return visualizer.draw_faces( + img, + faces, + draw_box=draw_box, + draw_kps=draw_kps, + draw_gender_age=draw_gender_age, + ) diff --git a/python-package/insightface/model_zoo/__init__.py b/python-package/insightface/model_zoo/__init__.py index 225623d61..551e151e9 100644 --- a/python-package/insightface/model_zoo/__init__.py +++ b/python-package/insightface/model_zoo/__init__.py @@ -1,6 +1,21 @@ -from .model_zoo import get_model +from __future__ import annotations + +from .model_zoo import get_model, PickableInferenceSession, ModelRouter from .arcface_onnx import ArcFaceONNX from .retinaface import RetinaFace from .scrfd import SCRFD from .landmark import Landmark from .attribute import Attribute +from .inswapper import INSwapper + +__all__ = [ + 'get_model', + 'PickableInferenceSession', + 'ModelRouter', + 'ArcFaceONNX', + 'RetinaFace', + 'SCRFD', + 'Landmark', + 'Attribute', + 'INSwapper', +] diff --git a/python-package/insightface/model_zoo/arcface_onnx.py b/python-package/insightface/model_zoo/arcface_onnx.py index b537ce2ee..ad86bab8e 100644 --- a/python-package/insightface/model_zoo/arcface_onnx.py +++ b/python-package/insightface/model_zoo/arcface_onnx.py @@ -4,89 +4,176 @@ # @Time : 2021-05-04 # @Function : -from __future__ import division +from __future__ import annotations + +from typing import Any, List, Optional, Tuple, Union + import numpy as np import cv2 import onnx import onnxruntime + from ..utils import face_align -__all__ = [ - 'ArcFaceONNX', -] +__all__ = ['ArcFaceONNX'] class ArcFaceONNX: - def __init__(self, model_file=None, session=None): + """ONNX-based ArcFace face recognition model. + + This class provides face embedding extraction using ArcFace models. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + taskname: Task identifier ('recognition'). + input_size: Expected input size (width, height). + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the ArcFace model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ assert model_file is not None self.model_file = model_file self.session = session self.taskname = 'recognition' + find_sub = False find_mul = False model = onnx.load(self.model_file) graph = model.graph for nid, node in enumerate(graph.node[:8]): - #print(nid, node.name) if node.name.startswith('Sub') or node.name.startswith('_minus'): find_sub = True if node.name.startswith('Mul') or node.name.startswith('_mul'): find_mul = True + if find_sub and find_mul: - #mxnet arcface model input_mean = 0.0 input_std = 1.0 else: input_mean = 127.5 input_std = 127.5 + self.input_mean = input_mean self.input_std = input_std - #print('input mean and std:', self.input_mean, self.input_std) + if self.session is None: self.session = onnxruntime.InferenceSession(self.model_file, None) + input_cfg = self.session.get_inputs()[0] input_shape = input_cfg.shape input_name = input_cfg.name self.input_size = tuple(input_shape[2:4][::-1]) self.input_shape = input_shape + outputs = self.session.get_outputs() - output_names = [] - for out in outputs: - output_names.append(out.name) + output_names = [out.name for out in outputs] self.input_name = input_name self.output_names = output_names - assert len(self.output_names)==1 + assert len(self.output_names) == 1 self.output_shape = outputs[0].shape - def prepare(self, ctx_id, **kwargs): - if ctx_id<0: + def prepare(self, ctx_id: int, **kwargs: Any) -> None: + """Prepare the model for inference. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + **kwargs: Additional arguments (unused). + """ + if ctx_id < 0: self.session.set_providers(['CPUExecutionProvider']) - def get(self, img, face): + def get( + self, + img: np.ndarray, + face: Any, + ) -> np.ndarray: + """Extract face embedding from image. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + face: Face object containing keypoints for alignment. + + Returns: + Flattened embedding vector. + """ aimg = face_align.norm_crop(img, landmark=face.kps, image_size=self.input_size[0]) face.embedding = self.get_feat(aimg).flatten() return face.embedding - def compute_sim(self, feat1, feat2): + def compute_sim( + self, + feat1: np.ndarray, + feat2: np.ndarray, + ) -> float: + """Compute cosine similarity between two feature vectors. + + Args: + feat1: First feature vector. + feat2: Second feature vector. + + Returns: + Cosine similarity score in range [-1, 1]. + """ from numpy.linalg import norm feat1 = feat1.ravel() feat2 = feat2.ravel() sim = np.dot(feat1, feat2) / (norm(feat1) * norm(feat2)) - return sim + return float(sim) - def get_feat(self, imgs): + def get_feat( + self, + imgs: Union[np.ndarray, List[np.ndarray]], + ) -> np.ndarray: + """Extract features from preprocessed images. + + Args: + imgs: Single image or list of images. + - Format: BGR + - Dtype: uint8 + - Range: 0-255 + + Returns: + Feature vectors with shape (N, embedding_dim). + """ if not isinstance(imgs, list): imgs = [imgs] input_size = self.input_size - blob = cv2.dnn.blobFromImages(imgs, 1.0 / self.input_std, input_size, - (self.input_mean, self.input_mean, self.input_mean), swapRB=True) + blob = cv2.dnn.blobFromImages( + imgs, + 1.0 / self.input_std, + input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) net_out = self.session.run(self.output_names, {self.input_name: blob})[0] return net_out - def forward(self, batch_data): + def forward(self, batch_data: np.ndarray) -> np.ndarray: + """Run forward pass on pre-normalized batch data. + + Args: + batch_data: Pre-normalized input tensor with shape (N, C, H, W). + + Returns: + Feature vectors. + """ blob = (batch_data - self.input_mean) / self.input_std net_out = self.session.run(self.output_names, {self.input_name: blob})[0] return net_out - - diff --git a/python-package/insightface/model_zoo/attribute.py b/python-package/insightface/model_zoo/attribute.py index 40c34de3f..ecb206685 100644 --- a/python-package/insightface/model_zoo/attribute.py +++ b/python-package/insightface/model_zoo/attribute.py @@ -4,91 +4,148 @@ # @Time : 2021-06-19 # @Function : -from __future__ import division +from __future__ import annotations + +from typing import Any, List, Optional, Tuple, Union + import numpy as np import cv2 import onnx import onnxruntime + from ..utils import face_align -__all__ = [ - 'Attribute', -] +__all__ = ['Attribute'] class Attribute: - def __init__(self, model_file=None, session=None): + """ONNX-based face attribute detection model. + + This class provides gender and age prediction for detected faces. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + taskname: Task identifier ('genderage' or 'attribute_N'). + input_size: Expected input size (width, height). + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the Attribute model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ assert model_file is not None self.model_file = model_file self.session = session + find_sub = False find_mul = False model = onnx.load(self.model_file) graph = model.graph for nid, node in enumerate(graph.node[:8]): - #print(nid, node.name) if node.name.startswith('Sub') or node.name.startswith('_minus'): find_sub = True if node.name.startswith('Mul') or node.name.startswith('_mul'): find_mul = True - if nid<3 and node.name=='bn_data': + if nid < 3 and node.name == 'bn_data': find_sub = True find_mul = True + if find_sub and find_mul: - #mxnet arcface model input_mean = 0.0 input_std = 1.0 else: input_mean = 127.5 input_std = 128.0 + self.input_mean = input_mean self.input_std = input_std - #print('input mean and std:', model_file, self.input_mean, self.input_std) + if self.session is None: self.session = onnxruntime.InferenceSession(self.model_file, None) + input_cfg = self.session.get_inputs()[0] input_shape = input_cfg.shape input_name = input_cfg.name self.input_size = tuple(input_shape[2:4][::-1]) self.input_shape = input_shape + outputs = self.session.get_outputs() - output_names = [] - for out in outputs: - output_names.append(out.name) + output_names = [out.name for out in outputs] self.input_name = input_name self.output_names = output_names - assert len(self.output_names)==1 + assert len(self.output_names) == 1 + output_shape = outputs[0].shape - #print('init output_shape:', output_shape) - if output_shape[1]==3: + if output_shape[1] == 3: self.taskname = 'genderage' else: - self.taskname = 'attribute_%d'%output_shape[1] + self.taskname = 'attribute_%d' % output_shape[1] - def prepare(self, ctx_id, **kwargs): - if ctx_id<0: + def prepare(self, ctx_id: int, **kwargs: Any) -> None: + """Prepare the model for inference. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + **kwargs: Additional arguments (unused). + """ + if ctx_id < 0: self.session.set_providers(['CPUExecutionProvider']) - def get(self, img, face): + def get( + self, + img: np.ndarray, + face: Any, + ) -> Union[Tuple[int, int], np.ndarray]: + """Predict gender and age for a face. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + face: Face object containing bounding box. + + Returns: + For genderage task: Tuple of (gender, age) where + - gender: 0 for female, 1 for male + - age: estimated age in years + For other tasks: Raw prediction array. + """ bbox = face.bbox w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1]) center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2 rotate = 0 - _scale = self.input_size[0] / (max(w, h)*1.5) - #print('param:', img.shape, bbox, center, self.input_size, _scale, rotate) + _scale = self.input_size[0] / (max(w, h) * 1.5) + aimg, M = face_align.transform(img, center, self.input_size[0], _scale, rotate) input_size = tuple(aimg.shape[0:2][::-1]) - #assert input_size==self.input_size - blob = cv2.dnn.blobFromImage(aimg, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) - pred = self.session.run(self.output_names, {self.input_name : blob})[0][0] - if self.taskname=='genderage': - assert len(pred)==3 - gender = np.argmax(pred[:2]) - age = int(np.round(pred[2]*100)) + + blob = cv2.dnn.blobFromImage( + aimg, + 1.0 / self.input_std, + input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) + pred = self.session.run(self.output_names, {self.input_name: blob})[0][0] + + if self.taskname == 'genderage': + assert len(pred) == 3 + gender = int(np.argmax(pred[:2])) + age = int(np.round(pred[2] * 100)) face['gender'] = gender face['age'] = age return gender, age else: return pred - - diff --git a/python-package/insightface/model_zoo/inswapper.py b/python-package/insightface/model_zoo/inswapper.py index dcaceb1f2..1013f0e0b 100644 --- a/python-package/insightface/model_zoo/inswapper.py +++ b/python-package/insightface/model_zoo/inswapper.py @@ -1,16 +1,47 @@ -import time +# -*- coding: utf-8 -*- +"""ONNX-based InsightFace face swapping model.""" + +from __future__ import annotations + +from typing import Any, List, Optional, Tuple, Union + import numpy as np import onnxruntime import cv2 import onnx from onnx import numpy_helper -from ..utils import face_align +from ..utils import face_align +__all__ = ['INSwapper'] -class INSwapper(): - def __init__(self, model_file=None, session=None): +class INSwapper: + """ONNX-based face swapping model. + + This class provides face swapping functionality using InsightFace models. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + emap: Embedding mapping matrix. + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + input_size: Expected input size (width, height). + input_shape: Full input shape. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the INSwapper model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ self.model_file = model_file self.session = session model = onnx.load(self.model_file) @@ -18,19 +49,17 @@ def __init__(self, model_file=None, session=None): self.emap = numpy_helper.to_array(graph.initializer[-1]) self.input_mean = 0.0 self.input_std = 255.0 - #print('input mean and std:', model_file, self.input_mean, self.input_std) + if self.session is None: self.session = onnxruntime.InferenceSession(self.model_file, None) + inputs = self.session.get_inputs() - self.input_names = [] - for inp in inputs: - self.input_names.append(inp.name) + self.input_names = [inp.name for inp in inputs] + outputs = self.session.get_outputs() - output_names = [] - for out in outputs: - output_names.append(out.name) - self.output_names = output_names - assert len(self.output_names)==1 + self.output_names = [out.name for out in outputs] + assert len(self.output_names) == 1 + output_shape = outputs[0].shape input_cfg = inputs[0] input_shape = input_cfg.shape @@ -38,68 +67,123 @@ def __init__(self, model_file=None, session=None): print('inswapper-shape:', self.input_shape) self.input_size = tuple(input_shape[2:4][::-1]) - def forward(self, img, latent): + def forward( + self, + img: np.ndarray, + latent: np.ndarray, + ) -> np.ndarray: + """Run forward pass on image with latent vector. + + Args: + img: Input image tensor (normalized). + latent: Latent embedding vector. + + Returns: + Model prediction output. + """ img = (img - self.input_mean) / self.input_std - pred = self.session.run(self.output_names, {self.input_names[0]: img, self.input_names[1]: latent})[0] + pred = self.session.run( + self.output_names, + {self.input_names[0]: img, self.input_names[1]: latent} + )[0] return pred - def get(self, img, target_face, source_face, paste_back=True): + def get( + self, + img: np.ndarray, + target_face: Any, + source_face: Any, + paste_back: bool = True, + ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: + """Perform face swap from source face to target face. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + target_face: Face object for target face (where to swap). + source_face: Face object for source face (what to swap). + paste_back: Whether to paste result back to original image. + + Returns: + If paste_back is True: Swapped image with face blended. + If paste_back is False: Tuple of (swapped_face_crop, transform_matrix). + """ aimg, M = face_align.norm_crop2(img, target_face.kps, self.input_size[0]) - blob = cv2.dnn.blobFromImage(aimg, 1.0 / self.input_std, self.input_size, - (self.input_mean, self.input_mean, self.input_mean), swapRB=True) - latent = source_face.normed_embedding.reshape((1,-1)) + blob = cv2.dnn.blobFromImage( + aimg, + 1.0 / self.input_std, + self.input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) + latent = source_face.normed_embedding.reshape((1, -1)) latent = np.dot(latent, self.emap) latent /= np.linalg.norm(latent) - pred = self.session.run(self.output_names, {self.input_names[0]: blob, self.input_names[1]: latent})[0] - #print(latent.shape, latent.dtype, pred.shape) - img_fake = pred.transpose((0,2,3,1))[0] - bgr_fake = np.clip(255 * img_fake, 0, 255).astype(np.uint8)[:,:,::-1] + pred = self.session.run( + self.output_names, + {self.input_names[0]: blob, self.input_names[1]: latent} + )[0] + + img_fake = pred.transpose((0, 2, 3, 1))[0] + bgr_fake = np.clip(255 * img_fake, 0, 255).astype(np.uint8)[:, :, ::-1] + if not paste_back: return bgr_fake, M - else: - target_img = img - fake_diff = bgr_fake.astype(np.float32) - aimg.astype(np.float32) - fake_diff = np.abs(fake_diff).mean(axis=2) - fake_diff[:2,:] = 0 - fake_diff[-2:,:] = 0 - fake_diff[:,:2] = 0 - fake_diff[:,-2:] = 0 - IM = cv2.invertAffineTransform(M) - img_white = np.full((aimg.shape[0],aimg.shape[1]), 255, dtype=np.float32) - bgr_fake = cv2.warpAffine(bgr_fake, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) - img_white = cv2.warpAffine(img_white, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) - fake_diff = cv2.warpAffine(fake_diff, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) - img_white[img_white>20] = 255 - fthresh = 10 - fake_diff[fake_diff=fthresh] = 255 - img_mask = img_white - mask_h_inds, mask_w_inds = np.where(img_mask==255) - mask_h = np.max(mask_h_inds) - np.min(mask_h_inds) - mask_w = np.max(mask_w_inds) - np.min(mask_w_inds) - mask_size = int(np.sqrt(mask_h*mask_w)) - k = max(mask_size//10, 10) - #k = max(mask_size//20, 6) - #k = 6 - kernel = np.ones((k,k),np.uint8) - img_mask = cv2.erode(img_mask,kernel,iterations = 1) - kernel = np.ones((2,2),np.uint8) - fake_diff = cv2.dilate(fake_diff,kernel,iterations = 1) - k = max(mask_size//20, 5) - #k = 3 - #k = 3 - kernel_size = (k, k) - blur_size = tuple(2*i+1 for i in kernel_size) - img_mask = cv2.GaussianBlur(img_mask, blur_size, 0) - k = 5 - kernel_size = (k, k) - blur_size = tuple(2*i+1 for i in kernel_size) - fake_diff = cv2.GaussianBlur(fake_diff, blur_size, 0) - img_mask /= 255 - fake_diff /= 255 - #img_mask = fake_diff - img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1]) - fake_merged = img_mask * bgr_fake + (1-img_mask) * target_img.astype(np.float32) - fake_merged = fake_merged.astype(np.uint8) - return fake_merged - + + target_img = img + fake_diff = bgr_fake.astype(np.float32) - aimg.astype(np.float32) + fake_diff = np.abs(fake_diff).mean(axis=2) + fake_diff[:2, :] = 0 + fake_diff[-2:, :] = 0 + fake_diff[:, :2] = 0 + fake_diff[:, -2:] = 0 + + IM = cv2.invertAffineTransform(M) + img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32) + bgr_fake = cv2.warpAffine( + bgr_fake, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0 + ) + img_white = cv2.warpAffine( + img_white, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0 + ) + fake_diff = cv2.warpAffine( + fake_diff, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0 + ) + + img_white[img_white > 20] = 255 + fthresh = 10 + fake_diff[fake_diff < fthresh] = 0 + fake_diff[fake_diff >= fthresh] = 255 + img_mask = img_white + + mask_h_inds, mask_w_inds = np.where(img_mask == 255) + mask_h = np.max(mask_h_inds) - np.min(mask_h_inds) + mask_w = np.max(mask_w_inds) - np.min(mask_w_inds) + mask_size = int(np.sqrt(mask_h * mask_w)) + k = max(mask_size // 10, 10) + kernel = np.ones((k, k), np.uint8) + img_mask = cv2.erode(img_mask, kernel, iterations=1) + + kernel = np.ones((2, 2), np.uint8) + fake_diff = cv2.dilate(fake_diff, kernel, iterations=1) + + k = max(mask_size // 20, 5) + kernel_size = (k, k) + blur_size = tuple(2 * i + 1 for i in kernel_size) + img_mask = cv2.GaussianBlur(img_mask, blur_size, 0) + + k = 5 + kernel_size = (k, k) + blur_size = tuple(2 * i + 1 for i in kernel_size) + fake_diff = cv2.GaussianBlur(fake_diff, blur_size, 0) + + img_mask /= 255 + fake_diff /= 255 + img_mask = np.reshape(img_mask, [img_mask.shape[0], img_mask.shape[1], 1]) + fake_merged = img_mask * bgr_fake + (1 - img_mask) * target_img.astype(np.float32) + fake_merged = fake_merged.astype(np.uint8) + + return fake_merged diff --git a/python-package/insightface/model_zoo/landmark.py b/python-package/insightface/model_zoo/landmark.py index 598b4b29a..1e6ffe9d5 100644 --- a/python-package/insightface/model_zoo/landmark.py +++ b/python-package/insightface/model_zoo/landmark.py @@ -4,97 +4,159 @@ # @Time : 2021-05-04 # @Function : -from __future__ import division +from __future__ import annotations + +from typing import Any, List, Optional, Tuple, Union + import numpy as np import cv2 import onnx import onnxruntime + from ..utils import face_align from ..utils import transform from ..data import get_object -__all__ = [ - 'Landmark', -] +__all__ = ['Landmark'] class Landmark: - def __init__(self, model_file=None, session=None): + """ONNX-based facial landmark detection model. + + This class provides facial landmark detection for 2D or 3D keypoints. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + taskname: Task identifier (e.g., 'landmark_2d_106'). + input_size: Expected input size (width, height). + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + lmk_dim: Dimension of landmarks (2 or 3). + lmk_num: Number of landmark points. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the Landmark model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ assert model_file is not None self.model_file = model_file self.session = session + find_sub = False find_mul = False model = onnx.load(self.model_file) graph = model.graph for nid, node in enumerate(graph.node[:8]): - #print(nid, node.name) if node.name.startswith('Sub') or node.name.startswith('_minus'): find_sub = True if node.name.startswith('Mul') or node.name.startswith('_mul'): find_mul = True - if nid<3 and node.name=='bn_data': + if nid < 3 and node.name == 'bn_data': find_sub = True find_mul = True + if find_sub and find_mul: - #mxnet arcface model input_mean = 0.0 input_std = 1.0 else: input_mean = 127.5 input_std = 128.0 + self.input_mean = input_mean self.input_std = input_std - #print('input mean and std:', model_file, self.input_mean, self.input_std) + if self.session is None: self.session = onnxruntime.InferenceSession(self.model_file, None) + input_cfg = self.session.get_inputs()[0] input_shape = input_cfg.shape input_name = input_cfg.name self.input_size = tuple(input_shape[2:4][::-1]) self.input_shape = input_shape + outputs = self.session.get_outputs() - output_names = [] - for out in outputs: - output_names.append(out.name) + output_names = [out.name for out in outputs] self.input_name = input_name self.output_names = output_names - assert len(self.output_names)==1 + assert len(self.output_names) == 1 + output_shape = outputs[0].shape self.require_pose = False - #print('init output_shape:', output_shape) - if output_shape[1]==3309: + + if output_shape[1] == 3309: self.lmk_dim = 3 self.lmk_num = 68 self.mean_lmk = get_object('meanshape_68.pkl') self.require_pose = True else: self.lmk_dim = 2 - self.lmk_num = output_shape[1]//self.lmk_dim - self.taskname = 'landmark_%dd_%d'%(self.lmk_dim, self.lmk_num) + self.lmk_num = output_shape[1] // self.lmk_dim + + self.taskname = 'landmark_%dd_%d' % (self.lmk_dim, self.lmk_num) - def prepare(self, ctx_id, **kwargs): - if ctx_id<0: + def prepare(self, ctx_id: int, **kwargs: Any) -> None: + """Prepare the model for inference. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + **kwargs: Additional arguments (unused). + """ + if ctx_id < 0: self.session.set_providers(['CPUExecutionProvider']) - def get(self, img, face): + def get( + self, + img: np.ndarray, + face: Any, + ) -> np.ndarray: + """Detect facial landmarks for a face. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + face: Face object containing bounding box. + + Returns: + Landmark coordinates with shape (lmk_num, lmk_dim). + """ bbox = face.bbox w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1]) center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2 rotate = 0 - _scale = self.input_size[0] / (max(w, h)*1.5) - #print('param:', img.shape, bbox, center, self.input_size, _scale, rotate) + _scale = self.input_size[0] / (max(w, h) * 1.5) + aimg, M = face_align.transform(img, center, self.input_size[0], _scale, rotate) input_size = tuple(aimg.shape[0:2][::-1]) - #assert input_size==self.input_size - blob = cv2.dnn.blobFromImage(aimg, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) - pred = self.session.run(self.output_names, {self.input_name : blob})[0][0] + + blob = cv2.dnn.blobFromImage( + aimg, + 1.0 / self.input_std, + input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) + pred = self.session.run(self.output_names, {self.input_name: blob})[0][0] + if pred.shape[0] >= 3000: pred = pred.reshape((-1, 3)) else: pred = pred.reshape((-1, 2)) + if self.lmk_num < pred.shape[0]: - pred = pred[self.lmk_num*-1:,:] + pred = pred[self.lmk_num * -1:, :] + pred[:, 0:2] += 1 pred[:, 0:2] *= (self.input_size[0] // 2) if pred.shape[1] == 3: @@ -103,12 +165,12 @@ def get(self, img, face): IM = cv2.invertAffineTransform(M) pred = face_align.trans_points(pred, IM) face[self.taskname] = pred + if self.require_pose: P = transform.estimate_affine_matrix_3d23d(self.mean_lmk, pred) s, R, t = transform.P2sRt(P) rx, ry, rz = transform.matrix2angle(R) - pose = np.array( [rx, ry, rz], dtype=np.float32 ) - face['pose'] = pose #pitch, yaw, roll + pose = np.array([rx, ry, rz], dtype=np.float32) + face['pose'] = pose + return pred - - diff --git a/python-package/insightface/model_zoo/model_zoo.py b/python-package/insightface/model_zoo/model_zoo.py index fc6283114..36583b17e 100644 --- a/python-package/insightface/model_zoo/model_zoo.py +++ b/python-package/insightface/model_zoo/model_zoo.py @@ -4,39 +4,86 @@ # @Time : 2021-05-04 # @Function : +from __future__ import annotations + import os import os.path as osp import glob +from typing import Any, Dict, List, Optional, Tuple, Union + import onnxruntime -from .arcface_onnx import * -from .retinaface import * -#from .scrfd import * -from .landmark import * + +from .arcface_onnx import ArcFaceONNX +from .retinaface import RetinaFace +from .landmark import Landmark from .attribute import Attribute from .inswapper import INSwapper from ..utils import download_onnx -__all__ = ['get_model'] +__all__ = ['get_model', 'PickableInferenceSession', 'ModelRouter'] -class PickableInferenceSession(onnxruntime.InferenceSession): - # This is a wrapper to make the current InferenceSession class pickable. - def __init__(self, model_path, **kwargs): +class PickableInferenceSession(onnxruntime.InferenceSession): + """A wrapper to make InferenceSession pickable for multiprocessing. + + This class extends onnxruntime.InferenceSession to support serialization + via pickle, enabling use in multiprocessing scenarios. + """ + + def __init__(self, model_path: str, **kwargs: Any) -> None: + """Initialize the pickable inference session. + + Args: + model_path: Path to the ONNX model file. + **kwargs: Additional arguments passed to InferenceSession. + """ super().__init__(model_path, **kwargs) self.model_path = model_path - def __getstate__(self): + def __getstate__(self) -> Dict[str, str]: + """Return state for pickling.""" return {'model_path': self.model_path} - def __setstate__(self, values): + def __setstate__(self, values: Dict[str, str]) -> None: + """Restore state from pickle.""" model_path = values['model_path'] self.__init__(model_path) + class ModelRouter: - def __init__(self, onnx_file): + """Router for identifying and instantiating appropriate model class. + + This class examines ONNX model structure to determine the appropriate + model class (detection, recognition, landmark, etc.) and creates + the corresponding model instance. + """ + + def __init__(self, onnx_file: str) -> None: + """Initialize the model router. + + Args: + onnx_file: Path to the ONNX model file. + """ self.onnx_file = onnx_file - def get_model(self, **kwargs): + def get_model(self, **kwargs: Any) -> Optional[Union[ArcFaceONNX, RetinaFace, Landmark, Attribute, INSwapper]]: + """Create and return the appropriate model instance. + + The model type is determined by examining the model's input/output shapes: + - Detection models: >= 5 outputs + - Landmark models: 192x192 input + - Attribute models: 96x96 input + - INSwapper: 2 inputs, 128x128 input + - Recognition models: square input >= 112, divisible by 16 + + Args: + **kwargs: Arguments passed to model initialization, including: + - providers: List of execution providers + - provider_options: Provider-specific options + + Returns: + Model instance of appropriate type, or None if unrecognized. + """ session = PickableInferenceSession(self.onnx_file, **kwargs) print(f'Applied providers: {session._providers}, with options: {session._provider_options}') inputs = session.get_inputs() @@ -44,21 +91,29 @@ def get_model(self, **kwargs): input_shape = input_cfg.shape outputs = session.get_outputs() - if len(outputs)>=5: + if len(outputs) >= 5: return RetinaFace(model_file=self.onnx_file, session=session) - elif input_shape[2]==192 and input_shape[3]==192: + elif input_shape[2] == 192 and input_shape[3] == 192: return Landmark(model_file=self.onnx_file, session=session) - elif input_shape[2]==96 and input_shape[3]==96: + elif input_shape[2] == 96 and input_shape[3] == 96: return Attribute(model_file=self.onnx_file, session=session) - elif len(inputs)==2 and input_shape[2]==128 and input_shape[3]==128: + elif len(inputs) == 2 and input_shape[2] == 128 and input_shape[3] == 128: return INSwapper(model_file=self.onnx_file, session=session) - elif input_shape[2]==input_shape[3] and input_shape[2]>=112 and input_shape[2]%16==0: + elif input_shape[2] == input_shape[3] and input_shape[2] >= 112 and input_shape[2] % 16 == 0: return ArcFaceONNX(model_file=self.onnx_file, session=session) else: - #raise RuntimeError('error on model routing') return None -def find_onnx_file(dir_path): + +def find_onnx_file(dir_path: str) -> Optional[str]: + """Find the most recent ONNX file in a directory. + + Args: + dir_path: Path to directory to search. + + Returns: + Path to the most recently modified ONNX file, or None if not found. + """ if not os.path.exists(dir_path): return None paths = glob.glob("%s/*.onnx" % dir_path) @@ -67,18 +122,55 @@ def find_onnx_file(dir_path): paths = sorted(paths) return paths[-1] -def get_default_providers(): + +def get_default_providers() -> List[str]: + """Get default ONNX Runtime execution providers. + + Returns: + List of provider names in priority order. + """ return ['CUDAExecutionProvider', 'CPUExecutionProvider'] -def get_default_provider_options(): + +def get_default_provider_options() -> Optional[Dict[str, Any]]: + """Get default provider options. + + Returns: + Provider options dictionary or None for defaults. + """ return None -def get_model(name, **kwargs): + +def get_model( + name: str, + **kwargs: Any, +) -> Optional[Union[ArcFaceONNX, RetinaFace, Landmark, Attribute, INSwapper]]: + """Load an ONNX model by name or path. + + Args: + name: Model name or path to ONNX file. + - If not ending with '.onnx', treated as model pack name + - If ending with '.onnx', treated as direct file path + **kwargs: Additional arguments including: + - root: Root directory for model storage (default: '~/.insightface') + - download: Whether to download if not found (default: False) + - download_zip: Whether to download as zip (default: False) + - providers: List of execution providers + - provider_options: Provider-specific options + + Returns: + Model instance of appropriate type, or None if not found. + + Example: + >>> model = get_model('buffalo_l', download=True) + >>> model = get_model('/path/to/model.onnx') + """ root = kwargs.get('root', '~/.insightface') root = os.path.expanduser(root) model_root = osp.join(root, 'models') allow_download = kwargs.get('download', False) download_zip = kwargs.get('download_zip', False) + if not name.endswith('.onnx'): model_dir = os.path.join(model_root, name) model_file = find_onnx_file(model_dir) @@ -86,13 +178,15 @@ def get_model(name, **kwargs): return None else: model_file = name + if not osp.exists(model_file) and allow_download: model_file = download_onnx('models', model_file, root=root, download_zip=download_zip) - assert osp.exists(model_file), 'model_file %s should exist'%model_file - assert osp.isfile(model_file), 'model_file %s should be a file'%model_file + + assert osp.exists(model_file), 'model_file %s should exist' % model_file + assert osp.isfile(model_file), 'model_file %s should be a file' % model_file + router = ModelRouter(model_file) providers = kwargs.get('providers', get_default_providers()) provider_options = kwargs.get('provider_options', get_default_provider_options()) model = router.get_model(providers=providers, provider_options=provider_options) return model - diff --git a/python-package/insightface/model_zoo/retinaface.py b/python-package/insightface/model_zoo/retinaface.py index fc4ad91ed..2f7c77af2 100644 --- a/python-package/insightface/model_zoo/retinaface.py +++ b/python-package/insightface/model_zoo/retinaface.py @@ -4,36 +4,46 @@ # @Time : 2021-09-18 # @Function : -from __future__ import division +from __future__ import annotations + import datetime +from typing import Any, Dict, List, Optional, Tuple, Union + import numpy as np import onnx import onnxruntime import os import os.path as osp import cv2 -import sys -def softmax(z): +__all__ = ['RetinaFace'] + + +def softmax(z: np.ndarray) -> np.ndarray: + """Apply softmax activation along axis 1.""" assert len(z.shape) == 2 s = np.max(z, axis=1) - s = s[:, np.newaxis] # necessary step to do broadcasting + s = s[:, np.newaxis] e_x = np.exp(z - s) div = np.sum(e_x, axis=1) - div = div[:, np.newaxis] # dito + div = div[:, np.newaxis] return e_x / div -def distance2bbox(points, distance, max_shape=None): - """Decode distance prediction to bounding box. +def distance2bbox( + points: np.ndarray, + distance: np.ndarray, + max_shape: Optional[Tuple[int, int]] = None, +) -> np.ndarray: + """Decode distance prediction to bounding box. + Args: - points (Tensor): Shape (n, 2), [x, y]. - distance (Tensor): Distance from the given point to 4 - boundaries (left, top, right, bottom). - max_shape (tuple): Shape of the image. - + points: Anchor points with shape (n, 2), [x, y]. + distance: Distance from anchor to 4 boundaries (left, top, right, bottom). + max_shape: Optional image shape for clipping. + Returns: - Tensor: Decoded bboxes. + Decoded bboxes with shape (n, 4). """ x1 = points[:, 0] - distance[:, 0] y1 = points[:, 1] - distance[:, 1] @@ -46,22 +56,26 @@ def distance2bbox(points, distance, max_shape=None): y2 = y2.clamp(min=0, max=max_shape[0]) return np.stack([x1, y1, x2, y2], axis=-1) -def distance2kps(points, distance, max_shape=None): - """Decode distance prediction to bounding box. +def distance2kps( + points: np.ndarray, + distance: np.ndarray, + max_shape: Optional[Tuple[int, int]] = None, +) -> np.ndarray: + """Decode distance prediction to keypoints. + Args: - points (Tensor): Shape (n, 2), [x, y]. - distance (Tensor): Distance from the given point to 4 - boundaries (left, top, right, bottom). - max_shape (tuple): Shape of the image. - + points: Anchor points with shape (n, 2). + distance: Distance predictions for keypoints. + max_shape: Optional image shape for clipping. + Returns: - Tensor: Decoded bboxes. + Decoded keypoints. """ preds = [] for i in range(0, distance.shape[1], 2): - px = points[:, i%2] + distance[:, i] - py = points[:, i%2+1] + distance[:, i+1] + px = points[:, i % 2] + distance[:, i] + py = points[:, i % 2 + 1] + distance[:, i + 1] if max_shape is not None: px = px.clamp(min=0, max=max_shape[1]) py = py.clamp(min=0, max=max_shape[0]) @@ -69,73 +83,111 @@ def distance2kps(points, distance, max_shape=None): preds.append(py) return np.stack(preds, axis=-1) + class RetinaFace: - def __init__(self, model_file=None, session=None): + """ONNX-based RetinaFace face detection model. + + This class provides face detection with optional keypoint detection. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + taskname: Task identifier ('detection'). + input_size: Expected input size (width, height) or None for dynamic. + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + nms_thresh: NMS IoU threshold. + det_thresh: Detection confidence threshold. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the RetinaFace model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ import onnxruntime self.model_file = model_file self.session = session self.taskname = 'detection' + if self.session is None: assert self.model_file is not None assert osp.exists(self.model_file) self.session = onnxruntime.InferenceSession(self.model_file, None) - self.center_cache = {} + + self.center_cache: Dict[Tuple[int, int, int], np.ndarray] = {} self.nms_thresh = 0.4 self.det_thresh = 0.5 self._init_vars() - def _init_vars(self): + def _init_vars(self) -> None: + """Initialize model variables from session.""" input_cfg = self.session.get_inputs()[0] input_shape = input_cfg.shape - #print(input_shape) + if isinstance(input_shape[2], str): - self.input_size = None + self.input_size: Optional[Tuple[int, int]] = None else: self.input_size = tuple(input_shape[2:4][::-1]) - #print('image_size:', self.image_size) + input_name = input_cfg.name self.input_shape = input_shape outputs = self.session.get_outputs() - output_names = [] - for o in outputs: - output_names.append(o.name) + output_names = [o.name for o in outputs] self.input_name = input_name self.output_names = output_names self.input_mean = 127.5 self.input_std = 128.0 - #print(self.output_names) - #assert len(outputs)==10 or len(outputs)==15 self.use_kps = False self._anchor_ratio = 1.0 self._num_anchors = 1 - if len(outputs)==6: + + if len(outputs) == 6: self.fmc = 3 self._feat_stride_fpn = [8, 16, 32] self._num_anchors = 2 - elif len(outputs)==9: + elif len(outputs) == 9: self.fmc = 3 self._feat_stride_fpn = [8, 16, 32] self._num_anchors = 2 self.use_kps = True - elif len(outputs)==10: + elif len(outputs) == 10: self.fmc = 5 self._feat_stride_fpn = [8, 16, 32, 64, 128] self._num_anchors = 1 - elif len(outputs)==15: + elif len(outputs) == 15: self.fmc = 5 self._feat_stride_fpn = [8, 16, 32, 64, 128] self._num_anchors = 1 self.use_kps = True - def prepare(self, ctx_id, **kwargs): - if ctx_id<0: + def prepare(self, ctx_id: int, **kwargs: Any) -> None: + """Prepare the model for inference. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + **kwargs: Additional arguments: + - nms_thresh: NMS IoU threshold + - det_thresh: Detection confidence threshold + - input_size: Input size override + """ + if ctx_id < 0: self.session.set_providers(['CPUExecutionProvider']) + nms_thresh = kwargs.get('nms_thresh', None) if nms_thresh is not None: self.nms_thresh = nms_thresh + det_thresh = kwargs.get('det_thresh', None) if det_thresh is not None: self.det_thresh = det_thresh + input_size = kwargs.get('input_size', None) if input_size is not None: if self.input_size is not None: @@ -143,82 +195,121 @@ def prepare(self, ctx_id, **kwargs): else: self.input_size = input_size - def forward(self, img, threshold): + def forward( + self, + img: np.ndarray, + threshold: float, + ) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: + """Run forward pass on image. + + Args: + img: Input image (BGR, uint8, 0-255). + threshold: Detection threshold. + + Returns: + Tuple of (scores_list, bboxes_list, kpss_list). + """ scores_list = [] bboxes_list = [] kpss_list = [] input_size = tuple(img.shape[0:2][::-1]) - blob = cv2.dnn.blobFromImage(img, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) - net_outs = self.session.run(self.output_names, {self.input_name : blob}) + + blob = cv2.dnn.blobFromImage( + img, + 1.0 / self.input_std, + input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) + net_outs = self.session.run(self.output_names, {self.input_name: blob}) input_height = blob.shape[2] input_width = blob.shape[3] fmc = self.fmc + for idx, stride in enumerate(self._feat_stride_fpn): scores = net_outs[idx] - bbox_preds = net_outs[idx+fmc] + bbox_preds = net_outs[idx + fmc] bbox_preds = bbox_preds * stride if self.use_kps: - kps_preds = net_outs[idx+fmc*2] * stride + kps_preds = net_outs[idx + fmc * 2] * stride + height = input_height // stride width = input_width // stride K = height * width key = (height, width, stride) + if key in self.center_cache: anchor_centers = self.center_cache[key] else: - #solution-1, c style: - #anchor_centers = np.zeros( (height, width, 2), dtype=np.float32 ) - #for i in range(height): - # anchor_centers[i, :, 1] = i - #for i in range(width): - # anchor_centers[:, i, 0] = i - - #solution-2: - #ax = np.arange(width, dtype=np.float32) - #ay = np.arange(height, dtype=np.float32) - #xv, yv = np.meshgrid(np.arange(width), np.arange(height)) - #anchor_centers = np.stack([xv, yv], axis=-1).astype(np.float32) - - #solution-3: - anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32) - #print(anchor_centers.shape) - - anchor_centers = (anchor_centers * stride).reshape( (-1, 2) ) - if self._num_anchors>1: - anchor_centers = np.stack([anchor_centers]*self._num_anchors, axis=1).reshape( (-1,2) ) - if len(self.center_cache)<100: + anchor_centers = np.stack( + np.mgrid[:height, :width][::-1], axis=-1 + ).astype(np.float32) + anchor_centers = (anchor_centers * stride).reshape((-1, 2)) + if self._num_anchors > 1: + anchor_centers = np.stack( + [anchor_centers] * self._num_anchors, axis=1 + ).reshape((-1, 2)) + if len(self.center_cache) < 100: self.center_cache[key] = anchor_centers - pos_inds = np.where(scores>=threshold)[0] + pos_inds = np.where(scores >= threshold)[0] bboxes = distance2bbox(anchor_centers, bbox_preds) pos_scores = scores[pos_inds] pos_bboxes = bboxes[pos_inds] scores_list.append(pos_scores) bboxes_list.append(pos_bboxes) + if self.use_kps: kpss = distance2kps(anchor_centers, kps_preds) - #kpss = kps_preds - kpss = kpss.reshape( (kpss.shape[0], -1, 2) ) + kpss = kpss.reshape((kpss.shape[0], -1, 2)) pos_kpss = kpss[pos_inds] kpss_list.append(pos_kpss) + return scores_list, bboxes_list, kpss_list - def detect(self, img, input_size = None, max_num=0, metric='default'): + def detect( + self, + img: np.ndarray, + input_size: Optional[Tuple[int, int]] = None, + max_num: int = 0, + metric: str = 'default', + ) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """Detect faces in an image. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + input_size: Override input size for detection. + max_num: Maximum number of faces to return. 0 means no limit. + metric: Selection metric when max_num > 0. + - 'default': Prefer faces closer to image center + - 'max': Select largest faces + + Returns: + Tuple of (bboxes, keypoints): + - bboxes: np.ndarray with shape (N, 5), each row is [x1, y1, x2, y2, score] + - keypoints: np.ndarray with shape (N, 5, 2) or None + """ assert input_size is not None or self.input_size is not None input_size = self.input_size if input_size is None else input_size - + im_ratio = float(img.shape[0]) / img.shape[1] model_ratio = float(input_size[1]) / input_size[0] - if im_ratio>model_ratio: + + if im_ratio > model_ratio: new_height = input_size[1] new_width = int(new_height / im_ratio) else: new_width = input_size[0] new_height = int(new_width * im_ratio) + det_scale = float(new_height) / img.shape[0] resized_img = cv2.resize(img, (new_width, new_height)) - det_img = np.zeros( (input_size[1], input_size[0], 3), dtype=np.uint8 ) + det_img = np.zeros((input_size[1], input_size[0], 3), dtype=np.uint8) det_img[:new_height, :new_width, :] = resized_img scores_list, bboxes_list, kpss_list = self.forward(det_img, self.det_thresh) @@ -227,39 +318,52 @@ def detect(self, img, input_size = None, max_num=0, metric='default'): scores_ravel = scores.ravel() order = scores_ravel.argsort()[::-1] bboxes = np.vstack(bboxes_list) / det_scale + if self.use_kps: kpss = np.vstack(kpss_list) / det_scale + else: + kpss = None + pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False) pre_det = pre_det[order, :] keep = self.nms(pre_det) det = pre_det[keep, :] + if self.use_kps: - kpss = kpss[order,:,:] - kpss = kpss[keep,:,:] - else: - kpss = None + kpss = kpss[order, :, :] + kpss = kpss[keep, :, :] + if max_num > 0 and det.shape[0] > max_num: - area = (det[:, 2] - det[:, 0]) * (det[:, 3] - - det[:, 1]) + area = (det[:, 2] - det[:, 0]) * (det[:, 3] - det[:, 1]) img_center = img.shape[0] // 2, img.shape[1] // 2 offsets = np.vstack([ (det[:, 0] + det[:, 2]) / 2 - img_center[1], (det[:, 1] + det[:, 3]) / 2 - img_center[0] ]) offset_dist_squared = np.sum(np.power(offsets, 2.0), 0) - if metric=='max': + + if metric == 'max': values = area else: - values = area - offset_dist_squared * 2.0 # some extra weight on the centering - bindex = np.argsort( - values)[::-1] # some extra weight on the centering + values = area - offset_dist_squared * 2.0 + + bindex = np.argsort(values)[::-1] bindex = bindex[0:max_num] det = det[bindex, :] if kpss is not None: kpss = kpss[bindex, :] + return det, kpss - def nms(self, dets): + def nms(self, dets: np.ndarray) -> List[int]: + """Non-maximum suppression. + + Args: + dets: Detection boxes with shape (N, 5), each row is [x1, y1, x2, y2, score]. + + Returns: + List of indices to keep. + """ thresh = self.nms_thresh x1 = dets[:, 0] y1 = dets[:, 1] @@ -289,13 +393,28 @@ def nms(self, dets): return keep -def get_retinaface(name, download=False, root='~/.insightface/models', **kwargs): + +def get_retinaface( + name: str, + download: bool = False, + root: str = '~/.insightface/models', + **kwargs: Any, +) -> RetinaFace: + """Get RetinaFace model by name. + + Args: + name: Model name or path. + download: Whether to download if not found. + root: Root directory for model storage. + **kwargs: Additional arguments. + + Returns: + RetinaFace model instance. + """ if not download: assert os.path.exists(name) return RetinaFace(name) else: from .model_store import get_model_file _file = get_model_file("retinaface_%s" % name, root=root) - return retinaface(_file) - - + return RetinaFace(_file) diff --git a/python-package/insightface/model_zoo/scrfd.py b/python-package/insightface/model_zoo/scrfd.py index 674db4bba..e1a0496a1 100644 --- a/python-package/insightface/model_zoo/scrfd.py +++ b/python-package/insightface/model_zoo/scrfd.py @@ -4,36 +4,46 @@ # @Time : 2021-05-04 # @Function : -from __future__ import division +from __future__ import annotations + import datetime +from typing import Any, Dict, List, Optional, Tuple, Union + import numpy as np import onnx import onnxruntime import os import os.path as osp import cv2 -import sys -def softmax(z): +__all__ = ['SCRFD'] + + +def softmax(z: np.ndarray) -> np.ndarray: + """Apply softmax activation along axis 1.""" assert len(z.shape) == 2 s = np.max(z, axis=1) - s = s[:, np.newaxis] # necessary step to do broadcasting + s = s[:, np.newaxis] e_x = np.exp(z - s) div = np.sum(e_x, axis=1) - div = div[:, np.newaxis] # dito + div = div[:, np.newaxis] return e_x / div -def distance2bbox(points, distance, max_shape=None): - """Decode distance prediction to bounding box. +def distance2bbox( + points: np.ndarray, + distance: np.ndarray, + max_shape: Optional[Tuple[int, int]] = None, +) -> np.ndarray: + """Decode distance prediction to bounding box. + Args: - points (Tensor): Shape (n, 2), [x, y]. - distance (Tensor): Distance from the given point to 4 - boundaries (left, top, right, bottom). - max_shape (tuple): Shape of the image. - + points: Anchor points with shape (n, 2), [x, y]. + distance: Distance from anchor to 4 boundaries (left, top, right, bottom). + max_shape: Optional image shape for clipping. + Returns: - Tensor: Decoded bboxes. + Decoded bboxes with shape (n, 4). """ x1 = points[:, 0] - distance[:, 0] y1 = points[:, 1] - distance[:, 1] @@ -46,22 +56,26 @@ def distance2bbox(points, distance, max_shape=None): y2 = y2.clamp(min=0, max=max_shape[0]) return np.stack([x1, y1, x2, y2], axis=-1) -def distance2kps(points, distance, max_shape=None): - """Decode distance prediction to bounding box. +def distance2kps( + points: np.ndarray, + distance: np.ndarray, + max_shape: Optional[Tuple[int, int]] = None, +) -> np.ndarray: + """Decode distance prediction to keypoints. + Args: - points (Tensor): Shape (n, 2), [x, y]. - distance (Tensor): Distance from the given point to 4 - boundaries (left, top, right, bottom). - max_shape (tuple): Shape of the image. - + points: Anchor points with shape (n, 2). + distance: Distance predictions for keypoints. + max_shape: Optional image shape for clipping. + Returns: - Tensor: Decoded bboxes. + Decoded keypoints. """ preds = [] for i in range(0, distance.shape[1], 2): - px = points[:, i%2] + distance[:, i] - py = points[:, i%2+1] + distance[:, i+1] + px = points[:, i % 2] + distance[:, i] + py = points[:, i % 2 + 1] + distance[:, i + 1] if max_shape is not None: px = px.clamp(min=0, max=max_shape[1]) py = py.clamp(min=0, max=max_shape[0]) @@ -69,76 +83,118 @@ def distance2kps(points, distance, max_shape=None): preds.append(py) return np.stack(preds, axis=-1) + class SCRFD: - def __init__(self, model_file=None, session=None): + """ONNX-based SCRFD face detection model. + + This class provides face detection with optional keypoint detection. + SCRFD supports batched inference for improved performance. + + Attributes: + model_file: Path to the ONNX model file. + session: ONNX Runtime inference session. + taskname: Task identifier ('detection'). + input_size: Expected input size (width, height) or None for dynamic. + input_mean: Mean value for input normalization. + input_std: Std value for input normalization. + nms_thresh: NMS IoU threshold. + det_thresh: Detection confidence threshold. + batched: Whether model supports batched inference. + """ + + def __init__( + self, + model_file: Optional[str] = None, + session: Optional[onnxruntime.InferenceSession] = None, + ) -> None: + """Initialize the SCRFD model. + + Args: + model_file: Path to the ONNX model file. + session: Pre-existing ONNX Runtime session (optional). + """ import onnxruntime self.model_file = model_file self.session = session self.taskname = 'detection' self.batched = False + if self.session is None: assert self.model_file is not None assert osp.exists(self.model_file) self.session = onnxruntime.InferenceSession(self.model_file, None) - self.center_cache = {} + + self.center_cache: Dict[Tuple[int, int, int], np.ndarray] = {} self.nms_thresh = 0.4 self.det_thresh = 0.5 self._init_vars() - def _init_vars(self): + def _init_vars(self) -> None: + """Initialize model variables from session.""" input_cfg = self.session.get_inputs()[0] input_shape = input_cfg.shape - #print(input_shape) + if isinstance(input_shape[2], str): - self.input_size = None + self.input_size: Optional[Tuple[int, int]] = None else: self.input_size = tuple(input_shape[2:4][::-1]) - #print('image_size:', self.image_size) + input_name = input_cfg.name self.input_shape = input_shape outputs = self.session.get_outputs() + if len(outputs[0].shape) == 3: self.batched = True - output_names = [] - for o in outputs: - output_names.append(o.name) + + output_names = [o.name for o in outputs] self.input_name = input_name self.output_names = output_names self.input_mean = 127.5 self.input_std = 128.0 - #print(self.output_names) - #assert len(outputs)==10 or len(outputs)==15 self.use_kps = False self._anchor_ratio = 1.0 self._num_anchors = 1 - if len(outputs)==6: + + if len(outputs) == 6: self.fmc = 3 self._feat_stride_fpn = [8, 16, 32] self._num_anchors = 2 - elif len(outputs)==9: + elif len(outputs) == 9: self.fmc = 3 self._feat_stride_fpn = [8, 16, 32] self._num_anchors = 2 self.use_kps = True - elif len(outputs)==10: + elif len(outputs) == 10: self.fmc = 5 self._feat_stride_fpn = [8, 16, 32, 64, 128] self._num_anchors = 1 - elif len(outputs)==15: + elif len(outputs) == 15: self.fmc = 5 self._feat_stride_fpn = [8, 16, 32, 64, 128] self._num_anchors = 1 self.use_kps = True - def prepare(self, ctx_id, **kwargs): - if ctx_id<0: + def prepare(self, ctx_id: int, **kwargs: Any) -> None: + """Prepare the model for inference. + + Args: + ctx_id: Context ID for GPU device. Use -1 for CPU. + **kwargs: Additional arguments: + - nms_thresh: NMS IoU threshold + - det_thresh: Detection confidence threshold + - input_size: Input size override + """ + if ctx_id < 0: self.session.set_providers(['CPUExecutionProvider']) + nms_thresh = kwargs.get('nms_thresh', None) if nms_thresh is not None: self.nms_thresh = nms_thresh + det_thresh = kwargs.get('det_thresh', None) if det_thresh is not None: self.det_thresh = det_thresh + input_size = kwargs.get('input_size', None) if input_size is not None: if self.input_size is not None: @@ -146,26 +202,45 @@ def prepare(self, ctx_id, **kwargs): else: self.input_size = input_size - def forward(self, img, threshold): + def forward( + self, + img: np.ndarray, + threshold: float, + ) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: + """Run forward pass on image. + + Args: + img: Input image (BGR, uint8, 0-255). + threshold: Detection threshold. + + Returns: + Tuple of (scores_list, bboxes_list, kpss_list). + """ scores_list = [] bboxes_list = [] kpss_list = [] input_size = tuple(img.shape[0:2][::-1]) - blob = cv2.dnn.blobFromImage(img, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) - net_outs = self.session.run(self.output_names, {self.input_name : blob}) + + blob = cv2.dnn.blobFromImage( + img, + 1.0 / self.input_std, + input_size, + (self.input_mean, self.input_mean, self.input_mean), + swapRB=True, + ) + net_outs = self.session.run(self.output_names, {self.input_name: blob}) input_height = blob.shape[2] input_width = blob.shape[3] fmc = self.fmc + for idx, stride in enumerate(self._feat_stride_fpn): - # If model support batch dim, take first output if self.batched: scores = net_outs[idx][0] bbox_preds = net_outs[idx + fmc][0] bbox_preds = bbox_preds * stride if self.use_kps: kps_preds = net_outs[idx + fmc * 2][0] * stride - # If model doesn't support batching take output as is else: scores = net_outs[idx] bbox_preds = net_outs[idx + fmc] @@ -177,61 +252,78 @@ def forward(self, img, threshold): width = input_width // stride K = height * width key = (height, width, stride) + if key in self.center_cache: anchor_centers = self.center_cache[key] else: - #solution-1, c style: - #anchor_centers = np.zeros( (height, width, 2), dtype=np.float32 ) - #for i in range(height): - # anchor_centers[i, :, 1] = i - #for i in range(width): - # anchor_centers[:, i, 0] = i - - #solution-2: - #ax = np.arange(width, dtype=np.float32) - #ay = np.arange(height, dtype=np.float32) - #xv, yv = np.meshgrid(np.arange(width), np.arange(height)) - #anchor_centers = np.stack([xv, yv], axis=-1).astype(np.float32) - - #solution-3: - anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32) - #print(anchor_centers.shape) - - anchor_centers = (anchor_centers * stride).reshape( (-1, 2) ) - if self._num_anchors>1: - anchor_centers = np.stack([anchor_centers]*self._num_anchors, axis=1).reshape( (-1,2) ) - if len(self.center_cache)<100: + anchor_centers = np.stack( + np.mgrid[:height, :width][::-1], axis=-1 + ).astype(np.float32) + anchor_centers = (anchor_centers * stride).reshape((-1, 2)) + if self._num_anchors > 1: + anchor_centers = np.stack( + [anchor_centers] * self._num_anchors, axis=1 + ).reshape((-1, 2)) + if len(self.center_cache) < 100: self.center_cache[key] = anchor_centers - pos_inds = np.where(scores>=threshold)[0] + pos_inds = np.where(scores >= threshold)[0] bboxes = distance2bbox(anchor_centers, bbox_preds) pos_scores = scores[pos_inds] pos_bboxes = bboxes[pos_inds] scores_list.append(pos_scores) bboxes_list.append(pos_bboxes) + if self.use_kps: kpss = distance2kps(anchor_centers, kps_preds) - #kpss = kps_preds - kpss = kpss.reshape( (kpss.shape[0], -1, 2) ) + kpss = kpss.reshape((kpss.shape[0], -1, 2)) pos_kpss = kpss[pos_inds] kpss_list.append(pos_kpss) + return scores_list, bboxes_list, kpss_list - def detect(self, img, input_size = None, max_num=0, metric='default'): + def detect( + self, + img: np.ndarray, + input_size: Optional[Tuple[int, int]] = None, + max_num: int = 0, + metric: str = 'default', + ) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """Detect faces in an image. + + Args: + img: Input image as numpy array. + - Format: BGR (OpenCV default) + - Dtype: uint8 + - Range: 0-255 + - Shape: (H, W, 3) + input_size: Override input size for detection. + max_num: Maximum number of faces to return. 0 means no limit. + metric: Selection metric when max_num > 0. + - 'default': Prefer faces closer to image center + - 'max': Select largest faces + + Returns: + Tuple of (bboxes, keypoints): + - bboxes: np.ndarray with shape (N, 5), each row is [x1, y1, x2, y2, score] + - keypoints: np.ndarray with shape (N, 5, 2) or None + """ assert input_size is not None or self.input_size is not None input_size = self.input_size if input_size is None else input_size - + im_ratio = float(img.shape[0]) / img.shape[1] model_ratio = float(input_size[1]) / input_size[0] - if im_ratio>model_ratio: + + if im_ratio > model_ratio: new_height = input_size[1] new_width = int(new_height / im_ratio) else: new_width = input_size[0] new_height = int(new_width * im_ratio) + det_scale = float(new_height) / img.shape[0] resized_img = cv2.resize(img, (new_width, new_height)) - det_img = np.zeros( (input_size[1], input_size[0], 3), dtype=np.uint8 ) + det_img = np.zeros((input_size[1], input_size[0], 3), dtype=np.uint8) det_img[:new_height, :new_width, :] = resized_img scores_list, bboxes_list, kpss_list = self.forward(det_img, self.det_thresh) @@ -240,39 +332,52 @@ def detect(self, img, input_size = None, max_num=0, metric='default'): scores_ravel = scores.ravel() order = scores_ravel.argsort()[::-1] bboxes = np.vstack(bboxes_list) / det_scale + if self.use_kps: kpss = np.vstack(kpss_list) / det_scale + else: + kpss = None + pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False) pre_det = pre_det[order, :] keep = self.nms(pre_det) det = pre_det[keep, :] + if self.use_kps: - kpss = kpss[order,:,:] - kpss = kpss[keep,:,:] - else: - kpss = None + kpss = kpss[order, :, :] + kpss = kpss[keep, :, :] + if max_num > 0 and det.shape[0] > max_num: - area = (det[:, 2] - det[:, 0]) * (det[:, 3] - - det[:, 1]) + area = (det[:, 2] - det[:, 0]) * (det[:, 3] - det[:, 1]) img_center = img.shape[0] // 2, img.shape[1] // 2 offsets = np.vstack([ (det[:, 0] + det[:, 2]) / 2 - img_center[1], (det[:, 1] + det[:, 3]) / 2 - img_center[0] ]) offset_dist_squared = np.sum(np.power(offsets, 2.0), 0) - if metric=='max': + + if metric == 'max': values = area else: - values = area - offset_dist_squared * 2.0 # some extra weight on the centering - bindex = np.argsort( - values)[::-1] # some extra weight on the centering + values = area - offset_dist_squared * 2.0 + + bindex = np.argsort(values)[::-1] bindex = bindex[0:max_num] det = det[bindex, :] if kpss is not None: kpss = kpss[bindex, :] + return det, kpss - def nms(self, dets): + def nms(self, dets: np.ndarray) -> List[int]: + """Non-maximum suppression. + + Args: + dets: Detection boxes with shape (N, 5), each row is [x1, y1, x2, y2, score]. + + Returns: + List of indices to keep. + """ thresh = self.nms_thresh x1 = dets[:, 0] y1 = dets[:, 1] @@ -302,7 +407,24 @@ def nms(self, dets): return keep -def get_scrfd(name, download=False, root='~/.insightface/models', **kwargs): + +def get_scrfd( + name: str, + download: bool = False, + root: str = '~/.insightface/models', + **kwargs: Any, +) -> SCRFD: + """Get SCRFD model by name. + + Args: + name: Model name or path. + download: Whether to download if not found. + root: Root directory for model storage. + **kwargs: Additional arguments. + + Returns: + SCRFD model instance. + """ if not download: assert os.path.exists(name) return SCRFD(name) @@ -312,37 +434,6 @@ def get_scrfd(name, download=False, root='~/.insightface/models', **kwargs): return SCRFD(_file) -def scrfd_2p5gkps(**kwargs): +def scrfd_2p5gkps(**kwargs: Any) -> SCRFD: + """Get SCRFD 2.5G keypoints model.""" return get_scrfd("2p5gkps", download=True, **kwargs) - - -if __name__ == '__main__': - import glob - detector = SCRFD(model_file='./det.onnx') - detector.prepare(-1) - img_paths = ['tests/data/t1.jpg'] - for img_path in img_paths: - img = cv2.imread(img_path) - - for _ in range(1): - ta = datetime.datetime.now() - #bboxes, kpss = detector.detect(img, 0.5, input_size = (640, 640)) - bboxes, kpss = detector.detect(img, 0.5) - tb = datetime.datetime.now() - print('all cost:', (tb-ta).total_seconds()*1000) - print(img_path, bboxes.shape) - if kpss is not None: - print(kpss.shape) - for i in range(bboxes.shape[0]): - bbox = bboxes[i] - x1,y1,x2,y2,score = bbox.astype(np.int) - cv2.rectangle(img, (x1,y1) , (x2,y2) , (255,0,0) , 2) - if kpss is not None: - kps = kpss[i] - for kp in kps: - kp = kp.astype(np.int) - cv2.circle(img, tuple(kp) , 1, (0,0,255) , 2) - filename = img_path.split('/')[-1] - print('output:', filename) - cv2.imwrite('./outputs/%s'%filename, img) -