This commit is contained in:
NikolajDanger
2022-09-28 10:55:21 +02:00
parent 01ab491b61
commit f6ce6456b8
6 changed files with 0 additions and 0 deletions

518
selflocalization/camera.py Normal file
View File

@ -0,0 +1,518 @@
import cv2 # Import the OpenCV library
import numpy as np
import time
import sys
import threading
import framebuffer
from pkg_resources import parse_version
gstreamerCameraFound = False
piCameraFound = False
try:
import picamera
from picamera.array import PiRGBArray
piCameraFound = True
except ImportError:
print("Camera.py: picamera module not available - using OpenCV interface instead")
def isRunningOnArlo():
"""Return True if we are running on Arlo, otherwise False."""
return piCameraFound or gstreamerCameraFound
# Black magic in order to handle differences in namespace names in OpenCV 2.4 and 3.0
OPCV3 = parse_version(cv2.__version__) >= parse_version('3')
def capPropId(prop):
"""returns OpenCV VideoCapture property id given, e.g., "FPS
This is needed because of differences in the Python interface in OpenCV 2.4 and 3.0
"""
return getattr(cv2 if OPCV3 else cv2.cv, ("" if OPCV3 else "CV_") + "CAP_PROP_" + prop)
def gstreamer_pipeline(capture_width=1280, capture_height=720, framerate=30):
"""Utility function for setting parameters for the gstreamer camera pipeline"""
return (
"libcamerasrc !"
"videobox autocrop=true !"
"video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
"videoconvert ! "
"appsink"
% (
capture_width,
capture_height,
framerate,
)
)
class CaptureThread(threading.Thread):
"""Internal worker thread that captures frames from the camera"""
def __init__(self, cam, framebuffer):
threading.Thread.__init__(self)
self.cam = cam
self.framebuffer = framebuffer
self.terminateThreadEvent = threading.Event()
def run(self):
while not self.terminateThreadEvent.is_set():
if piCameraFound:
# Use piCamera
#self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
#image = self.rawCapture.array
# clear the stream in preparation for the next frame
#self.rawCapture.truncate(0)
if sys.version_info[0] > 2:
# Python 3.x
image = np.empty((self.cam.resolution[1], self.cam.resolution[0], 3), dtype=np.uint8)
else:
# Python 2.x
image = np.empty((self.cam.resolution[1] * self.cam.resolution[0] * 3,), dtype=np.uint8)
self.cam.capture(image, format="bgr", use_video_port=True)
if sys.version_info[0] < 3:
# Python 2.x
image = image.reshape((self.cam.resolution[1], self.cam.resolution[0], 3))
else: # Use OpenCV
retval, image = self.cam.read() # Read frame
if not retval: # Error
print("CaptureThread: Could not read next frame")
exit(-1)
# Update framebuffer
self.framebuffer.new_frame(image)
def stop(self):
"""Terminate the worker thread"""
self.terminateThreadEvent.set()
class Camera(object):
"""This class is responsible for doing the image processing. It detects known landmarks and
measures distances and orientations to these."""
def __init__(self, camidx, robottype='arlo', useCaptureThread = False):
"""Constructor:
camidx - index of camera
robottype - specify which robot you are using in order to use the correct camera calibration.
Supported types: arlo, frindo, scribbler, macbookpro"""
self.useCaptureThread = useCaptureThread
# Set camera calibration info
if robottype == 'arlo':
self.imageSize = (1280, 720)
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
self.intrinsic_matrix = np.asarray([500, 0., self.imageSize[0] / 2.0, 0.,
500, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
self.intrinsic_matrix.shape = (3, 3)
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
if robottype == 'frindo':
self.imageSize = (640, 480)
#self.intrinsic_matrix = np.asarray([ 7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
# 7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1. ], dtype = np.float64)
#self.intrinsic_matrix = np.asarray([ 6.0727040957659040e+02, 0., 3.0757300398967601e+02, 0.,
# 6.0768864690145904e+02, 2.8935674612358201e+02, 0., 0., 1. ], dtype = np.float64)
self.intrinsic_matrix = np.asarray([500, 0., 3.0757300398967601e+02, 0.,
500, 2.8935674612358201e+02, 0., 0., 1.], dtype = np.float64)
self.intrinsic_matrix.shape = (3, 3)
#self.distortion_coeffs = np.asarray([ 1.1911006165076067e-01, -1.0003366233413549e+00,
# 1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01 ], dtype = np.float64)
self.distortion_coeffs = np.asarray([0., 0., 2.0546093607192093e-02, -3.5538453075048249e-03, 0.], dtype = np.float64)
elif robottype == 'scribbler':
# Unknown calibration - just using the Frindo calibration
self.imageSize = (640, 480)
self.intrinsic_matrix = np.asarray([7.1305391967046853e+02, 0., 3.1172820723774367e+02, 0.,
7.0564929862291285e+02, 2.5634470978315028e+02, 0., 0., 1.], dtype = np.float64)
self.intrinsic_matrix.shape = (3, 3)
self.distortion_coeffs = np.asarray([1.1911006165076067e-01, -1.0003366233413549e+00,
1.9287903277399834e-02, -2.3728201444308114e-03, -2.8137265581326476e-01], dtype = np.float64)
elif robottype == 'macbookpro':
self.imageSize = (1280, 720)
#self.imageSize = (1080, 720)
#self.intrinsic_matrix = np.asarray([ 8.6955302212233869e+02, 0., 5.2076864848745902e+02, 0.,
# 8.7317664932843684e+02, 4.0331768178896669e+02, 0., 0., 1. ], dtype = np.float64)
self.intrinsic_matrix = np.asarray([9.4328095162920715e+02, 0., self.imageSize[0] / 2.0, 0.,
9.4946668595979077e+02, self.imageSize[1] / 2.0, 0., 0., 1.], dtype = np.float64)
self.intrinsic_matrix.shape = (3, 3)
#self.distortion_coeffs = np.asarray([ -1.2844325433988565e-01, -1.3646926538980573e+00,
# -5.7263071423202944e-03, 5.7422957803983802e-03, 5.9722099836744738e+00 ], dtype = np.float64)
self.distortion_coeffs = np.asarray([0., 0., -1.6169374082976234e-02, 8.7657653170062459e-03, 0.], dtype = np.float64)
else:
print("Camera.__init__: Unknown robot type")
exit(-1)
# Open a camera device for capturing
if piCameraFound:
# piCamera is available so we use this
#self.cam = picamera.PiCamera(camidx)
self.cam = picamera.PiCamera(camera_num=camidx, resolution=self.imageSize, framerate=30)
if not self.useCaptureThread:
self.rawCapture = PiRGBArray(self.cam, size=self.cam.resolution)
time.sleep(2) # wait for the camera
# Too slow code - instead just use cam.capture
#self.capture_generator = self.cam.capture_continuous(self.rawCapture, format="bgr", use_video_port=True)
gain = self.cam.awb_gains
self.cam.awb_mode='off'
self.cam.awb_gains = gain
self.cam.shutter_speed = self.cam.exposure_speed
self.cam.exposure_mode = 'off'
print("shutter_speed = ", self.cam.shutter_speed)
print("awb_gains = ", self.cam.awb_gains)
print("Camera width = ", self.cam.resolution[0])
print("Camera height = ", self.cam.resolution[1])
print("Camera FPS = ", self.cam.framerate)
else: # Use OpenCV interface
# We next try the gstreamer interface
self.cam = cv2.VideoCapture(gstreamer_pipeline(
capture_width=self.imageSize[0], capture_height=self.imageSize[1]),
apiPreference=cv2.CAP_GSTREAMER)
if not self.cam.isOpened(): # Did not work
# We try first the generic auto-detect interface
self.cam = cv2.VideoCapture(camidx)
if not self.cam.isOpened(): # Error
print("Camera.__init__: Could not open camera")
exit(-1)
else:
print("Camera.__init__: Using OpenCV with auto-detect interface")
else:
gstreamerCameraFound = True
print("Camera.__init__: Using OpenCV with gstreamer")
time.sleep(1) # wait for camera
# Set camera properties
self.cam.set(capPropId("FRAME_WIDTH"), self.imageSize[0])
self.cam.set(capPropId("FRAME_HEIGHT"), self.imageSize[1])
#self.cam.set(capPropId("BUFFERSIZE"), 1) # Does not work
#self.cam.set(capPropId("FPS"), 15)
self.cam.set(capPropId("FPS"), 30)
time.sleep(1)
# Get camera properties
print("Camera width = ", int(self.cam.get(capPropId("FRAME_WIDTH"))))
print("Camera height = ", int(self.cam.get(capPropId("FRAME_HEIGHT"))))
print("Camera FPS = ", int(self.cam.get(capPropId("FPS"))))
# Initializing the camera distortion maps
#self.mapx, self.mapy = cv2.initUndistortRectifyMap(self.intrinsic_matrix, self.distortion_coeffs, np.eye(3,3), np.eye(3,3), self.imageSize, cv2.CV_32FC1)
# Not needed we use the cv2.undistort function instead
# Initialize chessboard pattern parameters
self.patternFound = False
self.patternSize = (3,4)
self.patternUnit = 50.0 # mm (size of one checker square)
self.corners = []
# Initialize aruco detector
self.arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
# Set the correct physical marker size here
self.arucoMarkerLength = 0.15 # [m] actual size of aruco markers (in object coordinate system)
# Initialize worker thread and framebuffer
if self.useCaptureThread:
print("Using capture thread")
self.framebuffer = framebuffer.FrameBuffer()
self.capturethread = CaptureThread(self.cam, self.framebuffer)
self.capturethread.start()
time.sleep(0.75)
def __del__(self):
if piCameraFound:
self.cam.close()
def terminateCaptureThread(self):
if self.useCaptureThread:
self.capturethread.stop()
self.capturethread.join()
def get_capture(self):
"""Access to the internal camera object for advanced control of the camera."""
return self.cam
def get_colour(self):
"""OBSOLETE - use instead get_next_frame"""
print("OBSOLETE get_colour - use instead get_next_frame")
return self.get_next_frame()
def get_next_frame(self):
"""Gets the next available image frame from the camera."""
if self.useCaptureThread:
img = self.framebuffer.get_frame()
if img is None:
img = np.array((self.imageSize[0], self.imageSize[1], 3), dtype=np.uint8)
else:
if piCameraFound:
# Use piCamera
self.cam.capture(self.rawCapture, format="bgr", use_video_port=True)
img = self.rawCapture.array
# clear the stream in preparation for the next frame
self.rawCapture.truncate(0)
else: # Use OpenCV
retval, img = self.cam.read() # Read frame
if not retval: # Error
print("Camera.get_colour: Could not read next frame")
exit(-1)
return img
# ArUco object detector
def detect_aruco_objects(self, img):
"""Detect objects in the form of a binary ArUco code and return object IDs, distances (in cm) and
angles (in radians) to detected ArUco codes. The angle is computed as the signed angle between
translation vector to detected object projected onto the x-z plabe and the z-axis (pointing out
of the camera). This corresponds to that the angle is measuring location along the horizontal x-axis.
If no object is detected, the returned variables are set to None."""
self.aruco_corners, self.ids, rejectedImgPoints = cv2.aruco.detectMarkers(img, self.arucoDict)
self.rvecs, self.tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(self.aruco_corners, self.arucoMarkerLength, self.intrinsic_matrix, self.distortion_coeffs)
if not isinstance(self.ids, type(None)):
dists = np.linalg.norm(self.tvecs, axis=len(self.tvecs.shape) - 1) * 100
# Make sure we always return properly shaped arrays
dists = dists.reshape((dists.shape[0],))
ids = self.ids.reshape((self.ids.shape[0],))
# Compute angles
angles = np.zeros(dists.shape, dtype=dists.dtype)
for i in range(dists.shape[0]):
tobj = self.tvecs[i] * 100 / dists[i]
zaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
zaxis[0,-1] = 1
xaxis = np.zeros(tobj.shape, dtype=tobj.dtype)
xaxis[0,0] = 1
# We want the horizontal angle so project tobjt onto the x-z plane
tobj_xz = tobj
tobj_xz[0,1] = 0
# Should the sign be clockwise or counter-clockwise (left or right)?
# In this version it is positive to the left as seen from the camera.
direction = -1*np.sign(tobj_xz[0,0]) # The same as np.sign(np.dot(tobj, xaxis.T))
angles[i] = direction * np.arccos(np.dot(tobj_xz, zaxis.T))
else:
dists = None
ids = None
angles = None
return ids, dists, angles
def draw_aruco_objects(self, img):
"""Draws detected objects and their orientations on the image given in img."""
if not isinstance(self.ids, type(None)):
outimg = cv2.aruco.drawDetectedMarkers(img, self.aruco_corners, self.ids)
for i in range(self.ids.shape[0]):
outimg = cv2.drawFrameAxes(outimg, self.intrinsic_matrix, self.distortion_coeffs,
self.rvecs[i], self.tvecs[i], self.arucoMarkerLength)
else:
outimg = img
return outimg
# Chessboard object detector
def get_object(self, img):
"""Detect object and return object type, distance (in cm), angle (in radians) and
colour probability table in the order (R,G,B)"""
objectType = 'none'
colourProb = np.ones((3,)) / 3.0
distance = 0.0
angle = 0.0
self.patternFound = False
#patternFound, corners = self.get_corners(img)
self.get_corners(img)
if (self.patternFound):
# Determine if the object is horizontal or vertical
delta_x = abs(self.corners[0, 0, 0] - self.corners[2, 0, 0])
delta_y = abs(self.corners[0, 0, 1] - self.corners[2, 0, 1])
horizontal = (delta_y > delta_x)
if (horizontal):
objectType = 'horizontal'
else:
objectType = 'vertical'
# Compute distances and angles
if (horizontal):
height = ((abs (self.corners[0, 0, 1] - self.corners[2, 0, 1]) +
abs (self.corners[9, 0, 1] - self.corners[11, 0, 1])) / 2.0)
patternHeight = (self.patternSize[0]-1.0) * self.patternUnit
else:
height = (abs (self.corners[0, 0, 1] - self.corners[9, 0, 1]) +
abs (self.corners[2, 0, 1] - self.corners[11, 0, 1])) / 2.0
patternHeight = (self.patternSize[1]-1.0) * self.patternUnit
#distance = 1.0 / (0.0001 * height);
distance = self.intrinsic_matrix[1, 1] * patternHeight / (height * 10.0)
center = (self.corners[0, 0, 0] + self.corners[2, 0, 0] +
self.corners[9, 0, 0] + self.corners[11, 0, 0]) / 4.0
#angle = 0.0018 * center - 0.6425;
#angle *= -1.0;
angle = -np.arctan2(center - self.intrinsic_matrix[0, 2], self.intrinsic_matrix[0, 0])
#### Classify object by colour
# Extract rectangle corners
points = np.array(
[
self.corners[0],
self.corners[2],
self.corners[9],
self.corners[11]
]
)
points.shape = (4, 2)
points = np.int32(points)
# Compute region of interest
mask = np.zeros((self.imageSize[1], self.imageSize[0]), dtype = np.uint8)
#cv2.fillConvexPoly (mask, points, (255, 255, 255))
cv2.fillConvexPoly (mask, points, 255)
# Compute mean colour inside region of interest
mean_colour = cv2.mean(img, mask) # There is a bug here in Python 3 - it produces a segfault
red = mean_colour[2]
green = mean_colour[1]
blue = mean_colour[0]
sum = red + green + blue
colourProb[0] = red / sum
colourProb[1] = green / sum
colourProb[2] = blue / sum
return objectType, distance, angle, colourProb
def get_corners(self, img):
"""Detect corners - this is an auxillary method and should not be used directly"""
# Convert to gray scale
gray = cv2.cvtColor( img, cv2.COLOR_BGR2GRAY )
loggray = cv2.log(gray + 1.0)
cv2.normalize(loggray,loggray,0,255,cv2.NORM_MINMAX)
gray = cv2.convertScaleAbs(loggray)
#retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_FAST_CHECK)
retval, self.corners = cv2.findChessboardCorners(gray, self.patternSize, cv2.CALIB_CB_FAST_CHECK)
if (retval > 0):
self.patternFound = True
#cv2.cornerSubPix(gray, self.corners, (5,5), (-1,-1), (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 3, 0.0))
return self.patternFound, self.corners
def draw_object(self, img):
"""Draw the object if found into img"""
cv2.drawChessboardCorners(img, self.patternSize, self.corners, self.patternFound)
if (__name__=='__main__'):
print("Opening and initializing camera")
cam = Camera(0, 'macbookpro', useCaptureThread = True)
#cam = Camera(0, 'macbookpro', useCaptureThread = False)
#cam = Camera(0, 'arlo', useCaptureThread = True)
# Open a window
WIN_RF1 = "Camera view"
cv2.namedWindow(WIN_RF1)
cv2.moveWindow(WIN_RF1, 50, 50)
#WIN_RF3 = "Camera view - gray"
#cv2.namedWindow(WIN_RF3)
#cv2.moveWindow(WIN_RF3, 550, 50)
while True:
action = cv2.waitKey(10)
if action == ord('q'): # Quit
break
# Fetch next frame
#colour = cam.get_colour()
colour = cam.get_next_frame()
# Convert to gray scale
#gray = cv2.cvtColor(colour, cv2.COLOR_BGR2GRAY )
#loggray = cv2.log(gray + 1.0)
#cv2.normalize(loggray, loggray, 0, 255, cv2.NORM_MINMAX)
#gray = cv2.convertScaleAbs(loggray)
# Detect objects
objectType, distance, angle, colourProb = cam.get_object(colour)
if objectType != 'none':
print("Object type = ", objectType, ", distance = ", distance, ", angle = ", angle, ", colourProb = ", colourProb)
# Draw detected pattern
cam.draw_object(colour)
IDs, dists, angles = cam.detect_aruco_objects(colour)
if not isinstance(IDs, type(None)):
for i in range(len(IDs)):
print("Object ID = ", IDs[i], ", Distance = ", dists[i], ", angles = ", angles[i])
# Draw detected objects
cam.draw_aruco_objects(colour)
# Show frames
cv2.imshow(WIN_RF1, colour)
# Show frames
#cv2.imshow(WIN_RF3, gray)
# Close all windows
cv2.destroyAllWindows()
# Clean-up capture thread
cam.terminateCaptureThread()

View File

@ -0,0 +1,33 @@
# import copy
import threading
class FrameBuffer(object):
"""This class represents a framebuffer with a front and back buffer storing frames.
Access to the class is thread safe (controlled via an internal lock)."""
def __init__(self):
self.frameBuffer = [None, None] # Initialize the framebuffer with None references
self.currentBufferIndex = 0
self.lock = threading.Lock()
def get_frame(self):
"""Return latest frame from the framebuffer"""
# Obtain lock and release it when done
with self.lock:
if self.frameBuffer[self.currentBufferIndex] is not None:
# return copy.deepcopy(self.frameBuffer[self.currentBufferIndex])
return self.frameBuffer[self.currentBufferIndex]
else:
return None
def new_frame(self, frame):
"""Add a new frame to the frame buffer"""
# self.frameBuffer[int(not self.currentBufferIndex)] = copy.deepcopy(frame)
self.frameBuffer[int(not self.currentBufferIndex)] = frame
# Obtain lock and release it when done
with self.lock:
# Switch buffer index
self.currentBufferIndex = int(not self.currentBufferIndex)

View File

@ -0,0 +1,45 @@
import cv2 # Import the OpenCV library
import numpy as np
# TODO make this configurable from the command line using parser
markerID = 10 # Try 1 - 4
# Define some relevant constants
dpi = 72 # dots (pixels) per inch [inch^(-1)]
inch2mm = 25.4 # [mm / inch]
dpmm = dpi / inch2mm # Dots per mm [mm^(-1)]
# Define size of output image to fit A4 paper
a4width = 210.0 # [mm]
a4height = 297.0 # [mm]
# Size of output image
width = np.uint(np.round(a4width * dpmm)) # [pixels]
height = np.uint(np.round(a4height * dpmm)) # [pixels]
# Size of ArUco marker
markerPhysicalSize = 150 # [mm]
markerSize = np.uint(np.round(markerPhysicalSize * dpmm)) # [pixels]
# Create landmark image (all white gray scale image)
#landmarkImage = np.ones((width, height), dtype=np.uint8) * np.uint8(255)
landmarkImage = np.ones((height, width), dtype=np.uint8) * np.uint8(255)
# Initialize the ArUco dictionary
arucoDict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250)
# Draw marker
startWidth = int(np.round((width-markerSize)/2))
startHeight = int(np.round((height-markerSize)/2))
landmarkImage[startHeight:int(startHeight+markerSize), startWidth:int(startWidth+markerSize)] = cv2.aruco.drawMarker(arucoDict, markerID, markerSize, 1)
cv2.putText(landmarkImage, str(markerID), (startWidth, startHeight - 60), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0,0,0), 2)
# Save image
cv2.imwrite("../../../data/landmark" + str(markerID) + ".png", landmarkImage)
# Show image
cv2.namedWindow("Landmark")
cv2.imshow("Landmark", landmarkImage)
cv2.waitKey()

View File

@ -0,0 +1,85 @@
import numpy as np
import random_numbers as rn
class Particle(object):
"""Data structure for storing particle information (state and weight)"""
def __init__(self, x=0.0, y=0.0, theta=0.0, weight=0.0):
self.x = x
self.y = y
self.theta = np.mod(theta, 2.0*np.pi)
self.weight = weight
def getX(self):
return self.x
def getY(self):
return self.y
def getTheta(self):
return self.theta
def getWeight(self):
return self.weight
def setX(self, val):
self.x = val
def setY(self, val):
self.y = val
def setTheta(self, val):
self.theta = np.mod(val, 2.0*np.pi)
def setWeight(self, val):
self.weight = val
def estimate_pose(particles_list):
"""Estimate the pose from particles by computing the average position and orientation over all particles.
This is not done using the particle weights, but just the sample distribution."""
x_sum = 0.0
y_sum = 0.0
cos_sum = 0.0
sin_sum = 0.0
for particle in particles_list:
x_sum += particle.getX()
y_sum += particle.getY()
cos_sum += np.cos(particle.getTheta())
sin_sum += np.sin(particle.getTheta())
flen = len(particles_list)
if flen != 0:
x = x_sum / flen
y = y_sum / flen
theta = np.arctan2(sin_sum/flen, cos_sum/flen)
else:
x = x_sum
y = y_sum
theta = 0.0
return Particle(x, y, theta)
def move_particle(particle, delta_x, delta_y, delta_theta):
"""Move the particle by (delta_x, delta_y, delta_theta)"""
print("particle.py: move_particle not implemented. You should do this.")
def add_uncertainty(particles_list, sigma, sigma_theta):
"""Add some noise to each particle in the list. Sigma and sigma_theta is the noise
variances for position and angle noise."""
for particle in particles_list:
particle.x += rn.randn(0.0, sigma)
particle.y += rn.randn(0.0, sigma)
particle.theta = np.mod(particle.theta + rn.randn(0.0, sigma_theta), 2.0 * np.pi)
def add_uncertainty_von_mises(particles_list, sigma, theta_kappa):
"""Add some noise to each particle in the list. Sigma and theta_kappa is the noise
variances for position and angle noise."""
for particle in particles_list:
particle.x += rn.randn(0.0, sigma)
particle.y += rn.randn(0.0, sigma)
particle.theta = np.mod(rn.rand_von_mises(particle.theta, theta_kappa), 2.0 * np.pi) - np.pi

