Compare commits
206 Commits
2fd874c55d
...
main
Author | SHA1 | Date | |
---|---|---|---|
90a1c03a5c | |||
30213107de | |||
78e76da161 | |||
1c79989840 | |||
90b5ff824d | |||
144551df39 | |||
e18767fcb6 | |||
919b7b2c4d | |||
b30833dcae | |||
12bcfe412f | |||
374c321c4c | |||
e974a0bb27 | |||
f7d459482a | |||
073897c485 | |||
76a459fc9f | |||
b0bd525726 | |||
c70da77fc9 | |||
60dbadabe1 | |||
bcf1ae2a5a | |||
82e4c2eb43 | |||
02c2c04ca4 | |||
5fdcf1c0dc | |||
71b7c67fff | |||
fbd30c388c | |||
ffe364dd97 | |||
21236301e0 | |||
9d8e95d711 | |||
22ef920735 | |||
0558fcd9e4 | |||
9890800079 | |||
d68039f09f | |||
323185d08d | |||
baed8628be | |||
850ccb8179 | |||
879adf7b91 | |||
123c8fe350 | |||
f93a046270 | |||
9cc3b41f87 | |||
81862f8ee3 | |||
3219028980 | |||
cf21934931 | |||
a7616ad41c | |||
963509fd8a | |||
2c65a75f28 | |||
0621d02fff | |||
51b29110bc | |||
8984fcf6af | |||
6c27ad4805 | |||
437ccde0f1 | |||
e329d2b87f | |||
679d6b78c5 | |||
302095a929 | |||
8be9656f5d | |||
15fab9c09c | |||
04c8dcc96e | |||
5ddcd8499d | |||
9145085739 | |||
ebf356445b | |||
ef5f297a47 | |||
0e46b0dbe2 | |||
ca4929de5c | |||
e0cea5cb78 | |||
a626ad7c14 | |||
be1183c611 | |||
94bbb57d65 | |||
05f654ba94 | |||
2369aad004 | |||
7edb905bd2 | |||
cb33cc0027 | |||
417dd962ae | |||
b636440409 | |||
616d902ee2 | |||
c78c596db2 | |||
ad57381bc0 | |||
a7ffe3f581 | |||
23b99b56d5 | |||
5e2766b01e | |||
dc23e96fba | |||
ef9cabcaa5 | |||
d3db855dad | |||
d8647d4466 | |||
4ac0ebbd22 | |||
1f763a512f | |||
edec956494 | |||
4e41d872ce | |||
f822019bd1 | |||
3b1ce060df | |||
492f7b90cd | |||
4c89c36a78 | |||
afd5be5eb3 | |||
93df18a433 | |||
ab5974d46a | |||
85289bc5c9 | |||
90d13f8a03 | |||
03245ecd2a | |||
1a4587d60c | |||
44434488a6 | |||
1edc312c8a | |||
5714651a8d | |||
2372c85148 | |||
2c10cc9346 | |||
64d548ddfa | |||
a97b998754 | |||
783f3291c8 | |||
baf83d8fee | |||
27f039059f | |||
1c5a0edc0b | |||
8884013910 | |||
af3064ad4c | |||
85319f7631 | |||
5966c1188b | |||
1f9cb9069e | |||
f8de362c43 | |||
f60b56587a | |||
3e32a38ad2 | |||
d4a4d1c443 | |||
ac5e76d977 | |||
86b621fa9d | |||
a0eeeba787 | |||
b3aaf40487 | |||
0860ccd58b | |||
ac6d94ee39 | |||
dc8f23bd5e | |||
5e7509d2b6 | |||
8a2156a5d9 | |||
642591ecc4 | |||
b7ccb84c19 | |||
0bad669ba5 | |||
f2e3a7f56a | |||
b0e4bcf85c | |||
e3916ab6a0 | |||
3a3903176a | |||
f77b2d32bd | |||
f547e8e0c3 | |||
e1e229efc7 | |||
8c2a835e01 | |||
640db167fe | |||
f598801950 | |||
2009e1c972 | |||
a025ed0098 | |||
341de6027a | |||
1b0c989800 | |||
81815b17a6 | |||
6b3c84fff2 | |||
2490d93a3c | |||
7fb14f9345 | |||
95c011e4e7 | |||
f546e3f8f5 | |||
13bde48720 | |||
0cb11b5264 | |||
eeafcc7f8b | |||
81d9ae690a | |||
8cf1d8dcbd | |||
8201952b4b | |||
193e15fc2a | |||
517948c543 | |||
f49cab65f2 | |||
35b4cdbdfe | |||
35971fbcee | |||
a675ecfc88 | |||
0c3b5a7684 | |||
fc6f2ff67a | |||
c705d8bc7c | |||
7908028313 | |||
3f55a78127 | |||
29d567ff16 | |||
a90ef9d8f9 | |||
76f7e922cc | |||
94c7bf1d35 | |||
ac48afd01a | |||
6f93fd0857 | |||
f6ce6456b8 | |||
01ab491b61 | |||
1888422cdf | |||
96992c878e | |||
c1a6ddeda4 | |||
f28b1f6e1b | |||
9972ad7e86 | |||
07817b3356 | |||
8008f8fdae | |||
774fa0c44d | |||
0abeff7273 | |||
5916f9dbb3 | |||
08d5ed2f0a | |||
6c05d877f5 | |||
77bc6489e0 | |||
d6ed233b8c | |||
f9c28d107e | |||
8d2923af00 | |||
5fc8948852 | |||
9b684bb9b7 | |||
21df2467e6 | |||
fa51978f98 | |||
d466eb172e | |||
d088fe6a28 | |||
b603737bf8 | |||
5c5859cc45 | |||
e53dbab406 | |||
0c13f8cb2e | |||
280d9037c0 | |||
df1a4de690 | |||
b12106cd57 | |||
bef95b8526 | |||
cd640c403a | |||
7267e04ba7 | |||
55704cf590 |
BIN
Examples/.DS_Store
vendored
Normal file
BIN
Examples/.DS_Store
vendored
Normal file
Binary file not shown.
96
drive_between_arucos.py
Normal file
96
drive_between_arucos.py
Normal file
@ -0,0 +1,96 @@
|
||||
from time import sleep
|
||||
from math import cos, sin
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from robot import Robot
|
||||
|
||||
POWER = 70
|
||||
|
||||
TURN_T = 7.9 # 1 degree
|
||||
DRIVE_T = 22 # 1 centimeter
|
||||
|
||||
RIGHT_WHEEL_OFFSET = 4
|
||||
|
||||
CLOCKWISE_OFFSET = 0.92
|
||||
|
||||
FOCAL_LENGTH = 1691
|
||||
|
||||
CAMERA_MATRIX = np.array(
|
||||
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||
dtype=np.float32
|
||||
)
|
||||
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||
|
||||
def find_aruco(image):
|
||||
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||
image,
|
||||
aruco_dict,
|
||||
parameters=aruco_params
|
||||
)
|
||||
|
||||
if corners is None:
|
||||
return []
|
||||
|
||||
return [(box[0], ids[i][0]) for i, box in enumerate(corners)]
|
||||
|
||||
def find_arucos(arlo):
|
||||
aruco_dict = {}
|
||||
theta = np.deg2rad(-20)
|
||||
rot = np.array([[cos(theta), -sin(theta)], [sin(theta), cos(theta)]])
|
||||
|
||||
while True:
|
||||
arucos = find_aruco(arlo.take_photo())
|
||||
if arucos != []:
|
||||
for aruco, id_ in arucos:
|
||||
if id_ in [1,6] and id_ not in aruco_dict:
|
||||
print(f"found box {id_}")
|
||||
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||
np.array([aruco]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||
)[1][0][0]
|
||||
position = np.array([position[2], position[0]])
|
||||
position[0] += 22.5
|
||||
aruco_dict[id_] = position
|
||||
|
||||
if len(aruco_dict) >= 2:
|
||||
break
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
sleep((20 * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
arlo.stop()
|
||||
for key, value in aruco_dict.items():
|
||||
aruco_dict[key] = np.dot(rot, value)
|
||||
|
||||
return np.array(list(aruco_dict.values())[:2])
|
||||
|
||||
def main():
|
||||
arlo = Robot()
|
||||
aruco_positions = find_arucos(arlo)
|
||||
print(aruco_positions)
|
||||
|
||||
position = [
|
||||
np.average(aruco_positions[:,0]),
|
||||
np.average(aruco_positions[:,1])
|
||||
]
|
||||
print(position)
|
||||
|
||||
angle = np.rad2deg(np.arctan(position[1]/position[0]))
|
||||
drive_distance = np.sqrt(position[0]**2 + position[1]**2)
|
||||
if angle < 0:
|
||||
arlo.go_diff(POWER, POWER, 0, 1)
|
||||
sleep((abs(angle) * TURN_T)/1000)
|
||||
arlo.stop()
|
||||
else:
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
arlo.stop()
|
||||
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
sleep((drive_distance * DRIVE_T)/1000)
|
||||
arlo.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
71
drive_to_aruco.py
Normal file
71
drive_to_aruco.py
Normal file
@ -0,0 +1,71 @@
|
||||
from time import sleep
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from robot import Robot
|
||||
|
||||
POWER = 70
|
||||
|
||||
TURN_T = 7.9 # 1 degree
|
||||
DRIVE_T = 22 # 1 centimeter
|
||||
|
||||
RIGHT_WHEEL_OFFSET = 4
|
||||
|
||||
CLOCKWISE_OFFSET = 0.96
|
||||
|
||||
FOCAL_LENGTH = 1691
|
||||
|
||||
CAMERA_MATRIX = np.array(
|
||||
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||
dtype=np.float32
|
||||
)
|
||||
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||
|
||||
def find_aruco(image):
|
||||
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||
image,
|
||||
aruco_dict,
|
||||
parameters=aruco_params
|
||||
)
|
||||
|
||||
if corners is None:
|
||||
return []
|
||||
|
||||
return [(box[0], ids[i]) for i, box in enumerate(corners)]
|
||||
|
||||
def main():
|
||||
arlo = Robot()
|
||||
while True:
|
||||
arucos = find_aruco(arlo.take_photo())
|
||||
if arucos != []:
|
||||
break
|
||||
arlo.go_diff(40, 40, 1, 0)
|
||||
sleep(0.3)
|
||||
arlo.stop()
|
||||
|
||||
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||
np.array([arucos[0][0]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||
)[1][0][0]
|
||||
|
||||
angle = np.rad2deg(np.arctan(position[0]/position[2]))
|
||||
drive_distance = np.sqrt(position[0]**2 + position[2]**2)
|
||||
print(drive_distance)
|
||||
if angle < 0:
|
||||
arlo.go_diff(POWER, POWER, 0, 1)
|
||||
sleep((abs(angle) * TURN_T)/1000)
|
||||
arlo.stop()
|
||||
else:
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
arlo.stop()
|
||||
|
||||
# arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
# sleep((drive_distance * DRIVE_T)/1000)
|
||||
# arlo.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -2,9 +2,10 @@ from robot import Arlo
|
||||
|
||||
def main():
|
||||
arlo = Arlo(False)
|
||||
image = arlo.robot.take_photo()
|
||||
bounding_boxes = arlo.find_aruco(image)
|
||||
arlo.estimate_distance(bounding_boxes[0])
|
||||
for _ in range(5):
|
||||
image = arlo.robot.take_photo()
|
||||
bounding_boxes = arlo.find_aruco(image)
|
||||
print(arlo.estimate_distance(bounding_boxes[0][0]))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(9)
|
||||
main()
|
20
occupancy_grids.py
Normal file
20
occupancy_grids.py
Normal file
@ -0,0 +1,20 @@
|
||||
from matplotlib.backend_bases import NonGuiException
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from numpy.random import uniform
|
||||
|
||||
|
||||
def Binary_Baye_Filter (I, zt, x):
|
||||
pass
|
||||
|
||||
def invserse_sensor_model(m, x, z):
|
||||
pass
|
||||
|
||||
|
||||
def update(particles, weights, z, R, landmarks):
|
||||
for i, landmark in enumerate(landmarks):
|
||||
distance = np.linalg.norm(particles[:, 0:2] - landmark, axis=1)
|
||||
weights *= np.scipy.stats.norm(distance, R).pdf(z[i])
|
||||
|
||||
weights += 1.e-300 # avoid round-off to zero
|
||||
weights /= sum(weights) # normalize
|
154
rally.py
Normal file
154
rally.py
Normal file
@ -0,0 +1,154 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
import robot
|
||||
|
||||
LANDMARKS = [1,2,3,4] # SET ARUCO NUMBERS
|
||||
|
||||
POWER = 70
|
||||
|
||||
TURN_T = 8.3 # 1 degree
|
||||
DRIVE_T = 22 # 1 centimeter
|
||||
|
||||
RIGHT_WHEEL_OFFSET = 5
|
||||
|
||||
CLOCKWISE_OFFSET = 0.8
|
||||
|
||||
FOCAL_LENGTH = 1691
|
||||
|
||||
CAMERA_MATRIX = np.array(
|
||||
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||
dtype=np.float32
|
||||
)
|
||||
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||
|
||||
def find_aruco(image):
|
||||
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||
aruco_params = cv2.aruco.DetectorParameters_create()
|
||||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||
image,
|
||||
aruco_dict,
|
||||
parameters=aruco_params
|
||||
)
|
||||
|
||||
if corners is None:
|
||||
return []
|
||||
|
||||
return {ids[i][0]: box[0] for i, box in enumerate(corners)}
|
||||
|
||||
|
||||
def drunk_drive(drive_time, arlo, careful_range = 600):
|
||||
start = time.time()
|
||||
|
||||
end = start + drive_time
|
||||
turning = None
|
||||
while time.time() < end:
|
||||
forward_dist = arlo.read_front_ping_sensor()
|
||||
right_dist = arlo.read_right_ping_sensor()
|
||||
left_dist = arlo.read_left_ping_sensor()
|
||||
if forward_dist > careful_range and all(x > 250 for x in [right_dist, left_dist]):
|
||||
turning = None
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
else:
|
||||
if turning == "R" or (forward_dist > careful_range and right_dist > 250 and turning is None):
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
turning = "R"
|
||||
else:
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||
turning = "L"
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
def careful_forward(drive_time, arlo, careful_range = 600):
|
||||
start = time.time()
|
||||
if drive_time > 0.8:
|
||||
drive_time = 0.8
|
||||
hit_something = True
|
||||
else:
|
||||
hit_something = False
|
||||
|
||||
end = start + drive_time
|
||||
turning = None
|
||||
while time.time() < end:
|
||||
forward_dist = arlo.read_front_ping_sensor()
|
||||
right_dist = arlo.read_right_ping_sensor()
|
||||
left_dist = arlo.read_left_ping_sensor()
|
||||
if forward_dist > careful_range and all(x > 250 for x in [right_dist, left_dist]):
|
||||
turning = None
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
else:
|
||||
hit_something = True
|
||||
if turning == "R" or (forward_dist > careful_range and right_dist > 250 and turning is None):
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
turning = "R"
|
||||
else:
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||
turning = "L"
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
return hit_something
|
||||
|
||||
def main():
|
||||
landmark_order = LANDMARKS + [
|
||||
LANDMARKS[0]
|
||||
]
|
||||
|
||||
noah = robot.Robot()
|
||||
|
||||
while landmark_order != []:
|
||||
landmark = landmark_order.pop(0)
|
||||
print(f"looking for landmark {landmark}")
|
||||
|
||||
while True:
|
||||
while True:
|
||||
for _ in range(24):
|
||||
arucos = find_aruco(noah.take_photo())
|
||||
if landmark in arucos:
|
||||
break
|
||||
noah.go_diff(POWER, POWER, 0, 1)
|
||||
time.sleep((20 * TURN_T)/1000)
|
||||
noah.stop()
|
||||
|
||||
if landmark in arucos:
|
||||
break
|
||||
|
||||
drunk_drive(3, noah)
|
||||
|
||||
|
||||
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||
np.array([arucos[landmark]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||
)[1][0][0]
|
||||
|
||||
angle = np.rad2deg(np.arctan(position[0]/position[2])) - 5
|
||||
drive_distance = np.sqrt(position[0]**2 + position[2]**2) - 100
|
||||
|
||||
if angle < 0:
|
||||
noah.go_diff(POWER, POWER, 0, 1)
|
||||
time.sleep((abs(angle) * TURN_T)/1000)
|
||||
noah.stop()
|
||||
else:
|
||||
noah.go_diff(POWER, POWER, 1, 0)
|
||||
time.sleep((abs(angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
noah.stop()
|
||||
|
||||
if not careful_forward((drive_distance * DRIVE_T)/1000, noah, 400):
|
||||
arucos = find_aruco(noah.take_photo())
|
||||
if landmark in arucos:
|
||||
position = cv2.aruco.estimatePoseSingleMarkers(
|
||||
np.array([arucos[landmark]]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||
)[1][0][0]
|
||||
drive_distance = np.sqrt(position[0]**2 + position[2]**2) - 25
|
||||
noah.go_diff(POWER, POWER, 1, 1)
|
||||
time.sleep((drive_distance * DRIVE_T)/1000)
|
||||
noah.stop()
|
||||
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
start = time.time()
|
||||
main()
|
||||
print(f"Drove for {time.time() - start} seconds")
|
314
rally2.py
Normal file
314
rally2.py
Normal file
@ -0,0 +1,314 @@
|
||||
import time
|
||||
from turtle import right
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
from selflocalization import particle, camera
|
||||
|
||||
import robot
|
||||
|
||||
LANDMARKS = [6,8,9,7]
|
||||
|
||||
LANDMARK_POSITIONS = {
|
||||
6: [0, 0],
|
||||
8: [300, 0],
|
||||
9: [0, 400],
|
||||
7: [300, 400]
|
||||
}
|
||||
|
||||
POWER = 70
|
||||
|
||||
TURN_T = 9.6 # 1 degree
|
||||
DRIVE_T = 22 # 1 centimeter
|
||||
|
||||
RIGHT_WHEEL_OFFSET = 4
|
||||
|
||||
CLOCKWISE_OFFSET = 0.96
|
||||
|
||||
FOCAL_LENGTH = 1691
|
||||
|
||||
CAMERA_MATRIX = np.array(
|
||||
[[FOCAL_LENGTH, 0, 512], [0, FOCAL_LENGTH, 360], [0, 0, 1]],
|
||||
dtype=np.float32
|
||||
)
|
||||
DIST_COEF = np.array([0, 0, 0, 0, 0], dtype=np.float32)
|
||||
|
||||
SIGMA = 3
|
||||
SIGMA_THETA = 0.3
|
||||
|
||||
NUM_PARTICLES = 1000
|
||||
|
||||
|
||||
def look_around(noah, particles, cam, est_pose):
|
||||
for _ in range(24):
|
||||
time.sleep(0.2)
|
||||
# Fetch next frame
|
||||
for _ in range(10):
|
||||
colour = cam.get_next_frame()
|
||||
landmark_values = []
|
||||
|
||||
# Detect objects
|
||||
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||
if not isinstance(objectIDs, type(None)):
|
||||
|
||||
# List detected objects
|
||||
for i in range(len(objectIDs)):
|
||||
print(
|
||||
"Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
|
||||
if objectIDs[i] in LANDMARKS:
|
||||
landmark_values.append((objectIDs[i], dists[i] + 22.5, angles[i]))
|
||||
|
||||
# Compute particle weights
|
||||
for p in particles:
|
||||
calc_weight(p, landmark_values)
|
||||
|
||||
# Resampling
|
||||
particles = particle.resample(particles)
|
||||
|
||||
# Draw detected objects
|
||||
cam.draw_aruco_objects(colour)
|
||||
|
||||
else:
|
||||
# No observation - reset weights to uniform distribution
|
||||
for p in particles:
|
||||
p.setWeight(1.0/NUM_PARTICLES)
|
||||
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
# The estimate of the robots current pose
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
calc_weight(est_pose, landmark_values)
|
||||
noah.go_diff(POWER, POWER, 0, 1)
|
||||
time.sleep((15 * TURN_T)/1000)
|
||||
noah.stop()
|
||||
for p in particles:
|
||||
p.setTheta(p.theta - np.deg2rad(15))
|
||||
|
||||
# The estimate of the robots current pose
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
return particles, est_pose
|
||||
|
||||
|
||||
def turn_towards_landmark(noah, particles, est_pose, landmark):
|
||||
current_position = np.array([est_pose.x, est_pose.y])
|
||||
current_theta = est_pose.theta
|
||||
landmark_position = np.array(LANDMARK_POSITIONS[landmark])
|
||||
|
||||
relative_pos = landmark_position - current_position
|
||||
angle = np.rad2deg(np.arctan(relative_pos[1]/relative_pos[0]))
|
||||
turn_angle = np.mod(angle - (np.rad2deg(current_theta) - 180), 360)
|
||||
for p in particles:
|
||||
p.setTheta(p.theta - np.deg2rad(turn_angle))
|
||||
if turn_angle < 180:
|
||||
noah.go_diff(POWER, POWER, 0, 1)
|
||||
time.sleep((abs(turn_angle) * TURN_T)/1000)
|
||||
noah.stop()
|
||||
else:
|
||||
noah.go_diff(POWER, POWER, 1, 0)
|
||||
time.sleep((abs(360 - turn_angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
noah.stop()
|
||||
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
# The estimate of the robots current pose
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
return particles, est_pose
|
||||
|
||||
|
||||
def time_to_landmark(est_pose, landmark):
|
||||
"""Kør indenfor 1 meter"""
|
||||
current_position = np.array([est_pose.x, est_pose.y])
|
||||
landmark_position = np.array(LANDMARK_POSITIONS[landmark])
|
||||
|
||||
relative_pos = landmark_position - current_position
|
||||
drive_distance = np.sqrt(relative_pos[0]**2 + relative_pos[1]**2) - 100
|
||||
return (DRIVE_T * drive_distance)/1000
|
||||
|
||||
|
||||
def drive_until_stopped(noah, particles):
|
||||
noah.go_diff(POWER, POWER+RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
start = time.time()
|
||||
while True:
|
||||
forward_dist = noah.read_front_ping_sensor()
|
||||
if forward_dist < 1000:
|
||||
noah.stop()
|
||||
break
|
||||
left_dist = noah.read_left_ping_sensor()
|
||||
if left_dist < 400:
|
||||
noah.stop()
|
||||
break
|
||||
right_dist = noah.read_right_ping_sensor()
|
||||
if right_dist < 400:
|
||||
noah.stop()
|
||||
break
|
||||
if time.time() - start > 5:
|
||||
noah.stop()
|
||||
break
|
||||
time.sleep(0.01)
|
||||
|
||||
end = time.time()
|
||||
time_driven = end - start
|
||||
distance_driven = (time_driven*1000)/DRIVE_T
|
||||
for p in particles:
|
||||
x = np.cos(p.theta) * distance_driven
|
||||
y = np.sin(p.theta) * distance_driven
|
||||
p.x = p.x + x
|
||||
p.y = p.y + y
|
||||
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
|
||||
return time_driven, est_pose, particles
|
||||
|
||||
|
||||
def drunk_drive(noah):
|
||||
start = time.time()
|
||||
end = start + 2
|
||||
turning = None
|
||||
while time.time() < end:
|
||||
forward_dist = noah.read_front_ping_sensor()
|
||||
right_dist = noah.read_right_ping_sensor()
|
||||
left_dist = noah.read_left_ping_sensor()
|
||||
if forward_dist > 600 and all(x > 400 for x in [right_dist, left_dist]):
|
||||
turning = None
|
||||
noah.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
else:
|
||||
if turning == "R" or (forward_dist > 600 and right_dist > 400 and turning is None):
|
||||
noah.go_diff(POWER, POWER, 1, 0)
|
||||
turning = "R"
|
||||
else:
|
||||
noah.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 0, 1)
|
||||
turning = "L"
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
def inch_closer(noah, particles):
|
||||
start = time.time()
|
||||
noah.go_diff(POWER, POWER+RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
while True:
|
||||
forward_dist = noah.read_front_ping_sensor()
|
||||
if forward_dist < 300:
|
||||
noah.stop()
|
||||
break
|
||||
|
||||
end = time.time()
|
||||
time_driven = end - start
|
||||
distance_driven = (time_driven*1000)/DRIVE_T
|
||||
for p in particles:
|
||||
x = np.cos(p.theta) * distance_driven
|
||||
y = np.sin(p.theta) * distance_driven
|
||||
p.x = p.x + x
|
||||
p.y = p.y + y
|
||||
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
return est_pose, particles
|
||||
|
||||
|
||||
def initialize_particles(num_particles):
|
||||
particles = []
|
||||
for _ in range(num_particles):
|
||||
# Random starting points.
|
||||
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf(
|
||||
) - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
|
||||
particles.append(p)
|
||||
|
||||
return particles
|
||||
|
||||
|
||||
def dist(particle, landmark):
|
||||
return np.sqrt((landmark[0]-particle.x)**2+(landmark[1]-particle.y)**2)
|
||||
|
||||
|
||||
def calc_angle(particle, landmark, dist):
|
||||
e_theta = np.array([np.cos(particle.theta), np.sin(particle.theta)]).T
|
||||
e_landmark = (
|
||||
np.array([landmark[0]-particle.x, landmark[1]-particle.y]).T)/dist
|
||||
e_hat_theta = np.array([-np.sin(particle.theta), np.cos(particle.theta)]).T
|
||||
return np.sign(np.dot(e_landmark, e_hat_theta)) * np.arccos(np.dot(e_landmark, e_theta))
|
||||
|
||||
|
||||
def normal(x, mean, std):
|
||||
return (
|
||||
(np.exp(-(((x - mean)**2)/(2 * std**2))))/(np.sqrt(2 * np.pi) * std)
|
||||
)
|
||||
|
||||
|
||||
def calc_weight(particle, landmark_values):
|
||||
if landmark_values == []:
|
||||
return
|
||||
weights = []
|
||||
for values in landmark_values:
|
||||
dist_to_landmark = dist(particle, LANDMARK_POSITIONS[values[0]])
|
||||
dist_weight = normal(values[1], dist_to_landmark, SIGMA)
|
||||
|
||||
angle_to_landmark = calc_angle(
|
||||
particle, LANDMARK_POSITIONS[values[0]], dist_to_landmark)
|
||||
angle_weight = normal(values[2], angle_to_landmark, SIGMA_THETA)
|
||||
weights.append(dist_weight * angle_weight)
|
||||
|
||||
particle.setWeight(np.product(weights))
|
||||
|
||||
def turn_90_degrees(noah, est_pose, particles):
|
||||
x_values = [i[0] for i in LANDMARK_POSITIONS.values()]
|
||||
y_values = [i[1] for i in LANDMARK_POSITIONS.values()]
|
||||
middle = np.array([max(x_values)/2, max(y_values)/2])
|
||||
|
||||
current_position = np.array([est_pose.x, est_pose.y])
|
||||
relative_pos = middle - current_position
|
||||
angle = np.arctan(relative_pos[1]/relative_pos[0])
|
||||
relative_angle = np.mod(angle - est_pose.theta, 2.0*np.pi)
|
||||
|
||||
clockwise = relative_angle > 180
|
||||
|
||||
if clockwise:
|
||||
noah.go_diff(POWER, POWER, 1, 0)
|
||||
time.sleep((90 * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
else:
|
||||
noah.go_diff(POWER, POWER, 0, 1)
|
||||
time.sleep((90 * TURN_T)/1000)
|
||||
|
||||
noah.stop()
|
||||
|
||||
for p in particles:
|
||||
if clockwise:
|
||||
p.setTheta(p.theta - np.deg2rad(90))
|
||||
else:
|
||||
p.setTheta(p.theta + np.deg2rad(90))
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
_, est_pose, particles = drive_until_stopped(noah, particles)
|
||||
return est_pose, particles
|
||||
|
||||
|
||||
def main(noah):
|
||||
landmark_order = LANDMARKS + [
|
||||
LANDMARKS[0]
|
||||
]
|
||||
cam = camera.Camera(0, 'arlo', useCaptureThread=True)
|
||||
particles = initialize_particles(NUM_PARTICLES)
|
||||
|
||||
# The estimate of the robots current pose
|
||||
est_pose = particle.estimate_pose(particles)
|
||||
|
||||
for landmark in landmark_order:
|
||||
print(f"Going to landmark {landmark}")
|
||||
while True:
|
||||
particles, est_pose = look_around(noah, particles, cam, est_pose)
|
||||
print(est_pose)
|
||||
particles, est_pose = turn_towards_landmark(noah, particles, est_pose, landmark)
|
||||
drive_time = time_to_landmark(est_pose, landmark)
|
||||
time_driven, est_pose, particles = drive_until_stopped(noah, particles)
|
||||
if not abs(time_driven - drive_time) < 0.5:
|
||||
# drunk_drive(noah)
|
||||
est_pose, particles = turn_90_degrees(noah, est_pose, particles)
|
||||
continue
|
||||
|
||||
est_pose, particles = inch_closer(noah, particles)
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
noah = robot.Robot() # Noah er vores robots navn
|
||||
try:
|
||||
main(noah)
|
||||
except KeyboardInterrupt:
|
||||
noah.stop()
|
@ -1,2 +1 @@
|
||||
from .robot import Robot
|
||||
from .arlo import Arlo
|
||||
|
@ -1,21 +1,27 @@
|
||||
from time import sleep
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from .robot import Robot
|
||||
|
||||
START_VALUES = [60, 60]
|
||||
THRESHOLD = 1.05
|
||||
SLEEP_TIME = 2
|
||||
TESTING_SLEEP_TIME = 2
|
||||
DEFAULT_CALIBRATION_CODE = "40.40-40.40-40.40_ff-ff-ff-ff"
|
||||
|
||||
FOCAL_LENGTH = 1691
|
||||
|
||||
DEFAULT_CALIBRATION_CODE = "40.40-40.40-40.40_ff-ff-ff-ff"
|
||||
CAMERA_MATRIX = np.array(
|
||||
[[FOCAL_LENGTH,0,512],[0,FOCAL_LENGTH,360],[0,0,1]],
|
||||
dtype=np.float32
|
||||
)
|
||||
DIST_COEF = np.array([0,0,0,0,0], dtype=np.float32)
|
||||
|
||||
def test_forward(arlo, l_power, r_power):
|
||||
arlo.reset_encoder_counts()
|
||||
arlo.go_diff(l_power, r_power, 1, 1)
|
||||
sleep(SLEEP_TIME)
|
||||
sleep(TESTING_SLEEP_TIME)
|
||||
arlo.stop()
|
||||
return (
|
||||
abs(int(arlo.read_left_wheel_encoder())),
|
||||
@ -25,7 +31,7 @@ def test_forward(arlo, l_power, r_power):
|
||||
def test_back(arlo, l_power, r_power):
|
||||
arlo.reset_encoder_counts()
|
||||
arlo.go_diff(l_power, r_power, 0, 0)
|
||||
sleep(SLEEP_TIME)
|
||||
sleep(TESTING_SLEEP_TIME)
|
||||
arlo.stop()
|
||||
return (
|
||||
abs(int(arlo.read_left_wheel_encoder())),
|
||||
@ -35,7 +41,7 @@ def test_back(arlo, l_power, r_power):
|
||||
def test_clockwise(arlo, l_power, r_power):
|
||||
arlo.reset_encoder_counts()
|
||||
arlo.go_diff(l_power, r_power, 1, 0)
|
||||
sleep(SLEEP_TIME)
|
||||
sleep(TESTING_SLEEP_TIME)
|
||||
arlo.stop()
|
||||
return (
|
||||
abs(int(arlo.read_left_wheel_encoder())),
|
||||
@ -45,7 +51,7 @@ def test_clockwise(arlo, l_power, r_power):
|
||||
def test_anticlockwise(arlo, l_power, r_power):
|
||||
arlo.reset_encoder_counts()
|
||||
arlo.go_diff(l_power, r_power, 0, 1)
|
||||
sleep(SLEEP_TIME)
|
||||
sleep(TESTING_SLEEP_TIME)
|
||||
arlo.stop()
|
||||
return (
|
||||
abs(int(arlo.read_left_wheel_encoder())),
|
||||
@ -57,7 +63,7 @@ class Arlo():
|
||||
self.robot = Robot()
|
||||
if calibration_code is None:
|
||||
self._calibrate()
|
||||
elif calibration_code is None:
|
||||
elif calibration_code == False:
|
||||
self.calibration_code = DEFAULT_CALIBRATION_CODE
|
||||
self._decode_calibration_code()
|
||||
else:
|
||||
@ -89,17 +95,21 @@ class Arlo():
|
||||
self.robot.stop()
|
||||
|
||||
def estimate_distance(self, bounding_box):
|
||||
avg_left = (bounding_box[0][0] + bounding_box[3][0])/2
|
||||
avg_right = (bounding_box[1][0] + bounding_box[2][0])/2
|
||||
avg_up = (bounding_box[0][1] + bounding_box[1][1])/2
|
||||
avg_down = (bounding_box[2][1] + bounding_box[3][1])/2
|
||||
# avg_left = (bounding_box[0][0] + bounding_box[3][0])/2
|
||||
# avg_right = (bounding_box[1][0] + bounding_box[2][0])/2
|
||||
# avg_up = (bounding_box[0][1] + bounding_box[1][1])/2
|
||||
# avg_down = (bounding_box[2][1] + bounding_box[3][1])/2
|
||||
|
||||
avg_width = avg_right - avg_left
|
||||
avg_height = avg_down - avg_up
|
||||
# avg_width = avg_right - avg_left
|
||||
# avg_height = avg_down - avg_up
|
||||
|
||||
avg_size = (avg_width + avg_height)/2
|
||||
# avg_size = (avg_width + avg_height)/2
|
||||
|
||||
return (FOCAL_LENGTH * 15)/avg_size
|
||||
# return (FOCAL_LENGTH * 14.5)/avg_size
|
||||
|
||||
return cv2.aruco.estimatePoseSingleMarkers(
|
||||
np.array([bounding_box]), 14.5, CAMERA_MATRIX, DIST_COEF
|
||||
)[1][0][0][2]
|
||||
|
||||
def find_aruco(self, image):
|
||||
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
|
||||
@ -110,7 +120,10 @@ class Arlo():
|
||||
parameters=aruco_params
|
||||
)
|
||||
|
||||
return zip(corners, ids) if corners is not None else []
|
||||
if corners is None:
|
||||
return []
|
||||
|
||||
return [(box[0], ids[i]) for i, box in enumerate(corners)]
|
||||
|
||||
def draw_arucos(self, image, bounding_boxes):
|
||||
for bounding, n in bounding_boxes:
|
||||
@ -168,15 +181,15 @@ class Arlo():
|
||||
else:
|
||||
values[i][1] += 1
|
||||
|
||||
cps[i] = wheels[0]/SLEEP_TIME
|
||||
cps[i] = wheels[0]/TESTING_SLEEP_TIME
|
||||
|
||||
time = [0 for _ in range(4)]
|
||||
|
||||
cpc = 144/(3.14159*15) # wheel counts per cm
|
||||
|
||||
# milliseconds per 10cm forward
|
||||
# milliseconds per 1cm forward
|
||||
time[0] = int((1000 * cpc)/cps[0])
|
||||
# milliseconds per 10cm backwards
|
||||
# milliseconds per 1cm backwards
|
||||
time[1] = int((1000 * cpc)/cps[1])
|
||||
|
||||
cpr = 3.1415*38 # 1 rotation in cm
|
||||
|
522
selflocalization/camera.py
Normal file
522
selflocalization/camera.py
Normal file
@ -0,0 +1,522 @@
|
||||
import cv2 # Import the OpenCV library
|
||||
import numpy as np
|
||||
import time
|
||||
import sys
|
||||
import threading
|
||||
from selflocalization import framebuffer
|
||||
from pkg_resources import parse_version
|
||||
|
||||
|
||||
gstreamerCameraFound = False
|
||||
piCameraFound = False
|
||||
try:
|
||||
import picamera
|
||||
from picamera.array import PiRGBArray
|
||||
piCameraFound = True
|
||||
except ImportError:
|
||||
print("Camera.py: picamera module not available - using OpenCV interface instead")
|
||||
|
||||
def isRunningOnArlo():
|
||||
"""Return True if we are running on Arlo, otherwise False."""
|
||||
# TODO: Problematic that gstreamerCameraFound is first set after
|
||||
# instantiation of a Camera object
|
||||
return piCameraFound or gstreamerCameraFound
|
||||
|
||||
# Black magic in order to handle differences in namespace names in OpenCV 2.4 and 3.0
|
||||
OPCV3 = parse_version(cv2.__version__) >= parse_version('3')
|
||||
|
||||
def capPropId(prop):
|
||||
"""returns OpenCV VideoCapture property id given, e.g., "FPS
|
||||
This is needed because of differences in the Python interface in OpenCV 2.4 and 3.0
|
||||
"""
|
||||
return getattr(cv2 if OPCV3 else cv2.cv, ("" if OPCV3 else "CV_") + "CAP_PROP_" + prop)
|
||||
|
||||
|
||||
def gstreamer_pipeline(capture_width=1280, capture_height=720, framerate=30):
|
||||
"""Utility function for setting parameters for the gstreamer camera pipeline"""
|
||||
return (
|
||||
"libcamerasrc !"
|
||||
"videobox autocrop=true !"
|
||||
"video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
|
||||
"videoconvert ! "
|
||||
"appsink"
|
||||
% (
|
||||
capture_width,
|
||||
capture_height,
|
||||
framerate,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
|
||||
class CaptureThread(threading.Thread):
|
||||
"""Internal worker thread that captures frames from the camera"""
|
||||
|
||||
def __init__(self, cam, framebuffer):
|
||||
threading.Thread.__init__(self)
|
||||
self.cam = cam
|
||||
self.framebuffer = framebuffer
|
||||
self.terminateThreadEvent = threading.Event()
|
||||
|
||||
|
||||
def run(self):
|
||||
while not self.terminateThreadEvent.is_set():
|
||||
if piCameraFound:
|
||||
# Use piCamera
|
||||
|
||||
#self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
|
||||
#image = self.rawCapture.array
|
||||
|
||||
# clear the stream in preparation for the next frame
|
||||
#self.rawCapture.truncate(0)
|
||||
|
||||
if sys.version_info[0] > 2:
|
||||
# Python 3.x
|
||||
image = np.empty((self.cam.resolution[1], self.cam.resolution[0], 3), dtype=np.uint8)
|
||||
else:
|
||||
# Python 2.x
|
||||
image = np.empty((self.cam.resolution[1] * self.cam.resolution[0] * 3,), dtype=np.uint8)
|
||||
|
||||
self.cam.capture(image, format="bgr", use_video_port=True)
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
# Python 2.x
|
||||
image = image.reshape((self.cam.resolution[1], self.cam.resolution[0], 3))
|
||||
|
||||
else: # Use OpenCV
|
||||
retval, image = self.cam.read() # Read frame
|
||||
|
||||
if not retval: # Error
|
||||
print("CaptureThread: Could not read next frame")
|
||||
exit(-1)
|
||||
|
||||
# Update framebuffer
|
||||
self.framebuffer.new_frame(image)
|
||||
|
||||
|
||||
def stop(self):
|
||||
"""Terminate the worker thread"""
|
||||
self.terminateThreadEvent.set()
|
||||
|
||||
|
||||
class Camera(object):
|
||||
"""This class is responsible for doing the image processing. It detects known landmarks and
|
||||
measures distances and orientations to these."""
|
||||
|
||||
def __init__(self, camidx, robottype='arlo', useCaptureThread=False):
|
||||
"""Constructor:
|
||||
camidx - index of camera
|
||||
robottype - specify which robot you are using in order to use the correct camera calibration.
|
||||
Supported types: arlo, frindo, scribbler, macbookpro"""
|
||||
|
||||
print("robottype =", robottype)
|
||||
self.useCaptureThread = useCaptureThread
|
||||
|
||||
# Set camera calibration info
|
||||
if robottype == 'arlo':
|
||||
focal_length = 1050
|
||||
self.imageSize = (1024, 720)
|
||||
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
|
||||
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
|
||||
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
|
||||
self.intrinsic_matrix = np.asarray([focal_length, 0., self.imageSize[0] / 2.0, 0.,
|
||||
focal_length, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
|
||||
self.intrinsic_matrix.shape = (3, 3)
|
||||
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
|
||||
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
|
||||
elif robottype == 'frindo':
|
||||
self.imageSize = (640, 480)
|
||||
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
|
||||
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
|
||||
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
|
||||
self.intrinsic_matrix = np.asarray([500, 0., 3.0757300398967601e+02, 0.,
|
||||
500, 2.8935674612358201e+02, 0., 0., 1.], dtype = np.float64)
|
||||
self.intrinsic_matrix.shape = (3, 3)
|
||||
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
|
||||
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
|
||||
elif robottype == 'scribbler':
|
||||
# Unknown calibration - just using the Frindo calibration
|
||||
self.imageSize = (640, 480)
|
||||
self.intrinsic_matrix = np.asarray([7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
|
||||
7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1.], dtype = np.float64)
|
||||
self.intrinsic_matrix.shape = (3, 3)
|
||||
self.distortion_coeffs = np.asarray([1.1911006165076067e-01, -1.0003366233413549e+00,
|
||||
1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01], dtype = np.float64)
|
||||
elif robottype == 'macbookpro':
|
||||
self.imageSize = (1280, 720)
|
||||
#self.imageSize = (1080, 720)
|
||||
#self.intrinsic_matrix = np.asarray([ 8.6955302212233869e+02, 0., 5.2076864848745902e+02, 0.,
|
||||
# 8.7317664932843684e+02, 4.0331768178896669e+02, 0., 0., 1. ], dtype = np.float64)
|
||||
self.intrinsic_matrix = np.asarray([9.4328095162920715e+02, 0., self.imageSize[0] / 2.0, 0.,
|
||||
9.4946668595979077e+02, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
|
||||
self.intrinsic_matrix.shape = (3, 3)
|
||||
#self.distortion_coeffs = np.asarray([ -1.2844325433988565e-01, -1.3646926538980573e+00,
|
||||
# -5.7263071423202944e-03, 5.7422957803983802e-03, 5.9722099836744738e+00 ], dtype = np.float64)
|
||||
self.distortion_coeffs = np.asarray([0., 0., -1.6169374082976234e-02, 8.7657653170062459e-03, 0.], dtype = np.float64)
|
||||
else:
|
||||
print("Camera.__init__: Unknown robot type")
|
||||
exit(-1)
|
||||
|
||||
|
||||
# Open a camera device for capturing
|
||||
if piCameraFound:
|
||||
# piCamera is available so we use this
|
||||
#self.cam = picamera.PiCamera(camidx)
|
||||
self.cam = picamera.PiCamera(camera_num=camidx, resolution=self.imageSize, framerate=30)
|
||||
|
||||
if not self.useCaptureThread:
|
||||
self.rawCapture = PiRGBArray(self.cam, size=self.cam.resolution)
|
||||
|
||||
time.sleep(2) # wait for the camera
|
||||
|
||||
# Too slow code - instead just use cam.capture
|
||||
#self.capture_generator = self.cam.capture_continuous(self.rawCapture, format="bgr", use_video_port=True)
|
||||
|
||||
gain = self.cam.awb_gains
|
||||
self.cam.awb_mode='off'
|
||||
self.cam.awb_gains = gain
|
||||
|
||||
self.cam.shutter_speed = self.cam.exposure_speed
|
||||
self.cam.exposure_mode = 'off'
|
||||
|
||||
|
||||
print("shutter_speed = ", self.cam.shutter_speed)
|
||||
print("awb_gains = ", self.cam.awb_gains)
|
||||
|
||||
|
||||
print("Camera width = ", self.cam.resolution[0])
|
||||
print("Camera height = ", self.cam.resolution[1])
|
||||
print("Camera FPS = ", self.cam.framerate)
|
||||
|
||||
|
||||
else: # Use OpenCV interface
|
||||
|
||||
# We next try the gstreamer interface
|
||||
self.cam = cv2.VideoCapture(gstreamer_pipeline(
|
||||
capture_width=self.imageSize[0], capture_height=self.imageSize[1]),
|
||||
apiPreference=cv2.CAP_GSTREAMER)
|
||||
if not self.cam.isOpened(): # Did not work
|
||||
|
||||
# We try first the generic auto-detect interface
|
||||
self.cam = cv2.VideoCapture(camidx)
|
||||
if not self.cam.isOpened(): # Error
|
||||
print("Camera.__init__: Could not open camera")
|
||||
exit(-1)
|
||||
else:
|
||||
print("Camera.__init__: Using OpenCV with auto-detect interface")
|
||||
else:
|
||||
gstreamerCameraFound = True
|
||||
print("Camera.__init__: Using OpenCV with gstreamer")
|
||||
|
||||
time.sleep(1) # wait for camera
|
||||
|
||||
# Set camera properties
|
||||
self.cam.set(capPropId("FRAME_WIDTH"), self.imageSize[0])
|
||||
self.cam.set(capPropId("FRAME_HEIGHT"), self.imageSize[1])
|
||||
#self.cam.set(capPropId("BUFFERSIZE"), 1) # Does not work
|
||||
#self.cam.set(capPropId("FPS"), 15)
|
||||
self.cam.set(capPropId("FPS"), 30)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Get camera properties
|
||||
print("Camera width = ", int(self.cam.get(capPropId("FRAME_WIDTH"))))
|
||||
print("Camera height = ", int(self.cam.get(capPropId("FRAME_HEIGHT"))))
|
||||
print("Camera FPS = ", int(self.cam.get(capPropId("FPS"))))
|
||||
|
||||
# Initializing the camera distortion maps
|
||||
#self.mapx, self.mapy = cv2.initUndistortRectifyMap(self.intrinsic_matrix, self.distortion_coeffs, np.eye(3,3), np.eye(3,3), self.imageSize, cv2.CV_32FC1)
|
||||
# Not needed we use the cv2.undistort function instead
|
||||
|
||||
# Initialize chessboard pattern parameters
|
||||
self.patternFound = False
|
||||
self.patternSize = (3,4)
|
||||
self.patternUnit = 50.0 # mm (size of one checker square)
|
||||
self.corners = []
|
||||
|
||||
# Initialize aruco detector
|
||||
self.arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
|
||||
# Set the correct physical marker size here
|
||||
self.arucoMarkerLength = 0.15 # [m] actual size of aruco markers (in object coordinate system)
|
||||
|
||||
# Initialize worker thread and framebuffer
|
||||
if self.useCaptureThread:
|
||||
print("Using capture thread")
|
||||
self.framebuffer = framebuffer.FrameBuffer()
|
||||
self.capturethread = CaptureThread(self.cam, self.framebuffer)
|
||||
self.capturethread.start()
|
||||
time.sleep(0.75)
|
||||
|
||||
def __del__(self):
|
||||
if piCameraFound:
|
||||
self.cam.close()
|
||||
|
||||
def terminateCaptureThread(self):
|
||||
if self.useCaptureThread:
|
||||
self.capturethread.stop()
|
||||
self.capturethread.join()
|
||||
|
||||
def get_capture(self):
|
||||
"""Access to the internal camera object for advanced control of the camera."""
|
||||
return self.cam
|
||||
|
||||
def get_colour(self):
|
||||
"""OBSOLETE - use instead get_next_frame"""
|
||||
print("OBSOLETE get_colour - use instead get_next_frame")
|
||||
return self.get_next_frame()
|
||||
|
||||
def get_next_frame(self):
|
||||
"""Gets the next available image frame from the camera."""
|
||||
if self.useCaptureThread:
|
||||
img = self.framebuffer.get_frame()
|
||||
|
||||
if img is None:
|
||||
img = np.array((self.imageSize[0], self.imageSize[1], 3), dtype=np.uint8)
|
||||
|
||||
else:
|
||||
if piCameraFound:
|
||||
# Use piCamera
|
||||
|
||||
self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
|
||||
img = self.rawCapture.array
|
||||
|
||||
# clear the stream in preparation for the next frame
|
||||
self.rawCapture.truncate(0)
|
||||
|
||||
else: # Use OpenCV
|
||||
retval, img = self.cam.read() # Read frame
|
||||
|
||||
if not retval: # Error
|
||||
print("Camera.get_colour: Could not read next frame")
|
||||
exit(-1)
|
||||
|
||||
return img
|
||||
|
||||
|
||||
# ArUco object detector
|
||||
def detect_aruco_objects(self, img):
|
||||
"""Detect objects in the form of a binary ArUco code and return object IDs, distances (in cm) and
|
||||
angles (in radians) to detected ArUco codes. The angle is computed as the signed angle between
|
||||
translation vector to detected object projected onto the x-z plabe and the z-axis (pointing out
|
||||
of the camera). This corresponds to that the angle is measuring location along the horizontal x-axis.
|
||||
|
||||
If no object is detected, the returned variables are set to None."""
|
||||
self.aruco_corners, self.ids, rejectedImgPoints = cv2.aruco.detectMarkers(img, self.arucoDict)
|
||||
self.rvecs, self.tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(self.aruco_corners, self.arucoMarkerLength, self.intrinsic_matrix, self.distortion_coeffs)
|
||||
|
||||
|
||||
if not isinstance(self.ids, type(None)):
|
||||
dists = np.linalg.norm(self.tvecs, axis=len(self.tvecs.shape) - 1) * 100
|
||||
# Make sure we always return properly shaped arrays
|
||||
dists = dists.reshape((dists.shape[0],))
|
||||
ids = self.ids.reshape((self.ids.shape[0],))
|
||||
|
||||
# Compute angles
|
||||
angles = np.zeros(dists.shape, dtype=dists.dtype)
|
||||
for i in range(dists.shape[0]):
|
||||
tobj = self.tvecs[i] * 100 / dists[i]
|
||||
zaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
|
||||
zaxis[0,-1] = 1
|
||||
xaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
|
||||
xaxis[0,0] = 1
|
||||
|
||||
# We want the horizontal angle so project tobjt onto the x-z plane
|
||||
tobj_xz = tobj
|
||||
tobj_xz[0,1] = 0
|
||||
# Should the sign be clockwise or counter-clockwise (left or right)?
|
||||
# In this version it is positive to the left as seen from the camera.
|
||||
direction = -1*np.sign(tobj_xz[0,0]) # The same as np.sign(np.dot(tobj, xaxis.T))
|
||||
angles[i] = direction * np.arccos(np.dot(tobj_xz, zaxis.T))
|
||||
else:
|
||||
dists = None
|
||||
ids = None
|
||||
angles = None
|
||||
return ids, dists, angles
|
||||
|
||||
|
||||
def draw_aruco_objects(self, img):
|
||||
"""Draws detected objects and their orientations on the image given in img."""
|
||||
if not isinstance(self.ids, type(None)):
|
||||
outimg = cv2.aruco.drawDetectedMarkers(img, self.aruco_corners, self.ids)
|
||||
for i in range(self.ids.shape[0]):
|
||||
outimg = cv2.drawFrameAxes(outimg, self.intrinsic_matrix, self.distortion_coeffs,
|
||||
self.rvecs[i], self.tvecs[i], self.arucoMarkerLength)
|
||||
else:
|
||||
outimg = img
|
||||
|
||||
return outimg
|
||||
|
||||
# Chessboard object detector
|
||||
def get_object(self, img):
|
||||
"""Detect object and return object type, distance (in cm), angle (in radians) and
|
||||
colour probability table in the order (R,G,B)"""
|
||||
objectType = 'none'
|
||||
colourProb = np.ones((3,)) / 3.0
|
||||
distance = 0.0
|
||||
angle = 0.0
|
||||
self.patternFound = False
|
||||
|
||||
#patternFound, corners = self.get_corners(img)
|
||||
self.get_corners(img)
|
||||
|
||||
if (self.patternFound):
|
||||
|
||||
# Determine if the object is horizontal or vertical
|
||||
delta_x = abs(self.corners[0, 0, 0] - self.corners[2, 0, 0])
|
||||
delta_y = abs(self.corners[0, 0, 1] - self.corners[2, 0, 1])
|
||||
horizontal = (delta_y > delta_x)
|
||||
if (horizontal):
|
||||
objectType = 'horizontal'
|
||||
else:
|
||||
objectType = 'vertical'
|
||||
|
||||
# Compute distances and angles
|
||||
if (horizontal):
|
||||
height = ((abs (self.corners[0, 0, 1] - self.corners[2, 0, 1]) +
|
||||
abs (self.corners[9, 0, 1] - self.corners[11, 0, 1])) / 2.0)
|
||||
|
||||
patternHeight = (self.patternSize[0]-1.0) * self.patternUnit
|
||||
|
||||
else:
|
||||
height = (abs (self.corners[0, 0, 1] - self.corners[9, 0, 1]) +
|
||||
abs (self.corners[2, 0, 1] - self.corners[11, 0, 1])) / 2.0
|
||||
|
||||
patternHeight = (self.patternSize[1]-1.0) * self.patternUnit
|
||||
|
||||
#distance = 1.0 / (0.0001 * height);
|
||||
distance = self.intrinsic_matrix[1, 1] * patternHeight / (height * 10.0)
|
||||
|
||||
center = (self.corners[0, 0, 0] + self.corners[2, 0, 0] +
|
||||
self.corners[9, 0, 0] + self.corners[11, 0, 0]) / 4.0
|
||||
|
||||
#angle = 0.0018 * center - 0.6425;
|
||||
#angle *= -1.0;
|
||||
angle = -np.arctan2(center - self.intrinsic_matrix[0, 2], self.intrinsic_matrix[0, 0])
|
||||
|
||||
|
||||
#### Classify object by colour
|
||||
|
||||
# Extract rectangle corners
|
||||
points = np.array(
|
||||
[
|
||||
self.corners[0],
|
||||
self.corners[2],
|
||||
self.corners[9],
|
||||
self.corners[11]
|
||||
]
|
||||
)
|
||||
|
||||
points.shape = (4, 2)
|
||||
points = np.int32(points)
|
||||
|
||||
# Compute region of interest
|
||||
mask = np.zeros((self.imageSize[1], self.imageSize[0]), dtype = np.uint8)
|
||||
#cv2.fillConvexPoly (mask, points, (255, 255, 255))
|
||||
cv2.fillConvexPoly (mask, points, 255)
|
||||
|
||||
# Compute mean colour inside region of interest
|
||||
mean_colour = cv2.mean(img, mask) # There is a bug here in Python 3 - it produces a segfault
|
||||
|
||||
red = mean_colour[2]
|
||||
green = mean_colour[1]
|
||||
blue = mean_colour[0]
|
||||
sum = red + green + blue
|
||||
|
||||
colourProb[0] = red / sum
|
||||
colourProb[1] = green / sum
|
||||
colourProb[2] = blue / sum
|
||||
|
||||
|
||||
return objectType, distance, angle, colourProb
|
||||
|
||||
|
||||
def get_corners(self, img):
|
||||
"""Detect corners - this is an auxillary method and should not be used directly"""
|
||||
|
||||
# Convert to gray scale
|
||||
gray = cv2.cvtColor( img, cv2.COLOR_BGR2GRAY )
|
||||
loggray = cv2.log(gray + 1.0)
|
||||
cv2.normalize(loggray,loggray,0,255,cv2.NORM_MINMAX)
|
||||
gray = cv2.convertScaleAbs(loggray)
|
||||
|
||||
#retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_FAST_CHECK)
|
||||
retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_FAST_CHECK)
|
||||
if (retval > 0):
|
||||
self.patternFound = True
|
||||
#cv2.cornerSubPix(gray, self.corners, (5,5), (-1,-1), (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 3, 0.0))
|
||||
|
||||
return self.patternFound, self.corners
|
||||
|
||||
|
||||
def draw_object(self, img):
|
||||
"""Draw the object if found into img"""
|
||||
cv2.drawChessboardCorners(img, self.patternSize, self.corners, self.patternFound)
|
||||
|
||||
|
||||
|
||||
if (__name__=='__main__'):
|
||||
print("Opening and initializing camera")
|
||||
|
||||
#cam = Camera(0, 'macbookpro', useCaptureThread=True)
|
||||
#cam = Camera(0, 'macbookpro', useCaptureThread = False)
|
||||
cam = Camera(0, robottype='arlo', useCaptureThread=True)
|
||||
|
||||
# Open a window
|
||||
WIN_RF1 = "Camera view"
|
||||
cv2.namedWindow(WIN_RF1)
|
||||
cv2.moveWindow(WIN_RF1, 50, 50)
|
||||
|
||||
#WIN_RF3 = "Camera view - gray"
|
||||
#cv2.namedWindow(WIN_RF3)
|
||||
#cv2.moveWindow(WIN_RF3, 550, 50)
|
||||
|
||||
while True:
|
||||
|
||||
action = cv2.waitKey(10)
|
||||
|
||||
if action == ord('q'): # Quit
|
||||
break
|
||||
|
||||
# Fetch next frame
|
||||
#colour = cam.get_colour()
|
||||
colour = cam.get_next_frame()
|
||||
|
||||
# Convert to gray scale
|
||||
#gray = cv2.cvtColor(colour, cv2.COLOR_BGR2GRAY )
|
||||
#loggray = cv2.log(gray + 1.0)
|
||||
#cv2.normalize(loggray, loggray, 0, 255, cv2.NORM_MINMAX)
|
||||
#gray = cv2.convertScaleAbs(loggray)
|
||||
|
||||
# Detect objects
|
||||
#objectType, distance, angle, colourProb = cam.get_object(colour)
|
||||
#if objectType != 'none':
|
||||
# print("Object type = ", objectType, ", distance = ", distance, ", angle = ", angle, ", colourProb = ", colourProb)
|
||||
|
||||
# Draw detected pattern
|
||||
#cam.draw_object(colour)
|
||||
|
||||
IDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||
if not isinstance(IDs, type(None)):
|
||||
for i in range(len(IDs)):
|
||||
print("Object ID = ", IDs[i], ", Distance = ", dists[i], ", angles = ", angles[i])
|
||||
|
||||
# Draw detected objects
|
||||
cam.draw_aruco_objects(colour)
|
||||
|
||||
|
||||
# Show frames
|
||||
cv2.imshow(WIN_RF1, colour)
|
||||
|
||||
# Show frames
|
||||
#cv2.imshow(WIN_RF3, gray)
|
||||
|
||||
|
||||
# Close all windows
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
# Clean-up capture thread
|
||||
cam.terminateCaptureThread()
|
33
selflocalization/framebuffer.py
Normal file
33
selflocalization/framebuffer.py
Normal file
@ -0,0 +1,33 @@
|
||||
# import copy
|
||||
import threading
|
||||
|
||||
|
||||
class FrameBuffer(object):
|
||||
"""This class represents a framebuffer with a front and back buffer storing frames.
|
||||
Access to the class is thread safe (controlled via an internal lock)."""
|
||||
|
||||
def __init__(self):
|
||||
self.frameBuffer = [None, None] # Initialize the framebuffer with None references
|
||||
self.currentBufferIndex = 0
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def get_frame(self):
|
||||
"""Return latest frame from the framebuffer"""
|
||||
|
||||
# Obtain lock and release it when done
|
||||
with self.lock:
|
||||
if self.frameBuffer[self.currentBufferIndex] is not None:
|
||||
# return copy.deepcopy(self.frameBuffer[self.currentBufferIndex])
|
||||
return self.frameBuffer[self.currentBufferIndex]
|
||||
else:
|
||||
return None
|
||||
|
||||
def new_frame(self, frame):
|
||||
"""Add a new frame to the frame buffer"""
|
||||
# self.frameBuffer[int(not self.currentBufferIndex)] = copy.deepcopy(frame)
|
||||
self.frameBuffer[int(not self.currentBufferIndex)] = frame
|
||||
|
||||
# Obtain lock and release it when done
|
||||
with self.lock:
|
||||
# Switch buffer index
|
||||
self.currentBufferIndex = int(not self.currentBufferIndex)
|
45
selflocalization/makelandmark.py
Normal file
45
selflocalization/makelandmark.py
Normal file
@ -0,0 +1,45 @@
|
||||
import cv2 # Import the OpenCV library
|
||||
import numpy as np
|
||||
|
||||
|
||||
# TODO make this configurable from the command line using parser
|
||||
markerID = 10 # Try 1 - 4
|
||||
|
||||
# Define some relevant constants
|
||||
dpi = 72 # dots (pixels) per inch [inch^(-1)]
|
||||
inch2mm = 25.4 # [mm / inch]
|
||||
dpmm = dpi / inch2mm # Dots per mm [mm^(-1)]
|
||||
|
||||
# Define size of output image to fit A4 paper
|
||||
a4width = 210.0 # [mm]
|
||||
a4height = 297.0 # [mm]
|
||||
|
||||
# Size of output image
|
||||
width = np.uint(np.round(a4width * dpmm)) # [pixels]
|
||||
height = np.uint(np.round(a4height * dpmm)) # [pixels]
|
||||
|
||||
# Size of ArUco marker
|
||||
markerPhysicalSize = 150 # [mm]
|
||||
markerSize = np.uint(np.round(markerPhysicalSize * dpmm)) # [pixels]
|
||||
|
||||
# Create landmark image (all white gray scale image)
|
||||
#landmarkImage = np.ones((width, height), dtype=np.uint8) * np.uint8(255)
|
||||
landmarkImage = np.ones((height, width), dtype=np.uint8) * np.uint8(255)
|
||||
|
||||
# Initialize the ArUco dictionary
|
||||
arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
|
||||
|
||||
# Draw marker
|
||||
startWidth = int(np.round((width-markerSize)/2))
|
||||
startHeight = int(np.round((height-markerSize)/2))
|
||||
landmarkImage[startHeight:int(startHeight+markerSize), startWidth:int(startWidth+markerSize)] = cv2.aruco.drawMarker(arucoDict, markerID, markerSize, 1)
|
||||
cv2.putText(landmarkImage, str(markerID), (startWidth, startHeight - 60), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0,0,0), 2)
|
||||
|
||||
|
||||
# Save image
|
||||
cv2.imwrite("../../../data/landmark" + str(markerID) + ".png", landmarkImage)
|
||||
|
||||
# Show image
|
||||
cv2.namedWindow("Landmark")
|
||||
cv2.imshow("Landmark", landmarkImage)
|
||||
cv2.waitKey()
|
99
selflocalization/particle.py
Normal file
99
selflocalization/particle.py
Normal file
@ -0,0 +1,99 @@
|
||||
import random
|
||||
import numpy as np
|
||||
from selflocalization import random_numbers as rn
|
||||
|
||||
class Particle(object):
|
||||
"""Data structure for storing particle information (state and weight)"""
|
||||
def __init__(self, x=0.0, y=0.0, theta=0.0, weight=0.0):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.theta = np.mod(theta, 2.0*np.pi)
|
||||
self.weight = weight
|
||||
|
||||
def getX(self):
|
||||
return self.x
|
||||
|
||||
def getY(self):
|
||||
return self.y
|
||||
|
||||
def getTheta(self):
|
||||
return self.theta
|
||||
|
||||
def getWeight(self):
|
||||
return self.weight
|
||||
|
||||
def setX(self, val):
|
||||
self.x = val
|
||||
|
||||
def setY(self, val):
|
||||
self.y = val
|
||||
|
||||
def setTheta(self, val):
|
||||
self.theta = np.mod(val, 2.0*np.pi)
|
||||
|
||||
def setWeight(self, val):
|
||||
self.weight = val
|
||||
|
||||
def copy(self):
|
||||
return Particle(self.x, self.y, self.theta, self.weight)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"({self.x},{self.y},{self.theta})"
|
||||
|
||||
|
||||
def estimate_pose(particles_list):
|
||||
"""Estimate the pose from particles by computing the average position and orientation over all particles.
|
||||
This is not done using the particle weights, but just the sample distribution."""
|
||||
x_values = [i.getX() for i in particles_list]
|
||||
y_values = [i.getY() for i in particles_list]
|
||||
cos_values = [np.cos(i.getTheta()) for i in particles_list]
|
||||
sin_values = [np.sin(i.getTheta()) for i in particles_list]
|
||||
|
||||
x_sum = sum(x_values)
|
||||
y_sum = sum(y_values)
|
||||
cos_sum = sum(cos_values)
|
||||
sin_sum = sum(sin_values)
|
||||
|
||||
flen = len(particles_list)
|
||||
if flen != 0:
|
||||
x = x_sum / flen
|
||||
y = y_sum / flen
|
||||
theta = np.arctan2(sin_sum/flen, cos_sum/flen)
|
||||
else:
|
||||
x = x_sum
|
||||
y = y_sum
|
||||
theta = 0.0
|
||||
return Particle(x, y, theta)
|
||||
|
||||
|
||||
def move_particle(particle, delta_x, delta_y, delta_theta):
|
||||
"""Move the particle by (delta_x, delta_y, delta_theta)"""
|
||||
particle.setX(particle.getX() + delta_x)
|
||||
particle.setY(particle.getY() + delta_y)
|
||||
particle.setTheta(particle.getTheta() + delta_theta)
|
||||
|
||||
|
||||
def add_uncertainty(particles_list, sigma, sigma_theta):
|
||||
"""Add some noise to each particle in the list. Sigma and sigma_theta is the noise
|
||||
variances for position and angle noise."""
|
||||
for particle in particles_list:
|
||||
particle.x += rn.randn(0.0, sigma)
|
||||
particle.y += rn.randn(0.0, sigma)
|
||||
particle.theta = np.mod(particle.theta + rn.randn(0.0, sigma_theta), 2.0 * np.pi)
|
||||
|
||||
|
||||
def add_uncertainty_von_mises(particles_list, sigma, theta_kappa):
|
||||
"""Add some noise to each particle in the list. Sigma and theta_kappa is the noise
|
||||
variances for position and angle noise."""
|
||||
for particle in particles_list:
|
||||
particle.x += rn.randn(0.0, sigma)
|
||||
particle.y += rn.randn(0.0, sigma)
|
||||
particle.theta = np.mod(rn.rand_von_mises(particle.theta, theta_kappa), 2.0 * np.pi) - np.pi
|
||||
|
||||
def resample(particle_list):
|
||||
weights = [p.weight for p in particle_list]
|
||||
if sum(weights) == 0:
|
||||
weights = [1/len(particle_list) for _ in particle_list]
|
||||
new_particles = random.choices(particle_list, weights, k=len(particle_list))
|
||||
particle_list = [p.copy() for p in new_particles]
|
||||
return particle_list
|
54
selflocalization/random_numbers.py
Normal file
54
selflocalization/random_numbers.py
Normal file
@ -0,0 +1,54 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def randn(mu, sigma):
|
||||
"""Normal random number generator
|
||||
mean mu, standard deviation sigma"""
|
||||
return sigma * np.random.randn() + mu
|
||||
|
||||
|
||||
def rand_von_mises(mu, kappa):
|
||||
"""Generate random samples from the Von Mises distribution"""
|
||||
if kappa < 1e-6:
|
||||
# kappa is small: sample uniformly on circle
|
||||
theta = 2.0 * np.pi * np.random.ranf() - np.pi
|
||||
else:
|
||||
a = 1.0 + np.sqrt(1.0 + 4.0 * kappa * kappa)
|
||||
b = (a - np.sqrt(2.0 * a)) / (2.0 * kappa)
|
||||
r = (1.0 + b*b) / (2.0 * b)
|
||||
|
||||
not_done = True
|
||||
while not_done:
|
||||
u1 = np.random.ranf()
|
||||
u2 = np.random.ranf()
|
||||
|
||||
z = np.cos(np.pi * u1)
|
||||
f = (1.0 + r * z) / (r + z)
|
||||
c = kappa * (r - f)
|
||||
|
||||
not_done = (u2 >= c * (2.0 - c)) and (np.log(c) - np.log(u2) + 1.0 - c < 0.0)
|
||||
|
||||
u3 = np.random.ranf()
|
||||
theta = mu + np.sign(u3 - 0.5) * np.arccos(f)
|
||||
|
||||
return theta
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Tests
|
||||
|
||||
print("Gaussian distribution:")
|
||||
r = np.zeros(1000)
|
||||
for i in range(1000):
|
||||
r[i] = randn(1.0, 2.0)
|
||||
|
||||
print("True mean 1.0 == Estimated mean ", np.mean(r))
|
||||
print("True std 2.0 == Estimated std ", np.std(r))
|
||||
|
||||
print("Von Mises distribution:")
|
||||
|
||||
t = np.zeros(1000)
|
||||
for i in range(1000):
|
||||
t[i] = rand_von_mises(1.0, 8)
|
||||
|
||||
print("True mean 1.0 == Estimated mean ", np.mean(t))
|
292
selflocalization/selflocalize.py
Normal file
292
selflocalization/selflocalize.py
Normal file
@ -0,0 +1,292 @@
|
||||
import cv2
|
||||
import particle as particle
|
||||
import camera as camera
|
||||
import numpy as np
|
||||
from time import sleep
|
||||
from timeit import default_timer as timer
|
||||
import sys
|
||||
|
||||
|
||||
# Flags
|
||||
showGUI = False # Whether or not to open GUI windows
|
||||
onRobot = True # Whether or not we are running on the Arlo robot
|
||||
|
||||
|
||||
def isRunningOnArlo():
|
||||
"""Return True if we are running on Arlo, otherwise False.
|
||||
You can use this flag to switch the code from running on you laptop to Arlo - you need to do the programming here!
|
||||
"""
|
||||
return onRobot
|
||||
|
||||
|
||||
if isRunningOnArlo():
|
||||
sys.path.append("..")
|
||||
|
||||
|
||||
try:
|
||||
import robot
|
||||
onRobot = True
|
||||
except ImportError:
|
||||
print("selflocalize.py: robot module not present - forcing not running on Arlo!")
|
||||
onRobot = False
|
||||
|
||||
|
||||
POWER = 70
|
||||
|
||||
TURN_T = 7.9 # 1 degree
|
||||
DRIVE_T = 22 # 1 centimeter
|
||||
|
||||
RIGHT_WHEEL_OFFSET = 4
|
||||
|
||||
CLOCKWISE_OFFSET = 0.96
|
||||
|
||||
|
||||
# Some color constants in BGR format
|
||||
CRED = (0, 0, 255)
|
||||
CGREEN = (0, 255, 0)
|
||||
CBLUE = (255, 0, 0)
|
||||
CCYAN = (255, 255, 0)
|
||||
CYELLOW = (0, 255, 255)
|
||||
CMAGENTA = (255, 0, 255)
|
||||
CWHITE = (255, 255, 255)
|
||||
CBLACK = (0, 0, 0)
|
||||
|
||||
# Landmarks.
|
||||
# The robot knows the position of 2 landmarks. Their coordinates are in the unit centimeters [cm].
|
||||
landmarkIDs = [1, 5]
|
||||
landmarks = {
|
||||
1: (0.0, 0.0), # Coordinates for landmark 1
|
||||
5: (300.0, 0.0) # Coordinates for landmark 2
|
||||
}
|
||||
landmark_colors = [CRED, CGREEN] # Colors used when drawing the landmarks
|
||||
|
||||
|
||||
SIGMA = 3
|
||||
SIGMA_THETA = 0.3
|
||||
|
||||
|
||||
def normal(x, mean, std):
|
||||
return (
|
||||
(np.exp(-(((x - mean)**2)/(2 * std**2))))/(np.sqrt(2 * np.pi) * std)
|
||||
)
|
||||
|
||||
|
||||
def jet(x):
|
||||
"""Colour map for drawing particles. This function determines the colour of
|
||||
a particle from its weight."""
|
||||
r = (x >= 3.0/8.0 and x < 5.0/8.0) * (4.0 * x - 3.0/2.0) + (x >= 5.0/8.0 and x < 7.0/8.0) + (x >= 7.0/8.0) * (-4.0 * x + 9.0/2.0)
|
||||
g = (x >= 1.0/8.0 and x < 3.0/8.0) * (4.0 * x - 1.0/2.0) + (x >= 3.0/8.0 and x < 5.0/8.0) + (x >= 5.0/8.0 and x < 7.0/8.0) * (-4.0 * x + 7.0/2.0)
|
||||
b = (x < 1.0/8.0) * (4.0 * x + 1.0/2.0) + (x >= 1.0/8.0 and x < 3.0/8.0) + (x >= 3.0/8.0 and x < 5.0/8.0) * (-4.0 * x + 5.0/2.0)
|
||||
|
||||
return (255.0*r, 255.0*g, 255.0*b)
|
||||
|
||||
def draw_world(est_pose, particles, world):
|
||||
"""Visualization.
|
||||
This functions draws robots position in the world coordinate system."""
|
||||
|
||||
# Fix the origin of the coordinate system
|
||||
offsetX = 100
|
||||
offsetY = 250
|
||||
|
||||
# Constant needed for transforming from world coordinates to screen coordinates (flip the y-axis)
|
||||
ymax = world.shape[0]
|
||||
|
||||
world[:] = CWHITE # Clear background to white
|
||||
|
||||
# Find largest weight
|
||||
max_weight = 0
|
||||
for particle in particles:
|
||||
max_weight = max(max_weight, particle.getWeight())
|
||||
|
||||
# Draw particles
|
||||
for particle in particles:
|
||||
x = int(particle.getX() + offsetX)
|
||||
y = ymax - (int(particle.getY() + offsetY))
|
||||
colour = jet(particle.getWeight() / max_weight)
|
||||
cv2.circle(world, (x,y), 2, colour, 2)
|
||||
b = (int(particle.getX() + 15.0*np.cos(particle.getTheta()))+offsetX,
|
||||
ymax - (int(particle.getY() + 15.0*np.sin(particle.getTheta()))+offsetY))
|
||||
cv2.line(world, (x,y), b, colour, 2)
|
||||
|
||||
# Draw landmarks
|
||||
for i in range(len(landmarkIDs)):
|
||||
ID = landmarkIDs[i]
|
||||
lm = (int(landmarks[ID][0] + offsetX), int(ymax - (landmarks[ID][1] + offsetY)))
|
||||
cv2.circle(world, lm, 5, landmark_colors[i], 2)
|
||||
|
||||
# Draw estimated robot pose
|
||||
a = (int(est_pose.getX())+offsetX, ymax-(int(est_pose.getY())+offsetY))
|
||||
b = (int(est_pose.getX() + 15.0*np.cos(est_pose.getTheta()))+offsetX,
|
||||
ymax-(int(est_pose.getY() + 15.0*np.sin(est_pose.getTheta()))+offsetY))
|
||||
cv2.circle(world, a, 5, CMAGENTA, 2)
|
||||
cv2.line(world, a, b, CMAGENTA, 2)
|
||||
|
||||
|
||||
|
||||
def initialize_particles(num_particles):
|
||||
particles = []
|
||||
for i in range(num_particles):
|
||||
# Random starting points.
|
||||
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf() - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
|
||||
particles.append(p)
|
||||
|
||||
return particles
|
||||
|
||||
def dist(particle, landmark):
|
||||
return np.sqrt((landmark[0]-particle.x)**2+(landmark[1]-particle.y)**2)
|
||||
|
||||
def calc_angle(particle, landmark, dist):
|
||||
e_theta = np.array([np.cos(particle.theta), np.sin(particle.theta)]).T
|
||||
e_landmark = (np.array([landmark[0]-particle.x, landmark[1]-particle.y]).T)/dist
|
||||
e_hat_theta = np.array([-np.sin(particle.theta), np.cos(particle.theta)]).T
|
||||
return np.sign(np.dot(e_landmark, e_hat_theta)) * np.arccos(np.dot(e_landmark, e_theta))
|
||||
|
||||
def calc_weight(particle, landmark_values):
|
||||
if landmark_values == []:
|
||||
return
|
||||
weights = []
|
||||
for values in landmark_values:
|
||||
dist_to_landmark = dist(particle, landmarks[values[0]])
|
||||
dist_weight = normal(values[1], dist_to_landmark, SIGMA)
|
||||
|
||||
angle_to_landmark = calc_angle(particle, landmarks[values[0]], dist_to_landmark)
|
||||
angle_weight = normal(values[2], angle_to_landmark, SIGMA_THETA)
|
||||
weights.append(dist_weight * angle_weight)
|
||||
|
||||
particle.setWeight(np.product(weights))
|
||||
|
||||
def drive_towards_middle(est_pose, arlo):
|
||||
middle = np.array([150, 0])
|
||||
|
||||
position = np.array([est_pose.x, est_pose.y])
|
||||
theta = est_pose.theta
|
||||
print(np.rad2deg(theta))
|
||||
|
||||
relative_pos = middle - position
|
||||
print("relative_pos", relative_pos)
|
||||
angle = np.rad2deg(np.arctan(relative_pos[1]/relative_pos[0]))
|
||||
print(angle)
|
||||
turn_angle = np.mod(angle - (np.rad2deg(theta) - 180), 360)
|
||||
print(turn_angle)
|
||||
drive_distance = np.sqrt(position[0]**2 + position[1]**2)
|
||||
if turn_angle < 180:
|
||||
arlo.go_diff(POWER, POWER, 0, 1)
|
||||
sleep((abs(turn_angle) * TURN_T)/1000)
|
||||
arlo.stop()
|
||||
else:
|
||||
arlo.go_diff(POWER, POWER, 1, 0)
|
||||
sleep((abs(360 - turn_angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
||||
arlo.stop()
|
||||
|
||||
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
||||
sleep((drive_distance * DRIVE_T)/1000)
|
||||
arlo.stop()
|
||||
|
||||
# Main program #
|
||||
try:
|
||||
if showGUI:
|
||||
# Open windows
|
||||
WIN_RF1 = "Robot view"
|
||||
cv2.namedWindow(WIN_RF1)
|
||||
cv2.moveWindow(WIN_RF1, 50, 50)
|
||||
|
||||
WIN_World = "World view"
|
||||
cv2.namedWindow(WIN_World)
|
||||
cv2.moveWindow(WIN_World, 500, 50)
|
||||
|
||||
|
||||
# Initialize particles
|
||||
num_particles = 10000
|
||||
particles = initialize_particles(num_particles)
|
||||
|
||||
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
||||
|
||||
# Driving parameters
|
||||
velocity = 0.0 # cm/sec
|
||||
angular_velocity = 0.0 # radians/sec
|
||||
|
||||
arlo = robot.Robot()
|
||||
|
||||
# Allocate space for world map
|
||||
world = np.zeros((500,500,3), dtype=np.uint8)
|
||||
|
||||
# Draw map
|
||||
draw_world(est_pose, particles, world)
|
||||
|
||||
print("Opening and initializing camera")
|
||||
if camera.isRunningOnArlo():
|
||||
cam = camera.Camera(0, 'arlo', useCaptureThread = True)
|
||||
else:
|
||||
cam = camera.Camera(0, 'macbookpro', useCaptureThread = True)
|
||||
|
||||
|
||||
while True:
|
||||
|
||||
# Fetch next frame
|
||||
colour = cam.get_next_frame()
|
||||
landmark_values = []
|
||||
|
||||
# Detect objects
|
||||
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
|
||||
if not isinstance(objectIDs, type(None)):
|
||||
|
||||
# List detected objects
|
||||
for i in range(len(objectIDs)):
|
||||
print("Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
|
||||
if objectIDs[i] in landmarkIDs:
|
||||
landmark_values.append((objectIDs[i], dists[i], angles[i]))
|
||||
|
||||
# Compute particle weights
|
||||
for p in particles:
|
||||
calc_weight(p, landmark_values)
|
||||
|
||||
# Resampling
|
||||
particles = particle.resample(particles)
|
||||
|
||||
# Draw detected objects
|
||||
cam.draw_aruco_objects(colour)
|
||||
|
||||
else:
|
||||
# No observation - reset weights to uniform distribution
|
||||
for p in particles:
|
||||
p.setWeight(1.0/num_particles)
|
||||
|
||||
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
||||
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
||||
calc_weight(est_pose, landmark_values)
|
||||
est_weight = est_pose.weight * 10**5
|
||||
print("est_weight:", est_weight)
|
||||
if est_weight > 1:
|
||||
draw_world(est_pose, particles, world)
|
||||
cv2.imwrite("test.png", world)
|
||||
break
|
||||
# else:
|
||||
# arlo.go_diff(POWER, POWER, 0, 1)
|
||||
# sleep((15 * TURN_T)/1000)
|
||||
# arlo.stop()
|
||||
# for p in particles:
|
||||
# p.setTheta(p.theta + np.deg2rad(15))
|
||||
|
||||
if showGUI:
|
||||
# Draw map
|
||||
draw_world(est_pose, particles, world)
|
||||
|
||||
# Show frame
|
||||
cv2.imshow(WIN_RF1, colour)
|
||||
|
||||
# Show world
|
||||
cv2.imshow(WIN_World, world)
|
||||
|
||||
print(est_pose.x, est_pose.y, est_pose.theta)
|
||||
drive_towards_middle(est_pose, arlo)
|
||||
|
||||
|
||||
finally:
|
||||
# Make sure to clean up even if an exception occurred
|
||||
|
||||
# Close all windows
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
# Clean-up capture thread
|
||||
cam.terminateCaptureThread()
|
||||
|
Reference in New Issue
Block a user