-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblobs.py
More file actions
208 lines (170 loc) · 5.91 KB
/
blobs.py
File metadata and controls
208 lines (170 loc) · 5.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import numpy as np
import munkres
import pt_config
# Configuration constants. Change these in pt_config
BLOB_LIFE = pt_config.BLOB_LIFE # life of blob in frames, if not seen
EDGE_THRESHOLD = pt_config.EDGE_THRESHOLD # border of image, in pixels, which is regarded as out-of-frame
DISTANCE_THRESHOLD = pt_config.DISTANCE_THRESHOLD # distance threshold, in pixels. If blob is further than this from previous position, update is ignored
MOVE_LIMIT = pt_config.MOVE_LIMIT # maximum velocity of the blob. If outside this limit, velocity is disabled
MATCH_DISTANCE = pt_config.MATCH_DISTANCE # maximum distance between blobs in the Hungarian algorithm matching step
blob_id = 0
class VirtualBlob:
"""
Represents a single pedestrian blob.
"""
def __init__(self, x,y):
"""
Create a new blob at the given (x,y) co-ordinate (in pixels). Each blob has a unique
ID number, and a random color (for visulisation)
"""
global blob_id
self.x = x
self.y = y
self.dx = 0
self.dy = 0
self.life = BLOB_LIFE
self.got_updated = False
self.color = (np.random.randint(0,255),np.random.randint(0,255),np.random.randint(0,255))
self.id = blob_id
blob_id = blob_id + 1
def update_location(self, x, y):
"""Update the current state of the blob to the new given position, if it is
not too far away (<DISTANCE_THRESHOLD away) from the previous position"""
if abs(x-self.x)<DISTANCE_THRESHOLD and abs(y-self.y)<DISTANCE_THRESHOLD:
self.dx = 0.65*self.dx + 0.35*(x - self.x)
self.dy = 0.65*self.dy + 0.35*(y - self.y)
self.x = 0.6*self.x + 0.4*x
self.y = 0.6*self.y + 0.4*y
self.life = BLOB_LIFE
self.got_updated = True
def set_location(self, x, y):
"""Change the position of the blob _without_ any distance filtering or velocity calculation."""
self.x = x
self.y = y
def move(self):
"""Apply the current estimated velocity to the blob; used when the blob is not observed in the scene"""
if abs(self.dx) < MOVE_LIMIT and abs(self.dy) < MOVE_LIMIT:
self.x += self.dx
self.y += self.dy
def decay(self):
"""Age the blob by one unit. When life<=0, return True, else return False"""
# update location using velocity
# die a bit
self.life = self.life - 1
return self.life<=0
def __repr__(self):
return "(%d, %d, %d, %d)" % (self.x, self.y, self.dx, self.dy)
class BlobTracker:
"""The tracker object, which keeps track of a collection of pedestrian blobs"""
def __init__(self):
"""Initialise a new, empty tracker"""
self.virtual_blobs = []
self.traces = {}
self.frame = 0
self.is_inited=False
def init_blobs(self, blobs, fnum):
"""Initialise a set of blobs, from a list of initial (x,y) co-ordinates, in the format
[(x,y), (x,y), ... ] """
# initialise virtual blobs to be blobs
self.virtual_blobs = []
for blob in blobs:
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
self.is_inited = True
#returns true is this blob is within the frame
def check_frame(self, blob, frame):
"""Given an (x,y) co-ordinated, check if that position is inside the central frame (i.e. is
not inside the border region"""
# Check Frame
in_frame = False
# left
if blob[0]< frame[0]+EDGE_THRESHOLD:
in_frame = True
# right
if blob[0]> frame[2]-EDGE_THRESHOLD:
in_frame = True
# top
if blob[1]< frame[1]+EDGE_THRESHOLD:
in_frame = True
# bottom
if blob[1]> frame[3]-EDGE_THRESHOLD:
in_frame = True
return in_frame
def track_blobs(self, blobs, frame, fnum):
"""Main update call. Takes a list of new, observed blob co-ordinates, a rectangular frame specifier of the form
[left, bottom, right, top] and a frame number, and updates the positions of the virtual blobs."""
# initialise if not already done so
if not self.is_inited:
self.init_blobs(blobs, fnum)
return
# get max length of blob lists
max_size = max(len(blobs), len(self.virtual_blobs))
distance_matrix = np.zeros((max_size, max_size))
for v in self.virtual_blobs:
v.move()
# compute distance matrix
for i in range(max_size):
if i>=len(blobs):
distance_matrix[i,:] = 0
# no matching blob/virtual blob
else:
for j in range(max_size):
if j>=len(self.virtual_blobs):
distance_matrix[i,j] = 0
else:
dx = blobs[i][0]-self.virtual_blobs[j].x
dy = blobs[i][1]-self.virtual_blobs[j].y
distance_matrix[i,j] = np.sqrt(dx**2 + dy**2)
copy_distances = np.array(distance_matrix)
m = munkres.Munkres()
ot = m.compute(distance_matrix)
rows = [t[1] for t in ot]
# clear the update flag
for v in self.virtual_blobs:
v.got_updated = False
# blobs on rows
for i,matching_virtual in enumerate(rows):
if i<len(blobs):
blob = blobs[i]
if matching_virtual<len(self.virtual_blobs):
if copy_distances[i][matching_virtual]< MATCH_DISTANCE:
self.virtual_blobs[matching_virtual].update_location(blob[0], blob[1])
elif self.check_frame(blob, frame):
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
else:
# new baby blobs!
spawn = False
# left
if blob[0]<frame[0]+EDGE_THRESHOLD:
spawn = True
# right
if blob[0]>frame[2]-EDGE_THRESHOLD:
spawn = True
# top
if blob[1]<frame[1]+EDGE_THRESHOLD:
spawn = True
# bottom
if blob[1]>frame[3]-EDGE_THRESHOLD:
spawn = True
if spawn:
v = VirtualBlob(blob[0], blob[1])
self.virtual_blobs.append(v)
self.traces[v.id] = [(v.x, v.y, fnum)]
else:
pass
# deal with un-updated blobs
graveyard = []
for v in self.virtual_blobs:
if not v.got_updated:
# move, and reduce life counter
if v.decay():
#print "Virtual blob %s finally died." % v
graveyard.append(v)
# append trace of blob movement
self.traces[v.id].append((v.x, v.y, fnum))
# clean up the bodies
for v in graveyard:
self.virtual_blobs.remove(v)