View File

@ -0,0 +1,54 @@
import numpy as np
def randn(mu, sigma):
"""Normal random number generator
mean mu, standard deviation sigma"""
return sigma * np.random.randn() + mu
def rand_von_mises(mu, kappa):
"""Generate random samples from the Von Mises distribution"""
if kappa < 1e-6:
# kappa is small: sample uniformly on circle
theta = 2.0 * np.pi * np.random.ranf() - np.pi
else:
a = 1.0 + np.sqrt(1.0 + 4.0 * kappa * kappa)
b = (a - np.sqrt(2.0 * a)) / (2.0 * kappa)
r = (1.0 + b*b) / (2.0 * b)
not_done = True
while not_done:
u1 = np.random.ranf()
u2 = np.random.ranf()
z = np.cos(np.pi * u1)
f = (1.0 + r * z) / (r + z)
c = kappa * (r - f)
not_done = (u2 >= c * (2.0 - c)) and (np.log(c) - np.log(u2) + 1.0 - c < 0.0)
u3 = np.random.ranf()
theta = mu + np.sign(u3 - 0.5) * np.arccos(f)
return theta
if __name__ == '__main__':
# Tests
print("Gaussian distribution:")
r = np.zeros(1000)
for i in range(1000):
r[i] = randn(1.0, 2.0)
print("True mean 1.0 == Estimated mean ", np.mean(r))
print("True std 2.0 == Estimated std ", np.std(r))
print("Von Mises distribution:")
t = np.zeros(1000)
for i in range(1000):
t[i] = rand_von_mises(1.0, 8)
print("True mean 1.0 == Estimated mean ", np.mean(t))

