diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..ab45feac Binary files /dev/null and b/.DS_Store differ diff --git a/deep_sort/detection.py b/deep_sort/detection.py index 97cd39d0..473e0c2d 100644 --- a/deep_sort/detection.py +++ b/deep_sort/detection.py @@ -14,6 +14,8 @@ class Detection(object): Detector confidence score. feature : array_like A feature vector that describes the object contained in this image. + class_id : int + Class ID of the detected object. Attributes ---------- @@ -23,13 +25,16 @@ class Detection(object): Detector confidence score. feature : ndarray | NoneType A feature vector that describes the object contained in this image. + class_id : int + Class ID of the detected object. """ - def __init__(self, tlwh, confidence, feature): + def __init__(self, tlwh, confidence, feature, class_id=0): self.tlwh = np.asarray(tlwh, dtype=np.float64) self.confidence = float(confidence) self.feature = np.asarray(feature, dtype=np.float32) + self.class_id = class_id def to_tlbr(self): """Convert bounding box to format `(min x, min y, max x, max y)`, i.e., diff --git a/deep_sort/track.py b/deep_sort/track.py index f638e9b3..45f212e5 100644 --- a/deep_sort/track.py +++ b/deep_sort/track.py @@ -40,6 +40,8 @@ class Track: feature : Optional[ndarray] Feature vector of the detection this track originates from. If not None, this feature is added to the `features` cache. + class_id : int + Class ID of the tracked object. Attributes ---------- @@ -60,11 +62,15 @@ class Track: features : List[ndarray] A cache of features. On each measurement update, the associated feature vector is added to this list. + class_id : int + Class ID of the tracked object. + ema_feature : ndarray + Exponential moving average of the feature vector. """ def __init__(self, mean, covariance, track_id, n_init, max_age, - feature=None): + feature=None, class_id=0): self.mean = mean self.covariance = covariance self.track_id = track_id @@ -74,11 +80,15 @@ def __init__(self, mean, covariance, track_id, n_init, max_age, self.state = TrackState.Tentative self.features = [] + self.class_id = class_id + self.ema_feature = feature.copy() if feature is not None else None + if feature is not None: self.features.append(feature) self._n_init = n_init self._max_age = max_age + self._ema_alpha = 0.9 # EMA decay factor def to_tlwh(self): """Get current position in bounding box format `(top left x, top left y, @@ -137,7 +147,14 @@ def update(self, kf, detection): """ self.mean, self.covariance = kf.update( self.mean, self.covariance, detection.to_xyah()) - self.features.append(detection.feature) + + # Update EMA feature + if detection.feature is not None: + if self.ema_feature is None: + self.ema_feature = detection.feature.copy() + else: + self.ema_feature = self._ema_alpha * self.ema_feature + (1 - self._ema_alpha) * detection.feature + self.features.append(detection.feature) self.hits += 1 self.time_since_update = 0 diff --git a/deep_sort/track_interpolator.py b/deep_sort/track_interpolator.py new file mode 100644 index 00000000..be76c1b0 --- /dev/null +++ b/deep_sort/track_interpolator.py @@ -0,0 +1,145 @@ +import numpy as np +from scipy.interpolate import interp1d +from deep_sort.track import Track + +class TrackInterpolator: + """ + 轨迹插值与断点平滑处理类 + """ + + def __init__(self, max_gap=30): + """ + 初始化轨迹插值器 + + 参数 + ---------- + max_gap : int + 最大允许的轨迹中断帧数 + """ + self.max_gap = max_gap + self.track_history = {} + + def add_track(self, track_id, frame_id, bbox, class_id): + """ + 添加轨迹点 + + 参数 + ---------- + track_id : int + 轨迹ID + frame_id : int + 帧ID + bbox : ndarray + 边界框 (tlwh格式) + class_id : int + 类别ID + """ + if track_id not in self.track_history: + self.track_history[track_id] = [] + + self.track_history[track_id].append((frame_id, bbox, class_id)) + + def offline_interpolate_tracks(self): + """ + 离线对所有轨迹进行插值处理,使用scipy进行平滑插值 + + 返回 + ---------- + dict + 插值后的轨迹历史 + """ + interpolated_history = {} + + for track_id, track_points in self.track_history.items(): + if len(track_points) < 2: + interpolated_history[track_id] = track_points + continue + + # 按帧ID排序 + sorted_points = sorted(track_points, key=lambda x: x[0]) + + # 提取帧ID和边界框坐标 + frame_ids = [p[0] for p in sorted_points] + bboxes = [p[1] for p in sorted_points] + class_ids = [p[2] for p in sorted_points] + + # 将边界框转换为numpy数组 + bboxes = np.array(bboxes) + + # 计算帧ID之间的间隙 + gaps = np.diff(frame_ids) + + # 构建插值后的轨迹 + interpolated = [sorted_points[0]] + + for i in range(1, len(sorted_points)): + prev_frame = frame_ids[i-1] + curr_frame = frame_ids[i] + gap = gaps[i-1] + + # 如果间隙过大,不进行插值 + if gap > self.max_gap: + interpolated.append(sorted_points[i]) + continue + + # 对每个边界框坐标进行单独插值 + prev_bbox = bboxes[i-1] + curr_bbox = bboxes[i] + + # 创建插值函数(使用线性插值) + interp_functions = [] + for j in range(4): # tlwh四个坐标 + interp_func = interp1d([prev_frame, curr_frame], [prev_bbox[j], curr_bbox[j]], kind='linear') + interp_functions.append(interp_func) + + # 生成中间帧的插值结果 + for j in range(1, gap): + interp_frame = prev_frame + j + interp_bbox = np.array([f(interp_frame) for f in interp_functions]) + interpolated.append((interp_frame, interp_bbox, class_ids[i-1])) + + interpolated.append(sorted_points[i]) + + interpolated_history[track_id] = interpolated + + return interpolated_history + + def save_interpolated_tracks(self, output_file): + """ + 保存插值后的轨迹到文件 + + 参数 + ---------- + output_file : str + 输出文件路径 + """ + interpolated_history = self.offline_interpolate_tracks() + np.save(output_file, interpolated_history) + + def load_interpolated_tracks(self, input_file): + """ + 从文件加载插值后的轨迹 + + 参数 + ---------- + input_file : str + 输入文件路径 + """ + self.track_history = np.load(input_file, allow_pickle=True).item() + + def get_interpolated_track(self, track_id): + """ + 获取指定轨迹的插值结果 + + 参数 + ---------- + track_id : int + 轨迹ID + + 返回 + ---------- + list + 插值后的轨迹点列表 + """ + interpolated_history = self.offline_interpolate_tracks() + return interpolated_history.get(track_id, []) \ No newline at end of file diff --git a/deep_sort/tracker.py b/deep_sort/tracker.py index de99de44..97e1ad3c 100644 --- a/deep_sort/tracker.py +++ b/deep_sort/tracker.py @@ -78,61 +78,88 @@ def update(self, detections): self._initiate_track(detections[detection_idx]) self.tracks = [t for t in self.tracks if not t.is_deleted()] - # Update distance metric. + # Update distance metric using EMA features. active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] features, targets = [], [] for track in self.tracks: - if not track.is_confirmed(): + if not track.is_confirmed() or track.ema_feature is None: continue - features += track.features - targets += [track.track_id for _ in track.features] - track.features = [] - self.metric.partial_fit( - np.asarray(features), np.asarray(targets), active_targets) + features.append(track.ema_feature) + targets.append(track.track_id) + track.features = [] # Clear the features list + if features: + self.metric.partial_fit( + np.asarray(features), np.asarray(targets), active_targets) def _match(self, detections): - - def gated_metric(tracks, dets, track_indices, detection_indices): - features = np.array([dets[i].feature for i in detection_indices]) - targets = np.array([tracks[i].track_id for i in track_indices]) - cost_matrix = self.metric.distance(features, targets) - cost_matrix = linear_assignment.gate_cost_matrix( - self.kf, cost_matrix, tracks, dets, track_indices, - detection_indices) - - return cost_matrix - - # Split track set into confirmed and unconfirmed tracks. - confirmed_tracks = [ - i for i, t in enumerate(self.tracks) if t.is_confirmed()] - unconfirmed_tracks = [ - i for i, t in enumerate(self.tracks) if not t.is_confirmed()] - - # Associate confirmed tracks using appearance features. - matches_a, unmatched_tracks_a, unmatched_detections = \ - linear_assignment.matching_cascade( - gated_metric, self.metric.matching_threshold, self.max_age, - self.tracks, detections, confirmed_tracks) - - # Associate remaining tracks together with unconfirmed tracks using IOU. - iou_track_candidates = unconfirmed_tracks + [ - k for k in unmatched_tracks_a if - self.tracks[k].time_since_update == 1] - unmatched_tracks_a = [ - k for k in unmatched_tracks_a if - self.tracks[k].time_since_update != 1] - matches_b, unmatched_tracks_b, unmatched_detections = \ - linear_assignment.min_cost_matching( - iou_matching.iou_cost, self.max_iou_distance, self.tracks, - detections, iou_track_candidates, unmatched_detections) - - matches = matches_a + matches_b - unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b)) - return matches, unmatched_tracks, unmatched_detections + # Group detections by class ID + detections_by_class = {} + for i, det in enumerate(detections): + if det.class_id not in detections_by_class: + detections_by_class[det.class_id] = [] + detections_by_class[det.class_id].append(i) + + # Group tracks by class ID + tracks_by_class = {} + for i, track in enumerate(self.tracks): + if track.class_id not in tracks_by_class: + tracks_by_class[track.class_id] = [] + tracks_by_class[track.class_id].append(i) + + all_matches = [] + all_unmatched_tracks = [] + all_unmatched_detections = [] + + # Process each class separately + for class_id in set(detections_by_class.keys()).union(tracks_by_class.keys()): + class_detections = detections_by_class.get(class_id, []) + class_tracks = tracks_by_class.get(class_id, []) + + if not class_detections and not class_tracks: + continue + + def gated_metric(tracks, dets, track_indices, detection_indices): + features = np.array([dets[i].feature for i in detection_indices]) + targets = np.array([tracks[i].track_id for i in track_indices]) + cost_matrix = self.metric.distance(features, targets) + cost_matrix = linear_assignment.gate_cost_matrix( + self.kf, cost_matrix, tracks, dets, track_indices, + detection_indices) + return cost_matrix + + # Split track set into confirmed and unconfirmed tracks. + confirmed_tracks = [i for i in class_tracks if self.tracks[i].is_confirmed()] + unconfirmed_tracks = [i for i in class_tracks if not self.tracks[i].is_confirmed()] + + # Associate confirmed tracks using appearance features. + # First, filter detections to only include those of the current class + class_detections_indices = class_detections + matches_a, unmatched_tracks_a, unmatched_detections_class = \ + linear_assignment.matching_cascade( + gated_metric, self.metric.matching_threshold, self.max_age, + self.tracks, detections, confirmed_tracks, class_detections_indices) + + # Associate remaining tracks together with unconfirmed tracks using IOU. + iou_track_candidates = unconfirmed_tracks + [ + k for k in unmatched_tracks_a if + self.tracks[k].time_since_update == 1] + unmatched_tracks_a = [ + k for k in unmatched_tracks_a if + self.tracks[k].time_since_update != 1] + matches_b, unmatched_tracks_b, unmatched_detections_class = \ + linear_assignment.min_cost_matching( + iou_matching.iou_cost, self.max_iou_distance, self.tracks, + detections, iou_track_candidates, unmatched_detections_class) + + all_matches.extend(matches_a + matches_b) + all_unmatched_tracks.extend(unmatched_tracks_a + unmatched_tracks_b) + all_unmatched_detections.extend(unmatched_detections_class) + + return all_matches, all_unmatched_tracks, all_unmatched_detections def _initiate_track(self, detection): mean, covariance = self.kf.initiate(detection.to_xyah()) self.tracks.append(Track( mean, covariance, self._next_id, self.n_init, self.max_age, - detection.feature)) + detection.feature, detection.class_id)) self._next_id += 1 diff --git a/pytorch_deep_sort_app.py b/pytorch_deep_sort_app.py new file mode 100644 index 00000000..62e5f37d --- /dev/null +++ b/pytorch_deep_sort_app.py @@ -0,0 +1,174 @@ +import cv2 +import numpy as np +import argparse +from deep_sort.tracker import Tracker +from deep_sort.nn_matching import NearestNeighborDistanceMetric +from deep_sort.detection import Detection +from deep_sort.track_interpolator import TrackInterpolator +from tools.pytorch_feature_extractor import create_pytorch_box_encoder +from ultralytics import YOLO + +class DeepSORTApp: + def __init__(self, model_name='yolov8n', feature_dim=128, max_cosine_distance=0.2, max_gap=30): + # 加载YOLOv8模型 + self.model = YOLO(model_name + '.pt') + + # 创建特征提取器 + self.encoder = create_pytorch_box_encoder(feature_dim=feature_dim) + + # 创建距离度量 + self.metric = NearestNeighborDistanceMetric('cosine', max_cosine_distance, feature_dim) + + # 创建跟踪器 + self.tracker = Tracker(self.metric) + + # 创建轨迹插值器 + self.interpolator = TrackInterpolator(max_gap=max_gap) + + # 类别名称映射 + self.class_names = self.model.names + + # 帧计数器 + self.frame_id = 0 + + def process_frame(self, frame): + # 使用YOLOv8进行目标检测 + results = self.model(frame, verbose=False) + + # 提取检测结果 + detections = [] + boxes = [] + confidences = [] + class_ids = [] + + for result in results: + for box in result.boxes: + x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() + confidence = box.conf[0].cpu().numpy() + class_id = int(box.cls[0].cpu().numpy()) + + # 转换为tlwh格式 + tlwh = [x1, y1, x2 - x1, y2 - y1] + boxes.append(tlwh) + confidences.append(confidence) + class_ids.append(class_id) + + # 提取特征 + if boxes: + features = self.encoder(frame, boxes) + else: + features = [] + + # 创建Detection对象 + detections = [] + for i in range(len(boxes)): + detection = Detection(boxes[i], confidences[i], features[i], class_ids[i]) + detections.append(detection) + + # 预测跟踪器状态 + self.tracker.predict() + + # 更新跟踪器 + self.tracker.update(detections) + + # 记录轨迹信息到插值器 + for track in self.tracker.tracks: + if track.is_confirmed(): + tlwh = track.to_tlwh() + self.interpolator.add_track(track.track_id, self.frame_id, tlwh, track.class_id) + + # 绘制当前跟踪结果(不使用插值) + for track in self.tracker.tracks: + if not track.is_confirmed() or track.time_since_update > 1: + continue + + # 获取边界框 + tlwh = track.to_tlwh() + x1, y1, w, h = tlwh + x2, y2 = x1 + w, y1 + h + + # 获取类别名称 + class_name = self.class_names.get(track.class_id, 'Unknown') + + # 绘制边界框和ID + cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) + cv2.putText(frame, f'{class_name} {track.track_id}', + (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 2) + + # 增加帧计数器 + self.frame_id += 1 + + return frame + + def run(self, input_source): + # 打开视频流或摄像头 + if input_source.isdigit(): + cap = cv2.VideoCapture(int(input_source)) + else: + cap = cv2.VideoCapture(input_source) + + if not cap.isOpened(): + print(f"Error: Could not open input source {input_source}") + return + + # 处理视频流 + while True: + ret, frame = cap.read() + if not ret: + break + + # 处理帧 + frame = self.process_frame(frame) + + # 显示结果 + cv2.imshow('DeepSORT Tracking', frame) + + # 按'q'退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + # 执行离线轨迹插值 + print("Performing offline track interpolation...") + interpolated_history = self.interpolator.offline_interpolate_tracks() + + # 保存插值结果 + output_file = "interpolated_tracks.npy" + self.interpolator.save_interpolated_tracks(output_file) + print(f"Interpolated tracks saved to {output_file}") + + # 释放资源 + cap.release() + cv2.destroyAllWindows() + + # 打印插值统计信息 + total_tracks = len(interpolated_history) + total_points = sum(len(points) for points in interpolated_history.values()) + original_points = sum(len(points) for points in self.interpolator.track_history.values()) + interpolated_points = total_points - original_points + print(f"Offline interpolation completed:") + print(f"Total tracks: {total_tracks}") + print(f"Original points: {original_points}") + print(f"Interpolated points: {interpolated_points}") + print(f"Total points after interpolation: {total_points}") + +def parse_args(): + parser = argparse.ArgumentParser(description="DeepSORT with YOLOv8") + parser.add_argument('--input', default='0', help='Input source (camera index or video file)') + parser.add_argument('--model', default='yolov8n', help='YOLOv8 model name') + parser.add_argument('--feature-dim', type=int, default=128, help='Feature dimension') + parser.add_argument('--max-cosine-distance', type=float, default=0.2, help='Maximum cosine distance for matching') + parser.add_argument('--max-gap', type=int, default=30, help='Maximum gap for track interpolation') + return parser.parse_args() + +def main(): + args = parse_args() + app = DeepSORTApp( + model_name=args.model, + feature_dim=args.feature_dim, + max_cosine_distance=args.max_cosine_distance, + max_gap=args.max_gap + ) + app.run(args.input) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e7590640..a861baa1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ numpy opencv-python scipy +torch +torchvision +ultralytics diff --git a/tools/pytorch_feature_extractor.py b/tools/pytorch_feature_extractor.py new file mode 100644 index 00000000..803e7cbe --- /dev/null +++ b/tools/pytorch_feature_extractor.py @@ -0,0 +1,70 @@ +import torch +import torchvision.transforms as transforms +from torchvision.models import resnet50 +import cv2 +import numpy as np + +class PyTorchFeatureExtractor: + def __init__(self, model_name='resnet50', feature_dim=128): + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.model = resnet50(pretrained=True) + # 替换最后一层以输出指定维度的特征 + self.model.fc = torch.nn.Linear(self.model.fc.in_features, feature_dim) + self.model.to(self.device) + self.model.eval() + + # 图像预处理 + self.transform = transforms.Compose([ + transforms.ToPILImage(), + transforms.Resize((128, 64)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + ]) + + def extract_image_patch(self, image, bbox, patch_shape): + bbox = np.array(bbox) + if patch_shape is not None: + target_aspect = float(patch_shape[1]) / patch_shape[0] + new_width = target_aspect * bbox[3] + bbox[0] -= (new_width - bbox[2]) / 2 + bbox[2] = new_width + + bbox[2:] += bbox[:2] + bbox = bbox.astype(np.int64) + + bbox[:2] = np.maximum(0, bbox[:2]) + bbox[2:] = np.minimum(np.asarray(image.shape[:2][::-1]) - 1, bbox[2:]) + if np.any(bbox[:2] >= bbox[2:]): + return None + sx, sy, ex, ey = bbox + image = image[sy:ey, sx:ex] + image = cv2.resize(image, tuple(patch_shape[::-1])) + return image + + def __call__(self, image, boxes): + image_patches = [] + for box in boxes: + patch = self.extract_image_patch(image, box, (128, 64)) + if patch is None: + patch = np.random.uniform(0., 255., (128, 64, 3)).astype(np.uint8) + image_patches.append(patch) + + if not image_patches: + return np.array([]) + + # 预处理图像 + batch = torch.stack([self.transform(patch) for patch in image_patches]).to(self.device) + + # 提取特征 + with torch.no_grad(): + features = self.model(batch).cpu().numpy() + + return features + +def create_pytorch_box_encoder(model_name='resnet50', feature_dim=128): + encoder = PyTorchFeatureExtractor(model_name, feature_dim) + + def encode(image, boxes): + return encoder(image, boxes) + + return encode \ No newline at end of file