Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 151 additions & 89 deletions racecar_autopilot/racecar_autopilot/wall_estimator.py
Original file line number Diff line number Diff line change
@@ -1,144 +1,206 @@
#!/usr/bin/env python3

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

import numpy as np
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan

import numpy as np


class WallEstimator(Node):

def __init__(self):
super().__init__("wall_estimator")
super().__init__("estimator")

# Bounds for the LiDAR's measurements used for estimate
self.LEFT_REFERENCE_ANGLE = -90.0 # deg
self.RIGHT_REFERENCE_ANGLE = 90.0 # deg
self.REFERENCE_ANGLE_DELTA = 15.0 # deg
# Init subscribers
self.sub_ref = self.create_subscription(LaserScan, "scan", self.read_scan, 1)

# Init publishers
self.pub_y = self.create_publisher(Twist, "car_position", 1)

# Parameters
self.cone_angle_deg = 20.0 # deg # Angle around left/right reference to consider for the estimation

# Outputs
self.y_estimate = float(0)
self.dy_estimate = float(0)
self.theta_estimate = float(0)
self.y_estimation = 0.0
self.dy_estimation = 0.0
self.theta_estimation = 0.0

# Memory
self.y_left = float(0)
self.y_right = float(0)
self.theta_left = float(0)
self.theta_right = float(0)
self.nb_lidar_point = int(0)
self.y_left = 0.0
self.y_right = 0.0
self.theta_left = 0.0
self.theta_right = 0.0

self.lidar_subscription = self.create_subscription(LaserScan, "racecar/scan", self.read_scan, 1)
self.pos_publisher = self.create_publisher(Twist, "car_position", 1)
# Info about LiDAR read from the first scan
self.lidar_index_to_compute = True
self.index_left_start = 0 # 140
self.index_left_end = 0 # 220
self.index_right_start = 0 # 500
self.index_right_end = 0 # 580

def read_scan(self, scan_msg: LaserScan):
ranges = np.array(scan_msg.ranges, dtype=np.float32)
angle_min_deg: float = np.degrees(scan_msg.angle_min)
angle_increment_deg: float = np.degrees(scan_msg.angle_increment)
########################################
def read_scan(self, scan_msg):

left_lower_bound = self.LEFT_REFERENCE_ANGLE - self.REFERENCE_ANGLE_DELTA
left_upper_bound = self.LEFT_REFERENCE_ANGLE + self.REFERENCE_ANGLE_DELTA
if self.lidar_index_to_compute:
self.compute_lidar_ranges(scan_msg)
self.lidar_index_to_compute = False

right_lower_bound = self.RIGHT_REFERENCE_ANGLE - self.REFERENCE_ANGLE_DELTA
right_upper_bound = self.RIGHT_REFERENCE_ANGLE + self.REFERENCE_ANGLE_DELTA
self.estimate_car_position(scan_msg)

# Precompute angles in degress for all indices
angles_deg = angle_min_deg + np.arange(len(ranges), dtype=np.float32) * angle_increment_deg
########################################
def compute_lidar_ranges(self, scan_msg):

half_length = len(scan_msg.ranges) // 2
angle_min_deg = np.degrees(scan_msg.angle_min)
angle_increment_deg = np.degrees(scan_msg.angle_increment)
ranges = np.array(scan_msg.ranges, dtype=np.float32)
angles_deg = (
angle_min_deg
+ np.arange(len(ranges), dtype=np.float32) * angle_increment_deg
)

left_min_deg = -90.0 - self.cone_angle_deg
left_max_deg = -90.0 + self.cone_angle_deg
right_min_deg = 90.0 - self.cone_angle_deg
right_max_deg = 90.0 + self.cone_angle_deg

# Find indices corresponding to left and right min/max angles
self.index_left_start = np.searchsorted(angles_deg, left_min_deg, side="left")
self.index_left_end = (
np.searchsorted(angles_deg, left_max_deg, side="right") - 1
)
self.index_right_start = np.searchsorted(angles_deg, right_min_deg, side="left")
self.index_right_end = (
np.searchsorted(angles_deg, right_max_deg, side="right") - 1
)

