-
Notifications
You must be signed in to change notification settings - Fork 0
/
vision.py
290 lines (248 loc) · 9.87 KB
/
vision.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
from camera_manager import CameraManager
from connection import NTConnection
import magic_numbers
from typing import Tuple, Optional, List
from math import tan
import cv2
import numpy as np
import time
class Vision:
def __init__(self, camera_manager: CameraManager, connection: NTConnection) -> None:
self.camera_manager = camera_manager
self.connection = connection
def run(self) -> None:
"""Main process function.
Captures an image, processes the image, and sends results to the RIO.
"""
self.connection.pong()
frame_time, frame = self.camera_manager.get_frame()
# frame time is 0 in case of an error
if frame_time == 0:
self.camera_manager.notify_error(self.camera_manager.get_error())
return
# Flip the image beacuse it's originally upside down.
# frame = cv2.rotate(frame, cv2.ROTATE_180)
results, display = process_image(frame)
if results is not None:
(norm_x, norm_y, confidence) = results
(angle, distance) = get_bearing_distance_to_target((norm_x, norm_y))
self.connection.send_results(
(angle, distance, confidence, time.monotonic())
) # x and y in NDC, positive axes right and down; timestamp
# send image to display on driverstation
self.camera_manager.send_frame(display)
self.connection.set_fps()
def process_image(
frame: np.ndarray,
) -> Tuple[Optional[Tuple[float, float, float]], np.ndarray]:
"""Takes a frame returns the target dist & angle and an annotated display
returns None if there is no target
"""
frame_height = frame.shape[0]
frame_width = frame.shape[1]
mask = preprocess(frame)
contours = filter_contours(find_contours(mask))
contour_areas = [cv2.contourArea(c) for c in contours]
groups = group_contours(contours, contour_areas)
best_group = rank_groups(groups, contours, contour_areas)
if best_group is None:
display = annotate_image(frame, contours, [], (-1, -1))
cv2.circle(display, (frame_width // 2, frame_height // 2), 5, (0, 255, 255), -1)
return (None, display)
pos = group_com(contours, best_group, contour_areas)
display = annotate_image(frame, contours, best_group, pos)
norm_x = pos[0] * 2.0 / frame_width - 1.0
norm_y = pos[1] * 2.0 / frame_height - 1.0
conf = group_confidence(best_group, contours, contour_areas) * (1.0 - abs(norm_x))
return (norm_x, norm_y, conf), display
def get_bearing_distance_to_target(
normalised_location: Tuple[float, float]
) -> Tuple[float, float]:
norm_x = normalised_location[0]
norm_y = normalised_location[1]
angle = norm_x * magic_numbers.MAX_FOV_WIDTH / 2
# Trigonometrically estimated from the group's COM height on the screen
vert_angle = magic_numbers.GROUND_ANGLE - norm_y * magic_numbers.MAX_FOV_HEIGHT / 2
distance = (
magic_numbers.REL_TARGET_HEIGHT / tan(vert_angle)
+ magic_numbers.TARGET_EDGE_TO_CENTRE
)
return (angle, distance)
def preprocess(frame: np.ndarray) -> np.ndarray:
"""Creates a mask of expected target green color"""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, magic_numbers.TARGET_HSV_LOW, magic_numbers.TARGET_HSV_HIGH)
mask = cv2.erode(mask, magic_numbers.ERODE_DILATE_KERNEL)
mask = cv2.dilate(mask, magic_numbers.ERODE_DILATE_KERNEL)
return mask
def find_contours(mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Finds contours on a grayscale mask"""
*_, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
return contours
def filter_contours(contours: List[np.ndarray]) -> List[np.ndarray]:
"""Filters contours based on their aspect ratio, discaring tall ones"""
def is_contour_good(contour: np.ndarray):
_, _, w, h = cv2.boundingRect(contour)
return w / h > magic_numbers.MIN_CONTOUR_ASPECT_RATIO
return [c for c in contours if is_contour_good(c)]
def group_contours(contours: np.ndarray, contour_areas: List[int]) -> List[List[int]]:
"""Returs a nested list of contour indices grouped by relative distance"""
# Build an adjacency list based on distance between contours relative to their area
connections = [[] for _ in contours]
for i, a in enumerate(contours):
a_com = a.mean(axis=0)[0]
a_area = contour_areas[i]
for j, b in enumerate(contours[i + 1 :]):
max_metric = (
max(a_area, contour_areas[j + i + 1])
* magic_numbers.METRIC_SCALE_FACTOR
)
b_com = b.mean(axis=0)[0]
d = a_com - b_com
metric = d[0] ** 2 + d[1] ** 2
if metric < max_metric:
connections[i].append(j + i + 1)
connections[j + i + 1].append(i)
# Breadth first search from each contour that wasn't yet assigned to a group to find its group
assigned = [False for _ in contours]
group = set()
groups = []
for i, c in enumerate(connections):
if assigned[i]:
continue
group = {i}
assigned[i] = True
horizon = set(c)
while True:
if len(horizon) == 0:
# Group completed
groups.append(list(group))
break
for i in horizon:
assigned[i] = True
group.update(horizon)
new_horizon = set()
for old in horizon:
new_horizon.update(connections[old])
new_horizon.difference_update(group)
horizon = new_horizon
return groups
def rank_groups(
groups: List[List[int]], contours: np.ndarray, contour_areas: np.ndarray
) -> Optional[List[int]]:
"""Returns the group that is most likely to be the target
Takes the group with the largest combined area that has >1 contour
"""
# throw away groups with only 1 target as they are likely a false positive
valid_groups = (g for g in groups if len(g) > 1)
return max(
valid_groups,
key=lambda x: group_fitness(x, contours, contour_areas),
default=None,
)
def lerp(
value: float,
input_lower: float,
input_upper: float,
output_lower: float,
output_upper: float,
) -> float:
"""Scales a value based on the input range and output range.
For example, to scale a joystick throttle (1 to -1) to 0-1, we would:
scale_value(joystick.getThrottle(), 1, -1, 0, 1)
"""
input_distance = input_upper - input_lower
output_distance = output_upper - output_lower
ratio = (value - input_lower) / input_distance
result = ratio * output_distance + output_lower
return result
def group_fitness(
group: List[int],
contours: np.ndarray,
contour_areas: np.ndarray,
) -> float:
"""Fittness function for ranking the groups, unitless"""
bounding_rects = [cv2.boundingRect(contours[i]) for i in group]
min_y = min(rect[1] for rect in bounding_rects)
max_y = max(rect[1] + rect[3] for rect in bounding_rects)
height = max_y - min_y
return (
sum(contour_areas[i] for i in group) * magic_numbers.TOTAL_AREA_K
+ height**2 * magic_numbers.HEIGHT_K
)
def group_confidence(
group: List[int], contours: np.ndarray, contour_areas: np.ndarray
) -> float:
"""Confidence for a group 0-1"""
# work out aspect ratio
bounding_rects = [cv2.boundingRect(contours[i]) for i in group]
min_x = min(rect[0] for rect in bounding_rects)
max_x = max(rect[0] + rect[2] for rect in bounding_rects)
min_y = min(rect[1] for rect in bounding_rects)
max_y = max(rect[1] + rect[3] for rect in bounding_rects)
height = max_y - min_y
width = max_x - min_x
aspect_ratio = width / height
aspect_ratio_error = abs(1 - magic_numbers.GROUP_ASPECT_RATIO / aspect_ratio)
# how close to being a rectangle each contour is
rects_area = sum(rect[2] * rect[3] for rect in bounding_rects)
real_area = sum(contour_areas[i] for i in group)
rectangulares = real_area / rects_area
length = lerp(len(group), 2, 5, 0.5, 1)
return (
lerp(aspect_ratio_error, 0, magic_numbers.SATURATING_ASPECT_RATIO_ERROR, 1, 0)
* magic_numbers.CONF_ASPECT_RATIO_K
+ rectangulares * magic_numbers.CONF_RECTANGULARES_K
+ length * magic_numbers.CONF_LENGTH_K
) / magic_numbers.CONF_TOTAL
def group_com(
contours: np.ndarray,
group: List[int],
contour_areas: List[int],
) -> Tuple[int, int]:
"""Return center of mass of a contour group"""
# Calculates mean of contour centers weighted by their areas
summed = np.array((0.0, 0.0))
total_area = 0
for c in group:
area = contour_areas[c]
summed += contours[c].mean(axis=0)[0] * area
total_area += area
weighted_position = summed / total_area # xy position
return (
int(weighted_position[0]),
min(min(p[0][1] for p in contours[c]) for c in group),
)
def annotate_image(
display: np.ndarray, contours: np.ndarray, group: List[int], pos: Tuple[int, int]
) -> np.ndarray:
cv2.drawContours(
display,
contours,
-1,
(255, 0, 0),
thickness=2,
)
for c1, c2 in zip(group, group[1:]):
# takes the first point in each contour to be fast
p1 = contours[c1][0][0] # each point is [[x, y]]
p2 = contours[c2][0][0]
cv2.line(display, tuple(map(int, p1)), tuple(map(int, p2)), (0, 255, 0), 1)
cv2.circle(display, pos, 5, (0, 0, 255), -1)
return display
if __name__ == "__main__":
# this is to run on the robot
# to run vision code on your laptop use sim.py
vision = Vision(
CameraManager(
"Power Port Camera",
"/dev/video0",
magic_numbers.FRAME_HEIGHT,
magic_numbers.FRAME_WIDTH,
30,
"kYUYV",
),
NTConnection(),
)
while True:
vision.run()