diff --git a/imutils/__init__.py b/imutils/__init__.py index e5643ca..19f3917 100755 --- a/imutils/__init__.py +++ b/imutils/__init__.py @@ -2,7 +2,7 @@ # website: http://www.pyimagesearch.com # set the version number -__version__ = "0.5.4" +__version__ = "0.6.0" # import the necessary packages from .convenience import translate @@ -17,7 +17,36 @@ from .convenience import is_cv2 from .convenience import is_cv3 from .convenience import is_cv4 +from .convenience import is_cv5 from .convenience import check_opencv_version from .convenience import build_montages from .convenience import adjust_brightness_contrast from .meta import find_function +from .bbox import xywh_to_xyxy +from .bbox import xyxy_to_xywh +from .bbox import compute_iou +from .bbox import compute_iou_matrix +from .bbox import draw_bbox +from .bbox import draw_bboxes +from .bbox import scale_bbox +from .bbox import clip_bbox +from .bbox import bbox_area +from .bbox import bbox_center +from .bbox import normalize_bbox +from .bbox import denormalize_bbox +from .augmentation import add_gaussian_noise +from .augmentation import add_salt_pepper_noise +from .augmentation import random_brightness +from .augmentation import random_contrast +from .augmentation import random_brightness_contrast +from .augmentation import cutout +from .augmentation import random_hue +from .augmentation import random_saturation +from .augmentation import random_hsv +from .augmentation import random_rotation +from .augmentation import random_flip +from .augmentation import random_scale +from .augmentation import random_translation +from .augmentation import random_crop +from .augmentation import compose_augmentations +from .augmentation import random_augment diff --git a/imutils/augmentation.py b/imutils/augmentation.py new file mode 100644 index 0000000..6441334 --- /dev/null +++ b/imutils/augmentation.py @@ -0,0 +1,445 @@ +import numpy as np +import cv2 +import random +from .convenience import adjust_brightness_contrast + +def add_gaussian_noise(image, mean=0, sigma=25): + """ + Add Gaussian noise to an image + + Args: + image: Input image + mean: Mean of the noise + sigma: Standard deviation of the noise + + Returns: + Image with Gaussian noise added + """ + noise = np.random.normal(mean, sigma, image.shape).astype(np.float32) + noisy = image.astype(np.float32) + noise + noisy = np.clip(noisy, 0, 255).astype(np.uint8) + return noisy + +def add_salt_pepper_noise(image, prob=0.01): + """ + Add salt and pepper noise to an image + + Args: + image: Input image + prob: Probability of each pixel being affected + + Returns: + Image with salt and pepper noise added + """ + output = image.copy() + + salt = np.random.random(image.shape[:2]) < prob / 2 + output[salt] = 255 + + pepper = np.random.random(image.shape[:2]) < prob / 2 + output[pepper] = 0 + + return output + +def random_brightness(image, delta=50): + """ + Randomly adjust brightness of an image + + Args: + image: Input image + delta: Brightness change range [-delta, delta] + + Returns: + Brightness-adjusted image + """ + brightness = random.uniform(-delta, delta) + return adjust_brightness_contrast(image, brightness=brightness, contrast=0) + +def random_contrast(image, lower=0.5, upper=1.5): + """ + Randomly adjust contrast of an image + + Args: + image: Input image + lower: Lower bound of contrast multiplier + upper: Upper bound of contrast multiplier + + Returns: + Contrast-adjusted image + """ + contrast = random.uniform(lower, upper) + return adjust_brightness_contrast(image, brightness=0, contrast=(contrast - 1) * 100) + +def random_brightness_contrast(image, brightness_delta=50, contrast_lower=0.5, contrast_upper=1.5): + """ + Randomly adjust both brightness and contrast of an image + + Args: + image: Input image + brightness_delta: Brightness change range + contrast_lower: Lower bound of contrast multiplier + contrast_upper: Upper bound of contrast multiplier + + Returns: + Adjusted image + """ + brightness = random.uniform(-brightness_delta, brightness_delta) + contrast = (random.uniform(contrast_lower, contrast_upper) - 1) * 100 + return adjust_brightness_contrast(image, brightness=brightness, contrast=contrast) + +def cutout(image, num_holes=1, hole_size=16, fill_value=0): + """ + Apply Cutout augmentation - randomly mask out rectangular regions + + Args: + image: Input image + num_holes: Number of regions to mask out + hole_size: Size of each hole (int or tuple of (width, height)) + fill_value: Value to fill the holes with (int or RGB tuple) + + Returns: + Image with cutout applied + """ + output = image.copy() + h, w = image.shape[:2] + + if isinstance(hole_size, int): + hole_w = hole_h = hole_size + else: + hole_w, hole_h = hole_size + + for _ in range(num_holes): + y = random.randint(0, h - 1) + x = random.randint(0, w - 1) + + y1 = max(0, y - hole_h // 2) + y2 = min(h, y + hole_h // 2) + x1 = max(0, x - hole_w // 2) + x2 = min(w, x + hole_w // 2) + + output[y1:y2, x1:x2] = fill_value + + return output + +def random_hue(image, delta=18): + """ + Randomly adjust hue of an image + + Args: + image: Input image (BGR format) + delta: Hue change range [-delta, delta] + + Returns: + Hue-adjusted image + """ + if len(image.shape) == 2: + return image + + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV).astype(np.float32) + hsv[:, :, 0] += random.uniform(-delta, delta) + hsv[:, :, 0] = np.clip(hsv[:, :, 0], 0, 180) + hsv = hsv.astype(np.uint8) + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + +def random_saturation(image, lower=0.5, upper=1.5): + """ + Randomly adjust saturation of an image + + Args: + image: Input image (BGR format) + lower: Lower bound of saturation multiplier + upper: Upper bound of saturation multiplier + + Returns: + Saturation-adjusted image + """ + if len(image.shape) == 2: + return image + + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV).astype(np.float32) + hsv[:, :, 1] *= random.uniform(lower, upper) + hsv[:, :, 1] = np.clip(hsv[:, :, 1], 0, 255) + hsv = hsv.astype(np.uint8) + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + +def random_hsv(image, hue_delta=18, saturation_lower=0.5, saturation_upper=1.5): + """ + Randomly adjust HSV of an image + + Args: + image: Input image (BGR format) + hue_delta: Hue change range + saturation_lower: Lower bound of saturation multiplier + saturation_upper: Upper bound of saturation multiplier + + Returns: + HSV-adjusted image + """ + if len(image.shape) == 2: + return image + + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV).astype(np.float32) + + hsv[:, :, 0] += random.uniform(-hue_delta, hue_delta) + hsv[:, :, 0] = np.clip(hsv[:, :, 0], 0, 180) + + hsv[:, :, 1] *= random.uniform(saturation_lower, saturation_upper) + hsv[:, :, 1] = np.clip(hsv[:, :, 1], 0, 255) + + hsv = hsv.astype(np.uint8) + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + +def random_rotation(image, angle_range=(-30, 30), bboxes=None): + """ + Randomly rotate an image + + Args: + image: Input image + angle_range: Tuple of (min_angle, max_angle) in degrees + bboxes: Optional list of bounding boxes in [xmin, ymin, xmax, ymax] format + + Returns: + Rotated image (and optionally transformed bounding boxes) + """ + angle = random.uniform(angle_range[0], angle_range[1]) + (h, w) = image.shape[:2] + center = (w // 2, h // 2) + + M = cv2.getRotationMatrix2D(center, angle, 1.0) + rotated = cv2.warpAffine(image, M, (w, h)) + + if bboxes is not None: + rotated_bboxes = [] + for bbox in bboxes: + corners = np.array([ + [bbox[0], bbox[1], 1], + [bbox[2], bbox[1], 1], + [bbox[2], bbox[3], 1], + [bbox[0], bbox[3], 1] + ]) + transformed = M @ corners.T + x_coords = transformed[0, :] + y_coords = transformed[1, :] + new_bbox = [min(x_coords), min(y_coords), max(x_coords), max(y_coords)] + rotated_bboxes.append(new_bbox) + return rotated, rotated_bboxes + + return rotated + +def random_flip(image, bboxes=None, flip_code=None): + """ + Randomly flip an image + + Args: + image: Input image + bboxes: Optional list of bounding boxes in [xmin, ymin, xmax, ymax] format + flip_code: Flip type (None=random, 0=vertical, 1=horizontal, -1=both) + + Returns: + Flipped image (and optionally transformed bounding boxes) + """ + if flip_code is None: + flip_code = random.choice([0, 1, -1]) + + flipped = cv2.flip(image, flip_code) + + if bboxes is not None: + h, w = image.shape[:2] + flipped_bboxes = [] + for bbox in bboxes: + xmin, ymin, xmax, ymax = bbox + if flip_code == 1: + new_bbox = [w - xmax, ymin, w - xmin, ymax] + elif flip_code == 0: + new_bbox = [xmin, h - ymax, xmax, h - ymin] + else: + new_bbox = [w - xmax, h - ymax, w - xmin, h - ymin] + flipped_bboxes.append(new_bbox) + return flipped, flipped_bboxes + + return flipped + +def random_scale(image, scale_range=(0.8, 1.2), bboxes=None): + """ + Randomly scale an image + + Args: + image: Input image + scale_range: Tuple of (min_scale, max_scale) + bboxes: Optional list of bounding boxes + + Returns: + Scaled image (and optionally scaled bounding boxes) + """ + scale = random.uniform(scale_range[0], scale_range[1]) + h, w = image.shape[:2] + new_w, new_h = int(w * scale), int(h * scale) + + scaled = cv2.resize(image, (new_w, new_h)) + + if bboxes is not None: + scaled_bboxes = [] + for bbox in bboxes: + scaled_bboxes.append([bbox[0] * scale, bbox[1] * scale, + bbox[2] * scale, bbox[3] * scale]) + return scaled, scaled_bboxes + + return scaled + +def random_translation(image, translate_range=(0.1, 0.1), bboxes=None): + """ + Randomly translate an image + + Args: + image: Input image + translate_range: Tuple of (x_range, y_range) as fraction of image size + bboxes: Optional list of bounding boxes + + Returns: + Translated image (and optionally translated bounding boxes) + """ + h, w = image.shape[:2] + tx = random.uniform(-translate_range[0], translate_range[0]) * w + ty = random.uniform(-translate_range[1], translate_range[1]) * h + + M = np.float32([[1, 0, tx], [0, 1, ty]]) + translated = cv2.warpAffine(image, M, (w, h)) + + if bboxes is not None: + translated_bboxes = [] + for bbox in bboxes: + translated_bboxes.append([bbox[0] + tx, bbox[1] + ty, + bbox[2] + tx, bbox[3] + ty]) + return translated, translated_bboxes + + return translated + +def random_crop(image, crop_range=(0.8, 1.0), bboxes=None): + """ + Randomly crop an image + + Args: + image: Input image + crop_range: Tuple of (min_crop, max_crop) as fraction of image size + bboxes: Optional list of bounding boxes + + Returns: + Cropped image (and optionally adjusted bounding boxes) + """ + h, w = image.shape[:2] + scale = random.uniform(crop_range[0], crop_range[1]) + new_h, new_w = int(h * scale), int(w * scale) + + y = random.randint(0, h - new_h) + x = random.randint(0, w - new_w) + + cropped = image[y:y+new_h, x:x+new_w] + + if bboxes is not None: + cropped_bboxes = [] + for bbox in bboxes: + new_bbox = [max(0, bbox[0] - x), max(0, bbox[1] - y), + min(new_w, bbox[2] - x), min(new_h, bbox[3] - y)] + if new_bbox[2] > new_bbox[0] and new_bbox[3] > new_bbox[1]: + cropped_bboxes.append(new_bbox) + return cropped, cropped_bboxes + + return cropped + +def compose_augmentations(image, augmentations, bboxes=None, p=0.5): + """ + Compose multiple augmentation operations + + Args: + image: Input image + augmentations: List of augmentation functions + bboxes: Optional bounding boxes + p: Probability of applying each augmentation + + Returns: + Augmented image (and optionally augmented bounding boxes) + """ + result = image + result_bboxes = bboxes + + for aug_func in augmentations: + if random.random() < p: + if result_bboxes is not None: + try: + result, result_bboxes = aug_func(result, result_bboxes) + except TypeError: + result = aug_func(result) + else: + result = aug_func(result) + + if result_bboxes is not None: + return result, result_bboxes + return result + +def random_augment(image, bboxes=None, + brightness_delta=50, + contrast_range=(0.5, 1.5), + hue_delta=18, + saturation_range=(0.5, 1.5), + angle_range=(-30, 30), + flip_p=0.5, + scale_range=(0.8, 1.2), + translate_range=(0.1, 0.1), + crop_range=(0.8, 1.0), + noise_p=0.3, + cutout_p=0.3): + """ + Apply a comprehensive set of random augmentations + + Args: + image: Input image + bboxes: Optional bounding boxes + brightness_delta: Brightness change range + contrast_range: Contrast multiplier range + hue_delta: Hue change range + saturation_range: Saturation multiplier range + angle_range: Rotation angle range + flip_p: Probability of applying flip + scale_range: Scale range + translate_range: Translation range as fraction + crop_range: Crop range as fraction + noise_p: Probability of adding noise + cutout_p: Probability of applying cutout + + Returns: + Augmented image (and optionally augmented bounding boxes) + """ + result = image + result_bboxes = bboxes + + if random.random() < 0.5: + if result_bboxes is not None: + result, result_bboxes = random_flip(result, result_bboxes) + else: + result = random_flip(result) + + if random.random() < 0.5: + if result_bboxes is not None: + result, result_bboxes = random_rotation(result, angle_range, result_bboxes) + else: + result = random_rotation(result, angle_range) + + if random.random() < 0.5: + result = random_brightness_contrast(result, brightness_delta, + contrast_range[0], contrast_range[1]) + + if random.random() < 0.5: + result = random_hsv(result, hue_delta, saturation_range[0], saturation_range[1]) + + if random.random() < noise_p: + if random.random() < 0.5: + result = add_gaussian_noise(result) + else: + result = add_salt_pepper_noise(result) + + if random.random() < cutout_p: + result = cutout(result) + + if result_bboxes is not None: + return result, result_bboxes + return result diff --git a/imutils/bbox.py b/imutils/bbox.py new file mode 100644 index 0000000..57b42df --- /dev/null +++ b/imutils/bbox.py @@ -0,0 +1,305 @@ +import numpy as np +import cv2 + +def xywh_to_xyxy(box): + """ + Convert [x, y, w, h] format to [xmin, ymin, xmax, ymax] format + + Args: + box: Single bounding box or multiple bounding boxes (N, 4) + + Returns: + Converted bounding box(es) + """ + box = np.array(box, dtype=np.float32) + if box.ndim == 1: + x, y, w, h = box + return np.array([x, y, x + w, y + h], dtype=np.float32) + else: + x, y, w, h = box[:, 0], box[:, 1], box[:, 2], box[:, 3] + return np.column_stack([x, y, x + w, y + h]) + +def xyxy_to_xywh(box): + """ + Convert [xmin, ymin, xmax, ymax] format to [x, y, w, h] format + + Args: + box: Single bounding box or multiple bounding boxes (N, 4) + + Returns: + Converted bounding box(es) + """ + box = np.array(box, dtype=np.float32) + if box.ndim == 1: + xmin, ymin, xmax, ymax = box + return np.array([xmin, ymin, xmax - xmin, ymax - ymin], dtype=np.float32) + else: + xmin, ymin, xmax, ymax = box[:, 0], box[:, 1], box[:, 2], box[:, 3] + return np.column_stack([xmin, ymin, xmax - xmin, ymax - ymin]) + +def compute_iou(boxA, boxB): + """ + Compute IoU (Intersection over Union) between two bounding boxes + + Args: + boxA: Bounding box in [xmin, ymin, xmax, ymax] format + boxB: Bounding box in [xmin, ymin, xmax, ymax] format + + Returns: + IoU value (0.0 - 1.0) + """ + xA = max(boxA[0], boxB[0]) + yA = max(boxA[1], boxB[1]) + xB = min(boxA[2], boxB[2]) + yB = min(boxA[3], boxB[3]) + + interW = max(0, xB - xA) + interH = max(0, yB - yA) + interArea = interW * interH + + boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) + boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) + + iou = interArea / float(boxAArea + boxBArea - interArea + 1e-7) + return iou + +def compute_iou_matrix(boxesA, boxesB): + """ + Compute IoU matrix between two sets of bounding boxes + + Args: + boxesA: First set of boxes (N, 4) + boxesB: Second set of boxes (M, 4) + + Returns: + IoU matrix of shape (N, M) + """ + boxesA = np.array(boxesA) + boxesB = np.array(boxesB) + + iou_matrix = np.zeros((len(boxesA), len(boxesB))) + for i, boxA in enumerate(boxesA): + for j, boxB in enumerate(boxesB): + iou_matrix[i, j] = compute_iou(boxA, boxB) + return iou_matrix + +def draw_bbox(image, box, label=None, color=(0, 255, 0), thickness=2, + font_scale=0.6, font_thickness=2): + """ + Draw a labeled bounding box on an image + + Args: + image: OpenCV image + box: Bounding box in [xmin, ymin, xmax, ymax] format + label: Optional label text + color: Box color + thickness: Box line thickness + font_scale: Font scale for label + font_thickness: Font thickness for label + + Returns: + Image with bounding box drawn + """ + xmin, ymin, xmax, ymax = map(int, box) + + cv2.rectangle(image, (xmin, ymin), (xmax, ymax), color, thickness) + + if label: + (text_w, text_h), baseline = cv2.getTextSize( + label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness + ) + cv2.rectangle(image, (xmin, ymin - text_h - baseline - 5), + (xmin + text_w, ymin), color, -1) + cv2.putText(image, label, (xmin, ymin - baseline - 2), + cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness) + + return image + +def draw_bboxes(image, boxes, labels=None, colors=None, thickness=2, + font_scale=0.6, font_thickness=2): + """ + Draw multiple labeled bounding boxes on an image + + Args: + image: OpenCV image + boxes: List of bounding boxes in [xmin, ymin, xmax, ymax] format + labels: Optional list of label texts + colors: Optional list of colors (or single color for all) + thickness: Box line thickness + font_scale: Font scale for labels + font_thickness: Font thickness for labels + + Returns: + Image with bounding boxes drawn + """ + if labels is None: + labels = [None] * len(boxes) + + if colors is None: + colors = [(0, 255, 0)] * len(boxes) + elif isinstance(colors, tuple): + colors = [colors] * len(boxes) + + for box, label, color in zip(boxes, labels, colors): + image = draw_bbox(image, box, label, color, thickness, font_scale, font_thickness) + + return image + +def scale_bbox(box, scale, image_shape=None): + """ + Scale bounding box coordinates + + Args: + box: Bounding box in [xmin, ymin, xmax, ymax] format (single or multiple) + scale: Scale factor or (scale_x, scale_y) tuple + image_shape: Optional original image shape (h, w) or (h, w, c) + If provided along with target size as scale, computes scale automatically + + Returns: + Scaled bounding box + """ + box = np.array(box, dtype=np.float32) + + if image_shape is not None: + orig_h, orig_w = image_shape[:2] + if isinstance(scale, (tuple, list)) and len(scale) == 2: + new_w, new_h = scale + scale_x = new_w / orig_w + scale_y = new_h / orig_h + else: + scale_x = scale_y = scale + else: + if isinstance(scale, (tuple, list)) and len(scale) == 2: + scale_x, scale_y = scale + else: + scale_x = scale_y = scale + + if box.ndim == 1: + box[0] *= scale_x + box[1] *= scale_y + box[2] *= scale_x + box[3] *= scale_y + else: + box[:, 0] *= scale_x + box[:, 1] *= scale_y + box[:, 2] *= scale_x + box[:, 3] *= scale_y + + return box + +def clip_bbox(box, image_width, image_height): + """ + Clip bounding box to image boundaries + + Args: + box: Bounding box in [xmin, ymin, xmax, ymax] format (single or multiple) + image_width: Image width + image_height: Image height + + Returns: + Clipped bounding box + """ + box = np.array(box, dtype=np.float32) + + if box.ndim == 1: + box[0] = max(0, box[0]) + box[1] = max(0, box[1]) + box[2] = min(image_width, box[2]) + box[3] = min(image_height, box[3]) + else: + box[:, 0] = np.clip(box[:, 0], 0, image_width) + box[:, 1] = np.clip(box[:, 1], 0, image_height) + box[:, 2] = np.clip(box[:, 2], 0, image_width) + box[:, 3] = np.clip(box[:, 3], 0, image_height) + + return box + +def bbox_area(box): + """ + Compute bounding box area + + Args: + box: Bounding box in [xmin, ymin, xmax, ymax] format (single or multiple) + + Returns: + Area of bounding box + """ + box = np.array(box, dtype=np.float32) + + if box.ndim == 1: + return max(0, box[2] - box[0]) * max(0, box[3] - box[1]) + else: + return np.maximum(0, box[:, 2] - box[:, 0]) * np.maximum(0, box[:, 3] - box[:, 1]) + +def bbox_center(box): + """ + Compute bounding box center + + Args: + box: Bounding box in [xmin, ymin, xmax, ymax] format + + Returns: + Center point (cx, cy) + """ + box = np.array(box, dtype=np.float32) + + if box.ndim == 1: + return np.array([(box[0] + box[2]) / 2, (box[1] + box[3]) / 2]) + else: + cx = (box[:, 0] + box[:, 2]) / 2 + cy = (box[:, 1] + box[:, 3]) / 2 + return np.column_stack([cx, cy]) + +def normalize_bbox(box, image_width, image_height): + """ + Normalize bounding box coordinates to [0, 1] range + + Args: + box: Bounding box in [xmin, ymin, xmax, ymax] format + image_width: Image width + image_height: Image height + + Returns: + Normalized bounding box + """ + box = np.array(box, dtype=np.float32) + + if box.ndim == 1: + box[0] /= image_width + box[1] /= image_height + box[2] /= image_width + box[3] /= image_height + else: + box[:, 0] /= image_width + box[:, 1] /= image_height + box[:, 2] /= image_width + box[:, 3] /= image_height + + return box + +def denormalize_bbox(box, image_width, image_height): + """ + Denormalize bounding box coordinates from [0, 1] range to pixel coordinates + + Args: + box: Normalized bounding box in [xmin, ymin, xmax, ymax] format + image_width: Image width + image_height: Image height + + Returns: + Denormalized bounding box in pixel coordinates + """ + box = np.array(box, dtype=np.float32) + + if box.ndim == 1: + box[0] *= image_width + box[1] *= image_height + box[2] *= image_width + box[3] *= image_height + else: + box[:, 0] *= image_width + box[:, 1] *= image_height + box[:, 2] *= image_width + box[:, 3] *= image_height + + return box diff --git a/imutils/convenience.py b/imutils/convenience.py index 9704eb2..d9da4d1 100644 --- a/imutils/convenience.py +++ b/imutils/convenience.py @@ -62,35 +62,49 @@ def rotate_bound(image, angle): # perform the actual rotation and return the image return cv2.warpAffine(image, M, (nW, nH)) -def resize(image, width=None, height=None, inter=cv2.INTER_AREA): - # initialize the dimensions of the image to be resized and - # grab the image size +def resize(image, width=None, height=None, inter=cv2.INTER_AREA, bboxes=None): + """ + Resize an image and optionally scale bounding boxes accordingly. + + Args: + image: Input image + width: Target width (if None, computed from height) + height: Target height (if None, computed from width) + inter: Interpolation method for resizing + bboxes: Optional list of bounding boxes in [xmin, ymin, xmax, ymax] format + that will be scaled along with the image + + Returns: + If bboxes is None: returns resized image + If bboxes is provided: returns tuple of (resized_image, scaled_bboxes) + """ dim = None (h, w) = image.shape[:2] - # if both the width and height are None, then return the - # original image if width is None and height is None: - return image + if bboxes is None: + return image + return image, bboxes - # check to see if the width is None if width is None: - # calculate the ratio of the height and construct the - # dimensions r = height / float(h) dim = (int(w * r), height) - - # otherwise, the height is None else: - # calculate the ratio of the width and construct the - # dimensions r = width / float(w) dim = (width, int(h * r)) - # resize the image resized = cv2.resize(image, dim, interpolation=inter) - # return the resized image + if bboxes is not None: + import numpy as np + bboxes = np.array(bboxes, dtype=np.float32) + scaled_bboxes = bboxes.copy() + scaled_bboxes[:, 0] = bboxes[:, 0] * r + scaled_bboxes[:, 1] = bboxes[:, 1] * r + scaled_bboxes[:, 2] = bboxes[:, 2] * r + scaled_bboxes[:, 3] = bboxes[:, 3] * r + return resized, scaled_bboxes + return resized def skeletonize(image, size, structuring=cv2.MORPH_RECT): @@ -153,8 +167,8 @@ def auto_canny(image, sigma=0.33): def grab_contours(cnts): # if the length the contours tuple returned by cv2.findContours - # is '2' then we are using either OpenCV v2.4, v4-beta, or - # v4-official + # is '2' then we are using either OpenCV v2.4, v4-beta, v4-official, + # or v5.x (OpenCV 5 maintains the same signature as OpenCV 4) if len(cnts) == 2: cnts = cnts[0] @@ -207,6 +221,17 @@ def is_cv4(or_better=False): # otherwise we want to check for *strictly* OpenCV 4 return major == 4 +def is_cv5(or_better=False): + # grab the OpenCV major version number + major = get_opencv_major_version() + + # check to see if we are using *at least* OpenCV 5 + if or_better: + return major >= 5 + + # otherwise we want to check for *strictly* OpenCV 5 + return major == 5 + def get_opencv_major_version(lib=None): # if the supplied library is None, import OpenCV if lib is None: diff --git a/imutils/feature/factories.py b/imutils/feature/factories.py index 12d3719..23aa941 100755 --- a/imutils/feature/factories.py +++ b/imutils/feature/factories.py @@ -1,12 +1,99 @@ -from ..convenience import is_cv2 +from ..convenience import is_cv2, is_cv3, is_cv4, is_cv5 import cv2 from .dense import DENSE from .gftt import GFTT from .harris import HARRIS from .rootsift import RootSIFT +_CONTRIB_FUNCS = {"SURF", "STAR", "BRIEF", "FREAK"} + +def _build_detector_factory(): + """Build detector factory based on OpenCV version and available modules.""" + factory = { + "BRISK": cv2.BRISK_create, + "DENSE": DENSE, + "GFTT": GFTT, + "HARRIS": HARRIS, + "ORB": cv2.ORB_create, + } + + if hasattr(cv2, 'FastFeatureDetector_create'): + factory["FAST"] = cv2.FastFeatureDetector_create + + if hasattr(cv2, 'MSER_create'): + factory["MSER"] = cv2.MSER_create + + if hasattr(cv2, 'SIFT_create'): + factory["SIFT"] = cv2.SIFT_create + elif hasattr(cv2, 'xfeatures2d') and hasattr(cv2.xfeatures2d, 'SIFT_create'): + factory["SIFT"] = cv2.xfeatures2d.SIFT_create + else: + _CONTRIB_FUNCS.add("SIFT") + + if hasattr(cv2, 'xfeatures2d'): + if hasattr(cv2.xfeatures2d, 'SURF_create'): + factory["SURF"] = cv2.xfeatures2d.SURF_create + if hasattr(cv2.xfeatures2d, 'StarDetector_create'): + factory["STAR"] = cv2.xfeatures2d.StarDetector_create + + return factory + +def _build_extractor_factory(): + """Build extractor factory based on OpenCV version and available modules.""" + factory = { + "ROOTSIFT": RootSIFT, + "ORB": cv2.ORB_create, + "BRISK": cv2.BRISK_create, + } + + if hasattr(cv2, 'SIFT_create'): + factory["SIFT"] = cv2.SIFT_create + elif hasattr(cv2, 'xfeatures2d') and hasattr(cv2.xfeatures2d, 'SIFT_create'): + factory["SIFT"] = cv2.xfeatures2d.SIFT_create + else: + _CONTRIB_FUNCS.add("SIFT") + + if hasattr(cv2, 'xfeatures2d'): + if hasattr(cv2.xfeatures2d, 'SURF_create'): + factory["SURF"] = cv2.xfeatures2d.SURF_create + if hasattr(cv2.xfeatures2d, 'BriefDescriptorExtractor_create'): + factory["BRIEF"] = cv2.xfeatures2d.BriefDescriptorExtractor_create + if hasattr(cv2.xfeatures2d, 'FREAK_create'): + factory["FREAK"] = cv2.xfeatures2d.FREAK_create + + return factory + +def _build_matcher_factory(): + """Build matcher factory based on OpenCV version.""" + if is_cv2(): + return { + "BruteForce": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE, + "BruteForce-SL2": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_SL2, + "BruteForce-L1": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_L1, + "BruteForce-Hamming": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING, + "FlannBased": cv2.DESCRIPTOR_MATCHER_FLANNBASED + } + else: + return { + "BruteForce": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE, + "BruteForce-SL2": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_SL2, + "BruteForce-L1": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_L1, + "BruteForce-Hamming": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING, + "FlannBased": cv2.DESCRIPTOR_MATCHER_FLANNBASED + } + +_DETECTOR_FACTORY = _build_detector_factory() +_EXTRACTOR_FACTORY = _build_extractor_factory() +_MATCHER_FACTORY = _build_matcher_factory() + if is_cv2(): def FeatureDetector_create(method): + """ + Create a feature detector (OpenCV 2.x version). + + :param method: string of the type of keypoint detector to return + :return: the key point detector object + """ method = method.upper() if method == "DENSE": return DENSE() @@ -17,64 +104,31 @@ def FeatureDetector_create(method): return cv2.FeatureDetector_create(method) def DescriptorExtractor_create(method): + """ + Create a descriptor extractor (OpenCV 2.x version). + + :param method: string of the type of descriptor extractor to return + :return: the descriptor extractor object + """ method = method.upper() if method == "ROOTSIFT": return RootSIFT() return cv2.DescriptorExtractor_create(method) def DescriptorMatcher_create(method): + """ + Create a descriptor matcher (OpenCV 2.x version). + + :param method: string of the type of descriptor matcher to return + :return: the matcher object + """ return cv2.DescriptorMatcher_create(method) else: - try: - _DETECTOR_FACTORY = {"BRISK": cv2.BRISK_create, - "DENSE": DENSE, - "FAST": cv2.FastFeatureDetector_create, - "GFTT": GFTT, - "HARRIS": HARRIS, - "MSER": cv2.MSER_create, - "ORB": cv2.ORB_create, - "SIFT": cv2.xfeatures2d.SIFT_create, - "SURF": cv2.xfeatures2d.SURF_create, - "STAR": cv2.xfeatures2d.StarDetector_create - } - - _EXTRACTOR_FACTORY = {"SIFT": cv2.xfeatures2d.SIFT_create, - "ROOTSIFT": RootSIFT, - "SURF": cv2.xfeatures2d.SURF_create, - "BRIEF": cv2.xfeatures2d.BriefDescriptorExtractor_create, - "ORB": cv2.ORB_create, - "BRISK": cv2.BRISK_create, - "FREAK": cv2.xfeatures2d.FREAK_create - } - - _MATCHER_FACTORY = {"BruteForce": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE, - "BruteForce-SL2": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_SL2, - "BruteForce-L1": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_L1, - "BruteForce-Hamming": cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING, - "FlannBased": cv2.DESCRIPTOR_MATCHER_FLANNBASED - } - - except AttributeError: - _DETECTOR_FACTORY = {"MSER": cv2.MSER_create, - "FAST": cv2.FastFeatureDetector_create, - "BRISK": cv2.BRISK_create, - "ORB": cv2.ORB_create, - "GFTT": GFTT, - "HARRIS": HARRIS, - "DENSE": DENSE - } - - _EXTRACTOR_FACTORY = {"ORB": cv2.ORB_create, - "BRISK": cv2.BRISK_create - } - - _CONTRIB_FUNCS = {"SIFT", "ROOTSIFT", "SURF", "STAR", "BRIEF", "FREAK"} - - def FeatureDetector_create(detector, *args, **kw_args): """ - + Create a feature detector (OpenCV 3.x/4.x/5.x version). + :param detector: string of the type of keypoint detector to return :param args: positional arguments for detector :param kw_args: keyword arguments for detector @@ -90,14 +144,14 @@ def FeatureDetector_create(detector, *args, **kw_args): return detr(*args, **kw_args) - def DescriptorExtractor_create(extractor, *args, **kw_args): """ - + Create a descriptor extractor (OpenCV 3.x/4.x/5.x version). + :param extractor: string of the type of descriptor extractor to return :param args: positional arguments for extractor :param kw_args: keyword arguments for extractor - :return: the key extractor object + :return: the descriptor extractor object """ try: extr = _EXTRACTOR_FACTORY[extractor.upper()] @@ -111,9 +165,10 @@ def DescriptorExtractor_create(extractor, *args, **kw_args): def DescriptorMatcher_create(matcher): """ - + Create a descriptor matcher (OpenCV 3.x/4.x/5.x version). + :param matcher: string of the type of descriptor matcher to return - :return: the matcher int + :return: the matcher object """ try: extr = _MATCHER_FACTORY[matcher] diff --git a/imutils/video/__init__.py b/imutils/video/__init__.py index 4c0bd33..c874bd4 100644 --- a/imutils/video/__init__.py +++ b/imutils/video/__init__.py @@ -3,4 +3,5 @@ from .fps import FPS from .videostream import VideoStream from .webcamvideostream import WebcamVideoStream -from .filevideostream import FileVideoStream \ No newline at end of file +from .filevideostream import FileVideoStream +from .networkvideostream import NetworkVideoStream \ No newline at end of file diff --git a/imutils/video/networkvideostream.py b/imutils/video/networkvideostream.py new file mode 100644 index 0000000..c9a206a --- /dev/null +++ b/imutils/video/networkvideostream.py @@ -0,0 +1,177 @@ +from threading import Thread, Lock, Event +import cv2 +import time +import logging +import queue + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class NetworkVideoStream: + """ + Enhanced video stream class specifically for network streams (RTSP/HTTP) + Provides robust network fault tolerance capabilities + """ + + def __init__(self, src, name="NetworkVideoStream", + buffer_size=128, reconnect_attempts=10, + reconnect_delay=3.0, read_timeout=5.0, + max_lag_threshold=2.0): + """ + Initialize network video stream + + Args: + src: RTSP/HTTP video stream URL + name: Thread name + buffer_size: Frame buffer size + reconnect_attempts: Maximum reconnection attempts + reconnect_delay: Reconnection delay in seconds + read_timeout: Read timeout in seconds + max_lag_threshold: Maximum allowed lag in seconds, skip frames if exceeded + """ + self.src = src + self.name = name + self.buffer_size = buffer_size + self.reconnect_attempts = reconnect_attempts + self.reconnect_delay = reconnect_delay + self.read_timeout = read_timeout + self.max_lag_threshold = max_lag_threshold + + self.stream = None + self.stopped = False + self.frame_buffer = queue.Queue(maxsize=buffer_size) + self.lock = Lock() + self.connected = Event() + self.connection_timeouts = 0 + self.frames_dropped = 0 + self.total_frames = 0 + + self._connect() + + def _connect(self): + """Establish network video stream connection""" + if self.stream is not None: + self.stream.release() + + logger.info(f"Connecting to network stream: {self.src}") + + self.stream = cv2.VideoCapture(self.src) + + if not self.stream.isOpened(): + logger.error(f"Failed to open network stream: {self.src}") + self.connected.clear() + return False + + ret, frame = self.stream.read() + if not ret or frame is None: + logger.error("Failed to read initial frame from network stream") + self.connected.clear() + return False + + self.connected.set() + logger.info("Network stream connected successfully") + return True + + def _reconnect_loop(self): + """Reconnection loop""" + attempt = 0 + while attempt < self.reconnect_attempts and not self.stopped: + attempt += 1 + logger.warning(f"Reconnection attempt {attempt}/{self.reconnect_attempts}") + + if self._connect(): + self.connection_timeouts += 1 + return True + + time.sleep(self.reconnect_delay) + + logger.error("All reconnection attempts failed") + return False + + def start(self): + """Start the video stream""" + t = Thread(target=self._update, name=self.name, args=()) + t.daemon = True + t.start() + + self.connected.wait(timeout=self.read_timeout) + return self + + def _update(self): + """Main frame reading loop""" + last_read_time = time.time() + + while not self.stopped: + if not self.connected.is_set(): + if not self._reconnect_loop(): + time.sleep(1.0) + continue + + try: + ret, frame = self.stream.read() + + if not ret or frame is None: + logger.warning("Frame read failed, connection may be lost") + self.connected.clear() + continue + + current_time = time.time() + lag = current_time - last_read_time + last_read_time = current_time + self.total_frames += 1 + + if lag > self.max_lag_threshold: + self.frames_dropped += 1 + while self.frame_buffer.qsize() > 1: + try: + self.frame_buffer.get_nowait() + except queue.Empty: + break + + if self.frame_buffer.full(): + try: + self.frame_buffer.get_nowait() + self.frames_dropped += 1 + except queue.Empty: + pass + + self.frame_buffer.put(frame) + + except Exception as e: + logger.error(f"Error in frame reading: {e}") + self.connected.clear() + time.sleep(0.1) + + def read(self): + """Read the latest frame""" + if not self.connected.is_set(): + return None + + try: + return self.frame_buffer.get(timeout=self.read_timeout) + except queue.Empty: + logger.warning("Frame buffer empty, possible connection issue") + return None + + def get_stats(self): + """Get stream statistics""" + return { + "connected": self.connected.is_set(), + "buffer_size": self.frame_buffer.qsize(), + "total_frames": self.total_frames, + "frames_dropped": self.frames_dropped, + "connection_timeouts": self.connection_timeouts + } + + def stop(self): + """Stop the video stream""" + self.stopped = True + self.connected.clear() + if self.stream is not None: + self.stream.release() + + def __enter__(self): + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() diff --git a/imutils/video/videostream.py b/imutils/video/videostream.py index 4f9de0b..b3d788d 100644 --- a/imutils/video/videostream.py +++ b/imutils/video/videostream.py @@ -1,39 +1,62 @@ -# import the necessary packages from .webcamvideostream import WebcamVideoStream class VideoStream: - def __init__(self, src=0, usePiCamera=False, resolution=(320, 240), - framerate=32, **kwargs): - # check to see if the picamera module should be used - if usePiCamera: - # only import the picamera packages unless we are - # explicity told to do so -- this helps remove the - # requirement of `picamera[array]` from desktops or - # laptops that still want to use the `imutils` package - from .pivideostream import PiVideoStream - - # initialize the picamera stream and allow the camera - # sensor to warmup - self.stream = PiVideoStream(resolution=resolution, - framerate=framerate, **kwargs) - - # otherwise, we are using OpenCV so initialize the webcam - # stream - else: - self.stream = WebcamVideoStream(src=src) - - def start(self): - # start the threaded video stream - return self.stream.start() - - def update(self): - # grab the next frame from the stream - self.stream.update() - - def read(self): - # return the current frame - return self.stream.read() - - def stop(self): - # stop the thread and release any resources - self.stream.stop() + def __init__(self, src=0, usePiCamera=False, resolution=(320, 240), + framerate=32, useNetworkStream=False, **kwargs): + """ + Initialize video stream with automatic source detection + + Args: + src: Video source (camera index, RTSP/HTTP URL, etc.) + usePiCamera: Whether to use Raspberry Pi camera + resolution: Resolution for Pi camera + framerate: Framerate for Pi camera + useNetworkStream: Force use of enhanced network stream mode + **kwargs: Additional arguments passed to underlying stream class + """ + if usePiCamera: + from .pivideostream import PiVideoStream + self.stream = PiVideoStream(resolution=resolution, + framerate=framerate, **kwargs) + elif useNetworkStream or (isinstance(src, str) and + (src.startswith('rtsp://') or + src.startswith('http://') or + src.startswith('https://'))): + from .networkvideostream import NetworkVideoStream + self.stream = NetworkVideoStream(src=src, **kwargs) + else: + self.stream = WebcamVideoStream(src=src, **kwargs) + + def start(self): + """Start the threaded video stream""" + return self.stream.start() + + def update(self): + """Grab the next frame from the stream""" + self.stream.update() + + def read(self): + """Return the current frame""" + return self.stream.read() + + def stop(self): + """Stop the thread and release any resources""" + self.stream.stop() + + def is_connected(self): + """Check connection status""" + if hasattr(self.stream, 'is_connected'): + return self.stream.is_connected() + return True + + def get_stats(self): + """Get stream statistics""" + if hasattr(self.stream, 'get_stats'): + return self.stream.get_stats() + return None + + def __enter__(self): + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() diff --git a/imutils/video/webcamvideostream.py b/imutils/video/webcamvideostream.py index dbe8751..6e7a858 100644 --- a/imutils/video/webcamvideostream.py +++ b/imutils/video/webcamvideostream.py @@ -1,42 +1,159 @@ -# import the necessary packages -from threading import Thread +from threading import Thread, Lock import cv2 +import time +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) class WebcamVideoStream: - def __init__(self, src=0, name="WebcamVideoStream"): - # initialize the video camera stream and read the first frame - # from the stream - self.stream = cv2.VideoCapture(src) - (self.grabbed, self.frame) = self.stream.read() - - # initialize the thread name - self.name = name - - # initialize the variable used to indicate if the thread should - # be stopped - self.stopped = False - - def start(self): - # start the thread to read frames from the video stream - t = Thread(target=self.update, name=self.name, args=()) - t.daemon = True - t.start() - return self - - def update(self): - # keep looping infinitely until the thread is stopped - while True: - # if the thread indicator variable is set, stop the thread - if self.stopped: - return - - # otherwise, read the next frame from the stream - (self.grabbed, self.frame) = self.stream.read() - - def read(self): - # return the frame most recently read - return self.frame - - def stop(self): - # indicate that the thread should be stopped - self.stopped = True + def __init__(self, src=0, name="WebcamVideoStream", + reconnect_attempts=5, reconnect_delay=2.0, + timeout=None, skip_frames_on_lag=True): + """ + Initialize video stream with enhanced fault tolerance + + Args: + src: Video source (camera index, RTSP/HTTP URL, etc.) + name: Thread name + reconnect_attempts: Number of reconnection attempts on disconnect + reconnect_delay: Delay between reconnection attempts in seconds + timeout: Read timeout in seconds + skip_frames_on_lag: Whether to skip frames when lagging behind + """ + self.src = src + self.name = name + self.reconnect_attempts = reconnect_attempts + self.reconnect_delay = reconnect_delay + self.timeout = timeout + self.skip_frames_on_lag = skip_frames_on_lag + + self.stream = None + self.grabbed = False + self.frame = None + self.stopped = False + self.lock = Lock() + self.last_frame_time = time.time() + self.connection_lost = False + self.reconnect_count = 0 + + self._initialize_stream() + + def _initialize_stream(self): + """Initialize or reinitialize video stream connection""" + if self.stream is not None: + self.stream.release() + + self.stream = cv2.VideoCapture(self.src) + + if self.timeout is not None: + self.stream.set(cv2.CAP_PROP_FPS, 30) + + (self.grabbed, self.frame) = self.stream.read() + + if not self.grabbed: + logger.warning(f"Failed to read initial frame from source: {self.src}") + return False + + self.connection_lost = False + self.last_frame_time = time.time() + logger.info(f"Successfully connected to video source: {self.src}") + return True + + def _attempt_reconnect(self): + """Attempt to reconnect to video source""" + self.connection_lost = True + self.reconnect_count = 0 + + while self.reconnect_count < self.reconnect_attempts and not self.stopped: + self.reconnect_count += 1 + logger.warning(f"Attempting reconnect {self.reconnect_count}/{self.reconnect_attempts}") + + if self._initialize_stream(): + logger.info("Reconnection successful!") + return True + + time.sleep(self.reconnect_delay) + + logger.error(f"Failed to reconnect after {self.reconnect_attempts} attempts") + return False + + def start(self): + """Start the video stream thread""" + t = Thread(target=self.update, name=self.name, args=()) + t.daemon = True + t.start() + return self + + def update(self): + """Main loop for continuously reading frames""" + consecutive_failures = 0 + max_consecutive_failures = 10 + + while True: + if self.stopped: + return + + if self.connection_lost: + if not self._attempt_reconnect(): + time.sleep(1.0) + continue + + try: + (grabbed, frame) = self.stream.read() + + if not grabbed: + consecutive_failures += 1 + + if consecutive_failures >= max_consecutive_failures: + logger.warning("Multiple consecutive frame read failures, attempting reconnect") + self.connection_lost = True + consecutive_failures = 0 + + time.sleep(0.01) + continue + + consecutive_failures = 0 + + with self.lock: + self.grabbed = grabbed + self.frame = frame + self.last_frame_time = time.time() + + if self.skip_frames_on_lag: + lag = time.time() - self.last_frame_time + if lag > 0.1: + for _ in range(2): + self.stream.grab() + + except Exception as e: + logger.error(f"Error reading frame: {e}") + self.connection_lost = True + time.sleep(0.1) + + def read(self): + """Read the latest frame""" + with self.lock: + if self.frame is None: + return None + return self.frame.copy() if self.frame is not None else None + + def is_connected(self): + """Check connection status""" + return not self.connection_lost and self.grabbed + + def get_reconnect_count(self): + """Get number of reconnection attempts""" + return self.reconnect_count + + def stop(self): + """Stop the video stream""" + self.stopped = True + if self.stream is not None: + self.stream.release() + + def __enter__(self): + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop()