293 lines
8.9 KiB
Python
293 lines
8.9 KiB
Python
import cv2
|
|
import particle as particle
|
|
import camera as camera
|
|
import numpy as np
|
|
from time import sleep
|
|
from timeit import default_timer as timer
|
|
import sys
|
|
|
|
|
|
# Flags
|
|
showGUI = False # Whether or not to open GUI windows
|
|
onRobot = True # Whether or not we are running on the Arlo robot
|
|
|
|
|
|
def isRunningOnArlo():
|
|
"""Return True if we are running on Arlo, otherwise False.
|
|
You can use this flag to switch the code from running on you laptop to Arlo - you need to do the programming here!
|
|
"""
|
|
return onRobot
|
|
|
|
|
|
if isRunningOnArlo():
|
|
sys.path.append("..")
|
|
|
|
|
|
try:
|
|
import robot
|
|
onRobot = True
|
|
except ImportError:
|
|
print("selflocalize.py: robot module not present - forcing not running on Arlo!")
|
|
onRobot = False
|
|
|
|
|
|
POWER = 70
|
|
|
|
TURN_T = 7.9 # 1 degree
|
|
DRIVE_T = 22 # 1 centimeter
|
|
|
|
RIGHT_WHEEL_OFFSET = 4
|
|
|
|
CLOCKWISE_OFFSET = 0.96
|
|
|
|
|
|
# Some color constants in BGR format
|
|
CRED = (0, 0, 255)
|
|
CGREEN = (0, 255, 0)
|
|
CBLUE = (255, 0, 0)
|
|
CCYAN = (255, 255, 0)
|
|
CYELLOW = (0, 255, 255)
|
|
CMAGENTA = (255, 0, 255)
|
|
CWHITE = (255, 255, 255)
|
|
CBLACK = (0, 0, 0)
|
|
|
|
# Landmarks.
|
|
# The robot knows the position of 2 landmarks. Their coordinates are in the unit centimeters [cm].
|
|
landmarkIDs = [1, 5]
|
|
landmarks = {
|
|
1: (0.0, 0.0), # Coordinates for landmark 1
|
|
5: (300.0, 0.0) # Coordinates for landmark 2
|
|
}
|
|
landmark_colors = [CRED, CGREEN] # Colors used when drawing the landmarks
|
|
|
|
|
|
SIGMA = 3
|
|
SIGMA_THETA = 0.3
|
|
|
|
|
|
def normal(x, mean, std):
|
|
return (
|
|
(np.exp(-(((x - mean)**2)/(2 * std**2))))/(np.sqrt(2 * np.pi) * std)
|
|
)
|
|
|
|
|
|
def jet(x):
|
|
"""Colour map for drawing particles. This function determines the colour of
|
|
a particle from its weight."""
|
|
r = (x >= 3.0/8.0 and x < 5.0/8.0) * (4.0 * x - 3.0/2.0) + (x >= 5.0/8.0 and x < 7.0/8.0) + (x >= 7.0/8.0) * (-4.0 * x + 9.0/2.0)
|
|
g = (x >= 1.0/8.0 and x < 3.0/8.0) * (4.0 * x - 1.0/2.0) + (x >= 3.0/8.0 and x < 5.0/8.0) + (x >= 5.0/8.0 and x < 7.0/8.0) * (-4.0 * x + 7.0/2.0)
|
|
b = (x < 1.0/8.0) * (4.0 * x + 1.0/2.0) + (x >= 1.0/8.0 and x < 3.0/8.0) + (x >= 3.0/8.0 and x < 5.0/8.0) * (-4.0 * x + 5.0/2.0)
|
|
|
|
return (255.0*r, 255.0*g, 255.0*b)
|
|
|
|
def draw_world(est_pose, particles, world):
|
|
"""Visualization.
|
|
This functions draws robots position in the world coordinate system."""
|
|
|
|
# Fix the origin of the coordinate system
|
|
offsetX = 100
|
|
offsetY = 250
|
|
|
|
# Constant needed for transforming from world coordinates to screen coordinates (flip the y-axis)
|
|
ymax = world.shape[0]
|
|
|
|
world[:] = CWHITE # Clear background to white
|
|
|
|
# Find largest weight
|
|
max_weight = 0
|
|
for particle in particles:
|
|
max_weight = max(max_weight, particle.getWeight())
|
|
|
|
# Draw particles
|
|
for particle in particles:
|
|
x = int(particle.getX() + offsetX)
|
|
y = ymax - (int(particle.getY() + offsetY))
|
|
colour = jet(particle.getWeight() / max_weight)
|
|
cv2.circle(world, (x,y), 2, colour, 2)
|
|
b = (int(particle.getX() + 15.0*np.cos(particle.getTheta()))+offsetX,
|
|
ymax - (int(particle.getY() + 15.0*np.sin(particle.getTheta()))+offsetY))
|
|
cv2.line(world, (x,y), b, colour, 2)
|
|
|
|
# Draw landmarks
|
|
for i in range(len(landmarkIDs)):
|
|
ID = landmarkIDs[i]
|
|
lm = (int(landmarks[ID][0] + offsetX), int(ymax - (landmarks[ID][1] + offsetY)))
|
|
cv2.circle(world, lm, 5, landmark_colors[i], 2)
|
|
|
|
# Draw estimated robot pose
|
|
a = (int(est_pose.getX())+offsetX, ymax-(int(est_pose.getY())+offsetY))
|
|
b = (int(est_pose.getX() + 15.0*np.cos(est_pose.getTheta()))+offsetX,
|
|
ymax-(int(est_pose.getY() + 15.0*np.sin(est_pose.getTheta()))+offsetY))
|
|
cv2.circle(world, a, 5, CMAGENTA, 2)
|
|
cv2.line(world, a, b, CMAGENTA, 2)
|
|
|
|
|
|
|
|
def initialize_particles(num_particles):
|
|
particles = []
|
|
for i in range(num_particles):
|
|
# Random starting points.
|
|
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf() - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
|
|
particles.append(p)
|
|
|
|
return particles
|
|
|
|
def dist(particle, landmark):
|
|
return np.sqrt((landmark[0]-particle.x)**2+(landmark[1]-particle.y)**2)
|
|
|
|
def calc_angle(particle, landmark, dist):
|
|
e_theta = np.array([np.cos(particle.theta), np.sin(particle.theta)]).T
|
|
e_landmark = (np.array([landmark[0]-particle.x, landmark[1]-particle.y]).T)/dist
|
|
e_hat_theta = np.array([-np.sin(particle.theta), np.cos(particle.theta)]).T
|
|
return np.sign(np.dot(e_landmark, e_hat_theta)) * np.arccos(np.dot(e_landmark, e_theta))
|
|
|
|
def calc_weight(particle, landmark_values):
|
|
if landmark_values == []:
|
|
return
|
|
weights = []
|
|
for values in landmark_values:
|
|
dist_to_landmark = dist(particle, landmarks[values[0]])
|
|
dist_weight = normal(values[1], dist_to_landmark, SIGMA)
|
|
|
|
angle_to_landmark = calc_angle(particle, landmarks[values[0]], dist_to_landmark)
|
|
angle_weight = normal(values[2], angle_to_landmark, SIGMA_THETA)
|
|
weights.append(dist_weight * angle_weight)
|
|
|
|
particle.setWeight(np.product(weights))
|
|
|
|
def drive_towards_middle(est_pose, arlo):
|
|
middle = np.array([150, 0])
|
|
|
|
position = np.array([est_pose.x, est_pose.y])
|
|
theta = est_pose.theta
|
|
print(np.rad2deg(theta))
|
|
|
|
relative_pos = middle - position
|
|
print("relative_pos", relative_pos)
|
|
angle = np.rad2deg(np.arctan(relative_pos[1]/relative_pos[0]))
|
|
print(angle)
|
|
turn_angle = np.mod(angle - (np.rad2deg(theta) - 180), 360)
|
|
print(turn_angle)
|
|
drive_distance = np.sqrt(position[0]**2 + position[1]**2)
|
|
if turn_angle < 180:
|
|
arlo.go_diff(POWER, POWER, 0, 1)
|
|
sleep((abs(turn_angle) * TURN_T)/1000)
|
|
arlo.stop()
|
|
else:
|
|
arlo.go_diff(POWER, POWER, 1, 0)
|
|
sleep((abs(360 - turn_angle) * TURN_T * CLOCKWISE_OFFSET)/1000)
|
|
arlo.stop()
|
|
|
|
arlo.go_diff(POWER, POWER + RIGHT_WHEEL_OFFSET, 1, 1)
|
|
sleep((drive_distance * DRIVE_T)/1000)
|
|
arlo.stop()
|
|
|
|
# Main program #
|
|
try:
|
|
if showGUI:
|
|
# Open windows
|
|
WIN_RF1 = "Robot view"
|
|
cv2.namedWindow(WIN_RF1)
|
|
cv2.moveWindow(WIN_RF1, 50, 50)
|
|
|
|
WIN_World = "World view"
|
|
cv2.namedWindow(WIN_World)
|
|
cv2.moveWindow(WIN_World, 500, 50)
|
|
|
|
|
|
# Initialize particles
|
|
num_particles = 10000
|
|
particles = initialize_particles(num_particles)
|
|
|
|
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
|
|
|
# Driving parameters
|
|
velocity = 0.0 # cm/sec
|
|
angular_velocity = 0.0 # radians/sec
|
|
|
|
arlo = robot.Robot()
|
|
|
|
# Allocate space for world map
|
|
world = np.zeros((500,500,3), dtype=np.uint8)
|
|
|
|
# Draw map
|
|
draw_world(est_pose, particles, world)
|
|
|
|
print("Opening and initializing camera")
|
|
if camera.isRunningOnArlo():
|
|
cam = camera.Camera(0, 'arlo', useCaptureThread = True)
|
|
else:
|
|
cam = camera.Camera(0, 'macbookpro', useCaptureThread = True)
|
|
|
|
|
|
while True:
|
|
|
|
# Fetch next frame
|
|
colour = cam.get_next_frame()
|
|
landmark_values = []
|
|
|
|
# Detect objects
|
|
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
|
|
if not isinstance(objectIDs, type(None)):
|
|
|
|
# List detected objects
|
|
for i in range(len(objectIDs)):
|
|
print("Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
|
|
if objectIDs[i] in landmarkIDs:
|
|
landmark_values.append((objectIDs[i], dists[i], angles[i]))
|
|
|
|
# Compute particle weights
|
|
for p in particles:
|
|
calc_weight(p, landmark_values)
|
|
|
|
# Resampling
|
|
particles = particle.resample(particles)
|
|
|
|
# Draw detected objects
|
|
cam.draw_aruco_objects(colour)
|
|
|
|
else:
|
|
# No observation - reset weights to uniform distribution
|
|
for p in particles:
|
|
p.setWeight(1.0/num_particles)
|
|
|
|
particle.add_uncertainty(particles, SIGMA, SIGMA_THETA)
|
|
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
|
|
calc_weight(est_pose, landmark_values)
|
|
est_weight = est_pose.weight * 10**5
|
|
print("est_weight:", est_weight)
|
|
if est_weight > 1:
|
|
draw_world(est_pose, particles, world)
|
|
cv2.imwrite("test.png", world)
|
|
break
|
|
# else:
|
|
# arlo.go_diff(POWER, POWER, 0, 1)
|
|
# sleep((15 * TURN_T)/1000)
|
|
# arlo.stop()
|
|
# for p in particles:
|
|
# p.setTheta(p.theta + np.deg2rad(15))
|
|
|
|
if showGUI:
|
|
# Draw map
|
|
draw_world(est_pose, particles, world)
|
|
|
|
# Show frame
|
|
cv2.imshow(WIN_RF1, colour)
|
|
|
|
# Show world
|
|
cv2.imshow(WIN_World, world)
|
|
|
|
print(est_pose.x, est_pose.y, est_pose.theta)
|
|
drive_towards_middle(est_pose, arlo)
|
|
|
|
|
|
finally:
|
|
# Make sure to clean up even if an exception occurred
|
|
|
|
# Close all windows
|
|
cv2.destroyAllWindows()
|
|
|
|
# Clean-up capture thread
|
|
cam.terminateCaptureThread()
|
|
|