From efb229b31e1da70a6d75f191dfa8c91ab6adc43d Mon Sep 17 00:00:00 2001 From: lwk <3098293798@qq.com> Date: Wed, 11 Mar 2026 23:03:05 +0800 Subject: [PATCH] feat(glm): DeepSORT reconstruction with torchreid & strong classes separation --- .DS_Store | Bin 0 -> 6148 bytes deep_sort/__init__.py | 28 +++- deep_sort/detection.py | 42 ++--- deep_sort/nn_matching.py | 137 ++-------------- deep_sort/track.py | 137 ++++------------ deep_sort/track_interpolation.py | 154 ++++++++++++++++++ deep_sort/tracker.py | 156 +++++++++++------- deep_sort/yolo_detector.py | 270 +++++++++++++++++++++++++++++++ requirements.txt | 5 + run_tracking.py | 80 +++++++++ 10 files changed, 683 insertions(+), 326 deletions(-) create mode 100644 .DS_Store create mode 100644 deep_sort/track_interpolation.py create mode 100644 deep_sort/yolo_detector.py create mode 100644 run_tracking.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..ab45feac3f3aa2c0f1621bcaa2df3ffda29ef823 GIT binary patch literal 6148 zcmeHKJx;?g6n>^bDzJ2_m@1zDsS^`YsKSVZ)B^x*P(eybMB6<(XMi(s0cJ*ELJW)? zfE(~V+buQ?1r}7H_aggq-cR217sZZ=NOdM@o2WrVbu`9k9bJX6o!gwOI1L9YenvKE zcl-IUnDZ2cFdz&pBLn8cr!5-L8I6PQS3U2b;Amw`vPj2)I1&C-F+907n4)J8Do zUkZ-V4r7OdwCAYWEO^)|kjdp~_!hWSi6N1id$7aA= c@g|xP*fbA-vBT0LG!Xd_Ff>Rf4E!kr?|!zVv;Y7A literal 0 HcmV?d00001 diff --git a/deep_sort/__init__.py b/deep_sort/__init__.py index 43e08fb8a..a7860613f 100644 --- a/deep_sort/__init__.py +++ b/deep_sort/__init__.py @@ -1 +1,27 @@ -# vim: expandtab:ts=4:sw=4 +from .detection import Detection +from .kalman_filter import KalmanFilter +from .linear_assignment import min_cost_matching, matching_cascade, gate_cost_matrix +from .iou_matching import iou, iou_cost +from .nn_matching import NearestNeighborDistanceMetric +from .track import Track, TrackState +from .tracker import Tracker +from .yolo_detector import YOLOv8Detector, ReIDExtractor, YOLOv8DeepSORT +from .track_interpolation import TrackInterpolator + +__all__ = [ + 'Detection', + 'KalmanFilter', + 'min_cost_matching', + 'matching_cascade', + 'gate_cost_matrix', + 'iou', + 'iou_cost', + 'NearestNeighborDistanceMetric', + 'Track', + 'TrackState', + 'Tracker', + 'YOLOv8Detector', + 'ReIDExtractor', + 'YOLOv8DeepSORT', + 'TrackInterpolator' +] diff --git a/deep_sort/detection.py b/deep_sort/detection.py index 97cd39d07..777098c5c 100644 --- a/deep_sort/detection.py +++ b/deep_sort/detection.py @@ -1,49 +1,25 @@ -# vim: expandtab:ts=4:sw=4 import numpy as np -class Detection(object): - """ - This class represents a bounding box detection in a single image. - - Parameters - ---------- - tlwh : array_like - Bounding box in format `(x, y, w, h)`. - confidence : float - Detector confidence score. - feature : array_like - A feature vector that describes the object contained in this image. - - Attributes - ---------- - tlwh : ndarray - Bounding box in format `(top left x, top left y, width, height)`. - confidence : ndarray - Detector confidence score. - feature : ndarray | NoneType - A feature vector that describes the object contained in this image. - - """ - - def __init__(self, tlwh, confidence, feature): +class Detection: + def __init__(self, tlwh, confidence, feature, class_id=0, class_name=None): self.tlwh = np.asarray(tlwh, dtype=np.float64) self.confidence = float(confidence) - self.feature = np.asarray(feature, dtype=np.float32) + self.feature = np.asarray(feature, dtype=np.float32) if feature is not None else None + self.class_id = int(class_id) + self.class_name = class_name def to_tlbr(self): - """Convert bounding box to format `(min x, min y, max x, max y)`, i.e., - `(top left, bottom right)`. - """ ret = self.tlwh.copy() ret[2:] += ret[:2] return ret def to_xyah(self): - """Convert bounding box to format `(center x, center y, aspect ratio, - height)`, where the aspect ratio is `width / height`. - """ ret = self.tlwh.copy() ret[:2] += ret[2:] / 2 ret[2] /= ret[3] return ret + + def to_xyxy(self): + ret = self.to_tlbr() + return ret diff --git a/deep_sort/nn_matching.py b/deep_sort/nn_matching.py index 2e7bfea4b..d17279a0a 100644 --- a/deep_sort/nn_matching.py +++ b/deep_sort/nn_matching.py @@ -1,24 +1,7 @@ -# vim: expandtab:ts=4:sw=4 import numpy as np def _pdist(a, b): - """Compute pair-wise squared distance between points in `a` and `b`. - - Parameters - ---------- - a : array_like - An NxM matrix of N samples of dimensionality M. - b : array_like - An LxM matrix of L samples of dimensionality M. - - Returns - ------- - ndarray - Returns a matrix of size len(a), len(b) such that eleement (i, j) - contains the squared distance between `a[i]` and `b[j]`. - - """ a, b = np.asarray(a), np.asarray(b) if len(a) == 0 or len(b) == 0: return np.zeros((len(a), len(b))) @@ -29,100 +12,28 @@ def _pdist(a, b): def _cosine_distance(a, b, data_is_normalized=False): - """Compute pair-wise cosine distance between points in `a` and `b`. - - Parameters - ---------- - a : array_like - An NxM matrix of N samples of dimensionality M. - b : array_like - An LxM matrix of L samples of dimensionality M. - data_is_normalized : Optional[bool] - If True, assumes rows in a and b are unit length vectors. - Otherwise, a and b are explicitly normalized to lenght 1. - - Returns - ------- - ndarray - Returns a matrix of size len(a), len(b) such that eleement (i, j) - contains the squared distance between `a[i]` and `b[j]`. - - """ if not data_is_normalized: - a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True) - b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) + a = np.asarray(a) + b = np.asarray(b) + if len(a) > 0: + a = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-8) + if len(b) > 0: + b = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-8) return 1. - np.dot(a, b.T) def _nn_euclidean_distance(x, y): - """ Helper function for nearest neighbor distance metric (Euclidean). - - Parameters - ---------- - x : ndarray - A matrix of N row-vectors (sample points). - y : ndarray - A matrix of M row-vectors (query points). - - Returns - ------- - ndarray - A vector of length M that contains for each entry in `y` the - smallest Euclidean distance to a sample in `x`. - - """ distances = _pdist(x, y) return np.maximum(0.0, distances.min(axis=0)) def _nn_cosine_distance(x, y): - """ Helper function for nearest neighbor distance metric (cosine). - - Parameters - ---------- - x : ndarray - A matrix of N row-vectors (sample points). - y : ndarray - A matrix of M row-vectors (query points). - - Returns - ------- - ndarray - A vector of length M that contains for each entry in `y` the - smallest cosine distance to a sample in `x`. - - """ distances = _cosine_distance(x, y) return distances.min(axis=0) -class NearestNeighborDistanceMetric(object): - """ - A nearest neighbor distance metric that, for each target, returns - the closest distance to any sample that has been observed so far. - - Parameters - ---------- - metric : str - Either "euclidean" or "cosine". - matching_threshold: float - The matching threshold. Samples with larger distance are considered an - invalid match. - budget : Optional[int] - If not None, fix samples per class to at most this number. Removes - the oldest samples when the budget is reached. - - Attributes - ---------- - samples : Dict[int -> List[ndarray]] - A dictionary that maps from target identities to the list of samples - that have been observed so far. - - """ - +class NearestNeighborDistanceMetric: def __init__(self, metric, matching_threshold, budget=None): - - if metric == "euclidean": self._metric = _nn_euclidean_distance elif metric == "cosine": @@ -135,18 +46,6 @@ def __init__(self, metric, matching_threshold, budget=None): self.samples = {} def partial_fit(self, features, targets, active_targets): - """Update the distance metric with new data. - - Parameters - ---------- - features : ndarray - An NxM matrix of N features of dimensionality M. - targets : ndarray - An integer array of associated target identities. - active_targets : List[int] - A list of targets that are currently present in the scene. - - """ for feature, target in zip(features, targets): self.samples.setdefault(target, []).append(feature) if self.budget is not None: @@ -154,24 +53,10 @@ def partial_fit(self, features, targets, active_targets): self.samples = {k: self.samples[k] for k in active_targets} def distance(self, features, targets): - """Compute distance between features and targets. - - Parameters - ---------- - features : ndarray - An NxM matrix of N features of dimensionality M. - targets : List[int] - A list of targets to match the given `features` against. - - Returns - ------- - ndarray - Returns a cost matrix of shape len(targets), len(features), where - element (i, j) contains the closest squared distance between - `targets[i]` and `features[j]`. - - """ cost_matrix = np.zeros((len(targets), len(features))) for i, target in enumerate(targets): - cost_matrix[i, :] = self._metric(self.samples[target], features) + if target in self.samples and len(self.samples[target]) > 0: + cost_matrix[i, :] = self._metric(self.samples[target], features) + else: + cost_matrix[i, :] = self.matching_threshold + 1 return cost_matrix diff --git a/deep_sort/track.py b/deep_sort/track.py index f638e9b3e..b5ad0e91f 100644 --- a/deep_sort/track.py +++ b/deep_sort/track.py @@ -1,70 +1,15 @@ -# vim: expandtab:ts=4:sw=4 +import numpy as np class TrackState: - """ - Enumeration type for the single target track state. Newly created tracks are - classified as `tentative` until enough evidence has been collected. Then, - the track state is changed to `confirmed`. Tracks that are no longer alive - are classified as `deleted` to mark them for removal from the set of active - tracks. - - """ - Tentative = 1 Confirmed = 2 Deleted = 3 class Track: - """ - A single target track with state space `(x, y, a, h)` and associated - velocities, where `(x, y)` is the center of the bounding box, `a` is the - aspect ratio and `h` is the height. - - Parameters - ---------- - mean : ndarray - Mean vector of the initial state distribution. - covariance : ndarray - Covariance matrix of the initial state distribution. - track_id : int - A unique track identifier. - n_init : int - Number of consecutive detections before the track is confirmed. The - track state is set to `Deleted` if a miss occurs within the first - `n_init` frames. - max_age : int - The maximum number of consecutive misses before the track state is - set to `Deleted`. - feature : Optional[ndarray] - Feature vector of the detection this track originates from. If not None, - this feature is added to the `features` cache. - - Attributes - ---------- - mean : ndarray - Mean vector of the initial state distribution. - covariance : ndarray - Covariance matrix of the initial state distribution. - track_id : int - A unique track identifier. - hits : int - Total number of measurement updates. - age : int - Total number of frames since first occurance. - time_since_update : int - Total number of frames since last measurement update. - state : TrackState - The current track state. - features : List[ndarray] - A cache of features. On each measurement update, the associated feature - vector is added to this list. - - """ - def __init__(self, mean, covariance, track_id, n_init, max_age, - feature=None): + feature=None, class_id=0, class_name=None, ema_alpha=0.9): self.mean = mean self.covariance = covariance self.track_id = track_id @@ -73,94 +18,78 @@ def __init__(self, mean, covariance, track_id, n_init, max_age, self.time_since_update = 0 self.state = TrackState.Tentative + self.class_id = class_id + self.class_name = class_name + + self._n_init = n_init + self._max_age = max_age + + self.ema_alpha = ema_alpha + self.ema_feature = None self.features = [] if feature is not None: self.features.append(feature) + self.ema_feature = feature.copy() - self._n_init = n_init - self._max_age = max_age + self.history = [] + self.last_detection = None def to_tlwh(self): - """Get current position in bounding box format `(top left x, top left y, - width, height)`. - - Returns - ------- - ndarray - The bounding box. - - """ ret = self.mean[:4].copy() ret[2] *= ret[3] ret[:2] -= ret[2:] / 2 return ret def to_tlbr(self): - """Get current position in bounding box format `(min x, miny, max x, - max y)`. - - Returns - ------- - ndarray - The bounding box. - - """ ret = self.to_tlwh() ret[2:] = ret[:2] + ret[2:] return ret def predict(self, kf): - """Propagate the state distribution to the current time step using a - Kalman filter prediction step. - - Parameters - ---------- - kf : kalman_filter.KalmanFilter - The Kalman filter. - - """ self.mean, self.covariance = kf.predict(self.mean, self.covariance) self.age += 1 self.time_since_update += 1 def update(self, kf, detection): - """Perform Kalman filter measurement update step and update the feature - cache. - - Parameters - ---------- - kf : kalman_filter.KalmanFilter - The Kalman filter. - detection : Detection - The associated detection. - - """ self.mean, self.covariance = kf.update( self.mean, self.covariance, detection.to_xyah()) - self.features.append(detection.feature) + + if detection.feature is not None: + self.features.append(detection.feature) + if self.ema_feature is None: + self.ema_feature = detection.feature.copy() + else: + self.ema_feature = self.ema_alpha * self.ema_feature + \ + (1 - self.ema_alpha) * detection.feature + norm = np.linalg.norm(self.ema_feature) + if norm > 0: + self.ema_feature /= norm self.hits += 1 self.time_since_update = 0 + self.last_detection = detection + if self.state == TrackState.Tentative and self.hits >= self._n_init: self.state = TrackState.Confirmed def mark_missed(self): - """Mark this track as missed (no association at the current time step). - """ if self.state == TrackState.Tentative: self.state = TrackState.Deleted elif self.time_since_update > self._max_age: self.state = TrackState.Deleted def is_tentative(self): - """Returns True if this track is tentative (unconfirmed). - """ return self.state == TrackState.Tentative def is_confirmed(self): - """Returns True if this track is confirmed.""" return self.state == TrackState.Confirmed def is_deleted(self): - """Returns True if this track is dead and should be deleted.""" return self.state == TrackState.Deleted + + def get_feature(self): + if self.ema_feature is not None: + return self.ema_feature + elif len(self.features) > 0: + return self.features[-1] + return None diff --git a/deep_sort/track_interpolation.py b/deep_sort/track_interpolation.py new file mode 100644 index 000000000..1731aea79 --- /dev/null +++ b/deep_sort/track_interpolation.py @@ -0,0 +1,154 @@ +import numpy as np +from collections import defaultdict + + +class TrackInterpolator: + def __init__(self, max_gap=30, min_track_length=3): + self.max_gap = max_gap + self.min_track_length = min_track_length + + def interpolate_tracks(self, tracks_data): + tracks_by_id = defaultdict(list) + + for detection in tracks_data: + frame_id, track_id, x, y, w, h = detection[:6] + class_id = detection[6] if len(detection) > 6 else 0 + confidence = detection[7] if len(detection) > 7 else 1.0 + tracks_by_id[track_id].append({ + 'frame_id': int(frame_id), + 'bbox': [x, y, w, h], + 'class_id': class_id, + 'confidence': confidence + }) + + for track_id in tracks_by_id: + tracks_by_id[track_id].sort(key=lambda x: x['frame_id']) + + interpolated_tracks = [] + + for track_id, track_data in tracks_by_id.items(): + if len(track_data) < self.min_track_length: + interpolated_tracks.extend([ + [d['frame_id'], track_id] + d['bbox'] + + [d['class_id'], d['confidence']] + for d in track_data + ]) + continue + + interpolated = self._interpolate_single_track(track_data, track_id) + interpolated_tracks.extend(interpolated) + + interpolated_tracks.sort(key=lambda x: (x[0], x[1])) + + return interpolated_tracks + + def _interpolate_single_track(self, track_data, track_id): + result = [] + n = len(track_data) + + i = 0 + while i < n: + result.append([ + track_data[i]['frame_id'], + track_id + ] + track_data[i]['bbox'] + [ + track_data[i]['class_id'], + track_data[i]['confidence'] + ]) + + if i < n - 1: + gap = track_data[i+1]['frame_id'] - track_data[i]['frame_id'] + + if 1 < gap <= self.max_gap: + interpolated = self._linear_interpolate( + track_data[i], track_data[i+1], track_id + ) + result.extend(interpolated) + + i += 1 + + return result + + def _linear_interpolate(self, start, end, track_id): + interpolated = [] + + start_frame = start['frame_id'] + end_frame = end['frame_id'] + gap = end_frame - start_frame + + start_bbox = np.array(start['bbox']) + end_bbox = np.array(end['bbox']) + + for frame_offset in range(1, gap): + alpha = frame_offset / gap + interp_bbox = start_bbox * (1 - alpha) + end_bbox * alpha + + confidence = start['confidence'] * (1 - alpha) + end['confidence'] * alpha + + interpolated.append([ + start_frame + frame_offset, + track_id, + interp_bbox[0], interp_bbox[1], interp_bbox[2], interp_bbox[3], + start['class_id'], + confidence + ]) + + return interpolated + + def smooth_tracks(self, tracks_data, window_size=5): + tracks_by_id = defaultdict(list) + + for detection in tracks_data: + frame_id, track_id = detection[0], detection[1] + tracks_by_id[track_id].append(detection) + + for track_id in tracks_by_id: + tracks_by_id[track_id].sort(key=lambda x: x[0]) + + smoothed_tracks = [] + + for track_id, track_data in tracks_by_id.items(): + if len(track_data) < window_size: + smoothed_tracks.extend(track_data) + continue + + smoothed = self._smooth_single_track(track_data, window_size) + smoothed_tracks.extend(smoothed) + + smoothed_tracks.sort(key=lambda x: (x[0], x[1])) + + return smoothed_tracks + + def _smooth_single_track(self, track_data, window_size): + smoothed = [] + n = len(track_data) + half_window = window_size // 2 + + for i in range(n): + start_idx = max(0, i - half_window) + end_idx = min(n, i + half_window + 1) + + window = track_data[start_idx:end_idx] + + avg_bbox = np.mean([d[2:6] for d in window], axis=0) + + smoothed.append([ + track_data[i][0], + track_data[i][1], + avg_bbox[0], avg_bbox[1], avg_bbox[2], avg_bbox[3], + track_data[i][6] if len(track_data[i]) > 6 else 0, + track_data[i][7] if len(track_data[i]) > 7 else 1.0 + ]) + + return smoothed + + def post_process(self, tracks_data, interpolate=True, smooth=True): + result = tracks_data + + if interpolate: + result = self.interpolate_tracks(result) + + if smooth: + result = self.smooth_tracks(result) + + return result diff --git a/deep_sort/tracker.py b/deep_sort/tracker.py index de99de44e..f6e88b861 100644 --- a/deep_sort/tracker.py +++ b/deep_sort/tracker.py @@ -1,5 +1,3 @@ -# vim: expandtab:ts=4:sw=4 -from __future__ import absolute_import import numpy as np from . import kalman_filter from . import linear_assignment @@ -8,67 +6,27 @@ class Tracker: - """ - This is the multi-target tracker. - - Parameters - ---------- - metric : nn_matching.NearestNeighborDistanceMetric - A distance metric for measurement-to-track association. - max_age : int - Maximum number of missed misses before a track is deleted. - n_init : int - Number of consecutive detections before the track is confirmed. The - track state is set to `Deleted` if a miss occurs within the first - `n_init` frames. - - Attributes - ---------- - metric : nn_matching.NearestNeighborDistanceMetric - The distance metric used for measurement to track association. - max_age : int - Maximum number of missed misses before a track is deleted. - n_init : int - Number of frames that a track remains in initialization phase. - kf : kalman_filter.KalmanFilter - A Kalman filter to filter target trajectories in image space. - tracks : List[Track] - The list of active tracks at the current time step. - - """ - - def __init__(self, metric, max_iou_distance=0.7, max_age=30, n_init=3): + def __init__(self, metric, max_iou_distance=0.7, max_age=30, n_init=3, + ema_alpha=0.9, separate_classes=True): self.metric = metric self.max_iou_distance = max_iou_distance self.max_age = max_age self.n_init = n_init + self.ema_alpha = ema_alpha + self.separate_classes = separate_classes self.kf = kalman_filter.KalmanFilter() self.tracks = [] self._next_id = 1 def predict(self): - """Propagate track state distributions one time step forward. - - This function should be called once every time step, before `update`. - """ for track in self.tracks: track.predict(self.kf) def update(self, detections): - """Perform measurement update and track management. - - Parameters - ---------- - detections : List[deep_sort.detection.Detection] - A list of detections at the current time step. - - """ - # Run matching cascade. matches, unmatched_tracks, unmatched_detections = \ self._match(detections) - # Update track set. for track_idx, detection_idx in matches: self.tracks[track_idx].update( self.kf, detections[detection_idx]) @@ -78,49 +36,122 @@ def update(self, detections): self._initiate_track(detections[detection_idx]) self.tracks = [t for t in self.tracks if not t.is_deleted()] - # Update distance metric. active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] features, targets = [], [] for track in self.tracks: if not track.is_confirmed(): continue - features += track.features - targets += [track.track_id for _ in track.features] + feat = track.get_feature() + if feat is not None: + features.append(feat) + targets.append(track.track_id) track.features = [] - self.metric.partial_fit( - np.asarray(features), np.asarray(targets), active_targets) + + if len(features) > 0: + self.metric.partial_fit( + np.asarray(features), np.asarray(targets), active_targets) def _match(self, detections): - def gated_metric(tracks, dets, track_indices, detection_indices): - features = np.array([dets[i].feature for i in detection_indices]) + features = [] + valid_detection_indices = [] + for i in detection_indices: + if dets[i].feature is not None: + features.append(dets[i].feature) + valid_detection_indices.append(i) + + if len(features) == 0: + return np.full((len(track_indices), len(detection_indices)), + self.metric.matching_threshold + 1) + + features = np.array(features) targets = np.array([tracks[i].track_id for i in track_indices]) + cost_matrix = self.metric.distance(features, targets) + + full_cost_matrix = np.full((len(track_indices), len(detection_indices)), + self.metric.matching_threshold + 1) + for row_idx, row in enumerate(cost_matrix): + for col_idx, val in enumerate(row): + full_cost_matrix[row_idx, valid_detection_indices[col_idx]] = val + cost_matrix = linear_assignment.gate_cost_matrix( - self.kf, cost_matrix, tracks, dets, track_indices, + self.kf, full_cost_matrix, tracks, dets, track_indices, detection_indices) return cost_matrix - # Split track set into confirmed and unconfirmed tracks. + if self.separate_classes: + return self._match_by_class(detections, gated_metric) + else: + return self._match_all(detections, gated_metric) + + def _match_by_class(self, detections, gated_metric): + all_matches = [] + all_unmatched_tracks = set(range(len(self.tracks))) + all_unmatched_detections = set(range(len(detections))) + + unique_classes = set(d.class_id for d in detections) + unique_classes.update(t.class_id for t in self.tracks) + + for class_id in unique_classes: + class_track_indices = [i for i, t in enumerate(self.tracks) + if t.class_id == class_id] + class_detection_indices = [i for i, d in enumerate(detections) + if d.class_id == class_id] + + if len(class_track_indices) == 0 or len(class_detection_indices) == 0: + continue + + confirmed_tracks = [i for i in class_track_indices + if self.tracks[i].is_confirmed()] + unconfirmed_tracks = [i for i in class_track_indices + if not self.tracks[i].is_confirmed()] + + matches_a, unmatched_tracks_a, unmatched_detections = \ + linear_assignment.matching_cascade( + gated_metric, self.metric.matching_threshold, self.max_age, + self.tracks, detections, confirmed_tracks, + class_detection_indices) + + iou_track_candidates = unconfirmed_tracks + [ + k for k in unmatched_tracks_a + if self.tracks[k].time_since_update == 1] + unmatched_tracks_a = [ + k for k in unmatched_tracks_a + if self.tracks[k].time_since_update != 1] + + matches_b, unmatched_tracks_b, unmatched_detections = \ + linear_assignment.min_cost_matching( + iou_matching.iou_cost, self.max_iou_distance, self.tracks, + detections, iou_track_candidates, unmatched_detections) + + class_matches = matches_a + matches_b + class_unmatched = list(set(unmatched_tracks_a + unmatched_tracks_b)) + + all_matches.extend(class_matches) + all_unmatched_tracks -= set(m[0] for m in class_matches) + all_unmatched_detections -= set(m[1] for m in class_matches) + + return all_matches, list(all_unmatched_tracks), list(all_unmatched_detections) + + def _match_all(self, detections, gated_metric): confirmed_tracks = [ i for i, t in enumerate(self.tracks) if t.is_confirmed()] unconfirmed_tracks = [ i for i, t in enumerate(self.tracks) if not t.is_confirmed()] - # Associate confirmed tracks using appearance features. matches_a, unmatched_tracks_a, unmatched_detections = \ linear_assignment.matching_cascade( gated_metric, self.metric.matching_threshold, self.max_age, self.tracks, detections, confirmed_tracks) - # Associate remaining tracks together with unconfirmed tracks using IOU. iou_track_candidates = unconfirmed_tracks + [ - k for k in unmatched_tracks_a if - self.tracks[k].time_since_update == 1] + k for k in unmatched_tracks_a + if self.tracks[k].time_since_update == 1] unmatched_tracks_a = [ - k for k in unmatched_tracks_a if - self.tracks[k].time_since_update != 1] + k for k in unmatched_tracks_a + if self.tracks[k].time_since_update != 1] matches_b, unmatched_tracks_b, unmatched_detections = \ linear_assignment.min_cost_matching( iou_matching.iou_cost, self.max_iou_distance, self.tracks, @@ -134,5 +165,6 @@ def _initiate_track(self, detection): mean, covariance = self.kf.initiate(detection.to_xyah()) self.tracks.append(Track( mean, covariance, self._next_id, self.n_init, self.max_age, - detection.feature)) + detection.feature, detection.class_id, detection.class_name, + self.ema_alpha)) self._next_id += 1 diff --git a/deep_sort/yolo_detector.py b/deep_sort/yolo_detector.py new file mode 100644 index 000000000..ed458e932 --- /dev/null +++ b/deep_sort/yolo_detector.py @@ -0,0 +1,270 @@ +import numpy as np +import cv2 +import torch +import torch.nn as nn +from ultralytics import YOLO + + +class ReIDExtractor: + def __init__(self, model_name='osnet_x0_25', device=None): + self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu') + self.model = self._load_model(model_name) + self.model.eval() + self.transform = self._get_transform() + + def _load_model(self, model_name): + try: + from torchreid.utils import load_pretrained_weights + from torchreid.models import build_model + model = build_model(name='osnet_x0_25', num_classes=1000) + self.feature_dim = 512 + except ImportError: + model = self._build_simple_reid() + self.feature_dim = 256 + return model.to(self.device) + + def _build_simple_reid(self): + return SimpleReIDNet() + + def _get_transform(self): + from torchvision import transforms + return transforms.Compose([ + transforms.ToPILImage(), + transforms.Resize((256, 128)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + ]) + + def extract(self, image, bbox): + x1, y1, w, h = bbox + x2, y2 = int(x1 + w), int(y1 + h) + x1, y1 = int(x1), int(y1) + + x1 = max(0, x1) + y1 = max(0, y1) + x2 = min(image.shape[1], x2) + y2 = min(image.shape[0], y2) + + if x2 <= x1 or y2 <= y1: + return np.zeros(self.feature_dim, dtype=np.float32) + + crop = image[y1:y2, x1:x2] + try: + crop_tensor = self.transform(crop).unsqueeze(0).to(self.device) + with torch.no_grad(): + feature = self.model(crop_tensor) + feature = feature.cpu().numpy().flatten() + norm = np.linalg.norm(feature) + if norm > 0: + feature /= norm + return feature + except Exception: + return np.zeros(self.feature_dim, dtype=np.float32) + + def extract_batch(self, image, bboxes): + features = [] + for bbox in bboxes: + feat = self.extract(image, bbox) + features.append(feat) + return np.array(features) + + +class SimpleReIDNet(nn.Module): + def __init__(self, feature_dim=256): + super().__init__() + self.features = nn.Sequential( + nn.Conv2d(3, 32, 3, stride=2, padding=1), + nn.BatchNorm2d(32), + nn.ReLU(inplace=True), + nn.Conv2d(32, 64, 3, stride=2, padding=1), + nn.BatchNorm2d(64), + nn.ReLU(inplace=True), + nn.Conv2d(64, 128, 3, stride=2, padding=1), + nn.BatchNorm2d(128), + nn.ReLU(inplace=True), + nn.AdaptiveAvgPool2d((1, 1)) + ) + self.fc = nn.Linear(128, feature_dim) + self.feature_dim = feature_dim + + def forward(self, x): + x = self.features(x) + x = x.view(x.size(0), -1) + x = self.fc(x) + return x + + +class YOLOv8Detector: + def __init__(self, model_path='yolov8n.pt', conf_threshold=0.5, + iou_threshold=0.5, classes=None, device=None): + self.model = YOLO(model_path) + self.conf_threshold = conf_threshold + self.iou_threshold = iou_threshold + self.classes = classes + self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu') + self.class_names = self.model.names + + def detect(self, image): + results = self.model(image, conf=self.conf_threshold, + iou=self.iou_threshold, classes=self.classes, + device=self.device, verbose=False) + + detections = [] + for result in results: + boxes = result.boxes + for i in range(len(boxes)): + xyxy = boxes.xyxy[i].cpu().numpy() + conf = boxes.conf[i].cpu().numpy() + cls = int(boxes.cls[i].cpu().numpy()) + + tlwh = np.array([xyxy[0], xyxy[1], + xyxy[2] - xyxy[0], + xyxy[3] - xyxy[1]]) + + detections.append({ + 'bbox': tlwh, + 'confidence': float(conf), + 'class_id': cls, + 'class_name': self.class_names[cls] + }) + + return detections + + +class YOLOv8DeepSORT: + def __init__(self, yolo_model='yolov8n.pt', reid_model='osnet_x0_25', + conf_threshold=0.5, max_cosine_distance=0.3, + nn_budget=100, max_age=30, n_init=3, ema_alpha=0.9, + separate_classes=True, classes=None, device=None): + from .nn_matching import NearestNeighborDistanceMetric + from .tracker import Tracker + + self.detector = YOLOv8Detector( + model_path=yolo_model, + conf_threshold=conf_threshold, + classes=classes, + device=device + ) + + self.reid_extractor = ReIDExtractor( + model_name=reid_model, + device=device + ) + + metric = NearestNeighborDistanceMetric( + "cosine", max_cosine_distance, nn_budget + ) + + self.tracker = Tracker( + metric, max_age=max_age, n_init=n_init, + ema_alpha=ema_alpha, separate_classes=separate_classes + ) + + self.device = device + + def process_frame(self, image): + detections_raw = self.detector.detect(image) + + from .detection import Detection + detections = [] + + for det in detections_raw: + feature = self.reid_extractor.extract(image, det['bbox']) + detection = Detection( + tlwh=det['bbox'], + confidence=det['confidence'], + feature=feature, + class_id=det['class_id'], + class_name=det['class_name'] + ) + detections.append(detection) + + self.tracker.predict() + self.tracker.update(detections) + + results = [] + for track in self.tracker.tracks: + if not track.is_confirmed() or track.time_since_update > 1: + continue + bbox = track.to_tlwh() + results.append({ + 'track_id': track.track_id, + 'bbox': bbox, + 'class_id': track.class_id, + 'class_name': track.class_name, + 'confidence': track.last_detection.confidence if track.last_detection else 0.0 + }) + + return results + + def process_video(self, video_path, output_path=None, show=False, + save_results=False, results_path=None): + cap = cv2.VideoCapture(video_path) + + if not cap.isOpened(): + raise ValueError(f"无法打开视频: {video_path}") + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + + writer = None + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + all_results = [] + frame_idx = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + results = self.process_frame(frame) + + for r in results: + x, y, w, h = r['bbox'] + track_id = r['track_id'] + class_name = r['class_name'] + + color = self._get_color(track_id) + cv2.rectangle(frame, (int(x), int(y)), + (int(x+w), int(y+h)), color, 2) + cv2.putText(frame, f"{class_name}-{track_id}", + (int(x), int(y)-10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) + + if save_results: + all_results.append([ + frame_idx, track_id, x, y, w, h, + r['class_id'], r['confidence'] + ]) + + if writer: + writer.write(frame) + + if show: + cv2.imshow('DeepSORT', frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_idx += 1 + + cap.release() + if writer: + writer.release() + if show: + cv2.destroyAllWindows() + + if save_results and results_path: + np.savetxt(results_path, np.array(all_results), + delimiter=',', fmt='%f') + + return all_results + + def _get_color(self, idx): + idx = idx * 3 + color = ((37 * idx) % 255, (17 * idx) % 255, (29 * idx) % 255) + return color diff --git a/requirements.txt b/requirements.txt index e7590640d..f7a0d1243 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,8 @@ numpy opencv-python scipy +torch>=1.9.0 +torchvision>=0.10.0 +ultralytics>=8.0.0 +filterpy>=1.4.5 +lap>=0.4.0 diff --git a/run_tracking.py b/run_tracking.py new file mode 100644 index 000000000..b51c67eaf --- /dev/null +++ b/run_tracking.py @@ -0,0 +1,80 @@ +import argparse +import numpy as np +import cv2 +from deep_sort import YOLOv8DeepSORT, TrackInterpolator + + +def main(): + parser = argparse.ArgumentParser(description='DeepSORT with YOLOv8') + parser.add_argument('--source', type=str, required=True, + help='视频路径或摄像头ID (0, 1, ...)') + parser.add_argument('--output', type=str, default=None, + help='输出视频路径') + parser.add_argument('--yolo-model', type=str, default='yolov8n.pt', + help='YOLOv8模型路径') + parser.add_argument('--conf', type=float, default=0.5, + help='检测置信度阈值') + parser.add_argument('--max-age', type=int, default=30, + help='轨迹最大丢失帧数') + parser.add_argument('--max-cosine-dist', type=float, default=0.3, + help='余弦距离阈值') + parser.add_argument('--ema-alpha', type=float, default=0.9, + help='EMA特征更新系数') + parser.add_argument('--classes', type=int, nargs='+', default=None, + help='要检测的类别ID') + parser.add_argument('--show', action='store_true', + help='显示实时跟踪结果') + parser.add_argument('--save-results', action='store_true', + help='保存跟踪结果到文件') + parser.add_argument('--results-path', type=str, default='results.txt', + help='跟踪结果保存路径') + parser.add_argument('--interpolate', action='store_true', + help='启用轨迹插值') + parser.add_argument('--smooth', action='store_true', + help='启用轨迹平滑') + parser.add_argument('--max-gap', type=int, default=30, + help='最大插值间隔帧数') + + args = parser.parse_args() + + tracker = YOLOv8DeepSORT( + yolo_model=args.yolo_model, + conf_threshold=args.conf, + max_age=args.max_age, + max_cosine_distance=args.max_cosine_dist, + ema_alpha=args.ema_alpha, + classes=args.classes + ) + + if args.source.isdigit(): + source = int(args.source) + else: + source = args.source + + results = tracker.process_video( + video_path=source, + output_path=args.output, + show=args.show, + save_results=True, + results_path=args.results_path if args.save_results else None + ) + + if args.interpolate or args.smooth: + interpolator = TrackInterpolator(max_gap=args.max_gap) + processed_results = interpolator.post_process( + results, + interpolate=args.interpolate, + smooth=args.smooth + ) + + if args.save_results: + output_path = args.results_path.replace('.txt', '_processed.txt') + np.savetxt(output_path, np.array(processed_results), + delimiter=',', fmt='%f') + print(f"处理后的结果已保存到: {output_path}") + + print(f"处理完成,共处理 {len(results)} 个检测") + + +if __name__ == '__main__': + main()