Compare commits
208 Commits
14c78e2023
...
main
Author | SHA1 | Date | |
---|---|---|---|
90a1c03a5c | |||
30213107de | |||
78e76da161 | |||
1c79989840 | |||
90b5ff824d | |||
144551df39 | |||
e18767fcb6 | |||
919b7b2c4d | |||
b30833dcae | |||
12bcfe412f | |||
374c321c4c | |||
e974a0bb27 | |||
f7d459482a | |||
073897c485 | |||
76a459fc9f | |||
b0bd525726 | |||
c70da77fc9 | |||
60dbadabe1 | |||
bcf1ae2a5a | |||
82e4c2eb43 | |||
02c2c04ca4 | |||
5fdcf1c0dc | |||
71b7c67fff | |||
fbd30c388c | |||
ffe364dd97 | |||
21236301e0 | |||
9d8e95d711 | |||
22ef920735 | |||
0558fcd9e4 | |||
9890800079 | |||
d68039f09f | |||
323185d08d | |||
baed8628be | |||
850ccb8179 | |||
879adf7b91 | |||
123c8fe350 | |||
f93a046270 | |||
9cc3b41f87 | |||
81862f8ee3 | |||
3219028980 | |||
cf21934931 | |||
a7616ad41c | |||
963509fd8a | |||
2c65a75f28 | |||
0621d02fff | |||
51b29110bc | |||
8984fcf6af | |||
6c27ad4805 | |||
437ccde0f1 | |||
e329d2b87f | |||
679d6b78c5 | |||
302095a929 | |||
8be9656f5d | |||
15fab9c09c | |||
04c8dcc96e | |||
5ddcd8499d | |||
9145085739 | |||
ebf356445b | |||
ef5f297a47 | |||
0e46b0dbe2 | |||
ca4929de5c | |||
e0cea5cb78 | |||
a626ad7c14 | |||
be1183c611 | |||
94bbb57d65 | |||
05f654ba94 | |||
2369aad004 | |||
7edb905bd2 | |||
cb33cc0027 | |||
417dd962ae | |||
b636440409 | |||
616d902ee2 | |||
c78c596db2 | |||
ad57381bc0 | |||
a7ffe3f581 | |||
23b99b56d5 | |||
5e2766b01e | |||
dc23e96fba | |||
ef9cabcaa5 | |||
d3db855dad | |||
d8647d4466 | |||
4ac0ebbd22 | |||
1f763a512f | |||
edec956494 | |||
4e41d872ce | |||
f822019bd1 | |||
3b1ce060df | |||
492f7b90cd | |||
4c89c36a78 | |||
afd5be5eb3 | |||
93df18a433 | |||
ab5974d46a | |||
85289bc5c9 | |||
90d13f8a03 | |||
03245ecd2a | |||
1a4587d60c | |||
44434488a6 | |||
1edc312c8a | |||
5714651a8d | |||
2372c85148 | |||
2c10cc9346 | |||
64d548ddfa | |||
a97b998754 | |||
783f3291c8 | |||
baf83d8fee | |||
27f039059f | |||
1c5a0edc0b | |||
8884013910 | |||
af3064ad4c | |||
85319f7631 | |||
5966c1188b | |||
1f9cb9069e | |||
f8de362c43 | |||
f60b56587a | |||
3e32a38ad2 | |||
d4a4d1c443 | |||
ac5e76d977 | |||
86b621fa9d | |||
a0eeeba787 | |||
b3aaf40487 | |||
0860ccd58b | |||
ac6d94ee39 | |||
dc8f23bd5e | |||
5e7509d2b6 | |||
8a2156a5d9 | |||
642591ecc4 | |||
b7ccb84c19 | |||
0bad669ba5 | |||
f2e3a7f56a | |||
b0e4bcf85c | |||
e3916ab6a0 | |||
3a3903176a | |||
f77b2d32bd | |||
f547e8e0c3 | |||
e1e229efc7 | |||
8c2a835e01 | |||
640db167fe | |||
f598801950 | |||
2009e1c972 | |||
a025ed0098 | |||
341de6027a | |||
1b0c989800 | |||
81815b17a6 | |||
6b3c84fff2 | |||
2490d93a3c | |||
7fb14f9345 | |||
95c011e4e7 | |||
f546e3f8f5 | |||
13bde48720 | |||
0cb11b5264 | |||
eeafcc7f8b | |||
81d9ae690a | |||
8cf1d8dcbd | |||
8201952b4b | |||
193e15fc2a | |||
517948c543 | |||
f49cab65f2 | |||
35b4cdbdfe | |||
35971fbcee | |||
a675ecfc88 | |||
0c3b5a7684 | |||
fc6f2ff67a | |||
c705d8bc7c | |||
7908028313 | |||
3f55a78127 | |||
29d567ff16 | |||
a90ef9d8f9 | |||
76f7e922cc | |||
94c7bf1d35 | |||
ac48afd01a | |||
6f93fd0857 | |||
f6ce6456b8 | |||
01ab491b61 | |||
1888422cdf | |||
96992c878e | |||
c1a6ddeda4 | |||
f28b1f6e1b | |||
9972ad7e86 | |||
07817b3356 | |||
8008f8fdae | |||
774fa0c44d | |||
0abeff7273 | |||
5916f9dbb3 | |||
08d5ed2f0a | |||
6c05d877f5 | |||
77bc6489e0 | |||
d6ed233b8c | |||
f9c28d107e | |||
8d2923af00 | |||
5fc8948852 | |||
9b684bb9b7 | |||
21df2467e6 | |||
fa51978f98 | |||
d466eb172e | |||
d088fe6a28 | |||
b603737bf8 | |||
5c5859cc45 | |||
e53dbab406 | |||
0c13f8cb2e | |||
280d9037c0 | |||
df1a4de690 | |||
b12106cd57 | |||
bef95b8526 | |||
cd640c403a | |||
7267e04ba7 | |||
55704cf590 | |||
2fd874c55d | |||
b8abaee518 |
BIN
Examples/.DS_Store
vendored
Normal file
BIN
Examples/.DS_Store
vendored
Normal file
Binary file not shown.
96
drive_between_arucos.py
Normal file
96
drive_between_arucos.py
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
from time import sleep
|
||||||
|
from math import cos, sin
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from robot import Robot
|
||||||
|
|
||||||
|
POWER = 70
|
||||||
|
|
||||||
|
TURN_T = 7.9 # 1 degree
|
||||||
|
DRIVE_T = 22 # 1 centimeter
|
||||||
|
|
||||||
|
RIGHT_WHEEL_OFFSET = 4
|
||||||
|
|
||||||
|
CLOCKWISE_OFFSET = 0.92
|
||||||
|
|
||||||
|
FOCAL_LENGTH = 1691
|
||||||
|
|
||||||
|
CAMERA_MATRIX = np.array(
|
||||||
|
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||||
|
dtype=np.float32
|
||||||
|
)
|
||||||
|
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||||
|
|
||||||
|
def find_aruco(image):
|
||||||
|
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||||
|
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||||
|
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||||
|
image,
|
||||||
|
aruco_dict,
|
||||||
|
parameters=aruco_params
|
||||||
|
)
|
||||||
|
|
||||||
|
if corners is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [(box[0], ids[i][0]) for i, box in enumerate(corners)]
|
||||||
|
|
||||||
|
def find_arucos(arlo):
|
||||||
|
aruco_dict = {}
|
||||||
|
theta = np.deg2rad(-20)
|
||||||
|
rot = np.array([[cos(theta), -sin(theta)], [sin(theta), cos(theta)]])
|
||||||
|
|
||||||
|
while True:
|
||||||
|
arucos = find_aruco(arlo.take_photo())
|
||||||
|
if arucos != []:
|
||||||
|
for aruco, id_ in arucos:
|
||||||
|
if id_ in [1,6] and id_ not in aruco_dict:
|
||||||
|
print(f"found box {id_}")
|
||||||
|
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||||
|
np.array([aruco]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||||
|
)[1][0][0]
|
||||||
|
position = np.array([position[2], position[0]])
|
||||||
|
position[0] += 22.5
|
||||||
|
aruco_dict[id_] = position
|
||||||
|
|
||||||
|
if len(aruco_dict) >= 2:
|
||||||
|
break
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
sleep((20 * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
for key, value in aruco_dict.items():
|
||||||
|
aruco_dict[key] = np.dot(rot, value)
|
||||||
|
|
||||||
|
return np.array(list(aruco_dict.values())[:2])
|
||||||
|
|
||||||
|
def main():
|
||||||
|
arlo = Robot()
|
||||||
|
aruco_positions = find_arucos(arlo)
|
||||||
|
print(aruco_positions)
|
||||||
|
|
||||||
|
position = [
|
||||||
|
np.average(aruco_positions[:,0]),
|
||||||
|
np.average(aruco_positions[:,1])
|
||||||
|
]
|
||||||
|
print(position)
|
||||||
|
|
||||||
|
angle = np.rad2deg(np.arctan(position[1]/position[0]))
|
||||||
|
drive_distance = np.sqrt(position[0]**2 + position[1]**2)
|
||||||
|
if angle < 0:
|
||||||
|
arlo.go_diff(POWER, POWER, 0, 1)
|
||||||
|
sleep((abs(angle) * TURN_T)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
else:
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
sleep((drive_distance * DRIVE_T)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
71
drive_to_aruco.py
Normal file
71
drive_to_aruco.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
from time import sleep
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from robot import Robot
|
||||||
|
|
||||||
|
POWER = 70
|
||||||
|
|
||||||
|
TURN_T = 7.9 # 1 degree
|
||||||
|
DRIVE_T = 22 # 1 centimeter
|
||||||
|
|
||||||
|
RIGHT_WHEEL_OFFSET = 4
|
||||||
|
|
||||||
|
CLOCKWISE_OFFSET = 0.96
|
||||||
|
|
||||||
|
FOCAL_LENGTH = 1691
|
||||||
|
|
||||||
|
CAMERA_MATRIX = np.array(
|
||||||
|
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||||
|
dtype=np.float32
|
||||||
|
)
|
||||||
|
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||||
|
|
||||||
|
def find_aruco(image):
|
||||||
|
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||||
|
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||||
|
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||||
|
image,
|
||||||
|
aruco_dict,
|
||||||
|
parameters=aruco_params
|
||||||
|
)
|
||||||
|
|
||||||
|
if corners is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [(box[0], ids[i]) for i, box in enumerate(corners)]
|
||||||
|
|
||||||
|
def main():
|
||||||
|
arlo = Robot()
|
||||||
|
while True:
|
||||||
|
arucos = find_aruco(arlo.take_photo())
|
||||||
|
if arucos != []:
|
||||||
|
break
|
||||||
|
arlo.go_diff(40, 40, 1, 0)
|
||||||
|
sleep(0.3)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||||
|
np.array([arucos[0][0]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||||
|
)[1][0][0]
|
||||||
|
|
||||||
|
angle = np.rad2deg(np.arctan(position[0]/position[2]))
|
||||||
|
drive_distance = np.sqrt(position[0]**2 + position[2]**2)
|
||||||
|
print(drive_distance)
|
||||||
|
if angle < 0:
|
||||||
|
arlo.go_diff(POWER, POWER, 0, 1)
|
||||||
|
sleep((abs(angle) * TURN_T)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
else:
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
# arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
# sleep((drive_distance * DRIVE_T)/1000)
|
||||||
|
# arlo.stop()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
11
estimate_distance.py
Normal file
11
estimate_distance.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
from robot import Arlo
|
||||||
|
|
||||||
|
def main():
|
||||||
|
arlo = Arlo(False)
|
||||||
|
for _ in range(5):
|
||||||
|
image = arlo.robot.take_photo()
|
||||||
|
bounding_boxes = arlo.find_aruco(image)
|
||||||
|
print(arlo.estimate_distance(bounding_boxes[0][0]))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
20
occupancy_grids.py
Normal file
20
occupancy_grids.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from matplotlib.backend_bases import NonGuiException
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
from numpy.random import uniform
|
||||||
|
|
||||||
|
|
||||||
|
def Binary_Baye_Filter (I, zt, x):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def invserse_sensor_model(m, x, z):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def update(particles, weights, z, R, landmarks):
|
||||||
|
for i, landmark in enumerate(landmarks):
|
||||||
|
distance = np.linalg.norm(particles[:, 0:2] - landmark, axis=1)
|
||||||
|
weights *= np.scipy.stats.norm(distance, R).pdf(z[i])
|
||||||
|
|
||||||
|
weights += 1.e-300 # avoid round-off to zero
|
||||||
|
weights /= sum(weights) # normalize
|
154
rally.py
Normal file
154
rally.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
import robot
|
||||||
|
|
||||||
|
LANDMARKS = [1,2,3,4] # SET ARUCO NUMBERS
|
||||||
|
|
||||||
|
POWER = 70
|
||||||
|
|
||||||
|
TURN_T = 8.3 # 1 degree
|
||||||
|
DRIVE_T = 22 # 1 centimeter
|
||||||
|
|
||||||
|
RIGHT_WHEEL_OFFSET = 5
|
||||||
|
|
||||||
|
CLOCKWISE_OFFSET = 0.8
|
||||||
|
|
||||||
|
FOCAL_LENGTH = 1691
|
||||||
|
|
||||||
|
CAMERA_MATRIX = np.array(
|
||||||
|
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||||
|
dtype=np.float32
|
||||||
|
)
|
||||||
|
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||||
|
|
||||||
|
def find_aruco(image):
|
||||||
|
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||||
|
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||||
|
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||||
|
image,
|
||||||
|
aruco_dict,
|
||||||
|
parameters=aruco_params
|
||||||
|
)
|
||||||
|
|
||||||
|
if corners is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return {ids[i][0]: box[0] for i, box in enumerate(corners)}
|
||||||
|
|
||||||
|
|
||||||
|
def drunk_drive(drive_time, arlo, careful_range = 600):
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
end = start + drive_time
|
||||||
|
turning = None
|
||||||
|
while time.time() < end:
|
||||||
|
forward_dist = arlo.read_front_ping_sensor()
|
||||||
|
right_dist = arlo.read_right_ping_sensor()
|
||||||
|
left_dist = arlo.read_left_ping_sensor()
|
||||||
|
if forward_dist > careful_range and all(x > 250 for x in [right_dist, left_dist]):
|
||||||
|
turning = None
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
else:
|
||||||
|
if turning == "R" or (forward_dist > careful_range and right_dist > 250 and turning is None):
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
turning = "R"
|
||||||
|
else:
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||||
|
turning = "L"
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
def careful_forward(drive_time, arlo, careful_range = 600):
|
||||||
|
start = time.time()
|
||||||
|
if drive_time > 0.8:
|
||||||
|
drive_time = 0.8
|
||||||
|
hit_something = True
|
||||||
|
else:
|
||||||
|
hit_something = False
|
||||||
|
|
||||||
|
end = start + drive_time
|
||||||
|
turning = None
|
||||||
|
while time.time() < end:
|
||||||
|
forward_dist = arlo.read_front_ping_sensor()
|
||||||
|
right_dist = arlo.read_right_ping_sensor()
|
||||||
|
left_dist = arlo.read_left_ping_sensor()
|
||||||
|
if forward_dist > careful_range and all(x > 250 for x in [right_dist, left_dist]):
|
||||||
|
turning = None
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
else:
|
||||||
|
hit_something = True
|
||||||
|
if turning == "R" or (forward_dist > careful_range and right_dist > 250 and turning is None):
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
turning = "R"
|
||||||
|
else:
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||||
|
turning = "L"
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
return hit_something
|
||||||
|
|
||||||
|
def main():
|
||||||
|
landmark_order = LANDMARKS + [
|
||||||
|
LANDMARKS[0]
|
||||||
|
]
|
||||||
|
|
||||||
|
noah = robot.Robot()
|
||||||
|
|
||||||
|
while landmark_order != []:
|
||||||
|
landmark = landmark_order.pop(0)
|
||||||
|
print(f"looking for landmark {landmark}")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
while True:
|
||||||
|
for _ in range(24):
|
||||||
|
arucos = find_aruco(noah.take_photo())
|
||||||
|
if landmark in arucos:
|
||||||
|
break
|
||||||
|
noah.go_diff(POWER, POWER, 0, 1)
|
||||||
|
time.sleep((20 * TURN_T)/1000)
|
||||||
|
noah.stop()
|
||||||
|
|
||||||
|
if landmark in arucos:
|
||||||
|
break
|
||||||
|
|
||||||
|
drunk_drive(3, noah)
|
||||||
|
|
||||||
|
|
||||||
|
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||||
|
np.array([arucos[landmark]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||||
|
)[1][0][0]
|
||||||
|
|
||||||
|
angle = np.rad2deg(np.arctan(position[0]/position[2])) - 5
|
||||||
|
drive_distance = np.sqrt(position[0]**2 + position[2]**2) - 100
|
||||||
|
|
||||||
|
if angle < 0:
|
||||||
|
noah.go_diff(POWER, POWER, 0, 1)
|
||||||
|
time.sleep((abs(angle) * TURN_T)/1000)
|
||||||
|
noah.stop()
|
||||||
|
else:
|
||||||
|
noah.go_diff(POWER, POWER, 1, 0)
|
||||||
|
time.sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
noah.stop()
|
||||||
|
|
||||||
|
if not careful_forward((drive_distance * DRIVE_T)/1000, noah, 400):
|
||||||
|
arucos = find_aruco(noah.take_photo())
|
||||||
|
if landmark in arucos:
|
||||||
|
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||||
|
np.array([arucos[landmark]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||||
|
)[1][0][0]
|
||||||
|
drive_distance = np.sqrt(position[0]**2 + position[2]**2) - 25
|
||||||
|
noah.go_diff(POWER, POWER, 1, 1)
|
||||||
|
time.sleep((drive_distance * DRIVE_T)/1000)
|
||||||
|
noah.stop()
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
start = time.time()
|
||||||
|
main()
|
||||||
|
print(f"Drove for {time.time() - start} seconds")
|
314
rally2.py
Normal file
314
rally2.py
Normal file
@ -0,0 +1,314 @@
|
|||||||
|
import time
|
||||||
|
from turtle import right
|
||||||
|
import numpy as np
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
from selflocalization import particle, camera
|
||||||
|
|
||||||
|
import robot
|
||||||
|
|
||||||
|
LANDMARKS = [6,8,9,7]
|
||||||
|
|
||||||
|
LANDMARK_POSITIONS = {
|
||||||
|
6: [0, 0],
|
||||||
|
8: [300, 0],
|
||||||
|
9: [0, 400],
|
||||||
|
7: [300, 400]
|
||||||
|
}
|
||||||
|
|
||||||
|
POWER = 70
|
||||||
|
|
||||||
|
TURN_T = 9.6 # 1 degree
|
||||||
|
DRIVE_T = 22 # 1 centimeter
|
||||||
|
|
||||||
|
RIGHT_WHEEL_OFFSET = 4
|
||||||
|
|
||||||
|
CLOCKWISE_OFFSET = 0.96
|
||||||
|
|
||||||
|
FOCAL_LENGTH = 1691
|
||||||
|
|
||||||
|
CAMERA_MATRIX = np.array(
|
||||||
|
[[FOCAL_LENGTH, 0, 512], [0, FOCAL_LENGTH, 360], [0, 0, 1]],
|
||||||
|
dtype=np.float32
|
||||||
|
)
|
||||||
|
DIST_COEF = np.array([0, 0, 0, 0, 0], dtype=np.float32)
|
||||||
|
|
||||||
|
SIGMA = 3
|
||||||
|
SIGMA_THETA = 0.3
|
||||||
|
|
||||||
|
NUM_PARTICLES = 1000
|
||||||
|
|
||||||
|
|
||||||
|
def look_around(noah, particles, cam, est_pose):
|
||||||
|
for _ in range(24):
|
||||||
|
time.sleep(0.2)
|
||||||
|
# Fetch next frame
|
||||||
|
for _ in range(10):
|
||||||
|
colour = cam.get_next_frame()
|
||||||
|
landmark_values = []
|
||||||
|
|
||||||
|
# Detect objects
|
||||||
|
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||||
|
if not isinstance(objectIDs, type(None)):
|
||||||
|
|
||||||
|
# List detected objects
|
||||||
|
for i in range(len(objectIDs)):
|
||||||
|
print(
|
||||||
|
"Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
|
||||||
|
if objectIDs[i] in LANDMARKS:
|
||||||
|
landmark_values.append((objectIDs[i], dists[i] + 22.5, angles[i]))
|
||||||
|
|
||||||
|
# Compute particle weights
|
||||||
|
for p in particles:
|
||||||
|
calc_weight(p, landmark_values)
|
||||||
|
|
||||||
|
# Resampling
|
||||||
|
particles = particle.resample(particles)
|
||||||
|
|
||||||
|
# Draw detected objects
|
||||||
|
cam.draw_aruco_objects(colour)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# No observation - reset weights to uniform distribution
|
||||||
|
for p in particles:
|
||||||
|
p.setWeight(1.0/NUM_PARTICLES)
|
||||||
|
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
# The estimate of the robots current pose
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
calc_weight(est_pose, landmark_values)
|
||||||
|
noah.go_diff(POWER, POWER, 0, 1)
|
||||||
|
time.sleep((15 * TURN_T)/1000)
|
||||||
|
noah.stop()
|
||||||
|
for p in particles:
|
||||||
|
p.setTheta(p.theta - np.deg2rad(15))
|
||||||
|
|
||||||
|
# The estimate of the robots current pose
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
return particles, est_pose
|
||||||
|
|
||||||
|
|
||||||
|
def turn_towards_landmark(noah, particles, est_pose, landmark):
|
||||||
|
current_position = np.array([est_pose.x, est_pose.y])
|
||||||
|
current_theta = est_pose.theta
|
||||||
|
landmark_position = np.array(LANDMARK_POSITIONS[landmark])
|
||||||
|
|
||||||
|
relative_pos = landmark_position - current_position
|
||||||
|
angle = np.rad2deg(np.arctan(relative_pos[1]/relative_pos[0]))
|
||||||
|
turn_angle = np.mod(angle - (np.rad2deg(current_theta) - 180), 360)
|
||||||
|
for p in particles:
|
||||||
|
p.setTheta(p.theta - np.deg2rad(turn_angle))
|
||||||
|
if turn_angle < 180:
|
||||||
|
noah.go_diff(POWER, POWER, 0, 1)
|
||||||
|
time.sleep((abs(turn_angle) * TURN_T)/1000)
|
||||||
|
noah.stop()
|
||||||
|
else:
|
||||||
|
noah.go_diff(POWER, POWER, 1, 0)
|
||||||
|
time.sleep((abs(360 - turn_angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
noah.stop()
|
||||||
|
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
# The estimate of the robots current pose
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
return particles, est_pose
|
||||||
|
|
||||||
|
|
||||||
|
def time_to_landmark(est_pose, landmark):
|
||||||
|
"""Kør indenfor 1 meter"""
|
||||||
|
current_position = np.array([est_pose.x, est_pose.y])
|
||||||
|
landmark_position = np.array(LANDMARK_POSITIONS[landmark])
|
||||||
|
|
||||||
|
relative_pos = landmark_position - current_position
|
||||||
|
drive_distance = np.sqrt(relative_pos[0]**2 + relative_pos[1]**2) - 100
|
||||||
|
return (DRIVE_T * drive_distance)/1000
|
||||||
|
|
||||||
|
|
||||||
|
def drive_until_stopped(noah, particles):
|
||||||
|
noah.go_diff(POWER, POWER+RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
start = time.time()
|
||||||
|
while True:
|
||||||
|
forward_dist = noah.read_front_ping_sensor()
|
||||||
|
if forward_dist < 1000:
|
||||||
|
noah.stop()
|
||||||
|
break
|
||||||
|
left_dist = noah.read_left_ping_sensor()
|
||||||
|
if left_dist < 400:
|
||||||
|
noah.stop()
|
||||||
|
break
|
||||||
|
right_dist = noah.read_right_ping_sensor()
|
||||||
|
if right_dist < 400:
|
||||||
|
noah.stop()
|
||||||
|
break
|
||||||
|
if time.time() - start > 5:
|
||||||
|
noah.stop()
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
end = time.time()
|
||||||
|
time_driven = end - start
|
||||||
|
distance_driven = (time_driven*1000)/DRIVE_T
|
||||||
|
for p in particles:
|
||||||
|
x = np.cos(p.theta) * distance_driven
|
||||||
|
y = np.sin(p.theta) * distance_driven
|
||||||
|
p.x = p.x + x
|
||||||
|
p.y = p.y + y
|
||||||
|
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
|
||||||
|
return time_driven, est_pose, particles
|
||||||
|
|
||||||
|
|
||||||
|
def drunk_drive(noah):
|
||||||
|
start = time.time()
|
||||||
|
end = start + 2
|
||||||
|
turning = None
|
||||||
|
while time.time() < end:
|
||||||
|
forward_dist = noah.read_front_ping_sensor()
|
||||||
|
right_dist = noah.read_right_ping_sensor()
|
||||||
|
left_dist = noah.read_left_ping_sensor()
|
||||||
|
if forward_dist > 600 and all(x > 400 for x in [right_dist, left_dist]):
|
||||||
|
turning = None
|
||||||
|
noah.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
else:
|
||||||
|
if turning == "R" or (forward_dist > 600 and right_dist > 400 and turning is None):
|
||||||
|
noah.go_diff(POWER, POWER, 1, 0)
|
||||||
|
turning = "R"
|
||||||
|
else:
|
||||||
|
noah.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||||
|
turning = "L"
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
def inch_closer(noah, particles):
|
||||||
|
start = time.time()
|
||||||
|
noah.go_diff(POWER, POWER+RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
while True:
|
||||||
|
forward_dist = noah.read_front_ping_sensor()
|
||||||
|
if forward_dist < 300:
|
||||||
|
noah.stop()
|
||||||
|
break
|
||||||
|
|
||||||
|
end = time.time()
|
||||||
|
time_driven = end - start
|
||||||
|
distance_driven = (time_driven*1000)/DRIVE_T
|
||||||
|
for p in particles:
|
||||||
|
x = np.cos(p.theta) * distance_driven
|
||||||
|
y = np.sin(p.theta) * distance_driven
|
||||||
|
p.x = p.x + x
|
||||||
|
p.y = p.y + y
|
||||||
|
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
return est_pose, particles
|
||||||
|
|
||||||
|
|
||||||
|
def initialize_particles(num_particles):
|
||||||
|
particles = []
|
||||||
|
for _ in range(num_particles):
|
||||||
|
# Random starting points.
|
||||||
|
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf(
|
||||||
|
) - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
|
||||||
|
particles.append(p)
|
||||||
|
|
||||||
|
return particles
|
||||||
|
|
||||||
|
|
||||||
|
def dist(particle, landmark):
|
||||||
|
return np.sqrt((landmark[0]-particle.x)**2+(landmark[1]-particle.y)**2)
|
||||||
|
|
||||||
|
|
||||||
|
def calc_angle(particle, landmark, dist):
|
||||||
|
e_theta = np.array([np.cos(particle.theta), np.sin(particle.theta)]).T
|
||||||
|
e_landmark = (
|
||||||
|
np.array([landmark[0]-particle.x, landmark[1]-particle.y]).T)/dist
|
||||||
|
e_hat_theta = np.array([-np.sin(particle.theta), np.cos(particle.theta)]).T
|
||||||
|
return np.sign(np.dot(e_landmark, e_hat_theta)) * np.arccos(np.dot(e_landmark, e_theta))
|
||||||
|
|
||||||
|
|
||||||
|
def normal(x, mean, std):
|
||||||
|
return (
|
||||||
|
(np.exp(-(((x - mean)**2)/(2 * std**2))))/(np.sqrt(2 * np.pi) * std)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def calc_weight(particle, landmark_values):
|
||||||
|
if landmark_values == []:
|
||||||
|
return
|
||||||
|
weights = []
|
||||||
|
for values in landmark_values:
|
||||||
|
dist_to_landmark = dist(particle, LANDMARK_POSITIONS[values[0]])
|
||||||
|
dist_weight = normal(values[1], dist_to_landmark, SIGMA)
|
||||||
|
|
||||||
|
angle_to_landmark = calc_angle(
|
||||||
|
particle, LANDMARK_POSITIONS[values[0]], dist_to_landmark)
|
||||||
|
angle_weight = normal(values[2], angle_to_landmark, SIGMA_THETA)
|
||||||
|
weights.append(dist_weight * angle_weight)
|
||||||
|
|
||||||
|
particle.setWeight(np.product(weights))
|
||||||
|
|
||||||
|
def turn_90_degrees(noah, est_pose, particles):
|
||||||
|
x_values = [i[0] for i in LANDMARK_POSITIONS.values()]
|
||||||
|
y_values = [i[1] for i in LANDMARK_POSITIONS.values()]
|
||||||
|
middle = np.array([max(x_values)/2, max(y_values)/2])
|
||||||
|
|
||||||
|
current_position = np.array([est_pose.x, est_pose.y])
|
||||||
|
relative_pos = middle - current_position
|
||||||
|
angle = np.arctan(relative_pos[1]/relative_pos[0])
|
||||||
|
relative_angle = np.mod(angle - est_pose.theta, 2.0*np.pi)
|
||||||
|
|
||||||
|
clockwise = relative_angle > 180
|
||||||
|
|
||||||
|
if clockwise:
|
||||||
|
noah.go_diff(POWER, POWER, 1, 0)
|
||||||
|
time.sleep((90 * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
else:
|
||||||
|
noah.go_diff(POWER, POWER, 0, 1)
|
||||||
|
time.sleep((90 * TURN_T)/1000)
|
||||||
|
|
||||||
|
noah.stop()
|
||||||
|
|
||||||
|
for p in particles:
|
||||||
|
if clockwise:
|
||||||
|
p.setTheta(p.theta - np.deg2rad(90))
|
||||||
|
else:
|
||||||
|
p.setTheta(p.theta + np.deg2rad(90))
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
_, est_pose, particles = drive_until_stopped(noah, particles)
|
||||||
|
return est_pose, particles
|
||||||
|
|
||||||
|
|
||||||
|
def main(noah):
|
||||||
|
landmark_order = LANDMARKS + [
|
||||||
|
LANDMARKS[0]
|
||||||
|
]
|
||||||
|
cam = camera.Camera(0, 'arlo', useCaptureThread=True)
|
||||||
|
particles = initialize_particles(NUM_PARTICLES)
|
||||||
|
|
||||||
|
# The estimate of the robots current pose
|
||||||
|
est_pose = particle.estimate_pose(particles)
|
||||||
|
|
||||||
|
for landmark in landmark_order:
|
||||||
|
print(f"Going to landmark {landmark}")
|
||||||
|
while True:
|
||||||
|
particles, est_pose = look_around(noah, particles, cam, est_pose)
|
||||||
|
print(est_pose)
|
||||||
|
particles, est_pose = turn_towards_landmark(noah, particles, est_pose, landmark)
|
||||||
|
drive_time = time_to_landmark(est_pose, landmark)
|
||||||
|
time_driven, est_pose, particles = drive_until_stopped(noah, particles)
|
||||||
|
if not abs(time_driven - drive_time) < 0.5:
|
||||||
|
# drunk_drive(noah)
|
||||||
|
est_pose, particles = turn_90_degrees(noah, est_pose, particles)
|
||||||
|
continue
|
||||||
|
|
||||||
|
est_pose, particles = inch_closer(noah, particles)
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
noah = robot.Robot() # Noah er vores robots navn
|
||||||
|
try:
|
||||||
|
main(noah)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
noah.stop()
|
@ -1,2 +1 @@
|
|||||||
from .robot import Robot
|
from .robot import Robot
|
||||||
from .arlo import Arlo
|
|
||||||
|
@ -1,19 +1,27 @@
|
|||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
from .robot import Robot
|
from .robot import Robot
|
||||||
|
|
||||||
START_VALUES = [60, 60]
|
START_VALUES = [60, 60]
|
||||||
THRESHOLD = 1.05
|
THRESHOLD = 1.05
|
||||||
SLEEP_TIME = 2
|
TESTING_SLEEP_TIME = 2
|
||||||
|
DEFAULT_CALIBRATION_CODE = "40.40-40.40-40.40_ff-ff-ff-ff"
|
||||||
|
|
||||||
FOCAL_LENGTH = 0
|
FOCAL_LENGTH = 1691
|
||||||
|
|
||||||
|
CAMERA_MATRIX = np.array(
|
||||||
|
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||||
|
dtype=np.float32
|
||||||
|
)
|
||||||
|
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||||
|
|
||||||
def test_forward(arlo, l_power, r_power):
|
def test_forward(arlo, l_power, r_power):
|
||||||
arlo.reset_encoder_counts()
|
arlo.reset_encoder_counts()
|
||||||
arlo.go_diff(l_power, r_power, 1, 1)
|
arlo.go_diff(l_power, r_power, 1, 1)
|
||||||
sleep(SLEEP_TIME)
|
sleep(TESTING_SLEEP_TIME)
|
||||||
arlo.stop()
|
arlo.stop()
|
||||||
return (
|
return (
|
||||||
abs(int(arlo.read_left_wheel_encoder())),
|
abs(int(arlo.read_left_wheel_encoder())),
|
||||||
@ -23,7 +31,7 @@ def test_forward(arlo, l_power, r_power):
|
|||||||
def test_back(arlo, l_power, r_power):
|
def test_back(arlo, l_power, r_power):
|
||||||
arlo.reset_encoder_counts()
|
arlo.reset_encoder_counts()
|
||||||
arlo.go_diff(l_power, r_power, 0, 0)
|
arlo.go_diff(l_power, r_power, 0, 0)
|
||||||
sleep(SLEEP_TIME)
|
sleep(TESTING_SLEEP_TIME)
|
||||||
arlo.stop()
|
arlo.stop()
|
||||||
return (
|
return (
|
||||||
abs(int(arlo.read_left_wheel_encoder())),
|
abs(int(arlo.read_left_wheel_encoder())),
|
||||||
@ -33,7 +41,7 @@ def test_back(arlo, l_power, r_power):
|
|||||||
def test_clockwise(arlo, l_power, r_power):
|
def test_clockwise(arlo, l_power, r_power):
|
||||||
arlo.reset_encoder_counts()
|
arlo.reset_encoder_counts()
|
||||||
arlo.go_diff(l_power, r_power, 1, 0)
|
arlo.go_diff(l_power, r_power, 1, 0)
|
||||||
sleep(SLEEP_TIME)
|
sleep(TESTING_SLEEP_TIME)
|
||||||
arlo.stop()
|
arlo.stop()
|
||||||
return (
|
return (
|
||||||
abs(int(arlo.read_left_wheel_encoder())),
|
abs(int(arlo.read_left_wheel_encoder())),
|
||||||
@ -43,7 +51,7 @@ def test_clockwise(arlo, l_power, r_power):
|
|||||||
def test_anticlockwise(arlo, l_power, r_power):
|
def test_anticlockwise(arlo, l_power, r_power):
|
||||||
arlo.reset_encoder_counts()
|
arlo.reset_encoder_counts()
|
||||||
arlo.go_diff(l_power, r_power, 0, 1)
|
arlo.go_diff(l_power, r_power, 0, 1)
|
||||||
sleep(SLEEP_TIME)
|
sleep(TESTING_SLEEP_TIME)
|
||||||
arlo.stop()
|
arlo.stop()
|
||||||
return (
|
return (
|
||||||
abs(int(arlo.read_left_wheel_encoder())),
|
abs(int(arlo.read_left_wheel_encoder())),
|
||||||
@ -55,6 +63,9 @@ class Arlo():
|
|||||||
self.robot = Robot()
|
self.robot = Robot()
|
||||||
if calibration_code is None:
|
if calibration_code is None:
|
||||||
self._calibrate()
|
self._calibrate()
|
||||||
|
elif calibration_code == False:
|
||||||
|
self.calibration_code = DEFAULT_CALIBRATION_CODE
|
||||||
|
self._decode_calibration_code()
|
||||||
else:
|
else:
|
||||||
self.calibration_code = calibration_code
|
self.calibration_code = calibration_code
|
||||||
self._decode_calibration_code()
|
self._decode_calibration_code()
|
||||||
@ -84,7 +95,21 @@ class Arlo():
|
|||||||
self.robot.stop()
|
self.robot.stop()
|
||||||
|
|
||||||
def estimate_distance(self, bounding_box):
|
def estimate_distance(self, bounding_box):
|
||||||
average_left = bounding_box[0][0]
|
# avg_left = (bounding_box[0][0] + bounding_box[3][0])/2
|
||||||
|
# avg_right = (bounding_box[1][0] + bounding_box[2][0])/2
|
||||||
|
# avg_up = (bounding_box[0][1] + bounding_box[1][1])/2
|
||||||
|
# avg_down = (bounding_box[2][1] + bounding_box[3][1])/2
|
||||||
|
|
||||||
|
# avg_width = avg_right - avg_left
|
||||||
|
# avg_height = avg_down - avg_up
|
||||||
|
|
||||||
|
# avg_size = (avg_width + avg_height)/2
|
||||||
|
|
||||||
|
# return (FOCAL_LENGTH * 14.5)/avg_size
|
||||||
|
|
||||||
|
return cv2.aruco.estimatePoseSingleMarkers(
|
||||||
|
np.array([bounding_box]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||||
|
)[1][0][0][2]
|
||||||
|
|
||||||
def find_aruco(self, image):
|
def find_aruco(self, image):
|
||||||
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||||
@ -95,7 +120,10 @@ class Arlo():
|
|||||||
parameters=aruco_params
|
parameters=aruco_params
|
||||||
)
|
)
|
||||||
|
|
||||||
return zip(corners, ids)
|
if corners is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [(box[0], ids[i]) for i, box in enumerate(corners)]
|
||||||
|
|
||||||
def draw_arucos(self, image, bounding_boxes):
|
def draw_arucos(self, image, bounding_boxes):
|
||||||
for bounding, n in bounding_boxes:
|
for bounding, n in bounding_boxes:
|
||||||
@ -153,15 +181,15 @@ class Arlo():
|
|||||||
else:
|
else:
|
||||||
values[i][1] += 1
|
values[i][1] += 1
|
||||||
|
|
||||||
cps[i] = wheels[0]/SLEEP_TIME
|
cps[i] = wheels[0]/TESTING_SLEEP_TIME
|
||||||
|
|
||||||
time = [0 for _ in range(4)]
|
time = [0 for _ in range(4)]
|
||||||
|
|
||||||
cpc = 144/(3.14159*15) # wheel counts per cm
|
cpc = 144/(3.14159*15) # wheel counts per cm
|
||||||
|
|
||||||
# milliseconds per 10cm forward
|
# milliseconds per 1cm forward
|
||||||
time[0] = int((1000 * cpc)/cps[0])
|
time[0] = int((1000 * cpc)/cps[0])
|
||||||
# milliseconds per 10cm backwards
|
# milliseconds per 1cm backwards
|
||||||
time[1] = int((1000 * cpc)/cps[1])
|
time[1] = int((1000 * cpc)/cps[1])
|
||||||
|
|
||||||
cpr = 3.1415*38 # 1 rotation in cm
|
cpr = 3.1415*38 # 1 rotation in cm
|
||||||
|
522
selflocalization/camera.py
Normal file
522
selflocalization/camera.py
Normal file
@ -0,0 +1,522 @@
|
|||||||
|
import cv2 # Import the OpenCV library
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
from selflocalization import framebuffer
|
||||||
|
from pkg_resources import parse_version
|
||||||
|
|
||||||
|
|
||||||
|
gstreamerCameraFound = False
|
||||||
|
piCameraFound = False
|
||||||
|
try:
|
||||||
|
import picamera
|
||||||
|
from picamera.array import PiRGBArray
|
||||||
|
piCameraFound = True
|
||||||
|
except ImportError:
|
||||||
|
print("Camera.py: picamera module not available - using OpenCV interface instead")
|
||||||
|
|
||||||
|
def isRunningOnArlo():
|
||||||
|
"""Return True if we are running on Arlo, otherwise False."""
|
||||||
|
# TODO: Problematic that gstreamerCameraFound is first set after
|
||||||
|
# instantiation of a Camera object
|
||||||
|
return piCameraFound or gstreamerCameraFound
|
||||||
|
|
||||||
|
# Black magic in order to handle differences in namespace names in OpenCV 2.4 and 3.0
|
||||||
|
OPCV3 = parse_version(cv2.__version__) >= parse_version('3')
|
||||||
|
|
||||||
|
def capPropId(prop):
|
||||||
|
"""returns OpenCV VideoCapture property id given, e.g., "FPS
|
||||||
|
This is needed because of differences in the Python interface in OpenCV 2.4 and 3.0
|
||||||
|
"""
|
||||||
|
return getattr(cv2 if OPCV3 else cv2.cv, ("" if OPCV3 else "CV_") + "CAP_PROP_" + prop)
|
||||||
|
|
||||||
|
|
||||||
|
def gstreamer_pipeline(capture_width=1280, capture_height=720, framerate=30):
|
||||||
|
"""Utility function for setting parameters for the gstreamer camera pipeline"""
|
||||||
|
return (
|
||||||
|
"libcamerasrc !"
|
||||||
|
"videobox autocrop=true !"
|
||||||
|
"video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
|
||||||
|
"videoconvert ! "
|
||||||
|
"appsink"
|
||||||
|
% (
|
||||||
|
capture_width,
|
||||||
|
capture_height,
|
||||||
|
framerate,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CaptureThread(threading.Thread):
|
||||||
|
"""Internal worker thread that captures frames from the camera"""
|
||||||
|
|
||||||
|
def __init__(self, cam, framebuffer):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.cam = cam
|
||||||
|
self.framebuffer = framebuffer
|
||||||
|
self.terminateThreadEvent = threading.Event()
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while not self.terminateThreadEvent.is_set():
|
||||||
|
if piCameraFound:
|
||||||
|
# Use piCamera
|
||||||
|
|
||||||
|
#self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
|
||||||
|
#image = self.rawCapture.array
|
||||||
|
|
||||||
|
# clear the stream in preparation for the next frame
|
||||||
|
#self.rawCapture.truncate(0)
|
||||||
|
|
||||||
|
if sys.version_info[0] > 2:
|
||||||
|
# Python 3.x
|
||||||
|
image = np.empty((self.cam.resolution[1], self.cam.resolution[0], 3), dtype=np.uint8)
|
||||||
|
else:
|
||||||
|
# Python 2.x
|
||||||
|
image = np.empty((self.cam.resolution[1] * self.cam.resolution[0] * 3,), dtype=np.uint8)
|
||||||
|
|
||||||
|
self.cam.capture(image, format="bgr", use_video_port=True)
|
||||||
|
|
||||||
|
if sys.version_info[0] < 3:
|
||||||
|
# Python 2.x
|
||||||
|
image = image.reshape((self.cam.resolution[1], self.cam.resolution[0], 3))
|
||||||
|
|
||||||
|
else: # Use OpenCV
|
||||||
|
retval, image = self.cam.read() # Read frame
|
||||||
|
|
||||||
|
if not retval: # Error
|
||||||
|
print("CaptureThread: Could not read next frame")
|
||||||
|
exit(-1)
|
||||||
|
|
||||||
|
# Update framebuffer
|
||||||
|
self.framebuffer.new_frame(image)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Terminate the worker thread"""
|
||||||
|
self.terminateThreadEvent.set()
|
||||||
|
|
||||||
|
|
||||||
|
class Camera(object):
|
||||||
|
"""This class is responsible for doing the image processing. It detects known landmarks and
|
||||||
|
measures distances and orientations to these."""
|
||||||
|
|
||||||
|
def __init__(self, camidx, robottype='arlo', useCaptureThread=False):
|
||||||
|
"""Constructor:
|
||||||
|
camidx - index of camera
|
||||||
|
robottype - specify which robot you are using in order to use the correct camera calibration.
|
||||||
|
Supported types: arlo, frindo, scribbler, macbookpro"""
|
||||||
|
|
||||||
|
print("robottype =", robottype)
|
||||||
|
self.useCaptureThread = useCaptureThread
|
||||||
|
|
||||||
|
# Set camera calibration info
|
||||||
|
if robottype == 'arlo':
|
||||||
|
focal_length = 1050
|
||||||
|
self.imageSize = (1024, 720)
|
||||||
|
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||||
|
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
|
||||||
|
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
|
||||||
|
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix = np.asarray([focal_length, 0., self.imageSize[0] / 2.0, 0.,
|
||||||
|
focal_length, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix.shape = (3, 3)
|
||||||
|
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||||
|
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
|
||||||
|
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
|
||||||
|
elif robottype == 'frindo':
|
||||||
|
self.imageSize = (640, 480)
|
||||||
|
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||||
|
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
|
||||||
|
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
|
||||||
|
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix = np.asarray([500, 0., 3.0757300398967601e+02, 0.,
|
||||||
|
500, 2.8935674612358201e+02, 0., 0., 1.], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix.shape = (3, 3)
|
||||||
|
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||||
|
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
|
||||||
|
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
|
||||||
|
elif robottype == 'scribbler':
|
||||||
|
# Unknown calibration - just using the Frindo calibration
|
||||||
|
self.imageSize = (640, 480)
|
||||||
|
self.intrinsic_matrix = np.asarray([7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||||
|
7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1.], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix.shape = (3, 3)
|
||||||
|
self.distortion_coeffs = np.asarray([1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||||
|
1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01], dtype = np.float64)
|
||||||
|
elif robottype == 'macbookpro':
|
||||||
|
self.imageSize = (1280, 720)
|
||||||
|
#self.imageSize = (1080, 720)
|
||||||
|
#self.intrinsic_matrix = np.asarray([ 8.6955302212233869e+02, 0., 5.2076864848745902e+02, 0.,
|
||||||
|
# 8.7317664932843684e+02, 4.0331768178896669e+02, 0., 0., 1. ], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix = np.asarray([9.4328095162920715e+02, 0., self.imageSize[0] / 2.0, 0.,
|
||||||
|
9.4946668595979077e+02, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
|
||||||
|
self.intrinsic_matrix.shape = (3, 3)
|
||||||
|
#self.distortion_coeffs = np.asarray([ -1.2844325433988565e-01, -1.3646926538980573e+00,
|
||||||
|
# -5.7263071423202944e-03, 5.7422957803983802e-03, 5.9722099836744738e+00 ], dtype = np.float64)
|
||||||
|
self.distortion_coeffs = np.asarray([0., 0., -1.6169374082976234e-02, 8.7657653170062459e-03, 0.], dtype = np.float64)
|
||||||
|
else:
|
||||||
|
print("Camera.__init__: Unknown robot type")
|
||||||
|
exit(-1)
|
||||||
|
|
||||||
|
|
||||||
|
# Open a camera device for capturing
|
||||||
|
if piCameraFound:
|
||||||
|
# piCamera is available so we use this
|
||||||
|
#self.cam = picamera.PiCamera(camidx)
|
||||||
|
self.cam = picamera.PiCamera(camera_num=camidx, resolution=self.imageSize, framerate=30)
|
||||||
|
|
||||||
|
if not self.useCaptureThread:
|
||||||
|
self.rawCapture = PiRGBArray(self.cam, size=self.cam.resolution)
|
||||||
|
|
||||||
|
time.sleep(2) # wait for the camera
|
||||||
|
|
||||||
|
# Too slow code - instead just use cam.capture
|
||||||
|
#self.capture_generator = self.cam.capture_continuous(self.rawCapture, format="bgr", use_video_port=True)
|
||||||
|
|
||||||
|
gain = self.cam.awb_gains
|
||||||
|
self.cam.awb_mode='off'
|
||||||
|
self.cam.awb_gains = gain
|
||||||
|
|
||||||
|
self.cam.shutter_speed = self.cam.exposure_speed
|
||||||
|
self.cam.exposure_mode = 'off'
|
||||||
|
|
||||||
|
|
||||||
|
print("shutter_speed = ", self.cam.shutter_speed)
|
||||||
|
print("awb_gains = ", self.cam.awb_gains)
|
||||||
|
|
||||||
|
|
||||||
|
print("Camera width = ", self.cam.resolution[0])
|
||||||
|
print("Camera height = ", self.cam.resolution[1])
|
||||||
|
print("Camera FPS = ", self.cam.framerate)
|
||||||
|
|
||||||
|
|
||||||
|
else: # Use OpenCV interface
|
||||||
|
|
||||||
|
# We next try the gstreamer interface
|
||||||
|
self.cam = cv2.VideoCapture(gstreamer_pipeline(
|
||||||
|
capture_width=self.imageSize[0], capture_height=self.imageSize[1]),
|
||||||
|
apiPreference=cv2.CAP_GSTREAMER)
|
||||||
|
if not self.cam.isOpened(): # Did not work
|
||||||
|
|
||||||
|
# We try first the generic auto-detect interface
|
||||||
|
self.cam = cv2.VideoCapture(camidx)
|
||||||
|
if not self.cam.isOpened(): # Error
|
||||||
|
print("Camera.__init__: Could not open camera")
|
||||||
|
exit(-1)
|
||||||
|
else:
|
||||||
|
print("Camera.__init__: Using OpenCV with auto-detect interface")
|
||||||
|
else:
|
||||||
|
gstreamerCameraFound = True
|
||||||
|
print("Camera.__init__: Using OpenCV with gstreamer")
|
||||||
|
|
||||||
|
time.sleep(1) # wait for camera
|
||||||
|
|
||||||
|
# Set camera properties
|
||||||
|
self.cam.set(capPropId("FRAME_WIDTH"), self.imageSize[0])
|
||||||
|
self.cam.set(capPropId("FRAME_HEIGHT"), self.imageSize[1])
|
||||||
|
#self.cam.set(capPropId("BUFFERSIZE"), 1) # Does not work
|
||||||
|
#self.cam.set(capPropId("FPS"), 15)
|
||||||
|
self.cam.set(capPropId("FPS"), 30)
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Get camera properties
|
||||||
|
print("Camera width = ", int(self.cam.get(capPropId("FRAME_WIDTH"))))
|
||||||
|
print("Camera height = ", int(self.cam.get(capPropId("FRAME_HEIGHT"))))
|
||||||
|
print("Camera FPS = ", int(self.cam.get(capPropId("FPS"))))
|
||||||
|
|
||||||
|
# Initializing the camera distortion maps
|
||||||
|
#self.mapx, self.mapy = cv2.initUndistortRectifyMap(self.intrinsic_matrix, self.distortion_coeffs, np.eye(3,3), np.eye(3,3), self.imageSize, cv2.CV_32FC1)
|
||||||
|
# Not needed we use the cv2.undistort function instead
|
||||||
|
|
||||||
|
# Initialize chessboard pattern parameters
|
||||||
|
self.patternFound = False
|
||||||
|
self.patternSize = (3,4)
|
||||||
|
self.patternUnit = 50.0 # mm (size of one checker square)
|
||||||
|
self.corners = []
|
||||||
|
|
||||||
|
# Initialize aruco detector
|
||||||
|
self.arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
|
||||||
|
# Set the correct physical marker size here
|
||||||
|
self.arucoMarkerLength = 0.15 # [m] actual size of aruco markers (in object coordinate system)
|
||||||
|
|
||||||
|
# Initialize worker thread and framebuffer
|
||||||
|
if self.useCaptureThread:
|
||||||
|
print("Using capture thread")
|
||||||
|
self.framebuffer = framebuffer.FrameBuffer()
|
||||||
|
self.capturethread = CaptureThread(self.cam, self.framebuffer)
|
||||||
|
self.capturethread.start()
|
||||||
|
time.sleep(0.75)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if piCameraFound:
|
||||||
|
self.cam.close()
|
||||||
|
|
||||||
|
def terminateCaptureThread(self):
|
||||||
|
if self.useCaptureThread:
|
||||||
|
self.capturethread.stop()
|
||||||
|
self.capturethread.join()
|
||||||
|
|
||||||
|
def get_capture(self):
|
||||||
|
"""Access to the internal camera object for advanced control of the camera."""
|
||||||
|
return self.cam
|
||||||
|
|
||||||
|
def get_colour(self):
|
||||||
|
"""OBSOLETE - use instead get_next_frame"""
|
||||||
|
print("OBSOLETE get_colour - use instead get_next_frame")
|
||||||
|
return self.get_next_frame()
|
||||||
|
|
||||||
|
def get_next_frame(self):
|
||||||
|
"""Gets the next available image frame from the camera."""
|
||||||
|
if self.useCaptureThread:
|
||||||
|
img = self.framebuffer.get_frame()
|
||||||
|
|
||||||
|
if img is None:
|
||||||
|
img = np.array((self.imageSize[0], self.imageSize[1], 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
else:
|
||||||
|
if piCameraFound:
|
||||||
|
# Use piCamera
|
||||||
|
|
||||||
|
self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
|
||||||
|
img = self.rawCapture.array
|
||||||
|
|
||||||
|
# clear the stream in preparation for the next frame
|
||||||
|
self.rawCapture.truncate(0)
|
||||||
|
|
||||||
|
else: # Use OpenCV
|
||||||
|
retval, img = self.cam.read() # Read frame
|
||||||
|
|
||||||
|
if not retval: # Error
|
||||||
|
print("Camera.get_colour: Could not read next frame")
|
||||||
|
exit(-1)
|
||||||
|
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
# ArUco object detector
|
||||||
|
def detect_aruco_objects(self, img):
|
||||||
|
"""Detect objects in the form of a binary ArUco code and return object IDs, distances (in cm) and
|
||||||
|
angles (in radians) to detected ArUco codes. The angle is computed as the signed angle between
|
||||||
|
translation vector to detected object projected onto the x-z plabe and the z-axis (pointing out
|
||||||
|
of the camera). This corresponds to that the angle is measuring location along the horizontal x-axis.
|
||||||
|
|
||||||
|
If no object is detected, the returned variables are set to None."""
|
||||||
|
self.aruco_corners, self.ids, rejectedImgPoints = cv2.aruco.detectMarkers(img, self.arucoDict)
|
||||||
|
self.rvecs, self.tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(self.aruco_corners, self.arucoMarkerLength, self.intrinsic_matrix, self.distortion_coeffs)
|
||||||
|
|
||||||
|
|
||||||
|
if not isinstance(self.ids, type(None)):
|
||||||
|
dists = np.linalg.norm(self.tvecs, axis=len(self.tvecs.shape) - 1) * 100
|
||||||
|
# Make sure we always return properly shaped arrays
|
||||||
|
dists = dists.reshape((dists.shape[0],))
|
||||||
|
ids = self.ids.reshape((self.ids.shape[0],))
|
||||||
|
|
||||||
|
# Compute angles
|
||||||
|
angles = np.zeros(dists.shape, dtype=dists.dtype)
|
||||||
|
for i in range(dists.shape[0]):
|
||||||
|
tobj = self.tvecs[i] * 100 / dists[i]
|
||||||
|
zaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
|
||||||
|
zaxis[0,-1] = 1
|
||||||
|
xaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
|
||||||
|
xaxis[0,0] = 1
|
||||||
|
|
||||||
|
# We want the horizontal angle so project tobjt onto the x-z plane
|
||||||
|
tobj_xz = tobj
|
||||||
|
tobj_xz[0,1] = 0
|
||||||
|
# Should the sign be clockwise or counter-clockwise (left or right)?
|
||||||
|
# In this version it is positive to the left as seen from the camera.
|
||||||
|
direction = -1*np.sign(tobj_xz[0,0]) # The same as np.sign(np.dot(tobj, xaxis.T))
|
||||||
|
angles[i] = direction * np.arccos(np.dot(tobj_xz, zaxis.T))
|
||||||
|
else:
|
||||||
|
dists = None
|
||||||
|
ids = None
|
||||||
|
angles = None
|
||||||
|
return ids, dists, angles
|
||||||
|
|
||||||
|
|
||||||
|
def draw_aruco_objects(self, img):
|
||||||
|
"""Draws detected objects and their orientations on the image given in img."""
|
||||||
|
if not isinstance(self.ids, type(None)):
|
||||||
|
outimg = cv2.aruco.drawDetectedMarkers(img, self.aruco_corners, self.ids)
|
||||||
|
for i in range(self.ids.shape[0]):
|
||||||
|
outimg = cv2.drawFrameAxes(outimg, self.intrinsic_matrix, self.distortion_coeffs,
|
||||||
|
self.rvecs[i], self.tvecs[i], self.arucoMarkerLength)
|
||||||
|
else:
|
||||||
|
outimg = img
|
||||||
|
|
||||||
|
return outimg
|
||||||
|
|
||||||
|
# Chessboard object detector
|
||||||
|
def get_object(self, img):
|
||||||
|
"""Detect object and return object type, distance (in cm), angle (in radians) and
|
||||||
|
colour probability table in the order (R,G,B)"""
|
||||||
|
objectType = 'none'
|
||||||
|
colourProb = np.ones((3,)) / 3.0
|
||||||
|
distance = 0.0
|
||||||
|
angle = 0.0
|
||||||
|
self.patternFound = False
|
||||||
|
|
||||||
|
#patternFound, corners = self.get_corners(img)
|
||||||
|
self.get_corners(img)
|
||||||
|
|
||||||
|
if (self.patternFound):
|
||||||
|
|
||||||
|
# Determine if the object is horizontal or vertical
|
||||||
|
delta_x = abs(self.corners[0, 0, 0] - self.corners[2, 0, 0])
|
||||||
|
delta_y = abs(self.corners[0, 0, 1] - self.corners[2, 0, 1])
|
||||||
|
horizontal = (delta_y > delta_x)
|
||||||
|
if (horizontal):
|
||||||
|
objectType = 'horizontal'
|
||||||
|
else:
|
||||||
|
objectType = 'vertical'
|
||||||
|
|
||||||
|
# Compute distances and angles
|
||||||
|
if (horizontal):
|
||||||
|
height = ((abs (self.corners[0, 0, 1] - self.corners[2, 0, 1]) +
|
||||||
|
abs (self.corners[9, 0, 1] - self.corners[11, 0, 1])) / 2.0)
|
||||||
|
|
||||||
|
patternHeight = (self.patternSize[0]-1.0) * self.patternUnit
|
||||||
|
|
||||||
|
else:
|
||||||
|
height = (abs (self.corners[0, 0, 1] - self.corners[9, 0, 1]) +
|
||||||
|
abs (self.corners[2, 0, 1] - self.corners[11, 0, 1])) / 2.0
|
||||||
|
|
||||||
|
patternHeight = (self.patternSize[1]-1.0) * self.patternUnit
|
||||||
|
|
||||||
|
#distance = 1.0 / (0.0001 * height);
|
||||||
|
distance = self.intrinsic_matrix[1, 1] * patternHeight / (height * 10.0)
|
||||||
|
|
||||||
|
center = (self.corners[0, 0, 0] + self.corners[2, 0, 0] +
|
||||||
|
self.corners[9, 0, 0] + self.corners[11, 0, 0]) / 4.0
|
||||||
|
|
||||||
|
#angle = 0.0018 * center - 0.6425;
|
||||||
|
#angle *= -1.0;
|
||||||
|
angle = -np.arctan2(center - self.intrinsic_matrix[0, 2], self.intrinsic_matrix[0, 0])
|
||||||
|
|
||||||
|
|
||||||
|
#### Classify object by colour
|
||||||
|
|
||||||
|
# Extract rectangle corners
|
||||||
|
points = np.array(
|
||||||
|
[
|
||||||
|
self.corners[0],
|
||||||
|
self.corners[2],
|
||||||
|
self.corners[9],
|
||||||
|
self.corners[11]
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
points.shape = (4, 2)
|
||||||
|
points = np.int32(points)
|
||||||
|
|
||||||
|
# Compute region of interest
|
||||||
|
mask = np.zeros((self.imageSize[1], self.imageSize[0]), dtype = np.uint8)
|
||||||
|
#cv2.fillConvexPoly (mask, points, (255, 255, 255))
|
||||||
|
cv2.fillConvexPoly (mask, points, 255)
|
||||||
|
|
||||||
|
# Compute mean colour inside region of interest
|
||||||
|
mean_colour = cv2.mean(img, mask) # There is a bug here in Python 3 - it produces a segfault
|
||||||
|
|
||||||
|
red = mean_colour[2]
|
||||||
|
green = mean_colour[1]
|
||||||
|
blue = mean_colour[0]
|
||||||
|
sum = red + green + blue
|
||||||
|
|
||||||
|
colourProb[0] = red / sum
|
||||||
|
colourProb[1] = green / sum
|
||||||
|
colourProb[2] = blue / sum
|
||||||
|
|
||||||
|
|
||||||
|
return objectType, distance, angle, colourProb
|
||||||
|
|
||||||
|
|
||||||
|
def get_corners(self, img):
|
||||||
|
"""Detect corners - this is an auxillary method and should not be used directly"""
|
||||||
|
|
||||||
|
# Convert to gray scale
|
||||||
|
gray = cv2.cvtColor( img, cv2.COLOR_BGR2GRAY )
|
||||||
|
loggray = cv2.log(gray + 1.0)
|
||||||
|
cv2.normalize(loggray,loggray,0,255,cv2.NORM_MINMAX)
|
||||||
|
gray = cv2.convertScaleAbs(loggray)
|
||||||
|
|
||||||
|
#retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_FAST_CHECK)
|
||||||
|
retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_FAST_CHECK)
|
||||||
|
if (retval > 0):
|
||||||
|
self.patternFound = True
|
||||||
|
#cv2.cornerSubPix(gray, self.corners, (5,5), (-1,-1), (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 3, 0.0))
|
||||||
|
|
||||||
|
return self.patternFound, self.corners
|
||||||
|
|
||||||
|
|
||||||
|
def draw_object(self, img):
|
||||||
|
"""Draw the object if found into img"""
|
||||||
|
cv2.drawChessboardCorners(img, self.patternSize, self.corners, self.patternFound)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if (__name__=='__main__'):
|
||||||
|
print("Opening and initializing camera")
|
||||||
|
|
||||||
|
#cam = Camera(0, 'macbookpro', useCaptureThread=True)
|
||||||
|
#cam = Camera(0, 'macbookpro', useCaptureThread = False)
|
||||||
|
cam = Camera(0, robottype='arlo', useCaptureThread=True)
|
||||||
|
|
||||||
|
# Open a window
|
||||||
|
WIN_RF1 = "Camera view"
|
||||||
|
cv2.namedWindow(WIN_RF1)
|
||||||
|
cv2.moveWindow(WIN_RF1, 50, 50)
|
||||||
|
|
||||||
|
#WIN_RF3 = "Camera view - gray"
|
||||||
|
#cv2.namedWindow(WIN_RF3)
|
||||||
|
#cv2.moveWindow(WIN_RF3, 550, 50)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
action = cv2.waitKey(10)
|
||||||
|
|
||||||
|
if action == ord('q'): # Quit
|
||||||
|
break
|
||||||
|
|
||||||
|
# Fetch next frame
|
||||||
|
#colour = cam.get_colour()
|
||||||
|
colour = cam.get_next_frame()
|
||||||
|
|
||||||
|
# Convert to gray scale
|
||||||
|
#gray = cv2.cvtColor(colour, cv2.COLOR_BGR2GRAY )
|
||||||
|
#loggray = cv2.log(gray + 1.0)
|
||||||
|
#cv2.normalize(loggray, loggray, 0, 255, cv2.NORM_MINMAX)
|
||||||
|
#gray = cv2.convertScaleAbs(loggray)
|
||||||
|
|
||||||
|
# Detect objects
|
||||||
|
#objectType, distance, angle, colourProb = cam.get_object(colour)
|
||||||
|
#if objectType != 'none':
|
||||||
|
# print("Object type = ", objectType, ", distance = ", distance, ", angle = ", angle, ", colourProb = ", colourProb)
|
||||||
|
|
||||||
|
# Draw detected pattern
|
||||||
|
#cam.draw_object(colour)
|
||||||
|
|
||||||
|
IDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||||
|
if not isinstance(IDs, type(None)):
|
||||||
|
for i in range(len(IDs)):
|
||||||
|
print("Object ID = ", IDs[i], ", Distance = ", dists[i], ", angles = ", angles[i])
|
||||||
|
|
||||||
|
# Draw detected objects
|
||||||
|
cam.draw_aruco_objects(colour)
|
||||||
|
|
||||||
|
|
||||||
|
# Show frames
|
||||||
|
cv2.imshow(WIN_RF1, colour)
|
||||||
|
|
||||||
|
# Show frames
|
||||||
|
#cv2.imshow(WIN_RF3, gray)
|
||||||
|
|
||||||
|
|
||||||
|
# Close all windows
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
# Clean-up capture thread
|
||||||
|
cam.terminateCaptureThread()
|
33
selflocalization/framebuffer.py
Normal file
33
selflocalization/framebuffer.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
# import copy
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
class FrameBuffer(object):
|
||||||
|
"""This class represents a framebuffer with a front and back buffer storing frames.
|
||||||
|
Access to the class is thread safe (controlled via an internal lock)."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.frameBuffer = [None, None] # Initialize the framebuffer with None references
|
||||||
|
self.currentBufferIndex = 0
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
def get_frame(self):
|
||||||
|
"""Return latest frame from the framebuffer"""
|
||||||
|
|
||||||
|
# Obtain lock and release it when done
|
||||||
|
with self.lock:
|
||||||
|
if self.frameBuffer[self.currentBufferIndex] is not None:
|
||||||
|
# return copy.deepcopy(self.frameBuffer[self.currentBufferIndex])
|
||||||
|
return self.frameBuffer[self.currentBufferIndex]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def new_frame(self, frame):
|
||||||
|
"""Add a new frame to the frame buffer"""
|
||||||
|
# self.frameBuffer[int(not self.currentBufferIndex)] = copy.deepcopy(frame)
|
||||||
|
self.frameBuffer[int(not self.currentBufferIndex)] = frame
|
||||||
|
|
||||||
|
# Obtain lock and release it when done
|
||||||
|
with self.lock:
|
||||||
|
# Switch buffer index
|
||||||
|
self.currentBufferIndex = int(not self.currentBufferIndex)
|
45
selflocalization/makelandmark.py
Normal file
45
selflocalization/makelandmark.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import cv2 # Import the OpenCV library
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
# TODO make this configurable from the command line using parser
|
||||||
|
markerID = 10 # Try 1 - 4
|
||||||
|
|
||||||
|
# Define some relevant constants
|
||||||
|
dpi = 72 # dots (pixels) per inch [inch^(-1)]
|
||||||
|
inch2mm = 25.4 # [mm / inch]
|
||||||
|
dpmm = dpi / inch2mm # Dots per mm [mm^(-1)]
|
||||||
|
|
||||||
|
# Define size of output image to fit A4 paper
|
||||||
|
a4width = 210.0 # [mm]
|
||||||
|
a4height = 297.0 # [mm]
|
||||||
|
|
||||||
|
# Size of output image
|
||||||
|
width = np.uint(np.round(a4width * dpmm)) # [pixels]
|
||||||
|
height = np.uint(np.round(a4height * dpmm)) # [pixels]
|
||||||
|
|
||||||
|
# Size of ArUco marker
|
||||||
|
markerPhysicalSize = 150 # [mm]
|
||||||
|
markerSize = np.uint(np.round(markerPhysicalSize * dpmm)) # [pixels]
|
||||||
|
|
||||||
|
# Create landmark image (all white gray scale image)
|
||||||
|
#landmarkImage = np.ones((width, height), dtype=np.uint8) * np.uint8(255)
|
||||||
|
landmarkImage = np.ones((height, width), dtype=np.uint8) * np.uint8(255)
|
||||||
|
|
||||||
|
# Initialize the ArUco dictionary
|
||||||
|
arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
|
||||||
|
|
||||||
|
# Draw marker
|
||||||
|
startWidth = int(np.round((width-markerSize)/2))
|
||||||
|
startHeight = int(np.round((height-markerSize)/2))
|
||||||
|
landmarkImage[startHeight:int(startHeight+markerSize), startWidth:int(startWidth+markerSize)] = cv2.aruco.drawMarker(arucoDict, markerID, markerSize, 1)
|
||||||
|
cv2.putText(landmarkImage, str(markerID), (startWidth, startHeight - 60), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0,0,0), 2)
|
||||||
|
|
||||||
|
|
||||||
|
# Save image
|
||||||
|
cv2.imwrite("../../../data/landmark" + str(markerID) + ".png", landmarkImage)
|
||||||
|
|
||||||
|
# Show image
|
||||||
|
cv2.namedWindow("Landmark")
|
||||||
|
cv2.imshow("Landmark", landmarkImage)
|
||||||
|
cv2.waitKey()
|
99
selflocalization/particle.py
Normal file
99
selflocalization/particle.py
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
import random
|
||||||
|
import numpy as np
|
||||||
|
from selflocalization import random_numbers as rn
|
||||||
|
|
||||||
|
class Particle(object):
|
||||||
|
"""Data structure for storing particle information (state and weight)"""
|
||||||
|
def __init__(self, x=0.0, y=0.0, theta=0.0, weight=0.0):
|
||||||
|
self.x = x
|
||||||
|
self.y = y
|
||||||
|
self.theta = np.mod(theta, 2.0*np.pi)
|
||||||
|
self.weight = weight
|
||||||
|
|
||||||
|
def getX(self):
|
||||||
|
return self.x
|
||||||
|
|
||||||
|
def getY(self):
|
||||||
|
return self.y
|
||||||
|
|
||||||
|
def getTheta(self):
|
||||||
|
return self.theta
|
||||||
|
|
||||||
|
def getWeight(self):
|
||||||
|
return self.weight
|
||||||
|
|
||||||
|
def setX(self, val):
|
||||||
|
self.x = val
|
||||||
|
|
||||||
|
def setY(self, val):
|
||||||
|
self.y = val
|
||||||
|
|
||||||
|
def setTheta(self, val):
|
||||||
|
self.theta = np.mod(val, 2.0*np.pi)
|
||||||
|
|
||||||
|
def setWeight(self, val):
|
||||||
|
self.weight = val
|
||||||
|
|
||||||
|
def copy(self):
|
||||||
|
return Particle(self.x, self.y, self.theta, self.weight)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f"({self.x},{self.y},{self.theta})"
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_pose(particles_list):
|
||||||
|
"""Estimate the pose from particles by computing the average position and orientation over all particles.
|
||||||
|
This is not done using the particle weights, but just the sample distribution."""
|
||||||
|
x_values = [i.getX() for i in particles_list]
|
||||||
|
y_values = [i.getY() for i in particles_list]
|
||||||
|
cos_values = [np.cos(i.getTheta()) for i in particles_list]
|
||||||
|
sin_values = [np.sin(i.getTheta()) for i in particles_list]
|
||||||
|
|
||||||
|
x_sum = sum(x_values)
|
||||||
|
y_sum = sum(y_values)
|
||||||
|
cos_sum = sum(cos_values)
|
||||||
|
sin_sum = sum(sin_values)
|
||||||
|
|
||||||
|
flen = len(particles_list)
|
||||||
|
if flen != 0:
|
||||||
|
x = x_sum / flen
|
||||||
|
y = y_sum / flen
|
||||||
|
theta = np.arctan2(sin_sum/flen, cos_sum/flen)
|
||||||
|
else:
|
||||||
|
x = x_sum
|
||||||
|
y = y_sum
|
||||||
|
theta = 0.0
|
||||||
|
return Particle(x, y, theta)
|
||||||
|
|
||||||
|
|
||||||
|
def move_particle(particle, delta_x, delta_y, delta_theta):
|
||||||
|
"""Move the particle by (delta_x, delta_y, delta_theta)"""
|
||||||
|
particle.setX(particle.getX() + delta_x)
|
||||||
|
particle.setY(particle.getY() + delta_y)
|
||||||
|
particle.setTheta(particle.getTheta() + delta_theta)
|
||||||
|
|
||||||
|
|
||||||
|
def add_uncertainty(particles_list, sigma, sigma_theta):
|
||||||
|
"""Add some noise to each particle in the list. Sigma and sigma_theta is the noise
|
||||||
|
variances for position and angle noise."""
|
||||||
|
for particle in particles_list:
|
||||||
|
particle.x += rn.randn(0.0, sigma)
|
||||||
|
particle.y += rn.randn(0.0, sigma)
|
||||||
|
particle.theta = np.mod(particle.theta + rn.randn(0.0, sigma_theta), 2.0 * np.pi)
|
||||||
|
|
||||||
|
|
||||||
|
def add_uncertainty_von_mises(particles_list, sigma, theta_kappa):
|
||||||
|
"""Add some noise to each particle in the list. Sigma and theta_kappa is the noise
|
||||||
|
variances for position and angle noise."""
|
||||||
|
for particle in particles_list:
|
||||||
|
particle.x += rn.randn(0.0, sigma)
|
||||||
|
particle.y += rn.randn(0.0, sigma)
|
||||||
|
particle.theta = np.mod(rn.rand_von_mises(particle.theta, theta_kappa), 2.0 * np.pi) - np.pi
|
||||||
|
|
||||||
|
def resample(particle_list):
|
||||||
|
weights = [p.weight for p in particle_list]
|
||||||
|
if sum(weights) == 0:
|
||||||
|
weights = [1/len(particle_list) for _ in particle_list]
|
||||||
|
new_particles = random.choices(particle_list, weights, k=len(particle_list))
|
||||||
|
particle_list = [p.copy() for p in new_particles]
|
||||||
|
return particle_list
|
54
selflocalization/random_numbers.py
Normal file
54
selflocalization/random_numbers.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def randn(mu, sigma):
|
||||||
|
"""Normal random number generator
|
||||||
|
mean mu, standard deviation sigma"""
|
||||||
|
return sigma * np.random.randn() + mu
|
||||||
|
|
||||||
|
|
||||||
|
def rand_von_mises(mu, kappa):
|
||||||
|
"""Generate random samples from the Von Mises distribution"""
|
||||||
|
if kappa < 1e-6:
|
||||||
|
# kappa is small: sample uniformly on circle
|
||||||
|
theta = 2.0 * np.pi * np.random.ranf() - np.pi
|
||||||
|
else:
|
||||||
|
a = 1.0 + np.sqrt(1.0 + 4.0 * kappa * kappa)
|
||||||
|
b = (a - np.sqrt(2.0 * a)) / (2.0 * kappa)
|
||||||
|
r = (1.0 + b*b) / (2.0 * b)
|
||||||
|
|
||||||
|
not_done = True
|
||||||
|
while not_done:
|
||||||
|
u1 = np.random.ranf()
|
||||||
|
u2 = np.random.ranf()
|
||||||
|
|
||||||
|
z = np.cos(np.pi * u1)
|
||||||
|
f = (1.0 + r * z) / (r + z)
|
||||||
|
c = kappa * (r - f)
|
||||||
|
|
||||||
|
not_done = (u2 >= c * (2.0 - c)) and (np.log(c) - np.log(u2) + 1.0 - c < 0.0)
|
||||||
|
|
||||||
|
u3 = np.random.ranf()
|
||||||
|
theta = mu + np.sign(u3 - 0.5) * np.arccos(f)
|
||||||
|
|
||||||
|
return theta
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Tests
|
||||||
|
|
||||||
|
print("Gaussian distribution:")
|
||||||
|
r = np.zeros(1000)
|
||||||
|
for i in range(1000):
|
||||||
|
r[i] = randn(1.0, 2.0)
|
||||||
|
|
||||||
|
print("True mean 1.0 == Estimated mean ", np.mean(r))
|
||||||
|
print("True std 2.0 == Estimated std ", np.std(r))
|
||||||
|
|
||||||
|
print("Von Mises distribution:")
|
||||||
|
|
||||||
|
t = np.zeros(1000)
|
||||||
|
for i in range(1000):
|
||||||
|
t[i] = rand_von_mises(1.0, 8)
|
||||||
|
|
||||||
|
print("True mean 1.0 == Estimated mean ", np.mean(t))
|
292
selflocalization/selflocalize.py
Normal file
292
selflocalization/selflocalize.py
Normal file
@ -0,0 +1,292 @@
|
|||||||
|
import cv2
|
||||||
|
import particle as particle
|
||||||
|
import camera as camera
|
||||||
|
import numpy as np
|
||||||
|
from time import sleep
|
||||||
|
from timeit import default_timer as timer
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
# Flags
|
||||||
|
showGUI = False # Whether or not to open GUI windows
|
||||||
|
onRobot = True # Whether or not we are running on the Arlo robot
|
||||||
|
|
||||||
|
|
||||||
|
def isRunningOnArlo():
|
||||||
|
"""Return True if we are running on Arlo, otherwise False.
|
||||||
|
You can use this flag to switch the code from running on you laptop to Arlo - you need to do the programming here!
|
||||||
|
"""
|
||||||
|
return onRobot
|
||||||
|
|
||||||
|
|
||||||
|
if isRunningOnArlo():
|
||||||
|
sys.path.append("..")
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
import robot
|
||||||
|
onRobot = True
|
||||||
|
except ImportError:
|
||||||
|
print("selflocalize.py: robot module not present - forcing not running on Arlo!")
|
||||||
|
onRobot = False
|
||||||
|
|
||||||
|
|
||||||
|
POWER = 70
|
||||||
|
|
||||||
|
TURN_T = 7.9 # 1 degree
|
||||||
|
DRIVE_T = 22 # 1 centimeter
|
||||||
|
|
||||||
|
RIGHT_WHEEL_OFFSET = 4
|
||||||
|
|
||||||
|
CLOCKWISE_OFFSET = 0.96
|
||||||
|
|
||||||
|
|
||||||
|
# Some color constants in BGR format
|
||||||
|
CRED = (0, 0, 255)
|
||||||
|
CGREEN = (0, 255, 0)
|
||||||
|
CBLUE = (255, 0, 0)
|
||||||
|
CCYAN = (255, 255, 0)
|
||||||
|
CYELLOW = (0, 255, 255)
|
||||||
|
CMAGENTA = (255, 0, 255)
|
||||||
|
CWHITE = (255, 255, 255)
|
||||||
|
CBLACK = (0, 0, 0)
|
||||||
|
|
||||||
|
# Landmarks.
|
||||||
|
# The robot knows the position of 2 landmarks. Their coordinates are in the unit centimeters [cm].
|
||||||
|
landmarkIDs = [1, 5]
|
||||||
|
landmarks = {
|
||||||
|
1: (0.0, 0.0), # Coordinates for landmark 1
|
||||||
|
5: (300.0, 0.0) # Coordinates for landmark 2
|
||||||
|
}
|
||||||
|
landmark_colors = [CRED, CGREEN] # Colors used when drawing the landmarks
|
||||||
|
|
||||||
|
|
||||||
|
SIGMA = 3
|
||||||
|
SIGMA_THETA = 0.3
|
||||||
|
|
||||||
|
|
||||||
|
def normal(x, mean, std):
|
||||||
|
return (
|
||||||
|
(np.exp(-(((x - mean)**2)/(2 * std**2))))/(np.sqrt(2 * np.pi) * std)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def jet(x):
|
||||||
|
"""Colour map for drawing particles. This function determines the colour of
|
||||||
|
a particle from its weight."""
|
||||||
|
r = (x >= 3.0/8.0 and x < 5.0/8.0) * (4.0 * x - 3.0/2.0) + (x >= 5.0/8.0 and x < 7.0/8.0) + (x >= 7.0/8.0) * (-4.0 * x + 9.0/2.0)
|
||||||
|
g = (x >= 1.0/8.0 and x < 3.0/8.0) * (4.0 * x - 1.0/2.0) + (x >= 3.0/8.0 and x < 5.0/8.0) + (x >= 5.0/8.0 and x < 7.0/8.0) * (-4.0 * x + 7.0/2.0)
|
||||||
|
b = (x < 1.0/8.0) * (4.0 * x + 1.0/2.0) + (x >= 1.0/8.0 and x < 3.0/8.0) + (x >= 3.0/8.0 and x < 5.0/8.0) * (-4.0 * x + 5.0/2.0)
|
||||||
|
|
||||||
|
return (255.0*r, 255.0*g, 255.0*b)
|
||||||
|
|
||||||
|
def draw_world(est_pose, particles, world):
|
||||||
|
"""Visualization.
|
||||||
|
This functions draws robots position in the world coordinate system."""
|
||||||
|
|
||||||
|
# Fix the origin of the coordinate system
|
||||||
|
offsetX = 100
|
||||||
|
offsetY = 250
|
||||||
|
|
||||||
|
# Constant needed for transforming from world coordinates to screen coordinates (flip the y-axis)
|
||||||
|
ymax = world.shape[0]
|
||||||
|
|
||||||
|
world[:] = CWHITE # Clear background to white
|
||||||
|
|
||||||
|
# Find largest weight
|
||||||
|
max_weight = 0
|
||||||
|
for particle in particles:
|
||||||
|
max_weight = max(max_weight, particle.getWeight())
|
||||||
|
|
||||||
|
# Draw particles
|
||||||
|
for particle in particles:
|
||||||
|
x = int(particle.getX() + offsetX)
|
||||||
|
y = ymax - (int(particle.getY() + offsetY))
|
||||||
|
colour = jet(particle.getWeight() / max_weight)
|
||||||
|
cv2.circle(world, (x,y), 2, colour, 2)
|
||||||
|
b = (int(particle.getX() + 15.0*np.cos(particle.getTheta()))+offsetX,
|
||||||
|
ymax - (int(particle.getY() + 15.0*np.sin(particle.getTheta()))+offsetY))
|
||||||
|
cv2.line(world, (x,y), b, colour, 2)
|
||||||
|
|
||||||
|
# Draw landmarks
|
||||||
|
for i in range(len(landmarkIDs)):
|
||||||
|
ID = landmarkIDs[i]
|
||||||
|
lm = (int(landmarks[ID][0] + offsetX), int(ymax - (landmarks[ID][1] + offsetY)))
|
||||||
|
cv2.circle(world, lm, 5, landmark_colors[i], 2)
|
||||||
|
|
||||||
|
# Draw estimated robot pose
|
||||||
|
a = (int(est_pose.getX())+offsetX, ymax-(int(est_pose.getY())+offsetY))
|
||||||
|
b = (int(est_pose.getX() + 15.0*np.cos(est_pose.getTheta()))+offsetX,
|
||||||
|
ymax-(int(est_pose.getY() + 15.0*np.sin(est_pose.getTheta()))+offsetY))
|
||||||
|
cv2.circle(world, a, 5, CMAGENTA, 2)
|
||||||
|
cv2.line(world, a, b, CMAGENTA, 2)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def initialize_particles(num_particles):
|
||||||
|
particles = []
|
||||||
|
for i in range(num_particles):
|
||||||
|
# Random starting points.
|
||||||
|
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf() - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
|
||||||
|
particles.append(p)
|
||||||
|
|
||||||
|
return particles
|
||||||
|
|
||||||
|
def dist(particle, landmark):
|
||||||
|
return np.sqrt((landmark[0]-particle.x)**2+(landmark[1]-particle.y)**2)
|
||||||
|
|
||||||
|
def calc_angle(particle, landmark, dist):
|
||||||
|
e_theta = np.array([np.cos(particle.theta), np.sin(particle.theta)]).T
|
||||||
|
e_landmark = (np.array([landmark[0]-particle.x, landmark[1]-particle.y]).T)/dist
|
||||||
|
e_hat_theta = np.array([-np.sin(particle.theta), np.cos(particle.theta)]).T
|
||||||
|
return np.sign(np.dot(e_landmark, e_hat_theta)) * np.arccos(np.dot(e_landmark, e_theta))
|
||||||
|
|
||||||
|
def calc_weight(particle, landmark_values):
|
||||||
|
if landmark_values == []:
|
||||||
|
return
|
||||||
|
weights = []
|
||||||
|
for values in landmark_values:
|
||||||
|
dist_to_landmark = dist(particle, landmarks[values[0]])
|
||||||
|
dist_weight = normal(values[1], dist_to_landmark, SIGMA)
|
||||||
|
|
||||||
|
angle_to_landmark = calc_angle(particle, landmarks[values[0]], dist_to_landmark)
|
||||||
|
angle_weight = normal(values[2], angle_to_landmark, SIGMA_THETA)
|
||||||
|
weights.append(dist_weight * angle_weight)
|
||||||
|
|
||||||
|
particle.setWeight(np.product(weights))
|
||||||
|
|
||||||
|
def drive_towards_middle(est_pose, arlo):
|
||||||
|
middle = np.array([150, 0])
|
||||||
|
|
||||||
|
position = np.array([est_pose.x, est_pose.y])
|
||||||
|
theta = est_pose.theta
|
||||||
|
print(np.rad2deg(theta))
|
||||||
|
|
||||||
|
relative_pos = middle - position
|
||||||
|
print("relative_pos", relative_pos)
|
||||||
|
angle = np.rad2deg(np.arctan(relative_pos[1]/relative_pos[0]))
|
||||||
|
print(angle)
|
||||||
|
turn_angle = np.mod(angle - (np.rad2deg(theta) - 180), 360)
|
||||||
|
print(turn_angle)
|
||||||
|
drive_distance = np.sqrt(position[0]**2 + position[1]**2)
|
||||||
|
if turn_angle < 180:
|
||||||
|
arlo.go_diff(POWER, POWER, 0, 1)
|
||||||
|
sleep((abs(turn_angle) * TURN_T)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
else:
|
||||||
|
arlo.go_diff(POWER, POWER, 1, 0)
|
||||||
|
sleep((abs(360 - turn_angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||||
|
sleep((drive_distance * DRIVE_T)/1000)
|
||||||
|
arlo.stop()
|
||||||
|
|
||||||
|
# Main program #
|
||||||
|
try:
|
||||||
|
if showGUI:
|
||||||
|
# Open windows
|
||||||
|
WIN_RF1 = "Robot view"
|
||||||
|
cv2.namedWindow(WIN_RF1)
|
||||||
|
cv2.moveWindow(WIN_RF1, 50, 50)
|
||||||
|
|
||||||
|
WIN_World = "World view"
|
||||||
|
cv2.namedWindow(WIN_World)
|
||||||
|
cv2.moveWindow(WIN_World, 500, 50)
|
||||||
|
|
||||||
|
|
||||||
|
# Initialize particles
|
||||||
|
num_particles = 10000
|
||||||
|
particles = initialize_particles(num_particles)
|
||||||
|
|
||||||
|
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
||||||
|
|
||||||
|
# Driving parameters
|
||||||
|
velocity = 0.0 # cm/sec
|
||||||
|
angular_velocity = 0.0 # radians/sec
|
||||||
|
|
||||||
|
arlo = robot.Robot()
|
||||||
|
|
||||||
|
# Allocate space for world map
|
||||||
|
world = np.zeros((500,500,3), dtype=np.uint8)
|
||||||
|
|
||||||
|
# Draw map
|
||||||
|
draw_world(est_pose, particles, world)
|
||||||
|
|
||||||
|
print("Opening and initializing camera")
|
||||||
|
if camera.isRunningOnArlo():
|
||||||
|
cam = camera.Camera(0, 'arlo', useCaptureThread = True)
|
||||||
|
else:
|
||||||
|
cam = camera.Camera(0, 'macbookpro', useCaptureThread = True)
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
# Fetch next frame
|
||||||
|
colour = cam.get_next_frame()
|
||||||
|
landmark_values = []
|
||||||
|
|
||||||
|
# Detect objects
|
||||||
|
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||||
|
if not isinstance(objectIDs, type(None)):
|
||||||
|
|
||||||
|
# List detected objects
|
||||||
|
for i in range(len(objectIDs)):
|
||||||
|
print("Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
|
||||||
|
if objectIDs[i] in landmarkIDs:
|
||||||
|
landmark_values.append((objectIDs[i], dists[i], angles[i]))
|
||||||
|
|
||||||
|
# Compute particle weights
|
||||||
|
for p in particles:
|
||||||
|
calc_weight(p, landmark_values)
|
||||||
|
|
||||||
|
# Resampling
|
||||||
|
particles = particle.resample(particles)
|
||||||
|
|
||||||
|
# Draw detected objects
|
||||||
|
cam.draw_aruco_objects(colour)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# No observation - reset weights to uniform distribution
|
||||||
|
for p in particles:
|
||||||
|
p.setWeight(1.0/num_particles)
|
||||||
|
|
||||||
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||||
|
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
||||||
|
calc_weight(est_pose, landmark_values)
|
||||||
|
est_weight = est_pose.weight * 10**5
|
||||||
|
print("est_weight:", est_weight)
|
||||||
|
if est_weight > 1:
|
||||||
|
draw_world(est_pose, particles, world)
|
||||||
|
cv2.imwrite("test.png", world)
|
||||||
|
break
|
||||||
|
# else:
|
||||||
|
# arlo.go_diff(POWER, POWER, 0, 1)
|
||||||
|
# sleep((15 * TURN_T)/1000)
|
||||||
|
# arlo.stop()
|
||||||
|
# for p in particles:
|
||||||
|
# p.setTheta(p.theta + np.deg2rad(15))
|
||||||
|
|
||||||
|
if showGUI:
|
||||||
|
# Draw map
|
||||||
|
draw_world(est_pose, particles, world)
|
||||||
|
|
||||||
|
# Show frame
|
||||||
|
cv2.imshow(WIN_RF1, colour)
|
||||||
|
|
||||||
|
# Show world
|
||||||
|
cv2.imshow(WIN_World, world)
|
||||||
|
|
||||||
|
print(est_pose.x, est_pose.y, est_pose.theta)
|
||||||
|
drive_towards_middle(est_pose, arlo)
|
||||||
|
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Make sure to clean up even if an exception occurred
|
||||||
|
|
||||||
|
# Close all windows
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
# Clean-up capture thread
|
||||||
|
cam.terminateCaptureThread()
|
||||||
|
|
Reference in New Issue
Block a user