# Left side: only use the first half of the ranges
left_mask = (
(angles_deg[:half_length] >= left_lower_bound)
& (angles_deg[:half_length] <= left_upper_bound)
& (scan_msg.range_min < ranges[:half_length])
& (ranges[:half_length] < scan_msg.range_max)
self.get_logger().info(
f"LiDAR left indices: {self.index_left_start} to {self.index_left_end}"
)
self.get_logger().info(
f"LiDAR right indices: {self.index_right_start} to {self.index_right_end}"
)
left_distances = ranges[:half_length][left_mask]
left_thetas = angles_deg[:half_length][left_mask]

if len(left_distances) > 2:
left_distances = np.array(left_distances, dtype=np.float32)
left_thetas = np.radians(left_thetas)
########################################
def estimate_line_from_points(self, d, theta):

y = left_distances * np.sin(left_thetas)
x = left_distances * np.cos(left_thetas)
# Convert to Cartesian
y = d * np.sin(theta)
x = d * np.cos(theta)

A = np.column_stack((x, np.ones(x.shape, dtype=np.float32)))
# Formulate A matrix of the linear system
ones = np.ones(x.shape)
A = np.column_stack((x, ones))

# Least Squares method
estimate = np.linalg.lstsq(A, y, rcond=None)[0] # (A'A)^-1 * A'y
# Least square solution
estimation = np.linalg.lstsq(A, y, rcond=None)[0] # (ATA)^-1 ATy

slope = estimate[0]
offset = estimate[1]
m = estimation[0] # slope
b = estimation[1] # offset

self.theta_left = np.arctan(slope)
self.y_left = offset
# Return angle and offset
theta = np.arctan(m)
y = b

# Right side: only use the second half of the ranges
right_mask = (
(angles_deg[half_length:] >= right_lower_bound)
& (angles_deg[half_length:] <= right_upper_bound)
& (scan_msg.range_min < ranges[half_length:])
& (ranges[half_length:] < scan_msg.range_max)
)
right_distances = ranges[half_length:][right_mask]
right_thetas = angles_deg[half_length:][right_mask]
return theta, y

########################################
def estimate_car_position(self, scan_msg):

if len(right_distances) > 2:
right_distances = np.array(right_distances)
right_thetas = np.radians(right_thetas)
ranges = np.array(scan_msg.ranges)

y = right_distances * np.sin(right_thetas)
x = right_distances * np.cos(right_thetas)
# Left side
d_data = []
theta_data = []
n_good_scan = 0

A = np.column_stack((x, np.ones(x.shape, dtype=np.float32)))
# Get data in the left cone
for i in range(self.index_left_start, self.index_left_end + 1):

# Least Squares method
estimate = np.linalg.lstsq(A, y, rcond=None)[0] # (ATA)^-1 ATy
scan_is_good = (ranges[i] > scan_msg.range_min) & (
ranges[i] < scan_msg.range_max
)

slope = estimate[0]
offset = estimate[1]
if scan_is_good:
d_data.append(ranges[i])
theta_data.append(scan_msg.angle_min + i * scan_msg.angle_increment)
n_good_scan = n_good_scan + 1

self.theta_right = np.arctan(slope)
self.y_right = offset
if n_good_scan > 2:

self.theta_estimate = (self.theta_left + self.theta_right) / 2
self.y_estimate = (self.y_left + self.y_right) / 2
# Estimate left wall line from points using least squares
d = np.array(d_data)
theta = np.array(theta_data)
self.theta_left, self.y_left = self.estimate_line_from_points(d, theta)

self.publish_estimate()
else:

def convert_angle_to_index(self, angle: float):
return round(self.nb_lidar_point * angle / 360)
self.get_logger().warning("Not enough good scans on the left side.")

# Right side
d_data = []
theta_data = []
n_good_scan = 0

# Get data in the right cone
for i in range(self.index_right_start, self.index_right_end + 1):

scan_is_good = (ranges[i] > scan_msg.range_min) & (
ranges[i] < scan_msg.range_max
)

if scan_is_good:

d_data.append(ranges[i])
theta_data.append(scan_msg.angle_min + i * scan_msg.angle_increment)

n_good_scan = n_good_scan + 1

if n_good_scan > 2:

# Estimate left wall line from points
d = np.array(d_data)
theta = np.array(theta_data)
self.theta_right, self.y_right = self.estimate_line_from_points(d, theta)

else:

self.get_logger().warning("Not enough good scans on the right side.")

# Combine left and right estimates
self.theta_estimation = 0.5 * (self.theta_left + self.theta_right)
self.y_estimation = 0.5 * (self.y_left + self.y_right)

self.pub_estimate()

########################################
def pub_estimate(self):

def publish_estimate(self):
msg = Twist()

msg.linear.y = float(self.y_estimate)
msg.angular.z = float(self.theta_estimate)
msg.linear.y = float(self.y_estimation)
msg.angular.z = float(self.theta_estimation)

# debug

# These are for debugging purposes
msg.linear.x = float(self.y_left)
msg.linear.z = float(self.y_right)

msg.angular.x = float(self.theta_left)
msg.angular.y = float(self.theta_right)

self.pos_publisher.publish(msg)
self.pub_y.publish(msg)


def main(args=None):
try:
rclpy.init(args=args)
node = WallEstimator()
rclpy.spin(node)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
rclpy.shutdown()
rclpy.init(args=args)
node = WallEstimator()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()


##############################################

if __name__ == "__main__":
main()
60 changes: 60 additions & 0 deletions racecar_bringup/launch/autopilot.launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description():
"""
Minimalist launch file for the racecar's teleoperation.
"""
return LaunchDescription(
[
Node(
package="joy",
executable="joy_node",
name="joy",
parameters=[{"deadzone": 0.05}],
arguments=["dev", "/dev/input/js0"],
output="screen",
),
Node(
package="racecar_teleop",
executable="slash_teleop",
name="teleop",
output="screen",
),
Node(
package="racecar_autopilot",
executable="slash_controller",
name="controller",
output="screen",
),
Node(
package="pb2roscpp",
executable="pb2roscpp",
name="arduino",
output="screen",
),
Node(
name="lidar",
package="rplidar_ros",
executable="rplidar_composition",
output="screen",
parameters=[
{
"serial_port": "/dev/ttyUSB0",
"serial_baudrate": 115200,
"frame_id": "racecar/base_laser",
"inverted": False,
"angle_compensate": True,
}
],
remappings=[("/scan", "/racecar/scan")],
),
Node(
package="racecar_autopilot",
executable="wall_estimator",
name="wall_estimator",
remappings=[("/scan", "/racecar/scan")],
),
]
)
2 changes: 1 addition & 1 deletion racecar_teleop/racecar_teleop/slash_teleop.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def joy_callback( self, joy_msg ):
# Closed-loop velocity, Open-loop steering
self.cmd_msg.linear.x = propulsion_user_input * self.max_vel #[m/s]
self.cmd_msg.angular.z = steering_user_input * self.cmd2rad
self.cmd_msg.linear.z = 7.0 # Control mode
self.cmd_msg.linear.z = 0.0 # Control mode

# Deadman is un-pressed
else:
Expand Down