Skip to content
22 changes: 17 additions & 5 deletions src/autonomous_nav/autonomous_nav/navigation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ def updateNavigation(self) -> None:
elif self.global_costmap != None and len(self.path.poses) == 0:
self.get_logger().warn("Running test")
self.planPath(self.global_costmap)
self.get_logger().info(f"plotted to goal successfully")
self.path_pub.publish(self.path)
self.get_logger().info(f"total indices queried: {self.index_count}")
return
self.publishStatus(f"En route to waypoint ({goal_x:.2f}, {goal_y:.2f})")
Expand Down Expand Up @@ -276,6 +278,7 @@ def planPath(
"""
This algorithm looks at the global occupancy grid in order to plan a path through it for the rover using an A* style search algorithm.
"""
publish_count = 0
lowest_cost_position = self.current_position
previous_position = lowest_cost_position
distance_to_goal = self.distance_2d(
Expand All @@ -292,7 +295,8 @@ def planPath(
# we have found no suitable indicies
# pop the current position from the path
self.get_logger().info(f"backtracking")
self.path.poses.pop()
if len(self.path.poses) != 0:
self.path.poses.pop()
# make the new lowest position the item at the top of the stack
lowest_cost_position = (
self.path.poses[-1].pose.position.x,
Expand All @@ -303,22 +307,30 @@ def planPath(

lowest_cost_node = self.find_lowest_cost_node(target_area, grid)
lowest_cost_position = self.index_to_position(grid, lowest_cost_node)
self.append_path(lowest_cost_position)
if publish_count % 10 == 0:
self.get_logger().info(
f"current lowest cost position is {lowest_cost_position[0]},{lowest_cost_position[1]}"
)
publish_count = publish_count + 1
self.append_path(lowest_cost_position)
distance_to_goal = self.distance_2d(
lowest_cost_position[0],
lowest_cost_position[1],
self.end_goal_waypoint[0],
self.end_goal_waypoint[1],
)
self.get_logger().info(f"plotted to goal successfully")
self.path_pub.publish(self.path)

def find_lowest_cost_node(self, target_area: list[Tuple[int, int]], grid: OccupancyGrid) -> int:
self.get_logger().info(f"target area is this large: {len(target_area)}")
minimum_cost = sys.float_info.max
for item in target_area:
item_cost = self.distance_between_indicies(grid, item[1], self.end_goal_index)
if item_cost < minimum_cost and item_cost != 100 and item_cost != -1:
if (
item_cost < minimum_cost
and item_cost != 100
and item_cost != -1
and item_cost != 50
):
minimum_cost = item_cost
minimum_index = item[1]

Expand Down
226 changes: 226 additions & 0 deletions src/autonomous_nav/autonomous_nav/stuck_detection_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import sys
import time
from dataclasses import dataclass
from typing import Callable, Dict, Tuple

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.subscription import Subscription
from std_msgs.msg import Bool, Float32, String, UInt8

from lib.color_codes import ColorCodes, colorStr
from lib.configs import MoteusMotorConfig, MotorConfigs
from lib.moteus_motor_state import MoteusMotorState, MoteusRunSettings

NODE_NAME = "stuck_detection_node"
DRIVE_MOTOR_CONFIGS = [
MotorConfigs.FRONT_LEFT_DRIVE_MOTOR,
MotorConfigs.FRONT_RIGHT_DRIVE_MOTOR,
MotorConfigs.MID_LEFT_DRIVE_MOTOR,
MotorConfigs.MID_RIGHT_DRIVE_MOTOR,
MotorConfigs.REAR_LEFT_DRIVE_MOTOR,
MotorConfigs.REAR_RIGHT_DRIVE_MOTOR,
]
RMX8_25_RATED_TORQUE = 10 # N/m


@dataclass
class StuckReason:
ros_message: str
""" The message sent to the ROS topic. """
stuck_duration: float
""" How many seconds tripped before marked as stuck. """
mask: int
""" How to encode/decode this reason. """

def __hash__(self) -> int:
return self.mask


class StuckReasons:
"""Reasons for why we are stuck."""

WHEELS_CANT_SPIN = StuckReason("wheels-cant-spin", 1.0, 1)
FREE_SPINNING = StuckReason("free-spinning", 1.0, 1 << 1)


class StuckDetectionNode(Node):
"""
Determines whether the rover is currently "stuck".

Publications:
- /stuck_detection/is_stuck [Bool]
- /stuck_detection/reason [UInt8]
- Reasons are encoded by a bitmask (see StuckReasons for usage).
"""

DEBUG = True

# Initialization

def __init__(self) -> None:
super().__init__(NODE_NAME)

for config in DRIVE_MOTOR_CONFIGS:
self.subscribe_to_motor(config)

# TODO: Add lifetimes to the values
self.target_states: Dict[int, MoteusRunSettings] = {}
self.current_states: Dict[int, MoteusMotorState] = {}

self.stuck_start: Dict[StuckReason, float] = {}

self.stuck_publisher = self.create_publisher(Bool, "/stuck_detection/is_stuck", 10)
self.stuck_reason_publisher = self.create_publisher(UInt8, "/stuck_detection/reason", 10)

if self.DEBUG:
self.target_velocity_ratio_publisher = self.create_publisher(
Float32, "/stuck_detection/debug/target_velocity_ratio", 10
)
self.rated_torque_ratio_publisher = self.create_publisher(
Float32, "/stuck_detection/debug/rated_torque_ratio", 10
)

self.timer = self.create_timer(0.1, self.update)

def subscribe_to_motor(
self, motor_config: MoteusMotorConfig
) -> Tuple[Subscription, Subscription]:
"""
Subscribes to the motor's target state and current state.
"""

def get_target_callback() -> Callable[[String], None]:
return lambda msg: self.motor_target_state_callback(
motor_config.can_id, MoteusRunSettings.fromJsonMsg(msg)
)

def get_current_callback() -> Callable[[String], None]:
return lambda msg: self.motor_current_state_callback(
motor_config.can_id, MoteusMotorState.fromJsonMsg(msg)
)

return (
self.create_subscription(
String,
motor_config.getInterfaceTopicName(),
get_target_callback(),
10,
),
self.create_subscription(
String,
motor_config.getCanTopicName(),
get_current_callback(),
10,
),
)

def motor_current_state_callback(self, can_id: int, state: MoteusMotorState) -> None:
self.current_states[can_id] = state

def motor_target_state_callback(self, can_id: int, settings: MoteusRunSettings) -> None:
self.target_states[can_id] = settings

# Detection

def update_stuck_reason(self, reason: StuckReason, is_active: bool) -> bool:
now = time.time()
if is_active:
if reason in self.stuck_start:
return now - self.stuck_start[reason] > reason.stuck_duration
else:
self.stuck_start[reason] = now
else:
self.stuck_start.pop(reason, 0.0)

return False

def calculate_per_motor_effort(self) -> list[StuckReason]:
FREE_SPINNING_THRESH = (0.2, 0.9) # <t, >v
STUCK_THRESH = (0.5, 0.2) # >t, <v
STUCK_WHEEL_MIN_COUNT = 3

cant_spin_count = 0
free_spinning_count = 0

for config in DRIVE_MOTOR_CONFIGS:
target = self.target_states.get(config.can_id)
state = self.current_states.get(config.can_id)

if target is None or state is None:
continue

if state.velocity is None or target.velocity is None:
continue

if state.torque is None:
# TODO: Fallback to q_current
continue

if abs(target.velocity) < 1e-3:
continue

v_norm = abs(state.velocity / target.velocity)
v_norm = min(v_norm, 1.0)

t_norm = abs(state.torque) / RMX8_25_RATED_TORQUE

if self.DEBUG:
self.target_velocity_ratio_publisher.publish(Float32(data=v_norm))
self.rated_torque_ratio_publisher.publish(Float32(data=t_norm))

if t_norm < FREE_SPINNING_THRESH[0] and v_norm > FREE_SPINNING_THRESH[1]:
free_spinning_count += 1

if t_norm > STUCK_THRESH[0] and v_norm < STUCK_THRESH[1]:
cant_spin_count += 1

stuck = []
if self.update_stuck_reason(
StuckReasons.WHEELS_CANT_SPIN, cant_spin_count >= STUCK_WHEEL_MIN_COUNT
):
stuck.append(StuckReasons.WHEELS_CANT_SPIN)
if self.update_stuck_reason(
StuckReasons.FREE_SPINNING, free_spinning_count >= STUCK_WHEEL_MIN_COUNT
):
stuck.append(StuckReasons.FREE_SPINNING)
return stuck

# Update

def update(self) -> None:
stuck = self.calculate_per_motor_effort()

if len(stuck) > 0:
self.stuck_publisher.publish(Bool(data=True))
encoded = 0
for reason in stuck:
encoded |= reason.mask
self.stuck_reason_publisher.publish(UInt8(data=encoded))
else:
self.stuck_publisher.publish(Bool(data=False))


def main(args: list[str] | None = None) -> None:
rclpy.init(args=args)
stuck_detection_node = None
try:
stuck_detection_node = StuckDetectionNode()
rclpy.spin(stuck_detection_node)
except KeyboardInterrupt:
pass
except ExternalShutdownException:
if stuck_detection_node is not None:
stuck_detection_node.get_logger().info(
colorStr(f"Shutting down {NODE_NAME}", ColorCodes.BLUE_OK)
)
finally:
if stuck_detection_node is not None:
stuck_detection_node.destroy_node()
rclpy.shutdown()
sys.exit(0)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import math
import sys
from pathlib import Path as filePath
from typing import Tuple
from typing import Generator, Tuple

import numpy as np
import pytest
Expand All @@ -10,7 +10,7 @@


@pytest.fixture(scope="session", autouse=True)
def ros2_init_shutdown():
def ros2_init_shutdown() -> Generator:
rclpy.init()
yield
rclpy.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import math
import sys
from pathlib import Path
from typing import List, Tuple

import numpy as np

# from pathlib import Path
from nav_msgs.msg import Path
# from nav_msgs.msg import Path as PathMsg

# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
Expand Down
Loading
Loading