Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
7 changes: 6 additions & 1 deletion deep_sort/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class Detection(object):
Detector confidence score.
feature : array_like
A feature vector that describes the object contained in this image.
class_id : int
Class ID of the detected object.

Attributes
----------
Expand All @@ -23,13 +25,16 @@ class Detection(object):
Detector confidence score.
feature : ndarray | NoneType
A feature vector that describes the object contained in this image.
class_id : int
Class ID of the detected object.

"""

def __init__(self, tlwh, confidence, feature):
def __init__(self, tlwh, confidence, feature, class_id=0):
self.tlwh = np.asarray(tlwh, dtype=np.float64)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
self.class_id = class_id

def to_tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
Expand Down
21 changes: 19 additions & 2 deletions deep_sort/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Track:
feature : Optional[ndarray]
Feature vector of the detection this track originates from. If not None,
this feature is added to the `features` cache.
class_id : int
Class ID of the tracked object.

Attributes
----------
Expand All @@ -60,11 +62,15 @@ class Track:
features : List[ndarray]
A cache of features. On each measurement update, the associated feature
vector is added to this list.
class_id : int
Class ID of the tracked object.
ema_feature : ndarray
Exponential moving average of the feature vector.

"""

def __init__(self, mean, covariance, track_id, n_init, max_age,
feature=None):
feature=None, class_id=0):
self.mean = mean
self.covariance = covariance
self.track_id = track_id
Expand All @@ -74,11 +80,15 @@ def __init__(self, mean, covariance, track_id, n_init, max_age,

self.state = TrackState.Tentative
self.features = []
self.class_id = class_id
self.ema_feature = feature.copy() if feature is not None else None

if feature is not None:
self.features.append(feature)

self._n_init = n_init
self._max_age = max_age
self._ema_alpha = 0.9 # EMA decay factor

def to_tlwh(self):
"""Get current position in bounding box format `(top left x, top left y,
Expand Down Expand Up @@ -137,7 +147,14 @@ def update(self, kf, detection):
"""
self.mean, self.covariance = kf.update(
self.mean, self.covariance, detection.to_xyah())
self.features.append(detection.feature)

# Update EMA feature
if detection.feature is not None:
if self.ema_feature is None:
self.ema_feature = detection.feature.copy()
else:
self.ema_feature = self._ema_alpha * self.ema_feature + (1 - self._ema_alpha) * detection.feature
self.features.append(detection.feature)

self.hits += 1
self.time_since_update = 0
Expand Down
145 changes: 145 additions & 0 deletions deep_sort/track_interpolator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import numpy as np
from scipy.interpolate import interp1d
from deep_sort.track import Track

class TrackInterpolator:
"""
轨迹插值与断点平滑处理类
"""

def __init__(self, max_gap=30):
"""
初始化轨迹插值器

参数
----------
max_gap : int
最大允许的轨迹中断帧数
"""
self.max_gap = max_gap
self.track_history = {}

def add_track(self, track_id, frame_id, bbox, class_id):
"""
添加轨迹点

参数
----------
track_id : int
轨迹ID
frame_id : int
帧ID
bbox : ndarray
边界框 (tlwh格式)
class_id : int
类别ID
"""
if track_id not in self.track_history:
self.track_history[track_id] = []

self.track_history[track_id].append((frame_id, bbox, class_id))

def offline_interpolate_tracks(self):
"""
离线对所有轨迹进行插值处理,使用scipy进行平滑插值

返回
----------
dict
插值后的轨迹历史
"""
interpolated_history = {}

for track_id, track_points in self.track_history.items():
if len(track_points) < 2:
interpolated_history[track_id] = track_points
continue

# 按帧ID排序
sorted_points = sorted(track_points, key=lambda x: x[0])

# 提取帧ID和边界框坐标
frame_ids = [p[0] for p in sorted_points]
bboxes = [p[1] for p in sorted_points]
class_ids = [p[2] for p in sorted_points]

# 将边界框转换为numpy数组
bboxes = np.array(bboxes)

# 计算帧ID之间的间隙
gaps = np.diff(frame_ids)

# 构建插值后的轨迹
interpolated = [sorted_points[0]]

for i in range(1, len(sorted_points)):
prev_frame = frame_ids[i-1]
curr_frame = frame_ids[i]
gap = gaps[i-1]

# 如果间隙过大,不进行插值
if gap > self.max_gap:
interpolated.append(sorted_points[i])
continue

# 对每个边界框坐标进行单独插值
prev_bbox = bboxes[i-1]
curr_bbox = bboxes[i]

# 创建插值函数(使用线性插值)
interp_functions = []
for j in range(4): # tlwh四个坐标
interp_func = interp1d([prev_frame, curr_frame], [prev_bbox[j], curr_bbox[j]], kind='linear')
interp_functions.append(interp_func)

# 生成中间帧的插值结果
for j in range(1, gap):
interp_frame = prev_frame + j
interp_bbox = np.array([f(interp_frame) for f in interp_functions])
interpolated.append((interp_frame, interp_bbox, class_ids[i-1]))

interpolated.append(sorted_points[i])

interpolated_history[track_id] = interpolated

return interpolated_history

def save_interpolated_tracks(self, output_file):
"""
保存插值后的轨迹到文件

参数
----------
output_file : str
输出文件路径
"""
interpolated_history = self.offline_interpolate_tracks()
np.save(output_file, interpolated_history)

def load_interpolated_tracks(self, input_file):
"""
从文件加载插值后的轨迹

参数
----------
input_file : str
输入文件路径
"""
self.track_history = np.load(input_file, allow_pickle=True).item()

def get_interpolated_track(self, track_id):
"""
获取指定轨迹的插值结果

参数
----------
track_id : int
轨迹ID

返回
----------
list
插值后的轨迹点列表
"""
interpolated_history = self.offline_interpolate_tracks()
return interpolated_history.get(track_id, [])
119 changes: 73 additions & 46 deletions deep_sort/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,61 +78,88 @@ def update(self, detections):
self._initiate_track(detections[detection_idx])
self.tracks = [t for t in self.tracks if not t.is_deleted()]

# Update distance metric.
# Update distance metric using EMA features.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
if not track.is_confirmed() or track.ema_feature is None:
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit(
np.asarray(features), np.asarray(targets), active_targets)
features.append(track.ema_feature)
targets.append(track.track_id)
track.features = [] # Clear the features list
if features:
self.metric.partial_fit(
np.asarray(features), np.asarray(targets), active_targets)

def _match(self, detections):

def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)