View File

@ -0,0 +1,233 @@
import cv2
import particle
import camera
import numpy as np
import time
from timeit import default_timer as timer
import sys
# Flags
showGUI = True # Whether or not to open GUI windows
onRobot = True # Whether or not we are running on the Arlo robot
def isRunningOnArlo():
"""Return True if we are running on Arlo, otherwise False.
You can use this flag to switch the code from running on you laptop to Arlo - you need to do the programming here!
"""
return onRobot
if isRunningOnArlo():
# XXX: You need to change this path to point to where your robot.py file is located
sys.path.append("../../../../Arlo/python")
try:
import robot
onRobot = True
except ImportError:
print("selflocalize.py: robot module not present - forcing not running on Arlo!")
onRobot = False
# Some color constants in BGR format
CRED = (0, 0, 255)
CGREEN = (0, 255, 0)
CBLUE = (255, 0, 0)
CCYAN = (255, 255, 0)
CYELLOW = (0, 255, 255)
CMAGENTA = (255, 0, 255)
CWHITE = (255, 255, 255)
CBLACK = (0, 0, 0)
# Landmarks.
# The robot knows the position of 2 landmarks. Their coordinates are in the unit centimeters [cm].
landmarkIDs = [1, 2]
landmarks = {
1: (0.0, 0.0), # Coordinates for landmark 1
2: (300.0, 0.0) # Coordinates for landmark 2
}
landmark_colors = [CRED, CGREEN] # Colors used when drawing the landmarks
def jet(x):
"""Colour map for drawing particles. This function determines the colour of
a particle from its weight."""
r = (x >= 3.0/8.0 and x < 5.0/8.0) * (4.0 * x - 3.0/2.0) + (x >= 5.0/8.0 and x < 7.0/8.0) + (x >= 7.0/8.0) * (-4.0 * x + 9.0/2.0)
g = (x >= 1.0/8.0 and x < 3.0/8.0) * (4.0 * x - 1.0/2.0) + (x >= 3.0/8.0 and x < 5.0/8.0) + (x >= 5.0/8.0 and x < 7.0/8.0) * (-4.0 * x + 7.0/2.0)
b = (x < 1.0/8.0) * (4.0 * x + 1.0/2.0) + (x >= 1.0/8.0 and x < 3.0/8.0) + (x >= 3.0/8.0 and x < 5.0/8.0) * (-4.0 * x + 5.0/2.0)
return (255.0*r, 255.0*g, 255.0*b)
def draw_world(est_pose, particles, world):
"""Visualization.
This functions draws robots position in the world coordinate system."""
# Fix the origin of the coordinate system
offsetX = 100
offsetY = 250
# Constant needed for transforming from world coordinates to screen coordinates (flip the y-axis)
ymax = world.shape[0]
world[:] = CWHITE # Clear background to white
# Find largest weight
max_weight = 0
for particle in particles:
max_weight = max(max_weight, particle.getWeight())
# Draw particles
for particle in particles:
x = int(particle.getX() + offsetX)
y = ymax - (int(particle.getY() + offsetY))
colour = jet(particle.getWeight() / max_weight)
cv2.circle(world, (x,y), 2, colour, 2)
b = (int(particle.getX() + 15.0*np.cos(particle.getTheta()))+offsetX,
ymax - (int(particle.getY() + 15.0*np.sin(particle.getTheta()))+offsetY))
cv2.line(world, (x,y), b, colour, 2)
# Draw landmarks
for i in range(len(landmarkIDs)):
ID = landmarkIDs[i]
lm = (int(landmarks[ID][0] + offsetX), int(ymax - (landmarks[ID][1] + offsetY)))
cv2.circle(world, lm, 5, landmark_colors[i], 2)
# Draw estimated robot pose
a = (int(est_pose.getX())+offsetX, ymax-(int(est_pose.getY())+offsetY))
b = (int(est_pose.getX() + 15.0*np.cos(est_pose.getTheta()))+offsetX,
ymax-(int(est_pose.getY() + 15.0*np.sin(est_pose.getTheta()))+offsetY))
cv2.circle(world, a, 5, CMAGENTA, 2)
cv2.line(world, a, b, CMAGENTA, 2)
def initialize_particles(num_particles):
particles = []
for i in range(num_particles):
# Random starting points.
p = particle.Particle(600.0*np.random.ranf() - 100.0, 600.0*np.random.ranf() - 250.0, np.mod(2.0*np.pi*np.random.ranf(), 2.0*np.pi), 1.0/num_particles)
particles.append(p)
return particles
# Main program #
try:
if showGUI:
# Open windows
WIN_RF1 = "Robot view"
cv2.namedWindow(WIN_RF1)
cv2.moveWindow(WIN_RF1, 50, 50)
WIN_World = "World view"
cv2.namedWindow(WIN_World)
cv2.moveWindow(WIN_World, 500, 50)
# Initialize particles
num_particles = 1000
particles = initialize_particles(num_particles)
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
# Driving parameters
velocity = 0.0 # cm/sec
angular_velocity = 0.0 # radians/sec
# Initialize the robot (XXX: You do this)
# Allocate space for world map
world = np.zeros((500,500,3), dtype=np.uint8)
# Draw map
draw_world(est_pose, particles, world)
print("Opening and initializing camera")
if camera.isRunningOnArlo():
cam = camera.Camera(0, 'arlo', useCaptureThread = True)
else:
cam = camera.Camera(0, 'macbookpro', useCaptureThread = True)
while True:
# Move the robot according to user input (only for testing)
action = cv2.waitKey(10)
if action == ord('q'): # Quit
break
if not isRunningOnArlo():
if action == ord('w'): # Forward
velocity += 4.0
elif action == ord('x'): # Backwards
velocity -= 4.0
elif action == ord('s'): # Stop
velocity = 0.0
angular_velocity = 0.0
elif action == ord('a'): # Left
angular_velocity += 0.2
elif action == ord('d'): # Right
angular_velocity -= 0.2
# Use motor controls to update particles
# XXX: Make the robot drive
# XXX: You do this
# Fetch next frame
colour = cam.get_next_frame()
# Detect objects
objectIDs, dists, angles = cam.detect_aruco_objects(colour)
if not isinstance(objectIDs, type(None)):
# List detected objects
for i in range(len(objectIDs)):
print("Object ID = ", objectIDs[i], ", Distance = ", dists[i], ", angle = ", angles[i])
# XXX: Do something for each detected object - remember, the same ID may appear several times
# Compute particle weights
# XXX: You do this
# Resampling
# XXX: You do this
# Draw detected objects
cam.draw_aruco_objects(colour)
else:
# No observation - reset weights to uniform distribution
for p in particles:
p.setWeight(1.0/num_particles)
est_pose = particle.estimate_pose(particles) # The estimate of the robots current pose
if showGUI:
# Draw map
draw_world(est_pose, particles, world)
# Show frame
cv2.imshow(WIN_RF1, colour)
# Show world
cv2.imshow(WIN_World, world)
finally:
# Make sure to clean up even if an exception occurred
# Close all windows
cv2.destroyAllWindows()
# Clean-up capture thread
cam.terminateCaptureThread()