Source code for qvl.qbot_platform
from qvl.qlabs import CommModularContainer
from qvl.actor import QLabsActor
import cv2
import numpy as np
import math
import struct
######################### MODULAR CONTAINER CLASS #########################
[docs]class QLabsQBotPlatform(QLabsActor):
"""This class is for spawning QBotPlatforms."""
ID_QBOT_PLATFORM = 23
FCN_QBOT_PLATFORM_COMMAND_AND_REQUEST_STATE = 10
FCN_QBOT_PLATFORM_COMMAND_AND_REQUEST_STATE_RESPONSE = 11
FCN_QBOT_PLATFORM_SET_TRANSFORM = 14
FCN_QBOT_PLATFORM_SET_TRANSFORM_RESPONSE = 15
FCN_QBOT_PLATFORM_POSSESS = 20
FCN_QBOT_PLATFORM_POSSESS_ACK = 21
FCN_QBOT_PLATFORM_IMAGE_REQUEST = 100
FCN_QBOT_PLATFORM_IMAGE_RESPONSE = 101
FCN_QBOT_PLATFORM_LIDAR_DATA_REQUEST = 120
FCN_QBOT_PLATFORM_LIDAR_DATA_RESPONSE = 121
VIEWPOINT_RGB = 0
VIEWPOINT_DEPTH = 1
VIEWPOINT_DOWNWARD = 2
VIEWPOINT_TRAILING = 3
CAMERA_RGB = 0
CAMERA_DEPTH = 1
CAMERA_DOWNWARD = 2
[docs] def __init__(self, qlabs, verbose=False):
""" Constructor Method
:param qlabs: A QuanserInteractiveLabs object
:param verbose: (Optional) Print error information to the console.
:type qlabs: object
:type verbose: boolean
"""
self._qlabs = qlabs
self._verbose = verbose
self.classID = self.ID_QBOT_PLATFORM
return
[docs] def possess(self, camera):
"""
Possess (take control of) a QBot in QLabs with the selected camera.
:param camera: Pre-defined camera constant. See CAMERA constants for available options. Default is the trailing camera.
:type camera: uint32
:return:
- **status** - `True` if possessing the camera was successful, `False` otherwise
:rtype: boolean
"""
c = CommModularContainer()
c.classID = self.ID_QBOT_PLATFORM
c.actorNumber = self.actorNumber
c.actorFunction = self.FCN_QBOT_PLATFORM_POSSESS
c.payload = bytearray(struct.pack(">B", camera))
c.containerSize = c.BASE_CONTAINER_SIZE + len(c.payload)
self._qlabs.flush_receive()
if (self._qlabs.send_container(c)):
c = self._qlabs.wait_for_container(self.ID_QBOT_PLATFORM, self.actorNumber, self.FCN_QBOT_PLATFORM_POSSESS_ACK)
if (c == None):
return False
else:
return True
else:
return False
[docs] def command_and_request_state(self, rightWheelSpeed, leftWheelSpeed, leftLED=[1,0,0], rightLED=[1,0,0]):
"""Sets the wheel speeds and LED colors.
:param rightWheelSpeed: Speed in m/s
:param leftWheelSpeed: Speed in m/s
:param leftLED: Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:param rightLED: Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:type rightWheelSpeed: float
:type leftWheelSpeed: float
:type leftLED: float array[3]
:type rightLED: float array[3]
:return:
- **status** - `True` if successful, `False` otherwise
- **location** - world location in m
- **forward vector** - unit scale vector
- **up vector** - unit scale vector
- **front bumper hit** - true if in contact with a collision object, False otherwise
- **left bumper hit** - true if in contact with a collision object, False otherwise
- **right bumper hit** - true if in contact with a collision object, False otherwise
- **gyro** - turn rate in rad/s
- **heading** - angle in rad
- **encoder left** - in counts
- **encoder right** - in counts
:rtype: boolean, float array[3], float array[3], float array[3], boolean, boolean, boolean, float, float, uint32, uint32
"""
c = CommModularContainer()
c.classID = self.ID_QBOT_PLATFORM
c.actorNumber = self.actorNumber
c.actorFunction = self.FCN_QBOT_PLATFORM_COMMAND_AND_REQUEST_STATE
c.payload = bytearray(struct.pack(">ffffffff", rightWheelSpeed, leftWheelSpeed, leftLED[0], leftLED[1], leftLED[2], rightLED[0], rightLED[1], rightLED[2]))
c.containerSize = c.BASE_CONTAINER_SIZE + len(c.payload)
location = [0,0,0]
forward = [0,0,0]
up = [0,0,0]
frontHit = False
leftHit = False
rightHit = False
gyro = 0
heading = 0
encoderLeft = 0
encoderRight = 0
self._qlabs.flush_receive()
if (self._qlabs.send_container(c)):
c = self._qlabs.wait_for_container(self.ID_QBOT_PLATFORM, self.actorNumber, self.FCN_QBOT_PLATFORM_COMMAND_AND_REQUEST_STATE_RESPONSE)
if (c == None):
return False, location, forward, up, frontHit, leftHit, rightHit, gyro, heading, encoderLeft, encoderRight
if len(c.payload) == 55:
location[0], location[1], location[2], forward[0], forward[1], forward[2], up[0], up[1], up[2], frontHit, leftHit, rightHit, gyro, heading, encoderLeft, encoderRight, = struct.unpack(">fffffffff???ffII", c.payload[0:55])
return True, location, forward, up, frontHit, leftHit, rightHit, gyro, heading, encoderLeft, encoderRight
else:
return False, location, forward, up, frontHit, leftHit, rightHit, gyro, heading, encoderLeft, encoderRight
else:
return False, location, forward, up, frontHit, leftHit, rightHit, gyro, heading, encoderLeft, encoderRight
[docs] def set_transform(self, location=[0,0,0], rotation=[0,0,0], scale=[1,1,1], leftLED=[1,0,0], rightLED=[1,0,0], enableDynamics=True, waitForConfirmation=True):
"""Sets the transform, LED colors, and enabling/disabling of physics dynamics
:param location: (Optional) An array of floats for x, y and z coordinates
:param rotation: (Optional) An array of floats for the roll, pitch, and yaw in radians
:param scale: (Optional) An array of floats for the scale in the x, y, and z directions. Scale values of 0.0 should not be used.
:param leftLED: (Optional) Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:param rightLED: (Optional) Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:param enableDynamics: (default True) Enables or disables gravity for set transform requests.
:param waitForConfirmation: (Optional) Wait for confirmation before proceeding. This makes the method a blocking operation. NOTE: Return data will only be valid if waitForConfirmation is True.
:type location: float array[3]
:type rotation: float array[3]
:type scale: float array[3]
:type leftLED: float array[3]
:type rightLED: float array[3
:type enableDynamics: boolean
:type waitForConfirmation: boolean
:return:
- **status** - True if successful or False otherwise
- **location** - world location in m
- **forward vector** - unit scale vector
- **up vector** - unit scale vector
:rtype: boolean, float array[3], float array[3], float array[3]
"""
c = CommModularContainer()
c.classID = self.ID_QBOT_PLATFORM
c.actorNumber = self.actorNumber
c.actorFunction = self.FCN_QBOT_PLATFORM_SET_TRANSFORM
c.payload = bytearray(struct.pack(">fffffffffffffffB", location[0], location[1], location[2], rotation[0], rotation[1], rotation[2], scale[0], scale[1], scale[2], leftLED[0], leftLED[1], leftLED[2], rightLED[0], rightLED[1], rightLED[2], enableDynamics))
c.containerSize = c.BASE_CONTAINER_SIZE + len(c.payload)
location = [0,0,0]
forward = [0,0,0]
up = [0,0,0]
if waitForConfirmation:
self._qlabs.flush_receive()
if (self._qlabs.send_container(c)):
if waitForConfirmation:
c = self._qlabs.wait_for_container(self.ID_QBOT_PLATFORM, self.actorNumber, self.FCN_QBOT_PLATFORM_SET_TRANSFORM_RESPONSE)
if (c == None):
return False, location, forward, up
if len(c.payload) == 36:
location[0], location[1], location[2], forward[0], forward[1], forward[2], up[0], up[1], up[2], = struct.unpack(">fffffffff", c.payload[0:36])
return True, location, forward, up
else:
return False, location, forward, up
else:
return True, location, forward, up
else:
return False, location, forward, up
[docs] def set_transform_degrees(self, location=[0,0,0], rotation=[0,0,0], scale=[1,1,1], leftLED=[1,0,0], rightLED=[1,0,0], enableDynamics=True, waitForConfirmation=True):
"""Sets the transform, LED colors, and enabling/disabling of physics dynamics
:param location: (Optional) An array of floats for x, y and z coordinates
:param rotation: (Optional) An array of floats for the roll, pitch, and yaw in degrees
:param scale: (Optional) An array of floats for the scale in the x, y, and z directions. Scale values of 0.0 should not be used.
:param leftLED: (Optional) Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:param rightLED: (Optional) Red, Green, Blue components of the RGB color on a 0.0 to 1.0 scale.
:param enableDynamics: (default True) Enables or disables gravity for set transform requests.
:param waitForConfirmation: (Optional) Wait for confirmation before proceeding. This makes the method a blocking operation. NOTE: Return data will only be valid if waitForConfirmation is True.
:type location: float array[3]
:type rotation: float array[3]
:type scale: float array[3]
:type leftLED: float array[3]
:type rightLED: float array[3
:type enableDynamics: boolean
:type waitForConfirmation: boolean
:return:
- **status** - True if successful or False otherwise
- **location** - world location in m
- **forward vector** - unit scale vector
- **up vector** - unit scale vector
:rtype: boolean, float array[3], float array[3], float array[3]
"""
success, location, forward, up = self.set_transform(location, [rotation[0]/180*math.pi, rotation[1]/180*math.pi, rotation[2]/180*math.pi], scale, leftLED, rightLED, enableDynamics, waitForConfirmation)
return success, location, forward, up
[docs] def get_image(self, camera):
"""
Request a JPG image from the QBot camera.
:param camera: Camera number to view from.
:type camera: byte
:return:
- **status** - `True` and image data if successful, `False` and empty otherwise
- **imageData** - Image in a JPG format
:rtype: boolean, byte array with jpg data
"""
if (not self._is_actor_number_valid()):
return False, None
c = CommModularContainer()
c.classID = self.ID_QBOT_PLATFORM
c.actorNumber = self.actorNumber
c.actorFunction = self.FCN_QBOT_PLATFORM_IMAGE_REQUEST
c.payload = bytearray(struct.pack(">B", camera))
c.containerSize = c.BASE_CONTAINER_SIZE + len(c.payload)
self._qlabs.flush_receive()
if (self._qlabs.send_container(c)):
c = self._qlabs.wait_for_container(self.ID_QBOT_PLATFORM, self.actorNumber, self.FCN_QBOT_PLATFORM_IMAGE_RESPONSE)
if (c == None):
return False, None
imageData = cv2.imdecode(np.frombuffer(bytearray(c.payload[8:len(c.payload)]), dtype=np.uint8, count=-1, offset=0), 1)
return True, imageData
else:
return False, None
[docs] def get_lidar(self, samplePoints=400):
"""
Request LIDAR data from a QBotPlatform.
:param samplePoints: (Optional) Change the number of points per revolution of the LIDAR.
:type samplePoints: uint32
:return: `True`, angles in radians, and distances in m if successful, `False`, none, and none otherwise
:rtype: boolean, float array, float array
"""
if (not self._is_actor_number_valid()):
if (self._verbose):
print('actorNumber object variable None. Use a spawn function to assign an actor or manually assign the actorNumber variable.')
return False, None, None
LIDAR_SAMPLES = 4096
LIDAR_RANGE = 8
# The LIDAR is simulated by using 4 orthogonal virtual cameras that are 1 pixel high. The
# lens distortion of these cameras must be removed to accurately calculate the XY position
# of the depth samples.
quarter_angle = np.linspace(0, 45, int(LIDAR_SAMPLES/8))
lens_curve = -0.0077*quarter_angle*quarter_angle + 1.3506*quarter_angle
lens_curve_rad = lens_curve/180*np.pi
angles = np.concatenate((np.pi*4/2-1*np.flip(lens_curve_rad), \
lens_curve_rad, \
(np.pi/2 - 1*np.flip(lens_curve_rad)), \
(np.pi/2 + lens_curve_rad), \
(np.pi - 1*np.flip(lens_curve_rad)), \
(np.pi + lens_curve_rad), \
(np.pi*3/2 - 1*np.flip(lens_curve_rad)), \
(np.pi*3/2 + lens_curve_rad)))
c = CommModularContainer()
c.classID = self.ID_QBOT_PLATFORM
c.actorNumber = self.actorNumber
c.actorFunction = self.FCN_QBOT_PLATFORM_LIDAR_DATA_REQUEST
c.payload = bytearray()
c.containerSize = c.BASE_CONTAINER_SIZE + len(c.payload)
self._qlabs.flush_receive()
if (self._qlabs.send_container(c)):
c = self._qlabs.wait_for_container(self.ID_QBOT_PLATFORM, self.actorNumber, self.FCN_QBOT_PLATFORM_LIDAR_DATA_RESPONSE)
if (c == None):
if (self._verbose):
print('Failed to receive return container.')
return False, None, None
if ((len(c.payload)-4)/2 != LIDAR_SAMPLES):
if (self._verbose):
print("Received {} bytes, expected {}".format(len(c.payload), LIDAR_SAMPLES*2))
return False, None, None
distance = np.linspace(0,0,LIDAR_SAMPLES)
for count in range(LIDAR_SAMPLES-1):
# clamp any value at 65535 to 0
raw_value = ((c.payload[4+count*2] * 256 + c.payload[5+count*2] )) %65535
# scale to LIDAR range
distance[count] = (raw_value/65535)*LIDAR_RANGE
# Resample the data using a linear radial distribution to the desired number of points
# and realign the first index to be 0 (forward)
sampled_angles = np.linspace(0,2*np.pi, num=samplePoints, endpoint=False)
sampled_distance = np.linspace(0,0, samplePoints)
index_raw = 512
for count in range(samplePoints):
while (angles[index_raw] < sampled_angles[count]):
index_raw = (index_raw + 1) % 4096
if index_raw != 0:
if (angles[index_raw]-angles[index_raw-1]) == 0:
sampled_distance[count] = distance[index_raw]
else:
sampled_distance[count] = (distance[index_raw]-distance[index_raw-1])*(sampled_angles[count]-angles[index_raw-1])/(angles[index_raw]-angles[index_raw-1]) + distance[index_raw-1]
else:
sampled_distance[count] = distance[index_raw]
return True, sampled_angles, sampled_distance
else:
if (self._verbose):
print('Communications request for LIDAR data failed.')
return False, None, None