return cost_matrix

# Split track set into confirmed and unconfirmed tracks.
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]

# Associate confirmed tracks using appearance features.
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)

# Associate remaining tracks together with unconfirmed tracks using IOU.
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1]
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1]
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)

matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
# Group detections by class ID
detections_by_class = {}
for i, det in enumerate(detections):
if det.class_id not in detections_by_class:
detections_by_class[det.class_id] = []
detections_by_class[det.class_id].append(i)

# Group tracks by class ID
tracks_by_class = {}
for i, track in enumerate(self.tracks):
if track.class_id not in tracks_by_class:
tracks_by_class[track.class_id] = []
tracks_by_class[track.class_id].append(i)

all_matches = []
all_unmatched_tracks = []
all_unmatched_detections = []

# Process each class separately
for class_id in set(detections_by_class.keys()).union(tracks_by_class.keys()):
class_detections = detections_by_class.get(class_id, [])
class_tracks = tracks_by_class.get(class_id, [])

if not class_detections and not class_tracks:
continue

def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix

# Split track set into confirmed and unconfirmed tracks.
confirmed_tracks = [i for i in class_tracks if self.tracks[i].is_confirmed()]
unconfirmed_tracks = [i for i in class_tracks if not self.tracks[i].is_confirmed()]

# Associate confirmed tracks using appearance features.
# First, filter detections to only include those of the current class
class_detections_indices = class_detections
matches_a, unmatched_tracks_a, unmatched_detections_class = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks, class_detections_indices)

# Associate remaining tracks together with unconfirmed tracks using IOU.
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1]
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1]
matches_b, unmatched_tracks_b, unmatched_detections_class = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections_class)

all_matches.extend(matches_a + matches_b)
all_unmatched_tracks.extend(unmatched_tracks_a + unmatched_tracks_b)
all_unmatched_detections.extend(unmatched_detections_class)

return all_matches, all_unmatched_tracks, all_unmatched_detections

def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah())
self.tracks.append(Track(
mean, covariance, self._next_id, self.n_init, self.max_age,
detection.feature))
detection.feature, detection.class_id))
self._next_id += 1
Loading