# Arlo Robot Controller import ctypes import io import os import sys import tempfile from contextlib import contextmanager from time import sleep import serial import cv2 libc = ctypes.CDLL(None) c_stderr = ctypes.c_void_p.in_dll(libc, 'stderr') @contextmanager def stderr_redirector(stream): original_stderr_fd = sys.stderr.fileno() def _redirect_stderr(to_fd): libc.fflush(c_stderr) sys.stderr.close() os.dup2(to_fd, original_stderr_fd) sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, 'wb')) saved_stderr_fd = os.dup(original_stderr_fd) try: tfile = tempfile.TemporaryFile(mode='w+b') _redirect_stderr(tfile.fileno()) yield _redirect_stderr(saved_stderr_fd) tfile.flush() tfile.seek(0, io.SEEK_SET) stream.write(tfile.read().decode()) finally: tfile.close() os.close(saved_stderr_fd) class Robot(object): """ Defines the Arlo robot API DISCLAIMER: This code does not contain error checking - it is the responsibility of the caller to ensure proper parameters and not to send commands to the Arduino too frequently (give it time to process the command by adding a short sleep wait statement). Failure to do some may lead to strange robot behaviour. In case you experience trouble - consider using only commands that do not use the wheel encoders. """ def __init__(self, port: str = '/dev/ttyACM0'): """ The constructor port parameter can be changed from default value if you want to control the robot directly from your labtop (instead of from the on-board raspberry pi). The value of port should point to the USB port on which the robot Arduino is connected. """ self.port = port # 1 sec. timeout, wait until data is received or until timeout # self.serialRead = serial.Serial(self.port,9600, timeout=1) # No timeout, wait forever or until data is received self.serial_read = serial.Serial(self.port,9600, timeout=None) # Wait if serial port is not open yet while not self.serial_read.isOpen(): sleep(1) print("Waiting for serial port connection ...") sleep(2) print("Running ...") def __del__(self): print("Shutting down the robot ...") sleep(0.05) print(self.stop()) sleep(0.1) cmd='k\n' print((self.send_command(cmd))) self.serial_read.close() def _init_camera(self): def gstreamer_pipeline( capture_width=1280, capture_height=720, framerate=30): """ Utility function for setting parameters for the gstreamer camera pipeline """ return ( "libcamerasrc !" "video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! " "videoconvert ! " "appsink" % ( capture_width, capture_height, framerate, ) ) return cv2.VideoCapture( gstreamer_pipeline(), apiPreference=cv2.CAP_GSTREAMER ) def take_photo(self): with stderr_redirector(io.StringIO()): cap = self._init_camera() _, photo = cap.read() cap.release() return photo def send_command(self, cmd : str, sleep_ms: float=0.0): """Sends a command to the Arduino robot controller""" self.serial_read.write(cmd.encode('ascii')) sleep(sleep_ms) str_val=self.serial_read.readline() return str_val def _power_checker(self, power: int): """ Checks if a power value is in the set {0, [40;127]}. This is an internal utility function. """ return (power == 0) or (power >=40 and power <=127) def go_diff(self, power_left : int, power_right : int, dir_left : int, dir_right : int): """ Start left motor with motor power powerLeft (in {0, [40;127]} and the numbers must be integer) and direction dirLeft (0=reverse, 1=forward) and right motor with motor power powerRight (in {0, [40;127]} and the numbers must be integer) and direction dirRight (0=reverse, 1=forward). The Arlo robot may blow a fuse if you run the motors at less than 40 in motor power, therefore choose either power = 0 or 40 < power <= 127. This does NOT use wheel encoders. """ if ( (not self._power_checker(power_left)) or (not self._power_checker(power_right)) ): print("WARNING: Read the docstring of Robot.go_diff()!") return "" pl_int = int(power_left) pr_int = int(power_right) dl_int = int(dir_left) dr_int = int(dir_right) cmd = f"d{pl_int},{pr_int},{dl_int},{dr_int}\n" return self.send_command(cmd) def stop(self): """ Send a stop command to stop motors. Sets the motor power on both wheels to zero. This does NOT use wheel encoders. """ cmd='s\n' return self.send_command(cmd) def read_sensor(self, sensorid : int): """ Send a read sensor command with sensorid and return sensor value. Will return -1, if error occurs. """ cmd=str(sensorid) + '\n' str_val=self.send_command(cmd) if len(str_val) > 0: return int(str_val) else: return -1 def read_front_ping_sensor(self): """ Read the front sonar ping sensor and return the measured range in milimeters [mm]. """ return self.read_sensor(0) def read_back_ping_sensor(self): """ Read the back sonar ping sensor and return the measured range in milimeters [mm]. """ return self.read_sensor(1) def read_left_ping_sensor(self): """ Read the left sonar ping sensor and return the measured range in milimeters [mm]. """ return self.read_sensor(2) def read_right_ping_sensor(self): """ Read the right sonar ping sensor and return the measured range in milimeters [mm]. """ return self.read_sensor(3) def read_left_wheel_encoder(self): """ Reads the left wheel encoder counts since last reset_encoder_counts command. The encoder has 144 counts for one complete wheel revolution. """ cmd='e0\n' return self.send_command(cmd, 0.045) def read_right_wheel_encoder(self): """ Reads the right wheel encoder counts since last clear reset_encoder_counts command. The encoder has 144 counts for one complete wheel revolution. """ cmd='e1\n' return self.send_command(cmd, 0.045) def reset_encoder_counts(self): """Reset the wheel encoder counts.""" cmd='c\n' return self.send_command(cmd)